协程基本概念以及一些使用分析

Kotlin中文社区 的个人主页 - 文章 - 掘金
协程是一个轻量级的线程封装,内部基于线程池API实现。协程能够使用阻塞的方式写出非阻塞的代码,解决并发中常见的回调地狱。

1.协程的使用

GlobalScope.launch(Dispatchers.Main) {
    val res = getResult(2)
    mNumTv.text = res.toString()
}

简单的启动协程,可以看出协程由三个部分构成GlobalScope,Dispathchers,launch,他们分别对应协程的作用域,调度器和协程构建器。

1.1协程作用域

协程的作用域用三种:

  • runBlocking:顶层函数,它和coroutineScope不一样,它会阻塞当前线程来等待,所以这个方法在业务中不适用。
  • GlobalScope:全局协程作用域,可以在整个应用的生命周期中操作,取消的时候需要考虑时机,所以业务中也不适用。
  • 自定义作用域:自定义作用域,不会造成内存泄漏。

补充-》

  • 通过GlobalScope启动的协程单独启动一个协程作用域,也就是如果在一个协程的内部又通过GlobalScope去启动一个协程,那么这个新启动的协程的作用域是和外部协程没有关系的(也就是如果你取消了外部协程那么它不会影响到GS的,这就出问题了),内部的子协程遵从默认的作用域规则。
  • coroutineScope是继承外部Job的上下文创建作用域,在其内部的取消操作是双向传播,子协程未捕获的异常也会向上传递给父协程。它更适合一系列对等的协程并发的完成一项任务,任何一个子协程异常退出,那么整体将退出
  • supervisorScope同样继承外部作用域的上下文,但其内部的取消操作是单向传播,父协程向子协程传播,反过来则不然,这意味这子协程发生了异常不会影响父协程和其他兄弟协程。它更适合一些独立不相干的任务,任何一个出现问题,不会影响到其他的工作。

协程作用域的使用建议:

  1. 对于没有协程作用域,但需要启动协程的时候,适合使用GlobalScope
  2. 对于已经有协程作用域的情况(例如通过GlobalScope启动的协程体内),直接用协程启动器启动
  3. 对于明确要求子协程之间相互独立不干扰时,使用supervisorScope
  4. 对于通过标准库 API 创建的协程,这样的协程比较底层,没有 Job、作用域等概念的支撑,例如我们前面提到过 suspend main 就是这种情况,对于这种情况优先考虑通过 coroutineScope 创建作用域;更进一步,大家尽量不要直接使用标准库 API,除非你对 Kotlin 的协程机制非常熟悉。

既然GlobalScope的作用域是整个应用的生命周期,那么我们在Activity和Fragment中就不能使用GlobalScope,使用的话可以能会造成内存泄漏,自定义作用域:

//声明作用域
 val scope = MainScope()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        val tv = TextView(this)
        setContentView(tv)
        //启动协程
        scope.launch(Dispatchers.Unconfined) {
            val one = getResult(20)
            val two = getResult(40)
            tv.text = (one + two).toString()
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        //销毁时取消作用域
        scope.cancel()
    }
    
    //在协程中调用的方法需要添加suspend
    private suspend fun getResult(num: Int): Int {
        delay(5000)
        return num * num
    }

1.2调度器

调度器的作用是告诉协程体运行在那个线程

  • Dispatchers.Main:指定执行的线程是主线程
  • Dispatchers.IO:指定执行的线程是IO线程
  • Dispatchers.Default:默认的调度器,适合执行CPU密集性的任务
  • Dispathcers.Unconfined:非限制的调度器,指定的线程可能会随着挂起的函数发生变化

但是当我们指定一个协程执行在主线程时,那么我们如何在这个主线程中进行一个网络请求,我们都知道在Android中网络请求是不能在主线程中进行的,那么这个时候我们就需要进行线程的切换,这里就需要使用到withContext函数来进行线程的切换

  // Dispatchers.Main
    suspend fun fetchDocs() {
        // Dispatchers.Main
        val result = get("hahaha")
        // Dispatchers.Main
    }

    // Dispatchers.Main
    suspend fun get(url: String) =
        // Dispatchers.IO
        withContext(Dispatchers.IO) {
            // Dispatchers.IO
        }

通过协程可以细粒度的控制线程调度,通过withContext可以控制任意一句代码运行在什么线程上,而不用引入回调来获取结果。这里要提到一个概念,主线程安全,就是每个挂起的函数都能引起主线程卡住的问题,如果它设计到任何磁盘、网络或者CPU密集型任务,那么需要使用withContext来确保主线程调用安全的。

