前言
在java8的环境下,CompletableFuture是非常受大家喜欢的api,其强大的异步编排能力是做应用服务的绝佳助手。这个api里面包含了大几十个方法,咱们不挨着展开,本篇默认的都是对这个api有一些基础的小伙伴。
CompletableFuture
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
- 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了Future和CompletionStage接口
使用场景
现在有如下场景。前端传递一个用户对象的集合,拿到之后需要后端挨个处理集合中的对象,且每个对象的操作都非常地耗时,例如需要三秒左右,每一个处理完之后会返回一个结果,我需要汇总这些结果返回给前端。示意图大概如下:
挨个处理完每个对象之后,已经花费了10s,这还没算上此次其它的操作。假设业务上对这个接口的要求是5s,那么显然这样做就不满足要求了,那怎么办呢,也比较容易想到,每个处理对象的操作异步进行,最终把结果汇总下就行了。
思路如下:
结合CompletableFuture实现业务
新建user类:
import lombok.Data;
/** * @author : wuwensheng * @date : 10:47 2021/12/13 */
@Data
public class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
}
springboot整合测试:
import com.teligen.PhoneApplication;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;
import java.util.function.Supplier;
/** * 异步批处理的类,可以有相关的许多变种 * * @author : wuwensheng * @date : 10:36 2021/12/13 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PhoneApplication.class)
@Slf4j
public class CompletableFutureTest {
@Autowired
private ThreadPoolExecutor customThreadPoolExecutor;
}
customThreadPoolExecutor这个对象是我的线程池。
每个对user的处理都是一个CompletableFuture,如下:
public CompletableFuture<Integer> disposeUser(User user) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
log.info("Thread name:{}", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return user.getAge() + 10;
}
}, customThreadPoolExecutor).handleAsync(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = param;
if (throwable == null) {
result = param * 2;
} else {
log.info("throwable is:{}", throwable.getMessage());
}
return result;
}
});
}
这里处理每一个user都让当前线程沉睡了两秒,用来模拟处理业务所花费的时间。
继续编排下:
@Test
public void userTest() {
List<User> users = new ArrayList<>();
users.add(new User("小明", 3));
users.add(new User("小红", 2));
users.add(new User("小芳", 18));
CompletableFuture<Integer>[] completableFutures = users.stream().map(user -> {
return disposeUser(user);
}).toArray(CompletableFuture[]::new);
// 等待所有任务执行完
CompletableFuture.allOf(completableFutures).join();
for (CompletableFuture<Integer> completableFuture : completableFutures) {
try {
log.info("result:{}", completableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
跑一下看看结果:
ok了,57秒开始处理,59秒处理完毕。任务在join之后的确是并行的。这是一种什么感觉呢,所有异步线程出去办事了,有一辆车等着它们回来,最后一个人回来的时候,那便发车。
咱们再验证下。当处理小芳的时候沉睡5秒,看下结果:
这次返回耗费了5秒左右,处理的最慢的那个线程决定了最终的返回时常,这也符合咱们的预期。 大家在处理集合数据并且每一条的处理都比较耗时的话,可以考虑这个手法。
这里要注意下,下方标记处的地方是处理业务代码的地方,在这里要对异常进行详尽的处理,因为这些都在异步线程内,线程的异常应自己处理完。
本篇暂时到此。
今天的文章CompletableFuture异步批处理分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/20301.html