使⽤CompletableFuture.supplyAsync实现异步操作
如何使⽤CompletableFuture.supplyAsync实现异步操作(复杂型)
实现的代码封装在function中,也有简单的说明,如下:
public static void useCompletableFuture_complicated(){
// 这个⽅法时描述⼀般地使⽤CompletableFuture实现异步操作,即复杂的使⽤CompletableFuture实现异步操作
// 假设我们有⼀个Person名字List
List<String> personNameList =new ArrayList<>();
// 为了⽅便测试,我们要构造⼤量的数据add到personNameList,⽤for循环,名字就是1, 2, 3, ...
// 这⾥添加1000个名字到personNameList
for(int i =0; i <1000; i++){
personNameList.add(String.valueOf(i));
}
// 假设我们要做的业务是personNameList⾥的每个⼈都说⼀句Hello World, 但是我们不关⼼他们说这句话的顺序,⽽且我们希望这个业务能够较快速的完成,所以采⽤异步就是⽐较合适的
// 先创建两个活动线程的线程池
ExecutorService executor = wFixedThreadPool(2);
// 开始我们的业务处理
for(String personName : personNameList){
CompletableFuture.supplyAsync(new Supplier<String>(){
@Override
public String get(){
// 模拟业务逻辑,say hello world
System.out.println(personName +": Hello World!");
return"task finished!";
}
}, executor);
}
// 关闭线程池executor
// 说明⼀下executor必须要显⽰关闭(它的⽅法⾥有介绍),不然线程池会⼀直等待任务,会导致main⽅法⼀直运⾏
// 还有就是关闭executor,不会导致之前提交的异步任务被打断或者取消。即之前提交的任务依然会执⾏到底,只是不会再接收新的任务
executor.shutdown();
/* 那么关闭线程池之后,我们怎么确定我们的任务是否都完成了呢,可以使⽤executor.isTerminated()命令
/
/ 可以看看isTerminated这个⽅法的说明,简单的说就是调⽤isTerminated()⽅法之前没有调⽤shutdown()⽅法的话,那么isTerminated()⽅法返回的永远是false。
// 所以isTerminated()⽅法返回true的情况就是在调⽤isTerminated()⽅法之前要先调⽤shutdown()⽅法,且所有的任务都完成了。
// 其实调⽤isTerminated()的⽬的就是我们对异步任务的结果是care, 我们需要等待异步任务的结果以便我们做下⼀步的动作。
// 如果我们不关⼼异步任务的结果的话,完全可以不⽤调⽤isTerminated()。
*/
while(!executor.isTerminated()){
System.out.println("no terminated");
try{
System.out.println("我要休眠⼀下");
TimeUnit.MILLISECONDS.sleep(10);
}catch(InterruptedException e){
jdk怎么使用e.printStackTrace();
}
}
}
如何使⽤CompletableFuture.supplyAsync实现异步操作(简洁型)
简洁的代码如下:
public static void useCompletableFuture_simple(){
// 这个⽅法时描述利⽤1.8新特性,简单使⽤CompletableFuture实现异步操作
// 先创建两个活动线程的线程池
ExecutorService executor = wFixedThreadPool(2);
List<String> nameList =new ArrayList<String>();
for(int i =0; i <1000; i++){
nameList.add(String.valueOf(i));
}
// 使⽤JDK 1.8的特性,stream()和Lambda表达式: (参数) -> {表达式}
nameList.stream().forEach(name -> CompletableFuture.supplyAsync((Supplier<String>)()->{
print((String) name);// 封装了业务逻辑
return"success";
}, executor).exceptionally(e ->{
System.out.println(e);
return"false";
}));
executor.shutdown();
while(!executor.isTerminated()){
System.out.println("no terminated");
try{
System.out.println("我要休眠⼀下");
TimeUnit.MILLISECONDS.sleep(10);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
两种⽅法输出结果⼀致(不要钻⽜⾓尖)
⽐喻:会输出1000个: x: Hello World!
x的范围是0-999,它们之间是没有顺序,中间会有’no terminated’和’我要休眠⼀下’这两句话。
分析
使⽤JDK1.8新特性代码简洁了很多,格局看起来有上来了。
注意点
1. Supplier< String >(多了空格是因为⽆空格不显⽰)的使⽤我的⽆法去掉,我看有的⽂章不需要Supplier,猜测是JDK版本不⼀致导
致。区别如下:
CompletableFuture.supplyAsync((Supplier<String>)()->{
CompletableFuture.supplyAsync(()->{
2. 使⽤异步其实也是与线程有关系,所以要关注⾃⼰的线程是否及时关闭,以免造成内存泄漏。
更新1
⽇期:2021-8-14
以上两个例⼦使⽤的线程池都是默认的,查看源码:
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可知newFixedThreadPool⽤的是LinkedBlockingQueue,即阻塞的链表队列,再通过源码看看这个队列的默认⼤⼩。
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue(){
this(Integer.MAX_VALUE);
}
默认⼤⼩是Integer.MAX_VALUE,2^31次⽅,2147483648,21亿多,可以认为是⽆限⼤。即这个任务队列是⽆界的。如果这个时候每个任务的执⾏时间是⾮常长,⼜不断的加任务进去,就会因为线程处理不完,导致内存飙升。
那如何修改呢?这个我⽬前也在寻答案。有结果定会更新出来。
更新2(终于来了)
⽇期:2022-1-14
⽬的:解决上⾯使⽤默认的线程池,池⼤⼩是⽆界的问题。
调整了两个部分,请看下⾯:
<1>. executor的调整
原来(AS_IS):
// 先创建两个活动线程的线程池
ExecutorService executor = wFixedThreadPool(2);
修改(TO_BE):
//使⽤阿⾥巴巴推荐的创建线程池的⽅式
//通过ThreadPoolExecutor构造函数⾃定义参数创建
ThreadPoolExecutor executor =new ThreadPoolExecutor(
5,
10,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
简单说明:通过构造函数⾃定义线程池参数,参数的意义⼤家可以⾃⾏百度或者查看源码。
<2>. forEach的调整(事实上与这个问题⽆关)
原来(AS_IS):
// 使⽤JDK 1.8的特性,stream()和Lambda表达式: (参数) -> {表达式}
nameList.stream().forEach(name -> CompletableFuture.supplyAsync((Supplier<String>)()->{
修改(TO_BE):
// 使⽤JDK 1.8的特性,Lambda表达式: (参数) -> {表达式}
nameList.forEach(name -> CompletableFuture.supplyAsync((Supplier<String>)()->{
总的代码:
public static void useCompletableFuture_simple(){
// 这个⽅法时描述利⽤1.8新特性,简单使⽤CompletableFuture实现异步操作
//使⽤阿⾥巴巴推荐的创建线程池的⽅式
/
/通过ThreadPoolExecutor构造函数⾃定义参数创建
ThreadPoolExecutor executor =new ThreadPoolExecutor(
5,
10,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
List<String> nameList =new ArrayList<String>();
for(int i =0; i <1000; i++){
nameList.add(String.valueOf(i));
}
// 使⽤JDK 1.8的特性,Lambda表达式: (参数) -> {表达式}
nameList.forEach(name -> CompletableFuture.supplyAsync((Supplier<String>)()->{
print((String) name);// 封装了业务逻辑
return"success";
}, executor).exceptionally(e ->{
System.out.println(e);
return"false";
}));
executor.shutdown();
while(!executor.isTerminated()){
System.out.println("no terminated");
try{
System.out.println("我要休眠⼀下");
TimeUnit.MILLISECONDS.sleep(10);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
好了,问题解决了,⼤家有什么疑问都可以留⾔⼀起讨论。
Thanks.
更新3
⽇期:2022-2-11
⽬的:⾃定义线程池和线程的名称。
当我们完成了线程池和线程⼯⼚的定义后,我们的ThreadPoolExecutor就可以使⽤⾃⼰定义的了,如下:// OwlThreadPoolExecutor是使⽤阿⾥巴巴推荐的创建线程池的⽅式,⾃定义的⼀个池,使⽤単例模式的懒汉式
ThreadPoolExecutor executor = ThreadPoolExecutorInstance();
线程池代码:
package;
import ArrayBlockingQueue;
import ThreadPoolExecutor;
import TimeUnit;
public class OwlThreadPoolExecutor {
/**
* default value
*/
private static int corePoolSite =5;
private static int maxPoolSite =10;
private static int queueCapacity =100;
private static Long keepAliveTime =1L;
public static volatile ThreadPoolExecutor threadPoolExecutorInstance =null;
private OwlThreadPoolExecutor(){}
public static void initialize(int corePoolSite,int maxPoolSite,int queueCapacity,long keepAliveTime){ PoolSite = corePoolSite;
OwlThreadPoolExecutor.maxPoolSite = maxPoolSite;
OwlThreadPoolExecutor.queueCapacity = queueCapacity;
OwlThreadPoolExecutor.keepAliveTime = keepAliveTime;
}
public static ThreadPoolExecutor getThreadPoolExecutorInstance(){
if(threadPoolExecutorInstance ==null|| threadPoolExecutorInstance.isShutdown()){
synchronized(OwlThreadPoolExecutor.class){
// double check
if(threadPoolExecutorInstance ==null|| threadPoolExecutorInstance.isShutdown()){
System.out.println("The thread pool instance is empty, so need to create.");
threadPoolExecutorInstance =new ThreadPoolExecutor(
corePoolSite,
maxPoolSite,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
new OwlThreadFactory("ThreadPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
System.out.println("The thread pool instance info: "+ threadPoolExecutorInstance);
}
}
}
return threadPoolExecutorInstance;
}
}
线程⼯⼚代码:
⾃定义线程⼯⼚并给予名字,⽅便监控JVM的⼯具追踪问题。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论