CompletableFuture⽤法详解——多线程系列
⼀起上⼿CompletableFuture吧
前⾔
JAVA⽀持的多线程开启⽅式
根据Oracle官⽅出具的Java⽂档说明,创建线程的⽅式只有两种:继承Thread或者实现Runnable接⼝。
但是这两种⽅法都存在⼀个缺陷,没有返回值,也就是说我们⽆法得知线程执⾏结果。虽然简单场景下已经满⾜,但是当我们需要返回值的时候怎么办呢?
Java 1.5 以后的Callable和Future接⼝就解决了这个问题,我们可以通过向线程池提交⼀个Callable来获取⼀个包含返回值的Future对象,从此,我们的程序逻辑就不再是同步顺序。
下⾯是Java8实战书籍的原⽂:
Future接⼝在Java5中被引⼊,设计初衷是对将来某个时刻会产⽣的结果进⾏建模。它建模了⼀种异步运算,返回⼀个执⾏结果的引⽤,当运算结束后,这个引⽤被返回给调⽤⽅。在Future中触发那些潜在耗时的操作完成。
如下图:
我们从最初的串⾏操作变成了并⾏,在异步的同时,我们还可以做其他事情来节约程序运⾏时间。
机智的⼩伙伴肯定会问了,什么,这篇不是讲JAVA8新特性,CompletableFuture的吗?怎么说起Future了?
不急,看下⾯
Future接⼝的局限性
当我们得到包含结果的Future时,我们可以使⽤get⽅法等待线程完成并获取返回值,注意我加粗的地⽅,Future的get() ⽅法会阻塞主线程。
Future⽂档原⽂如下
A {@code Future} represents the result of an asynchronous
computation. Methods are provided to check if the computation is
complete, to wait for its completion, and to retrieve the result of
the computation.
⾕歌翻译:
{@code Future}代表异步*计算的结果。提供了⼀些⽅法来检查计算是否完成,等待其完成并检索计算结果。
Future执⾏耗时任务
由此我们得知,Future获取得线程执⾏结果前,我们的主线程get()得到结果需要⼀直阻塞等待,即使我们使⽤isDone()⽅询去查看线程执⾏状态,但是这样也⾮常浪费cpu资源。
图⽚来源:Java8实战
当Future的线程进⾏了⼀个⾮常耗时的操作,那我们的主线程也就阻塞了。
当我们在简单业务上,可以使⽤Future的另⼀个重载⽅法get(long,TimeUnit)来设置超时时间,避免我们
的主线程被⽆穷尽地阻塞。
不过,有没有更好的解决⽅案呢?
我们需要更强⼤异步能⼒
不仅如此,当我们在碰到⼀下业务场景的时候,单纯使⽤Future接⼝或者FutureTask类并不能很好地完成以下我们所需的业务将两个异步计算合并为⼀个,这两个异步计算之间相互独⽴,同时第⼆个⼜依赖于第⼀个的结果
等待Future集合种的所有任务都完成。
仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的⽅式计算同⼀个值),并返回它的结果。
通过编程⽅式完成⼀个Future任务的执⾏(即以⼿⼯设定异步操作结果的⽅式)。
应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使⽤Future的计算结果进⾏下⼀步的的操作,不只是简单地阻塞等待操作的结果)
正⽂
神奇的CompletableFuture
什么是CompletableFuture
在Java 8中, 新增加了⼀个包含50个⽅法左右的类: CompletableFuture,结合了Future的优点,提供了⾮常强⼤的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能⼒,可以通过回调的⽅式处理计算结果,并且提供了转换和组合CompletableFuture的⽅法。
CompletableFuture被设计在Java中进⾏异步编程。异步编程意味着在主线程之外创建⼀个独⽴的线程,与主线程分隔开,并在上⾯运⾏⼀个⾮阻塞的任务,然后通知主线程进展,成功或者失败。
通过这种⽅式,你的主线程不⽤为了任务的完成⽽阻塞/等待,你可以⽤主线程去并⾏执⾏其他的任务。
使⽤这种并⾏⽅式,极⼤地提升了程序的表现。
Java8源码doc注释:
译⽂:
当⼀个Future可能需要显⽰地完成时,使⽤CompletionStage接⼝去⽀持完成时触发的函数和操作。
当2个以上线程同时尝试完成、异常完成、取消⼀个CompletableFuture时,只有⼀个能成功。
CompletableFuture实现了CompletionStage接⼝的如下策略:
1.为了完成当前的CompletableFuture接⼝或者其他完成⽅法的回调函数的线程,提供了⾮异步的完成操作。
2.没有显式⼊参Executor的所有async⽅法都使⽤ForkJoinPoolmonPool()为了简化监视、调试和跟踪,
所有⽣成的异步任务都是标记接⼝AsynchronousCompletionTask的实例。
3.所有的CompletionStage⽅法都是独⽴于其他共有⽅法实现的,因此⼀个⽅法的⾏为不会受到⼦类中其他
⽅法的覆盖。
CompletableFuture实现了Futurre接⼝的如下策略:
1.CompletableFuture⽆法直接控制完成,所以cancel操作被视为是另⼀种异常完成形式。
⽅法isCompletedExceptionally可以⽤来确定⼀个CompletableFuture是否以任何异常的⽅式完成。
2.以⼀个CompletionException为例,⽅法get()和get(long,TimeUnit)抛出⼀个ExecutionException,
对应CompletionException。为了在⼤多数上下⽂中简化⽤法,这个类还定义了⽅法join()和getNow,
⽽不是直接在这些情况中直接抛出CompletionException。
CompletableFuture API
想直接例⼦上⼿的⼩伙伴可以跳过去后⾯
实例化CompletableFuture
实例化⽅式
public static<U> CompletableFuture<U>supplyAsync(Supplier<U> supplier);
public static<U> CompletableFuture<U>supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void>runAsync(Runnable runnable);
public static CompletableFuture<Void>runAsync(Runnable runnable, Executor executor);
有两种格式,⼀种是supply开头的⽅法,⼀种是run开头的⽅法
supply开头:这种⽅法,可以返回异步线程执⾏之后的结果
run开头:这种不会返回结果,就只是执⾏线程任务
或者可以通过⼀个简单的⽆参构造器
CompletableFuture<String> completableFuture =new CompletableFuture<String>();
⼩贴⼠:我们注意到,在实例化⽅法中,我们是可以指定Executor参数的,当我们不指定的试话,我们所开的并⾏线程使⽤的是默认系统及公共线程池ForkJoinPool,⽽且这些线程都是守护线程。我们在编程的时候需要谨慎使⽤守护线程,如果将我们普通的⽤户线程设置成守护线程,当我们的程序主线程结束,JVM中不存在其余⽤户线程,那么CompletableFuture的守护线程会直接退出,造成任务⽆法完成的问题,其余的包括守护线程阻塞问题我就不在本篇赘述。
Java8实战:
其中supplyAsync⽤于有返回值的任务,runAsync则⽤于没有返回值的任务。Executor参数可以⼿动指定线程池,否则默认ForkJoinPoolmonPool()系统级公共线程池,注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM 关闭时,⽣命周期终⽌。
获取结果
同步获取结果
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
简单的例⼦
CompletableFuture<Integer> future =new CompletableFuture<>();
Integer integer = ();
get() ⽅法同样会阻塞直到任务完成,上⾯的代码,主线程会⼀直阻塞,因为这种⽅式创建的future从未完成。有兴趣的⼩伙伴可以打个断点看看,状态会⼀直是not completed
前两个⽅法⽐较通俗易懂,认真看完上⾯Future部分的⼩伙伴肯定知道什么意思。
getNow() 则有所区别,参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定⼀个确定的值。
join() 与get() 区别在于join() 返回计算的结果或者抛出⼀个unchecked异常(CompletionException),⽽get() 返回⼀个具体的异常.
计算完成后续操作1——complete
public CompletableFuture<T>whenComplete(BiConsumer<?super T,?super Throwable> action)
public CompletableFuture<T>whenCompleteAsync(BiConsumer<?super T,?super Throwable> action)
public CompletableFuture<T>whenCompleteAsync(BiConsumer<?super T,?super Throwable> action, Executor executor)
public CompletableFuture<T>exceptionally(Function<Throwable,?extends T> fn)
⽅法1和2的区别在于是否使⽤异步处理,2和3的区别在于是否使⽤⾃定义的线程池,前三个⽅法都会提供⼀个返回结果和可抛出异常,我们可以使⽤lambda表达式的来接收这两个参数,然后⾃⼰处理。
⽅法4,接收⼀个可抛出的异常,且必须return⼀个返回值,类型与钻⽯表达式种的类型⼀样,详见下⽂的exceptionally() 部分,更详细
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
return10086;
});
future.whenComplete((result, error)->{
System.out.println("拨打"+result);
error.printStackTrace();
});
计算完成后续操作2——handle
public<U> CompletableFuture<U>handle(BiFunction<?super T,Throwable,?extends U> fn)
public<U> CompletableFuture<U>handleAsync(BiFunction<?super T,Throwable,?extends U> fn)
public<U> CompletableFuture<U>handleAsync(BiFunction<?super T,Throwable,?extends U> fn, Executor executor)
眼尖的⼩伙伴可能已经发现了,handle⽅法集和上⾯的complete⽅法集没有区别,同样有两个参数⼀个返回结果和可抛出异常,区别就在于返回值,没错,虽然同样返回CompletableFuture类型,但是⾥⾯的参数类型,handle⽅法是可以⾃定义的。
// 开启⼀个异步⽅法
CompletableFuture<List> future = CompletableFuture.supplyAsync(()->{
List<String> list =new ArrayList<>();
list.add("语⽂");
list.add("数学");
/
/ 获取得到今天的所有课程
return list;
});
// 使⽤handle()⽅法接收list数据和error异常
CompletableFuture<Integer> future2 = future.handle((list,error)->{
// 如果报错,就打印出异常
error.printStackTrace();
// 如果不报错,返回⼀个包含Integer的全新的CompletableFuture
return list.size();
// 注意这⾥的两个CompletableFuture包含的返回类型不同
});
计算完成的后续操作3——apply
public<U> CompletableFuture<U>thenApply(Function<?super T,?extends U> fn)
public<U> CompletableFuture<U>thenApplyAsync(Function<?super T,?extends U> fn)
public<U> CompletableFuture<U>thenApplyAsync(Function<?super T,?extends U> fn, Executor executor)
为什么这三个⽅法被称作,计算完成的后续操作2呢,因为apply⽅法和handle⽅法⼀样,都是结束计算之后的后续操作,唯⼀的不同
是,handle⽅法会给出异常,可以让⽤户⾃⼰在内部处理,⽽apply⽅法只有⼀个返回结果,如果异常了,会被直接抛出,交给上⼀层处理。源程序能直接执行吗
如果不想每个链式调⽤都处理异常,那么就使⽤apply吧。
例⼦:请看下⾯的exceptionally() ⽰例
计算完成的后续操作4——accept
public CompletableFuture<Void>thenAccept(Consumer<?super T> action)
public CompletableFuture<Void>thenAcceptAsync(Consumer<?super T> action)
public CompletableFuture<Void>thenAcceptAsync(Consumer<?super T> action, Executor executor)
accept()三个⽅法只做最终结果的消费,注意此时返回的CompletableFuture是空返回。只消费,⽆返回,有点像流式编程的终端操作。
例⼦:请看下⾯的exceptionally() ⽰例
捕获中间产⽣的异常——exceptionally
public CompletableFuture<T>exceptionally(Function<Throwable,?extends T> fn)
exceptionally() 可以帮我们捕捉到所有中间过程的异常,⽅法会给我们⼀个异常作为参数,我们可以处理这个异常,同时返回⼀个默认值,跟服务降级 有点像,默认值的类型和上⼀个操作的返回值相同。
⼩贴⼠ :向线程池提交任务的时候发⽣的异常属于外部异常,是⽆法捕捉到的,毕竟还没有开始执⾏任务。作者也是在触发线程池拒绝策略的时候发现的。exceptionally() ⽆法捕捉RejectedExecutionException()
// 实例化⼀个CompletableFuture,返回值是Integer
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
// 返回null
return null;
});
CompletableFuture<String> exceptionally = future.thenApply(result ->{
// 制造⼀个空指针异常NPE
int i = result;
return i;
}).thenApply(result ->{
// 这⾥不会执⾏,因为上⾯出现了异常
String words ="现在是"+ result +"点钟";
return words;
}).exceptionally(error ->{
// 我们选择在这⾥打印出异常
error.printStackTrace();
// 并且当异常发⽣的时候,我们返回⼀个默认的⽂字
return"出错啊~";
});
exceptionally.thenAccept(System.out::println);
}
最后输出结果
组合式异步编程
组合两个completableFuture
还记得我们上⾯说的Future做不到的事吗
将两个异步计算合并为⼀个,这两个异步计算之间相互独⽴,同时第⼆个⼜依赖于第⼀个的结果。
thenApply()
假设⼀个场景,我是⼀个⼩学⽣,我想知道今天我需要上⼏门课程
此时我需要两个步骤,1.根据我的名字获取我的学⽣信息 2.根据我的学⽣信息查询课程
我们可以⽤下⾯这种⽅式来链式调⽤api,使⽤上⼀步的结果进⾏下⼀步操作
CompletableFuture<List<Lesson>> future = CompletableFuture.supplyAsync(()->{
// 根据学⽣姓名获取学⽣信息
Student(name);
}).thenApply(student ->{
// 再根据学⽣信息获取今天的课程
Lessons(student);
});
我们根据学⽣姓名获取学⽣信息,然后使⽤把得到的学⽣信息student传递到apply() ⽅法再获取得到学⽣今天的课程列表。
将两个异步计算合并为⼀个,这两个异步计算之间相互独⽴,互不依赖
thenCompose()
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论