1.3suspend

suspend使用限制:在协程内部使用或者在另外一个suspend方法里使用
suspend方法能够使协程执行暂停,等待执行完毕后返回结果,同时不会阻塞线程。
而且暂停协程里方法的执行,直到方法返回结果,这样就可以不用写callback来获取结果,可以使用同步的方式来写异步代码。
suspend的函数不是只能运行在后台线程的,它的运行线程是根据协程调度器所指定的线程。

16af307135e81673?imageslim

1.4开启一个协程

通过launch和async来开启一个新的协程,launch返回的是一个job对象,我们可以通过调用job.cancel()来取消这个协程。
async是创建一个协程在之后返回一个Deferred对象,我们可以通过Deferred.await()去获取返回值,

launch和async很大的一个区别是异常处理。async期望你通过调用await来获取结果(或异常),所以它默认不会抛出异常。这就意味着使用async启动新的协程,它会悄悄的把异常丢弃。

协程的启动模式

1.DEFAULT 立即执行
2.LAZY 懒加载模式,需要手动调用join或则await不然协程不会执行
3.ATOMIC 立即执行协程体,而且协程体中除了挂起函数,都会无视cancel状态,它的意思就是如果协程的启动模式为ATOMIC的话,那么只要在协程中没有挂起函数,那么哪怕它调用了cancel,它也会全部执行完。

     val job = scope.launch(start = CoroutineStart.ATOMIC) {
            Logger.e("start")
            var count = 0
            for (i in 0 until 10000) {
                count++
            }
            Logger.e("end:$count")
        }
        job.cancel()
         //它的输出为
        //start
        //end:10000
        //也就是协程启动模式为ATOMIC的哪怕是协程调用取消,只有协程中没有挂的函数操作,那么它必定会执行完再结束
        val job2 = scope.launch(start = CoroutineStart.ATOMIC) {
            Logger.e("start")
            var count = 0
            for (i in 0 until 10000) {
                count++
            }
            try {
                delay(1000)
            } catch (e: Exception) {
                // kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@d6916df
                e.printStackTrace()
            }
            Logger.e("end:$count")
        }
        job2.cancel()
        //输出:
        //start
        //在这个协程体内有delay的函数执行,而这个会引起协程的挂起,而挂起函数发现当前的协程处于cancel状态,就会抛出JobCancellationException,导致后续
        //代码不能执行,如果使用try、catch包裹delay方法,那么后续代码会继续执行。
        
  1. UNDISPATCHED 立即在当前线程中执行协程体,直到调用第一个挂起函数,挂起函数之后的代码执行的线程取决于当前协程体的上下文线程调度器(GlobalScope除外,因为GlobalScope.launch启动的是第一个顶级协程,无法关联当前协程的上下文)
runBlocking {
    println("print0: " + Thread.currentThread().name)
    launch(context = Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) {
        println("print1: " + Thread.currentThread().name)
        delay(2000)//延迟2秒
        println("print2: " + Thread.currentThread().name)
    }
}
//打印结果
print0: main
print1: main
print2: DefaultDispatcher-worker-1

补充-》
启动协程除了launch和async之外,还有actor和produce,其中actor和launch的行为类似,在未捕获的异常出现以后,会被当初未处理的异常抛出。而async和produce则主要用来输出结果,他们内部的异常只在外部消费他们的时候抛出。
协程还有个操作join,但是这个join关心的只是当前的协程是否完成,至于它是怎么完成的,是否是因为错误而引起的结束它都不关心,并且它会吃掉未处理的异常

1.5结构并发

为了避免协程泄漏,kotlin引入了结构化并发。结构化并集合了语言特性和最佳实践,遵循这个原则将帮助你追踪协程中的所有任务。
在Android中使用结构化并发能够做3件事情:

  1. 取消不再需要的任务
  2. 追踪所有正在进行的任务
  3. 协程失败时的错误信号

协程必须运行在 CoroutineScope 中,CoroutineScope会追踪你的协程,即使协程已经被挂起。CoroutineScope不会去执行协程,它仅仅是保证你不会丢失对协程的追踪。CoroutineScope赋予你创建新协程的能力,并且它可以需要所有由它开启的协程。

结构化并发保证当协程作用域取消,其中的所有协程都会取消。
结构化并发???
结构化并发保证当挂起函数返回时,它的所有任务都已经完成。
结构化并发的作用:

  • 1.当作用域取消,其中的协程也会取消
  • 2.当挂起函数返回,其中的所有任务都已完成
  • 3.当协程发生错误,其调用者会得到通知

coroutineScope和supervisorScope让你可以安全的在挂起函数中启动协程。

suspend fun fetchTwoDocs() {
    coroutineScope {
        launch { fetchDoc(1) }
        async { fetchDoc(2) }
    }
}

尽管上面的代码没有在任何地方显示的声明要等待协程的任务执行完程,看起来当协程还在运行时候,fetchDoc方法就会返回。为了结构化并发和避免任务鞋扣,我们希望确保当挂起函数(例如fetchDoc)返回时,它的所有任务都已经完成。这就意味着,由fetchDoc启动的协程都会先于它返回之前执行结束。
Kotlin通过coroutineScope构建器确保fetchDocs中的任务不会泄漏。coroutineScope构建器知道在其中的所有协程都执行结束时才会挂起自己。正因为如此,在coroutineScope中的所有协程尚未结束之前就从fetchDoc中返回是不可能的。

coroutineScope和supervisorScope会等待所有子协程执行结束。
使用coroutineScope或者supervisorScope可以让你在任务挂起函数中安全的启动协程,尽管这是一个新的协程,但是不会造成意外的泄漏任务,因为所有新的协程都完成了你才可以挂起调用者。
下面是一个在挂起函数中进行了一个1000次启动子协程的操作的启动和返回过程:
avatar

coroutineScope和supervisorScope它们之间最大的不同就是,当其中任意一个子协程失败时,coroutineScope会取消,但是supervisorScope不会(比如是多个网络请求的请求下,如果不想因为其中一个网络请求的失败影响到其他的,这个时候我们就可以使用supervisorScope)。

首先看一下viewmodel的扩展

val ViewModel.viewModelScope: CoroutineScope
        get() {
            val scope: CoroutineScope? = this.getTag(JOB_KEY)
            if (scope != null) {
                return scope
            }
            return setTagIfAbsent(JOB_KEY,
                CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))
        }

internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
    override val coroutineContext: CoroutineContext = context

    override fun close() {
        coroutineContext.cancel()
    }
}
//在viewmodel中维护有一个HaseMap的集合
 private final Map<String, Object> mBagOfTags = new HashMap<>();
 //这个集合存放的就是CoroutineScope,当viewmodel的观察者触发页面的onDestory的事件时会有ViewModelStore存储的viewmodel调用clear方法获取所有实现Closeable的对象调用close()
 
 @MainThread
    final void clear() {
        mCleared = true;
        // Since clear() is final, this method is still called on mock objects
        // and in those cases, mBagOfTags is null. It'll always be empty though
        // because setTagIfAbsent and getTag are not final so we can skip
        // clearing it
        if (mBagOfTags != null) {
            synchronized (mBagOfTags) {
                for (Object value : mBagOfTags.values()) {
                    // see comment for the similar call in setTagIfAbsent
                    closeWithRuntimeException(value);
                }
            }
        }
        onCleared();
    }
    
    private static void closeWithRuntimeException(Object obj) {
        if (obj instanceof Closeable) {
            try {
                ((Closeable) obj).close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
 

SupervisorJob()是什么?Dispatchers.Main.immediate是什么?
SupervisorJob() + Dispatchers.Main.immediate作用又是什么?
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
Dispatchers.Main.immediate是viewModelScope的默认CoroutineDispatcher

在 Android 上使用协程(三) :Real Work
这是其中的一段话,那这个样子的话,就应该说我在项目中的封装是有问题,request对象也是不具有生命周期的,协程相关的操作应该还是放在viewmodel中
Repository 是 Android Architecture
Components 架构的可选部分。如果你在 app 中使用了 repository 或者相似作用的层级,它更偏向于使用挂起函数。由于 repository 没有生命周期,它仅仅只是一个对象,所以它没有办法做资源清理工作。在 repository 中启动的协程将有可能泄露。

1.6一次性请求模式

协程在添加到ViewModel、Repositiory和Room中,每一层都有不同的责任。

  • 1.ViewModel在主线程启动协程,一旦有了结果就结束。
  • 2.Repository提供挂起函数并保证它们主线程安全。
  • 3.数据库和网络层提供挂起函数并保证它们主线安全。

ViewModel负责启动协程,保证用户离开界面时取消协程。它本身不做昂贵的操作,而是依赖其他层来做,一旦有了结果,就使用LiveData发送给UI界面。也正因为ViewModel不做昂贵的操作,所以它在主线程启动协程。通过主线程启动,当结果可用它可以更快的响应用户事件。

Repository提供挂起函数来访问数据。它通常不会启动长生命周期的协程,因为它没办法取消它们。无论何时Repository需要做昂贵的操作,它都需要使用withContext来提供主线程安全的接口。

数据层(网络或数据库)总数提供挂起函数。使用Kotlin协程的时候需要保证这些关起函数是主线程安全的,Room和Retrofit都遵循这个一原则。

1.7协程的并发

方案一取消前一个任务
方案二将下一个任务入队
方案三加入前一个任务
对应的操作可以在这个类中查实现,这个是在第三篇文章中提到的概念。
Helpers to control concurrency for one shot requests using Kotlin coroutines. · GitHub

秉心说TM的三篇在Android上使用协程的翻译文章
在 Android 上使用协程(一):Getting The Background
在 Android 上使用协程(二):Getting started
在 Android 上使用协程(三) :Real Work

2在协程中使用流

1.Koltin中的相应编程Flow和RxJava的对比


 fun createFlow(): Flow<Int> =flow {
        for (i in 1..10) {
        //调用emit发射数据
            emit(i)
        }
    }
        //把上面的循环,快速操作符
        //(1..10).asFlow()

 lifecycleScope.launch {
            createFlow()
                //collect作为消费者来消费数据
                .collect { num ->
                    Logger.e("current num$num")
                }
        }
对比 Flow RxJava
数据源 Flow< T > Observable< T >
订阅 collect subscribe

线程切换


lifecycleScope.launch {
    // 创建一个协程 Flow<T>
    createFlow()
        // 将数据发射的操作放到 IO 线程中的协程
        .flowOn(Dispatchers.IO)
        .collect { num ->
            // 具体的消费处理
            // ...
        }
    }
}

和RxJava的对比

操作 Flow RxJava
改变数据发射的线程 flowOn subscribeOn
改变消费数据的线程 launchIn observeOn
那么在协程中消费者所在的线程是怎么确定的,collect因为是挂起函数,那么它必然要在协程作用域内,那么它的运行线程就是协程所在线程。
注意launchIn使用比较特殊。

异常

lifecycleScope.launch {
    flow {
        //...
    }.catch {e->

    }.collect(

    )
}
对比 Flow RxJava
异常 catch onError

完成

lifecycleScope.launch {
    createFlow()
        .onCompletion {
            // 处理完成操作
        }
        .collect {

        }
}
对比 Flow RxJava
完成 onCompletion onComplete

2.Flow的特点

  • 冷流
  • 有序
  • 协作取消

冷流就是当触发collect方法的时候,数据才开始发射,也就是只有消费时才会生产数据流。

lifecycleScope.launch {
//这里虽然流创建好了,但是数据一直到五行collect才开始发射。
    val flow = (1..10).asFlow().flowOn(Dispatchers.Main)

    flow.collect { num ->
            // 具体的消费处理
            // ...
        }
    }
}

有序的概念就是如果在流的过程中有多个处理步骤,那么都是一个数据走完全部流程,下一个数据才会开始走流程。

lifecycleScope.launch {
    flow {
        for(i in 1..3) {
            Log.e("Flow","$i emit")
            emit(i)
        }
    }.filter {
        Log.e("Flow","$it filter")
        it % 2 != 0
    }.map {
        Log.e("Flow","$it map")
        "${it * it} money"
    }.collect {
        Log.e("Flow","i get $it")
    }
}

输出日志
E/Flow: 1 emit
E/Flow: 1 filter
E/Flow: 1 map
E/Flow: i get 1 money
E/Flow: 2 emit
E/Flow: 2 filter
E/Flow: 3 emit
E/Flow: 3 filter
E/Flow: 3 map
E/Flow: i get 9 money

Flow采用和协程一样的协作取消,也就是说,Flow的collect只能在可取消的挂起函数中挂起的时候取消,否则不能取消。

lifecycleScope.launch {
    val f = flow {
        for (i in 1..3) {
            delay(500)
            Log.e(TAG, "emit $i")
            emit(i)
        }
    }
    withTimeoutOrNull(1600) {
        f.collect {
            delay(500)
            Log.e(TAG, "consume $it")
        }
    }
    Log.e(TAG, "cancel")
}

输出日志
emit 1
consume 1
emit 2
cancel 

flow的创建方式有flow{},

flow{
  for(i in 1..3){
      emit(i)
  }
}

flowOf()


flowof(1,2,3).collect{
}

asFlow

listOf(1,2,3).asFlow().collect{
}

channelFlow()

channelFlow{
  for(i in 1..5){
    delay(100)
    send(i)
  }
}.collect{
  
}

其中有区别的是flow和channel ,flow是coloStream,在没有切换线程线程的情况下,生产者和消费者是同步非阻塞的。
而channel是HotStream,而channelFlow实现了生产者和消费者异步非阻塞模型。

补充=》

末端操作符号

collect是flow的最基本末端操作符。还有其他的末端操作符,大体分为两类:

  1. 集合类型转换操作,包括toList、toSet等
  2. 聚合操作,包括将Flow规约到单值的reduce、flod等,以及获得单个元素的操作sigle、singleOrNull、first等。

分离flow的消费和触发,同onEach来实现。

fun createFlow() = flow<Int> {
    (1..3).forEach {
      emit(it)
      delay(100)
    }
  }.onEach { println(it) }

fun main(){
  GlobalScope.launch {
    createFlow().collect()
  }
}

因此还可以使用下面的写法

fun main(){
  createFlow().launchIn(GlobalScope)
}

flow的取消可以直接取消它所在的协程

Flow的背压
在响应式编程中,就存在背压的问题,所谓的背压就是生产者的生产速率高于消费者的处理速率的情况下出现。

2.协程的源码分析

协程的类结构可以分为三个部分:CoroutineScope、CoroutineContext、Continuation
0b1b2e82148c4eab9eeac9082cee3afa~tplv-k3u1fbpfcp-zoom-1.image

当协程中遇到耗时的suspend操作可以挂起,等到任务结束的时候,协程会自动切回来。这个的切换过程就是Continuation在起作用,它可以理解为程序体,可以理解为当每次程序挂起的时候,就将剩余的代码包括起来,等到结束后执行剩余的代码。一个协程的代码块可能会被切割程若干个Continuation,在每个需要挂起的地方都会分配一个Continuation。
4e020b43bf9a4470be7979186a4b8f84~tplv-k3u1fbpfcp-zoom-1.image


/**
 * Interface representing a continuation after a suspension point that returns a value of type `T`.
 */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}



internal abstract class BaseContinuationImpl(
    // 完成后调用的 Continuation
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 1. 执行 suspend 中的代码块
                        val outcome = invokeSuspend(param)
                        // 2. 如果代码挂起就提前返回
                        if (outcome === COROUTINE_SUSPENDED) return
                        // 3. 返回结果
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 3. 返回失败结果
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // 4. 如果 completion 中还有子 completion,递归
                    current = completion
                    param = outcome
                } else {
                    // 5. 结果通知
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}


0e113af10950448d9cfc339361a6763e~tplv-k3u1fbpfcp-zoom-1.image

过程源码分析


lifecycleScope.launch(Dispatchers.Main) {
    val a = async { getResult(1, 2) }
    val b = async { getResult(3, 5) }

    val c = a.await() + b.await()
    Log.e(TAG, "result:$c")
} 

suspend fun getResult(a: Int, b: Int): Int {
    return withContext(Dispatchers.IO) {
        delay(1000)
        return@withContext a + b
    }
}
第一步获取CoroutineScope

上面使用的是lifecycle的协程扩展库,如果不使用的情况下就要使用MainScope,他们的CoroutineContext是一个样的

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            // ...
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            // ...
            return newScope
        }
    }

SupervisorJob 和Dispatchers.Main 很重要,它们分别代表了CoroutineContext 之前提及的 Job 和 ContinuationInterceptor

第二步启动协程
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

第三个参数block是个一个lambda参数,同时表明了它需要被suspend修饰。在launch中做了两件事,组合新的CoroutineContext,再创建一个新的Continuation。

在启动模式为defleat的情况下
AbstractCoroutine#start -> 最终到

即学即用Kotlin - 协程
抽丝剥茧Kotlin - 协程


补充channel

Channel实际就是一个队列,而且是并发安全的,它可以用来连接协程,进行不同协程间的通信。

Select

1.复用多个await

