`
frank-liu
  • 浏览: 1666190 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Cancel, Reject tasks in ThreadPool

 
阅读更多

简介

    在前面的文章中我们讨论过提交task到线程池中间执行。那里主要是考虑任务提交后一切都正常执行的情况。但是,如果在某些情况下如果我们想要取消或者拒绝task的提交和执行。则需要用到一些其他的特性。

Cancel Task

    Cancel task在线程池里有比较方便的支持。我们提交task的时候,一般会期待线程池的执行返回Future<T>类型的结果。在执行线程池的submit方法后我们就可以取得Future<T>结果的引用了。所以如果我们想要取消运行的话,可以直接调用该引用的cancel方法。

    我们来看一个具体的示例:

    首先我们定义一个可以提交给线程池的task,它需要实现Callable接口。

import java.util.concurrent.Callable;

public class Task implements Callable<String> {
	@Override
	public String call() throws Exception {
		while(true) {
			System.out.printf("Task: Test\n");
			Thread.sleep(100);
		}
	}
}

     这部分代码里有一个看起来比较诡异的地方。就是我们在call方法的实现里并没有返回任何String类型的结果。它只是一个无限循环。看起来它可能会报错。实际上因为会一直循环下去。代码还是可以通过编译并且运行的。

    然后,我们再来定义使用它的场景代码:

import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {
		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
		Task task = new Task();
		System.out.printf("Main: Executing the Task\n");
		Future<String> result = executor.submit(task);
		
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch(InterruptedException e) {
			e.printStackTrace();
		}
		System.out.printf("Main: Canceling the task\n");
		result.cancel(true);
		System.out.printf("Main: Canceled: %s\n", result.isCancelled());
		System.out.printf("Main: Done: %s\n", result.isDone());
		executor.shutdown();
		System.out.printf("Main: The executor has finished\n");
	}
}

     在前面的代码里,我们创建了一个线程池,然后通过submit方法提交task。在sleep两秒钟之后又通过result.cancel(true)方法取消线程的执行。下面是程序的执行结果:

Main: Executing the Task
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Task: Test
Main: Canceling the task
Main: Canceled: true
Main: Done: true
Main: The executor has finished

 

Reject Task

    在前面cancel task的场景中,我们是基于一个thread pool可以接收task并且能够执行。在某些情况下thread pool可能会拒绝task的执行。一般来说,当我们使用线程池结束的时候会调用线程池的shutdown方法。这个方法不会马上结束,它会等待里面的线程执行结束之后再结束。在这个时候,如果再向线程池提交task的话会被线程池拒绝。在thread pool里面提供了一种机制,当提交的task被拒绝之后,它将会被调用。

     在线程池里有一个setRejectedExecutionHandler,这个方法相当于一个事件的注册机制。它接收RejectedExecutionHandler类型的参数。在线程被reject之后该接口的rejectedExecution方法会被触发。

    我们现在来看看具体的实现示例:

    首先,实现一个继承RejectedExecutionHandler的类:

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;


public class RejectedTaskController implements RejectedExecutionHandler {
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		System.out.printf("RejectedTaskController: The task %s has" +
				"been rejected\n", r.toString());
		System.out.printf("RejectedTaskController: %s\n", executor.toString());
		System.out.printf("RejectedTaskController: Terminating: %s\n", 
				executor.isTerminating());
		System.out.printf("RejectedTaskController: Terminated: %s\n", 
				executor.isTerminated());
	}
}

     然后定义一个提交的线程:

import java.util.concurrent.TimeUnit;


public class Task implements Runnable {
	private String name;
	public Task(String name) {
		this.name = name;
	}
	
	@Override
	public void run() {
		System.out.printf("Task " + name + ": Starting");
		
		try {
			long duration = (long)(Math.random() * 10);
			System.out.printf("Task %s: ReportGenerator: Generating " +
					"a report during %d seconds\n", name, duration);
			TimeUnit.SECONDS.sleep(duration);
		} catch(InterruptedException e) {
			e.printStackTrace();
		}
		System.out.printf("Task %s: Ending\n", name);
	}
	
	public String toString() {
		return name;
	}
}

   该线程只是打印一些相关的信息并sleep几秒钟。

    具体调用实现的方法如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Main {

	public static void main(String[] args) {
		RejectedTaskController controller = 
				new RejectedTaskController();
		ThreadPoolExecutor executor = 
				(ThreadPoolExecutor) Executors.newCachedThreadPool();
		executor.setRejectedExecutionHandler(controller);
		System.out.printf("Main: Starting.\n");
		for(int i = 0; i < 3; i++) {
			Task task = new Task("Task" + i);
			executor.submit(task);
		}
		
		System.out.printf("Main: Shutting down the Executor.\n");
		executor.shutdown();
		
		System.out.printf("Main: Sending another Task.\n");
		Task task = new Task("RejectedTask");
		executor.submit(task);
		System.out.printf("Main: End.\n");
	}
}

    前面启动了3个线程让他们正常执行。然后再调用线程池的shutdown方法来中止线程池的运行。后面当我们再尝试提交线程的时候,我们可以看到rejectedExecution方法被触发。 

     下面是程序执行的结果:

Main: Starting.
Task Task0: StartingMain: Shutting down the Executor.
Task Task2: StartingTask Task1: StartingMain: Sending another Task.
RejectedTaskController: The task java.util.concurrent.FutureTask@18aa5e75 hasbeen rejected
Task Task1: ReportGenerator: Generating a report during 2 seconds
Task Task2: ReportGenerator: Generating a report during 8 seconds
Task Task0: ReportGenerator: Generating a report during 3 seconds
RejectedTaskController: java.util.concurrent.ThreadPoolExecutor@4b26f29f[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
RejectedTaskController: Terminating: true
RejectedTaskController: Terminated: false
Main: End.
Task Task1: Ending
Task Task0: Ending
Task Task2: Ending

 

总结

    对于提交给线程池执行的线程来说,如果我们希望对线程的执行有更多的控制,比如说取消或者拒绝线程的执行。我们可以通过取得的Future<T>引用调用cancel方法来实现。另外,在线程池已经调用shutdown方法之后,我们继续提交的task会被拒绝。为了知晓线程提交后被拒绝的消息以及做后续的处理,我们可以通过实现接口RejectedExecutionHandler,并注册该实现到线程池中。

参考材料

Java 7 concurrency cookbook

1
3
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics