再談協(xié)程之Callback寫出協(xié)程范兒

點(diǎn)擊上方藍(lán)字關(guān)注我,知識(shí)會(huì)給你力量

協(xié)程的出現(xiàn),顛覆了Java多年的編程風(fēng)格,如果你是一個(gè)第三方庫(kù)的作者,你可能想用Coroutines和Flow使你的基于Java回調(diào)的庫(kù)變得更加Kotlin化、協(xié)程化。從另一方面來(lái)說(shuō),如果你是一個(gè)API消費(fèi)者,你可能更愿意接入Coroutines風(fēng)格的API,使其對(duì)Kotlin更友好,也讓開(kāi)發(fā)邏輯變得更加線性化。
今天來(lái)看下如何使用Coroutine和Flow簡(jiǎn)化API,以及如何使用suspendCancellableCoroutine和callbackFlow API構(gòu)建你自己的協(xié)程風(fēng)格適配器。
Callbacks
Callbacks是異步通信的一個(gè)非常常見(jiàn)的解決方案。事實(shí)上,大部分Java場(chǎng)景下,我們都使用了它們作為Java編程語(yǔ)言的解決方案。然而,Callbacks也有一些缺點(diǎn):這種設(shè)計(jì)導(dǎo)致了嵌套的回調(diào),最終導(dǎo)致了難以理解的代碼,另外,異常處理也比較復(fù)雜。
在Kotlin中,你可以使用Coroutines簡(jiǎn)化調(diào)用Callbacks,但為此你需要建立自己的適配器,將舊的Callback轉(zhuǎn)化為Kotlin風(fēng)格的協(xié)程。
構(gòu)建Adapter
在協(xié)程中,Kotlin提供了suspendCancellableCoroutine來(lái)適配One-shot回調(diào),同時(shí)提供了callbackFlow來(lái)適配數(shù)據(jù)流場(chǎng)景下的回調(diào)。
下面的場(chǎng)景中,將用一個(gè)簡(jiǎn)單的Callbacks例子來(lái)演示下這種轉(zhuǎn)換。
One-shot async calls
假設(shè)我們有一個(gè)「NetAPI.getData」的函數(shù),返回一個(gè)Data Callback,在協(xié)程場(chǎng)景下,我們想讓它返回一個(gè)suspend函數(shù)。
所以,我們給NetAPI設(shè)計(jì)一個(gè)拓展函數(shù),用來(lái)返回Location的suspend函數(shù),如下所示。
suspend?fun?NetAPI.awaitGetData():?Data
由于這是一個(gè)One-shot的異步操作,我們使用可以suspendCancellableCoroutine函數(shù),suspendCancellableCoroutine執(zhí)行作為參數(shù)傳遞給它的代碼塊,然后暫停當(dāng)前Coroutine的執(zhí)行,同時(shí)等待繼續(xù)執(zhí)行的信號(hào)。當(dāng)Coroutine的Continuation對(duì)象中的resume或resumeWithException方法被調(diào)用時(shí),Coroutine將恢復(fù)執(zhí)行。
//?NetAPI的拓展函數(shù),用于返回Data
suspend?fun?NetAPI.awaitGetData():?Data?=
????//?創(chuàng)建一個(gè)可以cancelled??suspendCancellableCoroutine
????suspendCancellableCoroutine<Data>?{?continuation?->
????????val?callback?=?object?:?NetCallback?{
????????????override?fun?success(data:?Data)?{
????????????????//?Resume?coroutine?同時(shí)返回Data
????????????????continuation.resume(data)
????????????}
????????????override?fun?error(e:?String)?{
????????????????//?Resume?the?coroutine?
????????????????continuation.resumeWithException(e)
????????????}
????????}
????????addListener(callback)
????????//?結(jié)束suspendCancellableCoroutine塊的執(zhí)行,直到在任一回調(diào)中調(diào)用continuation參數(shù)
????}
?要注意的是:Coroutines庫(kù)中也能找到suspendCancellableCoroutine的不可取消版本(即suspendCoroutine),但最好總是選擇suspendCancellableCoroutine來(lái)處理Coroutine Scope的取消。
?
suspendCancellableCoroutine背后的原理
從內(nèi)部實(shí)現(xiàn)來(lái)說(shuō),suspendCancellableCoroutine使用suspendCoroutineUninterceptedOrReturn來(lái)獲取suspend函數(shù)中Coroutine的Continuation。這個(gè)Continuation對(duì)象被一個(gè)CancellableContinuation攔截,它可以用來(lái)控制當(dāng)前Coroutine的生命周期。
在這之后,傳遞給suspendCancellableCoroutine的lambda將被執(zhí)行,如果lambda返回一個(gè)結(jié)果,Coroutine將立即恢復(fù),或者將被暫停,直到CancellableContinuation從lambda中手動(dòng)進(jìn)行恢復(fù)。
源碼如下所示。
public?suspend?inline?fun?<T>?suspendCancellableCoroutine(
??crossinline?block:?(CancellableContinuation<T>)?->?Unit
):?T?=
??//?Get?the?Continuation?object?of?the?coroutine?that?it's?running?this?suspend?function
??suspendCoroutineUninterceptedOrReturn?{?uCont?->
????//?Take?over?the?control?of?the?coroutine.?The?Continuation's?been
????//?intercepted?and?it?follows?the?CancellableContinuationImpl?lifecycle?now
????val?cancellable?=?CancellableContinuationImpl(uCont.intercepted(),?...)
????/*?...?*/
?
????//?Call?block?of?code?with?the?cancellable?continuation
????block(cancellable)
????????
????//?Either?suspend?the?coroutine?and?wait?for?the?Continuation?to?be?resumed
????//?manually?in?`block`?or?return?a?result?if?`block`?has?finished?executing
????cancellable.getResult()
??}
Streaming data
如果我們想獲取多個(gè)數(shù)據(jù)流(使用NetAPI.getDataList函數(shù)),我們就需要使用Flow創(chuàng)建一個(gè)數(shù)據(jù)流。理想的API應(yīng)該是這樣的。
fun?NetAPI.getDataListFlow():?Flow<Data>
要將基于回調(diào)的流媒體API轉(zhuǎn)換為Flow,我們需要使用創(chuàng)建Flow的callbackFlow構(gòu)建器。在callbackFlow lambda中,我們處于Coroutine的上下文中,因此,可以調(diào)用suspend函數(shù)。與flow構(gòu)建器不同,callbackFlow允許通過(guò)send函數(shù)從不同CoroutineContext發(fā)出值,或者通過(guò)offer函數(shù)在協(xié)程外發(fā)出值。
通常情況下,使用callbackFlow的流適配器遵循這三個(gè)通用步驟。
- 創(chuàng)建回調(diào),使用offer將元素添加到流中。
- 注冊(cè)該回調(diào)。
- 等待消費(fèi)者取消循環(huán)程序并取消對(duì)回調(diào)的注冊(cè)。
示例代碼如下所示。
//?向consumer發(fā)送Data?updates
fun?NetAPI.getDataListFlow()?=?callbackFlow<Data>?{
??//?當(dāng)前會(huì)在一個(gè)協(xié)程作用域中創(chuàng)建一個(gè)新的Flow
??//?1.?創(chuàng)建回調(diào),使用offer將元素添加到流中
??val?callback?=?object?:?NetCallback()?{
????override?fun?success(result:?Result?)?{
??????result??:?return?//?Ignore?null?responses
??????for?(data?in?result.datas)?{
????????try?{
??????????offer(data)?//?將元素添加至flow
????????}?catch?(t:?Throwable)?{
??????????//?異常處理?
????????}
??????}
????}
??}
??//?2.?注冊(cè)該回調(diào),從而獲取數(shù)據(jù)流
??requestDataUpdates(callback).addOnFailureListener?{?e?->
????close(e)?//?異常時(shí)close
??}
??//?3.?等待消費(fèi)者取消循環(huán)程序并取消對(duì)回調(diào)的注冊(cè),這樣會(huì)suspend當(dāng)前協(xié)程,直到這個(gè)flow被關(guān)閉
??awaitClose?{
????//?移除監(jiān)聽(tīng)
????removeLocationUpdates(callback)
??}
}
callbackFlow背后的原理
在協(xié)程內(nèi)部,callbackFlow會(huì)使用channel,它在概念上與阻塞隊(duì)列非常相似。channel都有容量配置,限定了可緩沖元素?cái)?shù)的上限。
在callbackFlow中所創(chuàng)建channel的默認(rèn)容量為64個(gè)元素,當(dāng)你嘗試向已經(jīng)滿的channel添加新元素時(shí),send函數(shù)會(huì)將數(shù)據(jù)提供方掛起,直到新元素有空間能加入channel為止,而offer不會(huì)將相關(guān)元素添加到channel中,并會(huì)立即返回false。
awaitClose背后的原理
awaitClose的實(shí)現(xiàn)原理其實(shí)和suspendCancellableCoroutine是一樣的,參考下下面的代碼中的注釋。
public?suspend?fun?ProducerScope<*>.awaitClose(block:?()?->?Unit?=?{})?{
??...
??try?{
????//?Suspend?the?coroutine?with?a?cancellable?continuation
????suspendCancellableCoroutine<Unit>?{?cont?->
??????//?Suspend?forever?and?resume?the?coroutine?successfully?only?
??????//?when?the?Flow/Channel?is?closed
??????invokeOnClose?{?cont.resume(Unit)?}
????}
??}?finally?{
????//?Always?execute?caller's?clean?up?code
????block()
??}
}
有啥用?
將基于回調(diào)的API轉(zhuǎn)換為數(shù)據(jù)流,這玩意兒到底有什么用呢?我們拿最常用的View.setOnClickListener來(lái)看下,它既可以看作是一個(gè)One-shot的場(chǎng)景,也可以看作是數(shù)據(jù)流的場(chǎng)景。
我們先把它改寫成suspendCancellableCoroutine形式,代碼如下所示。
suspend?fun?View.awaitClick(block:?()?->?Unit):?View?=?suspendCancellableCoroutine?{?continuation?->
????setOnClickListener?{?view?->
????????if?(view?==?null)?{
????????????continuation.resumeWithException(Exception("error"))
????????}?else?{
????????????block()
????????????continuation.resume(view)
????????}
????}
}
使用:
lifecycleScope.launch?{
????binding.test.awaitClick?{
????????Toast.makeText(this@MainActivity,?"loading",?Toast.LENGTH_LONG).show()
????}
}
嗯,有點(diǎn)一言難盡的感覺(jué),就差脫褲子放屁了。我們?cè)侔阉某蓴?shù)據(jù)流的場(chǎng)景。
fun?View.clickFlow():?Flow<View>?{
????return?callbackFlow?{
????????setOnClickListener?{
????????????trySend(it)?//?offer函數(shù)被Deprecated了,使用trySend替代
????????}
????????awaitClose?{?setOnClickListener(null)?}
????}
}
使用:
lifecycleScope.launch?{
????binding.test.clickFlow().collect?{
????????Toast.makeText(this@MainActivity,?"loading",?Toast.LENGTH_LONG).show()
????}
}
好了,屁是完全放出來(lái)了。
可以發(fā)現(xiàn),這種場(chǎng)景下,強(qiáng)行硬套這種模式,其實(shí)并沒(méi)有什么卵用,反而會(huì)讓別人覺(jué)得你是個(gè)智障。
那么到底什么場(chǎng)景需要使用呢?我們可以想想,為什么需要Callbback。
大部分Callback hell的場(chǎng)景,都是異步請(qǐng)求,也就是帶阻塞的那種,或者就是數(shù)據(jù)流式的數(shù)據(jù)產(chǎn)出,所以這種僅僅是調(diào)用個(gè)閉包的回調(diào),其實(shí)不能叫回調(diào),它只是一個(gè)lambda,所以,我們?cè)賮?lái)看一個(gè)例子。
現(xiàn)在有一個(gè)TextView,顯示來(lái)自一個(gè)Edittext的輸入內(nèi)容。這樣一個(gè)場(chǎng)景就是一個(gè)明確的數(shù)據(jù)流場(chǎng)景,主要是利用Edittext的TextWatcher中的afterTextChanged回調(diào),我們將它改寫成Flow形式,代碼如下所示。
fun?EditText.afterTextChangedFlow():?Flow<Editable?>?{
????return?callbackFlow?{
????????val?watcher?=?object?:?TextWatcher?{
????????????override?fun?afterTextChanged(s:?Editable?)?{
????????????????trySend(s)
????????????}
????????????override?fun?beforeTextChanged(s:?CharSequence?,?start:?Int,?count:?Int,?after:?Int)?{}
????????????override?fun?onTextChanged(s:?CharSequence?,?start:?Int,?before:?Int,?count:?Int)?{}
????????}
????????addTextChangedListener(watcher)
????????awaitClose?{?removeTextChangedListener(watcher)?}
????}
}
使用:
lifecycleScope.launch?{
????with(binding)?{
????????test.afterTextChangedFlow().collect?{?show.text?=?it?}
????}
}
有點(diǎn)意思了,我沒(méi)寫回調(diào),但是也拿到了數(shù)據(jù)流,嗯,其實(shí)有點(diǎn)「強(qiáng)行可以」的感覺(jué)。
但是,一旦這里變成了Flow,這就變得很有味道了,這可是Flow啊,我們可以利用Flow那么多的操作符,做很多有意思的事情了。
舉個(gè)例子,我們可以對(duì)輸入框做限流,這個(gè)場(chǎng)景很常見(jiàn),例如搜索,用戶輸入的內(nèi)容會(huì)自動(dòng)搜索,但是又不能一輸入內(nèi)容就搜索,這樣會(huì)產(chǎn)生大量的無(wú)效搜索內(nèi)容,所以,這個(gè)場(chǎng)景也有個(gè)專有名詞——輸入框防抖。
之前在處理類似的需求時(shí),大部分都是采用RxJava的方式,但現(xiàn)在,我們有了Flow,可以在滿足協(xié)程范API的場(chǎng)景下,依然完成這個(gè)功能。
我們?cè)黾右幌耫ebounce即可。
lifecycleScope.launch?{
????with(binding)?{
????????test.afterTextChangedFlow()
????????????.buffer(Channel.CONFLATED)
????????????.debounce(300)
????????????.collect?{
????????????????show.text?=?it
????????????????//?來(lái)點(diǎn)業(yè)務(wù)處理
????????????????viewModel.getSearchResult(it)
????????????}
????}
}
甚至你還可以增加一個(gè)背壓策略,再來(lái)個(gè)debounce,在流停止后,完成數(shù)據(jù)收集。
?當(dāng)然你還可以把buffer和debounce直接寫到afterTextChangedFlow返回的Flow中,作為當(dāng)前場(chǎng)景的默認(rèn)處理。
?
參考資料:
https://medium.com/androiddevelopers/simplifying-apis-with-coroutines-and-flow-a6fb65338765
向大家推薦下我的網(wǎng)站?https://xuyisheng.top/??點(diǎn)擊原文一鍵直達(dá)
專注 Android-Kotlin-Flutter 歡迎大家訪問(wèn)
往期推薦
本文原創(chuàng)公眾號(hào):群英傳,授權(quán)轉(zhuǎn)載請(qǐng)聯(lián)系微信(Tomcat_xu),授權(quán)后,請(qǐng)?jiān)谠瓌?chuàng)發(fā)表24小時(shí)后轉(zhuǎn)載。< END >作者:徐宜生
更文不易,點(diǎn)個(gè)“三連”支持一下??
