你是否曾经为了追求任务更高性能而使用多线程,又是否因为多线程任务组合搞得代码散乱。那么一个适合任务编排的神奇来了
一,使用方法
方法太多,层次不同,但有规律可寻。
1. 开始异步调用
runAsync / supplyAsync
- runAsync 无返回值
- supplyAsync 有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test void test0() throws ExecutionException, InterruptedException { CompletableFuture.runAsync(() -> { log.info("异步task1"); });
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { String name = "异步task2"; log.info(name); return name; }); log.info(cf2.get()); }
11:29:09.352 [ForkJoinPool.commonPool-worker-9] INFO java8.CompletableFutureTest - 异步task1 11:29:09.352 [ForkJoinPool.commonPool-worker-2] INFO java8.CompletableFutureTest - 异步task2
|
类似的都有一个参数类型为 Executor 的同名方法,其实就是指定执行的线程池。默认的是 ForkJoinPool
1 2 3 4 5 6
|
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
|
2.强大的管道
thenRun / thenApply / thenAccept
- thenRun 无参无输出
- thenApply 入参并返回
- thenAccept 入参无返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Test void test1() { CompletableFuture<Void> completableFuture = new CompletableFuture(); completableFuture.thenRun(() -> { log.info("task 1"); }).supplyAsync(() -> { log.info("task 2"); return "task2"; }).thenApply((param) -> { String result = String.format("[apply param = %s]", param); log.info(result); return result; }).thenAccept((consumeData) -> { log.info("[accept = {}]", consumeData); }); completableFuture.complete(null); }
11:39:19.631 [main] INFO java8.CompletableFutureTest - task 1 11:39:19.631 [ForkJoinPool.commonPool-worker-9] INFO java8.CompletableFutureTest - task 2 11:39:19.635 [ForkJoinPool.commonPool-worker-9] INFO java8.CompletableFutureTest - [apply param = task2] 11:39:19.635 [ForkJoinPool.commonPool-worker-9] INFO java8.CompletableFutureTest - [accept = [apply param = task2]]
|
同样的,对应的方法有 支持异步的, 和 支持自定义执行线程池的
注意:默认情况下,使用的是ForkJoinPool.commonPool
3.CompletableFuture组合
thenCompose / thenCombine / allOf / anyOf
thenCompose 将链式 CompletableFuture 调用拍平,避免了嵌套 get
thenCombine 两个 CompletableFuture 完成后组合调用
CompletableFuture allOf (CompletableFuture<?>… cfs) 多个 CompletableFuture 组合,所有任务完成,异常返回
不会反映过程结果,典型用例为
CompletableFuture.allOf(c1, c2, c3).join();
CompletableFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| CompletableFuture<String> foo1(String id) { return CompletableFuture.supplyAsync(() -> "id = " + id ); }
CompletableFuture<String> foo2(String idKey) { return CompletableFuture.supplyAsync(() -> String.format("desc = [%s]", idKey)); }
@Test void test3() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = foo1("001"); CompletableFuture<CompletableFuture<String>> f2 = f1.thenApplyAsync((key) -> { return foo2(key); }); log.warn("end = " + f2.get().get()); log.info("end2 = " + f1.thenCompose((key) -> foo2(key)).get()); }
15:19:18.838 [main] WARN java8.CompletableFutureTest - end = desc = [id = 001] 15:19:18.842 [main] INFO java8.CompletableFutureTest - end2 = desc = [id = 001]
@Test void test4() throws ExecutionException, InterruptedException { System.out.println(foo1("foo1").thenCombine(foo2("foo2"), (a, b) -> a + "__" + b).get()); }
id = foo1__desc = [foo2]
|
4.异常处理
exceptionly / handle / whenComplete
获取结果(异常处理)
借用一个总结:
|
handle |
whenComplete |
exceptionly |
访问成功 |
Yes |
Yes |
No |
访问失败 |
Yes |
Yes |
Yes |
能从失败中恢复 |
Yes |
No |
Yes |
能转换结果从 T 到 U |
Yes |
No |
No |
成功时触发 |
Yes |
Yes |
No |
失败时触发 |
Yes |
Yes |
Yes |
有异步版本 |
Yes |
Yes |
Yes (12 版本) |
可以看出,handle 各种情况都支持的很好。一般用它就是了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Test void test5() throws ExecutionException, InterruptedException { CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> { log.info("--supplyAsync1--"); return null; }).thenApply((p) -> { log.info("--thenApply2--"); Assert.notNull(p, "参数不允许为空"); return Integer.valueOf(2); }).handle( (res, ex) -> { if(ex != null) { log.error(ex.getMessage()); }else { log.info("--success--"); } return res; }); log.warn("end " + cf.get()); }
15:55:29.020 [ForkJoinPool.commonPool-worker-9] INFO java8.CompletableFutureTest - --supplyAsync1-- 15:55:29.024 [ForkJoinPool.commonPool-worker-9] INFO java8.CompletableFutureTest - --thenApply2-- 15:55:29.026 [ForkJoinPool.commonPool-worker-9] ERROR java8.CompletableFutureTest - java.lang.IllegalArgumentException: 参数不允许为空 15:55:29.026 [main] WARN java8.CompletableFutureTest - end null
|
注意:
handle 中不允许抛出异常;参数为 BiFunction,有返回值
whenComplete 在 get 的时候,如果有异常会继续抛出异常;参数为 BiConsumer,无返回值
5.结果获取
两者都是 获取结果的,只是 join 更偏向于 Future 组合,其异常会被封装。
主动计算,如果未完成,则会 Set 传入的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Test void test6() throws ExecutionException, InterruptedException { CompletableFuture<String> cf = CompletableFuture.runAsync(() -> { log.info("--111--"); }).supplyAsync(() -> { log.info("--222--"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } log.info("--333---"); return "333"; }); log.info(String.valueOf(cf.complete("123"))); log.warn("end = " + cf.get()); }
16:23:43.479 [main] INFO java8.CompletableFutureTest - true 16:23:43.479 [ForkJoinPool.commonPool-worker-9] INFO java8.CompletableFutureTest - --111-- 16:23:43.484 [main] WARN java8.CompletableFutureTest - end = 123
|
二,示例
2.1 多任务并行,正确或异常
多任务并发执行,获取正确或错误结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Test void multiTaskRunAndReturn() { Long start = System.currentTimeMillis(); final List<Integer> taskList = Arrays.asList(1, 2, 3, 4, 5); List<String> resultList = new ArrayList<>(); Map<String, String> errorList = new HashMap<>(); log.warn("start"); Stream<CompletableFuture<String>> completableFutureStream = taskList.stream() .map(num -> { return CompletableFuture .supplyAsync(() -> doubleInteger(num)) .handle((res, th) -> { if (th == null) { log.info("任务" + num + "完成! result=" + res + ", " + LocalTime.now().toString()); resultList.add(res.toString()); } else { log.error("任务" + num + "异常! e=" + th + ", " + LocalTime.now().toString()); errorList.put(num.toString(), th.getMessage()); } return ""; }); } ); CompletableFuture[] completableFutures = completableFutureStream.toArray(CompletableFuture[]::new); CompletableFuture.allOf(completableFutures) .whenComplete((v, th) -> { log.warn("所有任务执行完成触发\n resultList=" + resultList + "\n errorList=" + errorList+ "\n耗时=" + (System.currentTimeMillis() - start)); }).join(); log.warn("end");
}
public static Integer doubleInteger(Integer i) { try { log.warn("任务" + i + " 开始 ..."); TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } if (i ==3) { throw new IllegalArgumentException(""); } return 2 * i; }
|
2.2 任务组,组内并行,组外依赖
两组任务可分组执行,但任务组2依赖任务组1结果。 实现方式很多,但是要实现优雅,易扩展可不容易:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
@Slf4j public class CompletableFutureTest2 {
List<String> task1ResultList = new ArrayList<>();
CompletableFuture<String> runTask1(String name) { return CompletableFuture.supplyAsync(() -> { log.info("taskName={}",name); return name + "_Result_" + Thread.currentThread().getId(); }).handle((res, e) -> { if (e == null) { task1ResultList.add(res); }else { task1ResultList.add(e.getMessage()); } return res; }); }
CompletableFuture<String> runTask2(CompletableFuture preTask, String name) { return preTask.thenRunAsync(() -> { log.info("taskName={}",name); log.warn("preResult=" + task1ResultList.stream().collect(Collectors.joining(","))); }); }
@Test void test3() { List<CompletableFuture> list1 = new ArrayList<>(); for (int i=0; i<3; i++) { CompletableFuture cf1 = runTask1("task1"); list1.add(cf1); } CompletableFuture[] arr1 = list1.toArray(new CompletableFuture[list1.size()]); CompletableFuture<Void> f11 = CompletableFuture.allOf(arr1);
List<CompletableFuture> list2 = new ArrayList<>(); for(int i=0;i<7;i++) { CompletableFuture cf2 = runTask2( f11 , "task2"); list1.add(cf2); } CompletableFuture[] arr2 = list1.toArray(new CompletableFuture[list2.size()]); CompletableFuture.allOf(arr2).join(); }
}
|
2.3 泡茶
设想现在有个任务:泡茶。需要步骤大概可以分为烧水和准备茶具、茶叶,最终泡茶。
在这个任务重,烧水、准备茶具茶叶是分开的两个支线,他们是无无关联,可以并行的。示意如下:
- 任务支线1 洗水壶 –> 烧开水
- 任务支线2 洗茶壶 –> 洗茶杯 –> 取茶叶
- 任务汇总 泡茶
使用CompletableFuture
可以实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
@Slf4j public class TeaMakingExample {
public static void randomSleep() { try { TimeUnit.SECONDS.sleep(new Random().nextInt(5)); } catch (InterruptedException e) { e.printStackTrace(); } }
final static List<String> resList = new ArrayList<>();
static CompletableFuture buildSupplier(Supplier<String> function) { return CompletableFuture.supplyAsync(function).handle((res, ex) -> { resList.add(res); return res; }); }
public static void main(String[] args) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { log.info("任务11:洗水壶"); randomSleep(); return "干净的水壶"; }).thenApplyAsync( x -> { log.info("任务12:烧开水");
randomSleep(); return "开水"; }).handle((res, ex) -> { resList.add(res); return res; });
CompletableFuture<String> task21 = buildSupplier(() -> { log.info("任务21: 洗茶壶"); randomSleep(); return "干净的茶壶"; });
CompletableFuture<String> task22 = buildSupplier(() -> { log.info("任务22: 洗茶杯"); randomSleep(); return "干净的茶杯"; });
CompletableFuture<String> task23 = buildSupplier(() -> { log.info("任务23: 取茶叶"); randomSleep(); return "干净的茶叶"; });
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task21, task22, task23).thenRunAsync( () -> { log.warn("---所有材料准备齐全---"); log.warn("[{}]", resList.stream().collect(Collectors.joining(","))); log.warn("===最终步骤::泡茶==="); });
allTasks.join();
} }
|