引子
App 开发中,等待多个异步结果的场景很多见,
比如并发地在后台执行若干个运算,待所有运算执行完毕后归总结果。
比如并发地请求若干个接口,待所有结果返回后刷新界面。
比如统计相册页并发加载 20 张图片的耗时。
其实把若干异步任务串行化是最简单的解决办法,即前一个异步任务执行完毕后再执行下一个。但这样就无法利用多核性能,执行时间被拉长,此时的执行总时长 = 所有任务执行时长的和。
若允许任务并发,则执行总时长 = 执行时间最长任务的耗时。时间性能得以优化,但随之而来的一个复杂度是:“如何等待所有异步结果”。
本文会介绍几种解决方案,并将它们运用到不同的业务场景,比对一下哪个方案适用于哪个场景。
等待并发网络请求
布尔值
假设有如下两个网络请求:
// 拉取新闻
fun fetchNews() {
newsApi.fetchNews().enqueue(object : Callback<List<News>> {
override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... }
})
}
// 拉取广告
fun fetchAd() {
newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
override fun onFailure(call: Call<List<Ad>>, t: Throwable) { ... }
override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) { ... }
})
}
广告需要按一定规则插入到新闻列表中。
最简单的做法是,先请求新闻,待其返回后再请求广告。显然这会增加用户等待时间。而且会写出这样的代码:
// 拉取新闻
fun fetchNews() {
newsApi.fetchNews().enqueue(object : Callback<News> {
override fun onFailure(call: Call<News>, t: Throwable) { ... }
override fun onResponse(call: Call<News>, response: Response<News>) {
// 拉取广告
newsApi.fetchAd().enqueue(object : Callback<Ad> {
override fun onFailure(call: Call<Ad>, t: Throwable) { ... }
override fun onResponse(call: Call<Ad>, response: Response<Ad>) { ... }
})
}
})
}
嵌套回调,若再加一个接口,回调层次就会再加一层,不能忍。 用户和程序员的体验都不好,得想办法解决。
第一个想到的方案是布尔值:
var isNewsDone = false
var isAdDone = false
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() {
newsApi.fetchNews().enqueue(object : Callback<List<News>> {
override fun onFailure(call: Call<List<News>>, t: Throwable) {
isNewsDone = true
tryRefresh(news, ad)
}
override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) {
isNewsDone = true
news = response.body().result
tryRefresh(news, ad)
}
})
}
// 拉取广告
fun fetchAd() {
newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
override fun onFailure(call: Call<List<Ad>>, t: Throwable) {
isAdDone = true
tryRefresh(news, ad)
}
override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) {
isAdDone = true
ads = response.body().result
tryRefresh(news, ad)
}
})
}
// 尝试刷新界面(只有当两个请求都返回时才刷新)
fun tryRefresh(news: List<News>, ads: List<Ad>) {
if(isNewsDone && isAdDone){ //刷新界面 }
}
设置两个布尔值分别对应两个请求是否返回,并且在每个请求返回时检测两个布尔值,若都为 true 则进行刷新界面。
网络库通常会将请求成功的回调抛到主线程执行,所以这里没有线程安全问题。但如果不是网络请求,而是后台任务,此时需要将布尔值声明为volatile
保证其可见性,关于 volatile 更详细的解释可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?。
这个方案能解决问题,但只适用于并发请求数量很少的请求,因为每个请求都要声明一个布尔值。而且每增加一个请求都要修改其余请求的代码,可维护性差。
CountdownLatch
更好的方案是CountDownLatch
,它是java.util.concurrent
包下的一个类,用来等待多个异步结果,用法如下:
val countdownLatch = CountDownLatch(2)//初始化,等待2个异步结果
var news = emptyList()
var ads = emptyList()
// 拉取新闻
fun fetchNews() {
newsApi.fetchNews().enqueue(object : Callback<List<News>> {
override fun onFailure(call: Call<List<News>>, t: Throwable) {
countdownLatch.countDown()
}
override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) {
news = response.body().result
countdownLatch.countDown()
}
})
}
// 拉取广告
fun fetchAd() {
newsApi.fetchAd().enqueue(object : Callback<List<Ad>> {
override fun onFailure(call: Call<List<Ad>>, t: Throwable) {
countdownLatch.countDown()
}
override fun onResponse(call: Call<List<Ad>>, response: Response<List<Ad>>) {
ads = response.body().result
countdownLatch.countDown()
}
})
}
// countdownLatch 在新线程中等待
thread {
countdownLatch.await() // 阻塞线程等待两个请求返回
liveData.postValue() // 抛数据到主线程刷刷新界面
}.start()
CountDownLatch 在构造时需传入一个数量,它的语义可以理解为一个计数器。countDown() 将计数器减一,而 await() 会阻塞当前线程直到计数器为 0 才被唤醒。
该计数器是一个 int 值,可能被多线程访问,为了保证线程安全,它被声明为 volatile,并且 countDown() 通过 CAS + 自旋的方式将其减一。
关于 CAS 的介绍可以点击面试题 | 徒手写一个非阻塞线程安全队列 ConcurrentLinkedQueue?。
若新增一个接口,只需要将计数器的值加一,并在新接口返回时调用 countDown() 即可,可维护性陡增。
协程
Kotlin 是降低复杂度的大师,它对于这个问题的解决方案可以让代码看上去更简单。
在 Kotlin 的世界里异步操作应该被定义为suspend
方法,retrofit 就支持这样的操作,比如:
interface NewsApi {
@GET("/xxx")
suspend fun fetchNews(): List<News>
@GET("/xxx")
suspend fun fetchAd(): List<Ad>
}
然后在协程中使用async
启动异步任务:
scope.launch {
// 并发地请求网络
val newsDefered = async { fetchNews() }
val adDefered = async { fetchAd() }
// 等待两个网络请求返回
val news = newsDefered.await()
val ads = adDefered.await()
// 刷新界面
refreshUi(news, ads)
}
不管是写起来还是读起来,体验都非常好。因为协程把回调干掉了,逻辑不会跳来跳去。
其中的async()
是 CoroutineScope 的扩展方法:
// 启动协程,并返回协程执行结果
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T> {
...
}
async() 和 launch() 唯一的不同是它的返回值是Defered
,用于描述协程体执行的结果:
public interface Deferred<out T> : Job {
// 挂起方法: 等待值的计算,但不会阻塞当前线程,计算完成后恢复当前协程执行
public suspend fun await(): T
}
调用async()
启动子协程不会挂起外层协程,而是立即返回一个Deferred
对象,直到调用Deferred.await()
协程的执行才会被挂起。当协程在多个Deferred
对象上被挂起时,只有当它们都恢复后,协程才继续执行。这样就实现了“等待多个并行的异步结果”。
但这样写会问题:当广告拉取抛出异常时,新闻拉取也会被取消。
这是协程的一个默认设定,叫结构化并发,即并发是有结构性的。
Java 中线程的并发是没有结构的,所以做如下事情很困难:
- 结束一个线程时,如何一并结束它所有的子线程?
- 当某个子线程抛出异常时,如何结束和它同一层级的兄弟线程?
- 父线程如何等待所有子线程结束之后才结束?
之所以会很困难,是因为 Java 中的线程是没有级联关系的。而 Kotlin 通过协程域 CoroutineScope 以及协程上下文 CoroutineContext 实现级联关系。
在协程中启动的子协程会继承父协程的协程上下文,除了其中的 Job,一个新的 Job 会被创建并归属于父协程的子 Job。通过这套机制,协程和子协程之间有了级联关系,就能实现结构化并发。(以后会就结构化并发写一个系列,敬请期待~)
关于 CoroutineContext 内部结构的详细剖析可以点击Kotlin 协程 | CoroutineContext 为什么要设计成 indexed set?
但有些业务场景不需要子任务之间相互关联,比如当前场景,广告加载失败不应该影响新闻的拉取,大不了不展示广告。为此 kotlin 提供了supervisorScope
:
scope.launch {
supervisorScope {
// 并发地请求网络
val newsDefered = async { fetchNews() }
val adDefered = async { fetchAd() }
// 等待两个网络请求返回
val news = newsDefered.await()
val ads = adDefered.await()
// 刷新界面
refreshUi(news, ads)
}
}
supervisorScope 新建一个协程域继承父亲的协程上下文,但会将其中的 Job 重写为SupervisorJob
,它的特点就是孩子的失败不会影响父亲,也不会影响兄弟。
现在广告和新闻加载互不影响,各自抛异常都不会影响对方。但就目前的业务场景来说,理想情况是这样的:“广告加载失败不应该影响新闻的加载。但新闻加载失败应该取消广告的加载(因为此时广告也没有展示的机会)”
稍改动下代码:
scope.launch {
supervisorScope {
// 并发地请求网络
val adDefered = async { fetchAd() }
val newsDefered = async { fetchNews() }
// 当新闻请求抛异常时,取消广告请求
newsDefered.invokeOnCompletion { throwable ->
throwable?.let { adDefered.cancel() }
}
// 等待新闻
val news = try {
newsDefered.await()
} catch (e: Exception) {
emptyList()
}
// 等待广告
val ads = try {
adDefered.await()
} catch (e: Exception) {
emptyList()
}
// 刷新界面
refreshUi(news, ads)
}
}
invokeOnCompletion()
相当于注册了一个回调,在异步任务结束时调用,不管是正常结束还是因异常而结束。在该回调中判断,若新闻因异常而结束则取消广告任务。
因为新闻和广告任务都可能抛出异常,且 async 启动的异步任务是在调用 await() 时才会抛出异常,所以它应该包裹在 try-catch 中。Kotlin 中的 try-catch 是一个表达式,即是有返回值的。这个特性让正常和异常情况的值聚合在一个表达式中。
若不使用 try-catch,程序也不会奔溃,因为 supervisorScope 中异常是不会向上传播的,即子协程的异常不会影响兄弟和父亲。但这样就少了异常情况的处理。
若现有代码都是 Callback 形式的,还能不能享受协程的简洁?
能!Kotlin 提供了suspendCoroutine()
,专门用于将回调风格的代码转换成 suspend 方法,以拉取新闻为例:
// Callback 形式
fun fetchNews() {
newsApi.fetchNews().enqueue(object : Callback<List<News>> {
override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) { ... }
})
}
// suspend 形式
suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation ->
newsApi.fetchNews().enqueue(object : Callback<List<News>> {
override fun onFailure(call: Call<List<News>>, t: Throwable) {
continuation.resumeWithException(t)
}
override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) {
continuation.resume(response.body().result)
}
})
}
其中的Continuation
是剩余的计算,从形式上看,它就是一个回调:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>) // 开始剩余的计算
}
每个 suspend 方法被编译成 java 之后,都会在原有方法参数表最后添加一个 Continuation 参数,用于表达这个挂起点之后“剩余的计算”,举个例子:
scope.launch {
fun1() // 普通方法
suspendFun1() // 挂起方法
// --------------------------
fun2() // 普通方法
suspendFun2() // 挂起方法
// --------------------------
}
整个协程体中有四个方法,其中两个是挂起方法,每个挂起方法都是一道水平的分割线,分割线下方的代码就是当前执行点相对于整个协程体剩余的计算,这“剩余的计算”会被包装成 Continuation 并作为参数传入挂起方法。所以上述代码翻译成 java 就类似于:
scope.launch {
fun1()
suspendFun1(new Continuation() {
@override
public void resumeWith(Result<T> result) {
fun2()
suspendFun2(new Continuation() {
@override
public void resumeWith(Result<T> result) {
}
})
}
})
}
所以挂起方法无异于 java 中带回调的方法,它自然不会阻塞当前线程,它只是把协程体中剩下的代码当成回调,该回调会在将来某个时间点被执行。通过这种方式,挂起方法主动让出了 cpu 执行权。
题外话
从业务上讲,将 Callback 方法改造成挂起式可以降低业务复杂度。举个例子:用户可以通过若干动作触发拉取新闻,比如首次进入新闻页、下拉刷新新闻页、上拉加载更多新闻、切换分区。新闻页有一个埋点,当首次展示某分区时,上报此时的新闻。
若没有 suspend 方法,代码应该这样写:
// NewsViewModel.kt
fun fetchNews(isFirstLoad: Boolean, isChangeType: Boolean) {
newsApi.fetchNews().enqueue(object : Callback<List<News>> {
override fun onFailure(call: Call<List<News>>, t: Throwable) { ... }
override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) {
// 将新闻抛给界面刷新
newsLiveData.value = response.body.result
// 只有当首次加载或切换分区时时才埋点
if(isFirstLoad || isChangeType) {
reportNews(response.body.result)
}
}
})
}
// NewsActivity.kt
// 分区切换监听
tab.setOnTabChangeListener { index ->
newsViewModel.fetchNews(false, true)
}
// 首次加载新闻
fun init() {
newsViewModel.fetchNews(true, false)
}
// 下拉刷新
refreshLayout.setOnRefreshListener {
newsViewModel.fetchNews(false, false)
}
// 上拉加载更多
refreshLayout.setOnLoadMoreListener {
newsViewModel.fetchNews(false, false)
}
因为埋点需要带上新闻列表,所以必须在请求返回之后上报。不同业务场景的拉取接口是同一个,所以只能在统一的 onResponse() 中分类讨论,分类讨论依赖于标记位,不得不为 fetchNews() 添加两个参数。
如果将拉取新闻的接口改成 suspend 方式就能化解这类复杂度:
// NewsViewModel.kt
suspend fun fetchNews() = suspendCoroutine<List<News>> { continuation ->
newsApi.fetchNews().enqueue(object : Callback<List<News>> {
override fun onFailure(call: Call<List<News>>, t: Throwable) {
continuation.resumeWithException(t)
}
override fun onResponse(call: Call<List<News>>, response: Response<List<News>>) {
val news = response.body.result
newsLiveData.value = news
continuation.resume(news)
}
})
}
// NewsActivity.kt
fun initNews() {
scope.launch {
val news = viewModel.fetchNews()
reportNews(news)
}
}
fun changeNewsType() {
scope.launch {
val news = viewModel.fetchNews()
reportNews(news)
}
}
fun loadMoreNews() {
scope.launch { viewModel.fetchNews() }
}
fun refreshNews() {
scope.launch { viewModel.fetchNews() }
}
newsViewModel.newsLiveData.observe {news ->
showNews(news)
}
所有界面的刷新还是走 LiveData,但拉取新闻的方法被改造成挂起之后,也会将新闻列表用类似同步的方式返回,所以可以在相关业务点进行单独埋点。
统计相册加载图片耗时
再通过一个更高并发数的场景比对下各个方案代码上的差异,场景如下:
测试并发加载 20 张网络图片的总耗时。该场景下已经无法使用布尔值,因为并发数太多。
CountdownLatch
var start = SystemClock.elapsedRealtime()
var imageUrls = listOf(...)
val countdownLatch = CountDownLatch(imageUrls.size)
// 另起线程等待 CountDownLatch 并输出耗时
scope.launch(Dispatchers.IO) {
countdownLatch.await()
Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" )
}
// 遍历 20 张图片 url
imageUrls.forEach { img ->
ImageView {// 动态构建 ImageView
layout_width = 100
layout_height = 100
Glide.with(this@GlideActivity)
.load(img)
.listener(object : RequestListener<Drawable> {
override fun onLoadFailed( e: GlideException?, model: Any?, target: Target<Drawable>?, isFirstResource: Boolean ): Boolean {
countdownLatch.countDown() // 加载完一张
return false
}
override fun onResourceReady( resource: Drawable?, model: Any?, target: Target<Drawable>?, dataSource: DataSource?, isFirstResource: Boolean ): Boolean {
countdownLatch.countDown() // 加载完一张
return false
}
})
.into(this)
}
}
协程
var imageUrls = listOf(...)
scope.launch {
val start = SystemClock.elapsedRealtime()
// 将每个 url 都变换为一个 Defered
val defers = imageUrls.map { img ->
val imageView = ImageView {
layout_width = 100
layout_height = 100
}
async { imageView.loadImage(img) }
}
defers.awaitAll()//等待所有的异步任务结束
Log.d( "test", "time-consume=${SystemClock.elapsedRealtime() - start}" )
}
// 将 Callback 方式的加载转换为挂起方式
private suspend fun ImageView.loadImage(img: String) = suspendCoroutine<String> { continuation ->
Glide.with(this@GlideActivity)
.load(img)
.listener(object : RequestListener<Drawable> {
override fun onLoadFailed( e: GlideException?, model: Any?, target: Target<Drawable>?, isFirstResource: Boolean ): Boolean {
continuation.resume("")
return false
}
override fun onResourceReady( resource: Drawable?, model: Any?, target: Target<Drawable>?, dataSource: DataSource?, isFirstResource: Boolean ): Boolean {
continuation.resume("")
return false
}
})
.into(this)
}
你更喜欢哪种方式?
参考
我正在参与掘金技术社区创作者签约计划招募活动,点击链接报名投稿。
今天的文章面试题 | 异步任务的各种组合方式(一)分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/15451.html