`
blueswind8306
  • 浏览: 124603 次
  • 来自: ...
社区版块
存档分类
最新评论

并发编程系列-并行子任务的超时控制

阅读更多
在并发编程中,经常会遇到提交多个子任务并行执行的场景。比如一个中心节点同时派发任务给多个子节点,然后中心节点等待所有子节点完成任务后继续主流程。在这个过程中,主节点需要设置一个最大等待超时,当达到超时时间后不再等待未返回的节点结果,做功能降级处理。

对于这种需求,如果子任务是阻塞执行的,则一般会使用一个线程池来执行子任务,但主任务如何唤醒超时呢?直接想到的方式是主任务在提交完所有子任务后进入一个循环,不断判断所有子任务是否已经完成或者到达超时了,但这种方式会导致主任务线程需要频繁唤醒,加大了上下文切换的开销。并且由于子任务是异步执行的,还需要考虑结果对象的安全发布问题,加大了编码的复杂性。

在j.u.c中有一个CompletionService接口恰好可以实现上述需求,并且避免了上下文切换的开销。其基本思路是用ExecutorCompletionService包装Executor,并在内部使用一个BlockingQueue保存所有已经完成的任务。当主任务调用ExecutorCompletionService.submit方法时包装一个FutureTask的子类对象QueueingFuture并传递给内部Executor,此对象覆盖了FutureTask的done方法。当线程池的Worker线程在任务完成后会回调这个done方法,然后这个方法将已经完成的任务注入到BlockingQueue中去。这样外部只需要调用BlockingQueue的take或poll方法就可以取到完成的任务了。

注意:
  • 由于最先完成的任务会先注入BlockingQueue中,所以主线程中取得的任务结果集是按照完成的先后顺序排序的。
  • 由于使用FutureTask,保证了对象在线程间的安全发布,所以主线程得到的任务结果对象不会出现一致性问题。


以下是我写的一个Demo,参考了《Java并发编程实践》第6章的6.3.6节的程序,并加入了超时等待、以及超时后的通知子线程任务中断的机制:
public class TestCompletionService {

	private static final ExecutorService executor = Executors.newFixedThreadPool(10);
	
	private static final Random r = new Random();	// 每个子任务随机等待一个时间,以模拟子任务的执行时间
	
	private static final int TASK_TIMEOUT = 5;	// 设定最长超时时间为5s
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		
		final long startTime = System.currentTimeMillis();
		final long endTime = startTime + (TASK_TIMEOUT * 1000L);
		
		CompletionService<SubTaskResult> cs = new ExecutorCompletionService<SubTaskResult>(executor);
		List<Future<SubTaskResult>> futureList = new ArrayList<Future<SubTaskResult>>();
		for (int i = 0; i < 10; i++) {
			Future<SubTaskResult> f = cs.submit(new Callable<SubTaskResult>() {
				@Override
				public SubTaskResult call() throws Exception {
					try {
						// 子任务等待一个随机时间。如果这里不是+1而是+2,就可以模拟出现超时的情况
						long waitTime = (r.nextInt(TASK_TIMEOUT) + 1) * 1000L;
						Thread.sleep(waitTime);
						return new SubTaskResult(Thread.currentThread().getName() + 
								", sub thread sleepTime=" + waitTime + "ms");	
					} catch (InterruptedException e) {
						System.out.println(Thread.currentThread().getName() + 
								", catch an interrupted exception, interrupted status=" + Thread.interrupted());
						throw e;
					}
				}
			});
			futureList.add(f);
		}
		
		try {
			for (int i = 0; i < 10; i++) {
				long timeLeft = endTime - System.currentTimeMillis();
				try {
					// timeLeft可能为负数,由于j.u.c中所有负数与0等同,所以不用单独对负数做判断
					Future<SubTaskResult> responseFuture = cs.poll(timeLeft, TimeUnit.MILLISECONDS);
					if (responseFuture == null) {
						throw new TimeoutException("waiting timeout");
					}
					SubTaskResult response = responseFuture.get();
					System.out.println(response.getResult() + ", main thread waitFor: " + timeLeft);
				} catch (InterruptedException e) {
					e.printStackTrace();
					Thread.currentThread().interrupt();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}	
			}			
		} catch (TimeoutException e) {
			// 如果超时则终止所有任务(注意cancel只是调用子线程的interrupt方法,至于能不能中断得看子线程是否支持)
			// 因为对于已经完成的任务调用Future.cancel不起效,所以不需要排除那些已经完成的任务
			for (Future<SubTaskResult> future : futureList) {
				future.cancel(true);
			}
			e.printStackTrace();
		} finally {
			executor.shutdown();
			System.out.println("used: " + (System.currentTimeMillis() - startTime));	
		}
	}

}

class SubTaskResult {
	private final String result;
	public SubTaskResult(String result) {
		this.result = result;
	}
	
	public String getResult() {
		return result;
	}
}
分享到:
评论

相关推荐

    cpp-并行执行http请求支持超时设置

    并行执行http请求,支持超时设置

    C#并行编程高级教程:精通.NET 4 Parallel Extensions中文(第2部分)

    这本精品书籍浓墨重彩地描述如何使用C# 4、Visual Studio 2010和.NET Framework 4高效地创建基于任务的并行应用程序,详细讲述最新的单指令、多数据流指令和向量化等并行编程技术,介绍现代并行库,讨论如何珠联璧合...

    C#并行编程高级教程:精通.NET 4 Parallel Extensions中文(第一部分)

    这本精品书籍浓墨重彩地描述如何使用C# 4、Visual Studio 2010和.NET Framework 4高效地创建基于任务的并行应用程序,详细讲述最新的单指令、多数据流指令和向量化等并行编程技术,介绍现代并行库,讨论如何珠联璧合...

    C#并行编程高级教程:精通.NET 4 Parallel Extensions中文(第3部分)

    这本精品书籍浓墨重彩地描述如何使用C# 4、Visual Studio 2010和.NET Framework 4高效地创建基于任务的并行应用程序,详细讲述最新的单指令、多数据流指令和向量化等并行编程技术,介绍现代并行库,讨论如何珠联璧合...

    asyncTool并行框架-其他

    asyncTool特点:1、解决任意的多线程并行、串行、阻塞、依赖、回调的并发框架,可以任意组合各线程的执行顺序,带全链路回调和超时控制。2、其中的A、B、C分别是一个最小执行单元(worker),可以是一段耗时代码、一...

    cpie-cn_r148.pdf

    第5章并行编程 进程的创建 进程间通信 超时 注册进程 “客户端-服务端”模型 进程调度,实时性以及优先级 进程组 第6章分布式编程 动机 分布式机制 注册进程 连接 银行业务示例 第7章错误处理 Catch和...

    fourinone-3.04.25

    这样做的好处是,开发者有更大能力去深入控制并行计算的过程,去保持使用并行计算实现业务逻辑的完整性,而且对各种不同类型的并行计算场景也能灵活处理,不会因为某些特殊场景被map/reduce的框架限制住思维,并且...

    go开发实战.doc

    11. 并发编程 101 11.1 概述 101 11.1.1 并行和并发 101 11.1.2 Go语言并发优势 103 11.2 goroutine 103 11.2.1 goroutine是什么 103 11.2.2 创建goroutine 103 11.2.3 主goroutine先退出 104 11.2.4 ...

    数据库系统实现

    第9章 并发控制 9.1 串行调度和可串行化调度 9.1.1 调度 9.1.2 串行调度 9.1.3 可串行化调度 9.1.4 事务语义的影响 9.1.5 事务和调度的一种记法 习题 9.2 冲突可串行性 9.2.1 冲突 9.2.2 优先...

    Spring攻略(第二版 中文高清版).part2

    2.17 使用TaskExecutor实现并发性 101 2.17.1 问题 101 2.17.2 解决方案 101 2.17.3 工作原理 102 2.18 小结 110 第3章 Spring AOP和AspectJ支持 112 3.1 启用Spring的AspectJ注解支持 113 3.1.1 ...

    Spring攻略(第二版 中文高清版).part1

    2.17 使用TaskExecutor实现并发性 101 2.17.1 问题 101 2.17.2 解决方案 101 2.17.3 工作原理 102 2.18 小结 110 第3章 Spring AOP和AspectJ支持 112 3.1 启用Spring的AspectJ注解支持 113 3.1.1 ...

    精通websphere MQ

    目录.................................................................................................................................2 内容提要...........................................................

Global site tag (gtag.js) - Google Analytics