一个场景:获取数据,这个数据的来源于网络以及本地缓存,哪个先获取到就先用哪个,如果是使用的是本地数据的话,那么在网络数据获取成功后更新最新的数据:

fun CoroutineScope.getUserFromApi(login: String) = async(Dispatchers.IO){
    gitHubServiceApi.getUserSuspend(login)
}

fun CoroutineScope.getUserFromLocal(login:String) = async(Dispatchers.IO){
    File(localDir, login).takeIf { it.exists() }?.readText()?.let { gson.fromJson(it, User::class.java) }
}

首先使用select来解决数据选择的问题:

GlobalScope.launch {
    val localDeferred = getUserFromLocal(login)
    val remoteDeferred = getUserFromApi(login)

    val userResponse = select<Response<User?>> {
        localDeferred.onAwait { Response(it, true) }
        remoteDeferred.onAwait { Response(it, false) }
    }
    ...
}.join()

注意在select中挂起函数的调用为onAwait,而不是await,这样就实现了在不论哪个先回调,select都会立即返回对应的结果。
接下来处理先使用的是本地缓存的问题:

GlobalScope.launch {
    ...
    userResponse.value?.let { log(it) }
    userResponse.isLocal.takeIf { it }?.let {
        val userFromApi = remoteDeferred.await()
        cacheUser(login, userFromApi)
        log(userFromApi)
    }
}.join()


在看PokemonGo的项目里一些对协程flow的补充
这里提供了一个flow的使用场景,那就是根据用户的输入来进行搜索操作
提供了两种方式第一个通过channel

    private val mChanncel = ConflatedBroadcastChannel<String>()
    
    // 使用 ConflatedBroadcastChannel 进行搜索
    val searchResultForDb = mChanncel.asFlow()
        .filter {
            Timber.e("local current input:$it")
            return@filter it.isNotEmpty()
        }
        // 避免在单位时间内,快输入造成大量的请求
        .debounce(200)
        //  避免重复的搜索请求。假设正在搜索 dhl,用户删除了 l  然后输入 l。最后的结果还是 dhl。它就不会再执行搜索查询 dhl
        // distinctUntilChanged 对于 StateFlow 任何实例是没有效果的
        .distinctUntilChanged()
        .flatMapLatest { search -> // 只显示最后一次搜索的结果,忽略之前的请求
            Timber.e("local current input:$search")
            pokemonRepository.fetchPokemonByParameter(search).cachedIn(viewModelScope)
        }
        .catch { throwable ->
            //  异常捕获
        }.asLiveData()
        
            // 根据关键词搜索
    fun queryParameterForDb(parameter: String) = mChanncel.offer(parameter)
    

上面的代码就是先声明了一个channel对象,channel通过offer来输入数据,然后在channel的asFlow进行一系列的数据操作,最后给出结果。

另外一种方式是通过MutableStateFlow进行搜索操作


    // 使用 StateFlow 替换 ConflatedBroadcastChannel
    private val _stateFlow = MutableStateFlow<String>("") // 在 MainViewModel 内部使用
    val stateFlow = _stateFlow // 在外部使用
    
      /**
     * 使用 MutableStateFlow 进行网络搜索
     *
     * 因为没有合适的搜索接口,在这里模拟进行网络搜索
     */
    val searchResultMockNetWork =
        // 避免在单位时间内,快输入造成大量的请求
        stateFlow.debounce(200)
            .filter { result ->
                Timber.e("net current input:$result")
                if (result.isEmpty()) { // 过滤掉空字符串等等无效输入
                    return@filter false
                } else {
                    return@filter true
                }
            }
            .distinctUntilChanged()
            .flatMapLatest { // 只显示最后一次搜索的结果,忽略之前的请求
                // 网络请求,这里替换自己的实现即可
                Timber.e("net current str:$it")
                pokemonRepository.fetchPokemonList().cachedIn(viewModelScope)
            }
            .catch { throwable ->
                //  异常捕获
            }
            .asLiveData()
            
                // 根据关键词搜索
    fun queryParameterForNetWork(parameter: String) {
        _stateFlow.value = parameter
    }

这里就是通过MutableStateFlow来触发搜查操作,基本上两者非常像。

文章记录和整理于以下文章:
Kotlin中文社区 的个人主页 - 文章 - 掘金
在 Android 上使用协程(一):Getting The Background
在 Android 上使用协程(二):Getting started
在 Android 上使用协程(三) :Real Work

即学即用Kotlin - 协程
抽丝剥茧Kotlin - 协程