一.网络请求
1、网络请求配置
获取 github 贡献者的列表
/** * github 账号的设置 * 登录账号、token 、组织 * 获取token的地址 https://github.com/settings/tokens/new */ private val req = RequestData("zhi*******@163.com","***************", "kotlin")2、线程 execute 阻塞
fun loadContributorsBlocking(): List<User> { val repos = service.getOrgReposCall(req.org).execute().body() ?: emptyList() return repos.flatMap { service.getRepoContributorsCall(req.org, it.name).execute().body() ?: emptyList() } }3、异步 enqueue 回调
fun loadContributorsCallbacks(userLoadData: IUserLoadData) { service.getOrgReposCall(req.org).enqueue(NetCallback { val listRepos: List<Repo> = it ?: emptyList() if (listRepos.isEmpty()) userLoadData.onUserLoad(emptyList()) val allUsers: MutableList<User> = ArrayList() val numberOfProcessed = AtomicInteger() //多线程的同步处理 //val countDownLatch = CountDownLatch(listRepos.size) for (repo in listRepos) { service.getRepoContributorsCall(req.org, repo.name).enqueue(NetCallback { list -> if (!list.isNullOrEmpty()) allUsers.addAll(list) if (numberOfProcessed.incrementAndGet() == listRepos.size) { userLoadData.onUserLoad(allUsers.aggregate()) } }) } //countDownLatch.await() //导致线程阻塞 // userLoadData.onUserLoad(allUsers.aggregate()) }) }4、协成加载
suspend fun loadContributorsSuspend(userLoadData: IUserLoadData) { val repos = service.getOrgRepos(req.org).body() ?: emptyList() val users = repos.flatMap { repo -> service.getRepoContributors(req.org, repo.name).body() ?: emptyList() }.aggregate() userLoadData.onUserLoad(users) }5、协成并行
suspend fun loadContributorsConcurrent(userLoadData: IUserLoadData) { val repos = service.getOrgRepos(req.org).body() ?: emptyList() val deferred = repos.map { repo -> CoroutineScope(Dispatchers.IO).async { log(repo.name, coroutineContext) service.getRepoContributors(req.org, repo.name).body() ?: emptyList() } } val list = deferred.awaitAll().flatten().aggregate() userLoadData.onUserLoad(list) }二. 协成的取消
1、线程的取消
override fun onJobCancel() = onClick { log("main: I'm start!") val job = mainScope.launch { repeat(1000) { i -> log("job: I'm sleeping $i ...") delay(500L) } } delay(1300L) // delay a bit log("main: I'm tired of waiting!") job.cancel() // cancels the job log("main: Now I can quit.")}2、线程取消不起作用,需要执行完成
override fun onCancelComplete() = onClick { val startTime = System.currentTimeMillis() val job = mainScope.launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (i < 10) { // computation loop, just wastes CPU // print a message twice a second if (System.currentTimeMillis() >= nextPrintTime) { log("job: I'm sleeping ${i++} ...") nextPrintTime += 500L } } } delay(1300L) // delay a bit log("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion log("main: Now I can quit.")}3、捕获取消协成的异常
override fun onCancelException() = onClick { val job = mainScope.launch { repeat(5) { i -> try { // print a message twice a second log("job: I'm sleeping $i ...") delay(500) } catch (e: Exception) { log("job Exception: $e") } } } delay(1300L) // delay a bit log("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion log("main: Now I can quit.")}4、通过 isActive 取消协成
override fun onActiveCancel() = onClick { val startTime = System.currentTimeMillis() val job = mainScope.launch(Dispatchers.Default) { //(1) //(2) val job = mainScope.launch { var nextPrintTime = startTime var i = 0 while (isActive) { // cancellable computation loop // print a message twice a second if (System.currentTimeMillis() >= nextPrintTime) { log("job: I'm sleeping ${i++} ...",coroutineContext) nextPrintTime += 500L //(3) delay(500) } } } delay(1300L) // delay a bit log("main: I'm tired of waiting!", currentCoroutineContext()) job.cancelAndJoin() // cancels the job and waits for its completion log("main: Now I can quit.", currentCoroutineContext())}注解
- 如果注释 (1) 打开 (2) 导致线程一直被占用, while 循环下面代码不被执行
- 如果注释 (1) 打开 (2) 同时 打开 (3) 释放线程,才能执行 while 循环下面代码
5、通过 finally 取消后的操作
override fun onFinallyCancel() = onClick { val job = mainScope.launch { try { repeat(1000) { i -> log("job: I'm sleeping $i ...") delay(500L) } } catch (e: Exception) { log("Exception: : $e") } finally { log("job: I'm running finally") } } delay(1300L) // delay a bit log("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion log("main: Now I can quit.")}6、通过 withContext(NonCancellable) 设置不可取消,用于释放资源、停止服务等
override fun onNoCancel() = onClick { val job = mainScope.launch { try { repeat(1000) { i -> log("job: I'm sleeping $i ...") delay(500L) } } finally { withContext(NonCancellable) { log("job: I'm running finally") delay(1000L) log("job: And I've just delayed for 1 sec because I'm non-cancellable") } } } delay(1300L) // delay a bit log("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion log("main: Now I can quit.")}7、通过 withTimeout 设置超时
override fun onTimeOut() = onClick { withTimeout(1300L) { repeat(1000) { i -> log("I'm sleeping $i ...") delay(500L) } }}8、捕获超时异常
override fun onTimeOutCatch() = onClick { try { withTimeout(1300L) { repeat(1000) { i -> log("I'm sleeping $i ...") delay(500L) } } } catch (e: Exception) { log("Exception $e") }}9、withTimeoutOrNull 超时或返回 null
override fun onTimeOutCatchOrNull() = onClick { val result = withTimeoutOrNull(1300L) { repeat(1000) { i -> log("I'm sleeping $i ...") delay(500L) } "Done" // will get cancelled before it produces this result } log("Result is $result") val result1 = withTimeoutOrNull(13000L) { repeat(5) { i -> log("I'm sleeping $i ...") delay(500L) } "Done" // will get cancelled before it produces this result } log("Result is $result1")}三. Suspend function 介绍
1、串行执行
override fun onSequentialDefault() = onClick { val time = measureTimeMillis { val one = doSomethingUsefulOne() val two = doSomethingUsefulTwo() log("The answer is ${one + two}") } log("Completed in $time ms")}2、并行执行
override fun onAsyncOperation() { mainScope.launch { val time = measureTimeMillis { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } log("The answer is ${one.await() + two.await()}") } log("Completed in $time ms") }}3、懒加载
override fun lazyOperation() { mainScope.launch { val time = measureTimeMillis { val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() } val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() } one.start() // 如果没有 则在 one.await() 开始 log("The answer is ${one.await() + two.await()}") } log("Completed in $time ms") }}注释 :如果不通过 one.start() 开启,则会在 one.await() 时执行
4、错误异步演示
override fun asyncStyleErro() { log("Completed in asyncStyleErro") val time = measureTimeMillis { // 我们可以在协程外面启动异步执行 val one = somethingUsefulOneAsync() val two = somethingUsefulTwoAsync() // 但是等待结果必须调用其它的挂起或者阻塞 // 当我们等待结果的时候,这里我们使用 `runBlocking { …… }` 来阻塞主线程 mainScope.launch { log("The answer is ${one.await() + two.await()}") } } log("Completed in $time ms")}private fun somethingUsefulOneAsync() = mainScope.async { doSomethingUsefulOne()}// somethingUsefulTwoAsync 函数的返回值类型是 Deferred<Int>private fun somethingUsefulTwoAsync() = mainScope.async { doSomethingUsefulTwo()}private suspend fun doSomethingUsefulOne(): Int { log("doSomethingUsefulOne") delay(1000L) // pretend we are doing something useful here return 13}private suspend fun doSomethingUsefulTwo(): Int { log("doSomethingUsefulTwo") delay(1000L) // pretend we are doing something useful here, too return 29}注释 :
这种带有异步函数的编程风格仅供参考,因为这在其它编程语言中是一种受欢迎的风格。在 Kotlin 的协程中使用这种风格是强烈不推荐的, 原因如下所述
考虑一下如果 val one = somethingUsefulOneAsync() 这一行和 one.await() 表达式这里在代码中有逻辑错误, 并且程序抛出了异常以及程序在操作的过程中中止,
将会发生什么。 通常情况下,一个全局的异常处理者会捕获这个异常,将异常打印成日记并报告给开发者,但是反之该程序将会继续执行其它操作。
但是这里我们的 somethingUsefulOneAsync 仍然在后台执行, 尽管如此,启动它的那次操作也会被终止。这个程序将不会进行结构化并发,如下一小节所示。
5、正确异步演示
override fun asyncStyleRight() { mainScope.launch { try { failedConcurrentSum() } catch (e: ArithmeticException) { log("Computation failed with ArithmeticException",coroutineContext) } }}suspend fun failedConcurrentSum(): Int = coroutineScope { val one = async<Int> { try { delay(Long.MAX_VALUE) // 模拟一个长时间的运算 42 } finally { log("First child was cancelled",coroutineContext) } } val two = async<Int> { log("Second child throws an exception",coroutineContext) throw ArithmeticException() } one.await() + two.await()}四. 调度器
1、调度器的介绍
override fun onDispatcherType() { mainScope.launch { // 运行在父协程的上下文中,即 runBlocking 主协程 log("no param",coroutineContext) } mainScope.launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中 log("Dispatchers.Unconfined",coroutineContext) } mainScope.launch(Dispatchers.Default) { // 将会获取默认调度器 log("Dispatchers.Default",coroutineContext) } mainScope.launch(newSingleThreadContext("SingleThread")) { // 将使它获得一个新的线程 log("SingleThread",coroutineContext) } GlobalScope.launch { log("GlobalScope",coroutineContext) }}// Thread[main,5,main]-->StandaloneCoroutine{Active}@341e412-->297380853 : Dispatchers.Unconfined// Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@47975e0-->156964473 : Dispatchers.Default// Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@1d4ff3f-->112631768 : GlobalScope// Thread[SingleThread,5,main]-->StandaloneCoroutine{Active}@730100c-->282246497 : SingleThread// Thread[main,5,main]-->StandaloneCoroutine{Active}@fd1e36a-->494285765 : no param2、非受限调度器 vs 受限调度器
override fun onUnconfinedVsConfined() { mainScope.launch(Dispatchers.Unconfined) { // not confined -- will work with main thread log("Unconfined : start", coroutineContext) delay(500) log("Unconfined : end", coroutineContext) } mainScope.launch { // context of the parent, main runBlocking coroutine log("main runBlocking: start", coroutineContext) delay(1000) log("main runBlocking: end", coroutineContext) }}Thread[main,5,main]-->StandaloneCoroutine{Active}@62f6fa8-->315783128-->null : Unconfined : startThread[main,5,main]-->StandaloneCoroutine{Active}@a27eac1-->232495113-->null : main runBlocking: startThread[kotlinx.coroutines.DefaultExecutor,5,main]-->StandaloneCoroutine{Active}@62f6fa8-->315783128-->null : Unconfined : endThread[main,5,main]-->StandaloneCoroutine{Active}@a27eac1-->232495113-->null : main runBlocking: end3、不同线程间跳转
@OptIn(DelicateCoroutinesApi::class)override fun onJumpThread() { newSingleThreadContext("Ctx1").use { ctx1 -> newSingleThreadContext("Ctx2").use { ctx2 -> runBlocking(ctx1) { log("Started in ctx1", coroutineContext) withContext(ctx2) { log("Working in ctx2", coroutineContext) } log("Back to ctx1", coroutineContext) } } }}Thread[Ctx1,5,main]-->BlockingCoroutine{Active}@a4ddda1-->310540647-->null : Started in ctx1Thread[Ctx2,5,main]-->DispatchedCoroutine{Active}@67116b4-->213567889-->null : Working in ctx2Thread[Ctx1,5,main]-->BlockingCoroutine{Active}@a4ddda1-->310540647-->null : Back to ctx14、子协成
@OptIn(DelicateCoroutinesApi::class)override fun onChild() { mainScope.launch { // 启动一个协程来处理某种传入请求(request) val request = launch { // 孵化了两个子作业, 其中一个通过 GlobalScope 启动 GlobalScope.launch { log("job1: I run in GlobalScope and execute independently!",coroutineContext) delay(1000) log("job1: I am not affected by cancellation of the request",coroutineContext) } // 另一个则承袭了父协程的上下文 launch { delay(100) log("job2: I am a child of the request coroutine",coroutineContext) delay(1000) log("job2: I will not execute this line if my parent request is cancelled",coroutineContext) } } delay(500) request.cancel() // 取消请求(request)的执行 delay(1000) // 延迟一秒钟来看看发生了什么 log("main: Who has survived request cancellation?",coroutineContext) }}Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@8339fd9-->205189639-->null : job1: I run in GlobalScope and execute independently!Thread[main,5,main]-->StandaloneCoroutine{Active}@883307f-->204922311-->null : job2: I am a child of the request coroutineThread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@8339fd9-->205189639-->null : job1: I am not affected by cancellation of the requestThread[main,5,main]-->StandaloneCoroutine{Active}@9dc884c-->227554708-->null : main: Who has survived request cancellation?5、协成的命名和log
override fun onNamed() { mainScope.launch { log("Started main coroutine",coroutineContext) // 运行两个后台值计算 val v1 = async(CoroutineName("v1coroutine")) { delay(500) log("Computing v1",coroutineContext) 252 } val v2 = async(CoroutineName("v2coroutine")) { delay(1000) log("Computing v2",coroutineContext) 6 } launch(Dispatchers.Default + CoroutineName("test")) { log("I'm working in thread ${Thread.currentThread().name}",coroutineContext) } log("The answer for v1 / v2 = ${v1.await() / v2.await()}",coroutineContext) }}fun log(msg: String?, coroutine: CoroutineContext? = null) { val message = msg ?: "null" if (coroutine == null) { Log.e("~~~~", "${Thread.currentThread()} : $message") } else { val tag = "${Thread.currentThread()}-->${coroutine[Job]}-->${coroutine.hashCode()}-->${coroutine[CoroutineName.Key]} :" Log.e("~~~~", "$tag $message") }}五. Asynchronous Flow(异步流)
1、Flow 的介绍
fun onIntroduce(mainScope: CoroutineScope) = mainScope.launch { log("I'm the parent Coroutine") launch { for (k in 1..3) { log("I'm not blocked $k", coroutineContext) delay(2000) } } simple().collect { value -> println(value) } log("I'm the parent Coroutine end")}private fun simple(): Flow<Int> = flow { // 流构建器 for (i in 1..3) { delay(2000) // 假装我们在这里做了一些有用的事情 log("I'm not emit $i", coroutineContext) emit(i) // 发送下一个值 }}Thread[main,5,main] : I'm the parent CoroutineThread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 1Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 1Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 2Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 2Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 3Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 3Thread[main,5,main] : I'm the parent Coroutine end2、Flows are cold
flow does not run until the flow is collected
fun onFlowCold(mainScope: CoroutineScope) = mainScope.launch { log("Calling simple function...",coroutineContext) val flow = simple() log("Calling collect...",coroutineContext) flow.collect { value -> log(value,coroutineContext) } log("Calling collect again...",coroutineContext) flow.collect { value -> log(value,coroutineContext)}}fun simple(): Flow<Int> = flow { log("Flow started",coroutineContext) for (i in 1..3) { delay(100) emit(i) }}3、Flows withTimeout and onTimeoutOrNull
fun onFlowTimeout(mainScope: CoroutineScope) = mainScope.launch { try { withTimeout(250) { // 在 250 毫秒后超时 simple().collect { value -> log("$value") } } } catch (e: Exception) { log("Exception : $e") } log("Done") } private fun simple(): Flow<Int> = flow { // 流构建器 for (i in 1..3) { log("simpleFlow", coroutineContext) delay(100) // 假装我们在这里做了一些有用的事情 emit(i) // 发送下一个值 } } fun onTimeoutOrNull(mainScope: CoroutineScope) = mainScope.launch { withTimeoutOrNull(250) { // 在 250 毫秒后超时 simple().collect { value -> log("$value") } } log("Done") }4、Flow operators
- map : 转化器 可以接受 suspend 方法 以及 操作
- transform : 转化器 接受 FlowCollector
- filter :过滤器
- take : 截取长度
- toList : 转换成 List
- toSet : 转化为 Set
- first : 获取第一个数据
- reduce and fold 的区别 :fold 有初始值
fun onTransform(mainScope: CoroutineScope) = mainScope.launch { (1..3).asFlow() // a flow of requests .map { performRequest(it) } .transform { emit(it) emit("Hello emit $it") } .filter { it.length > 2 } .take(5) .collect { response -> log(response) } for (item in (1..3).asFlow().toList()){ log("toList $item") } for (item in (1..3).asFlow().toSet()){ log("toSet $item") } log((1..3).asFlow().first())}fun onReduce(mainScope: CoroutineScope) = mainScope.launch{ val sum = (1..5).asFlow() .map { it * it } // squares of numbers from 1 to 5 .reduce { a, b -> a + b } // sum them (terminal operator) log(sum) val sum1 = (1..5).asFlow() .map { it * it } // squares of numbers from 1 to 5 .fold(100) { a, b -> a + b } // sum them (terminal operator) log(sum1)}suspend fun performRequest(request: Int): String { delay(1000) // imitate long-running asynchronous work return "response $request"}5、Flows are sequential
fun onSequential(mainScope: CoroutineScope) = mainScope.launch { (1..5).asFlow() .filter { log("Filter $it") it % 2 == 0 } .map { log("Map $it") "string $it" }.collect { log("Collect $it") }}6、Flow context Thread
Since simple().collect is called from the main thread, the body of simple's flow is also called in the main thread
fun onThread(mainScope: CoroutineScope) = mainScope.launch { simple().collect { value -> log("${Thread.currentThread()} :Collected $value") }}private fun simple(): Flow<Int> = flow { log("${Thread.currentThread()}:Started simple flow") for (i in 1..3) { emit(i) }}7、Flow 的线程切换
However, the long-running CPU-consuming code might need to be executed in the context of Dispatchers.Default and UI-updating code might need to be executed in the context of Dispatchers.Main
Flow 通过 withContext 会导致异常,应该通过 flowOn 进行切换
private fun onThreadSwitchErro(mainScope: CoroutineScope) = mainScope.launch { simpleErro().collect { value -> log("collect $value", coroutineContext) }}private fun simpleErro(): Flow<Int> = flow { // The WRONG way to change context for CPU-consuming code in flow builder try { kotlinx.coroutines.withContext(Dispatchers.Default) { for (i in 1..3) { delay(1000) Thread.sleep(1000) // pretend we are computing it in CPU-consuming way log("emit $i", coroutineContext) emit(i) // emit next value } } } catch (e: Exception) { log("Exception $e") } }private fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(1000) Thread.sleep(1000) // pretend we are computing it in CPU-consuming way log("emit $i", coroutineContext) emit(i) // emit next value }}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builderprivate fun onThreadFlowOn(mainScope: CoroutineScope) = mainScope.launch { simple().collect { value -> log("collect $value", coroutineContext) }}8、Buffering
Buffer :通过 buffer 获取数据
*** conflate*** : 获取最新的数据
*** collectLatest***:获取最后的数据
fun onBuffer(mainScope: CoroutineScope) = mainScope.launch { val time = measureTimeMillis { simple().collect { value -> delay(300) // pretend we are processing it for 300 ms log("Normal $value") } } log("Collected in $time ms Normal",coroutineContext) val time1 = measureTimeMillis { simple().buffer(3).collect { value -> delay(300) // pretend we are processing it for 300 ms log("buffer $value",coroutineContext) } } log("Collected in $time1 ms buffer",coroutineContext) val time2 = measureTimeMillis { simple() .conflate() // conflate emissions, don't process each one .collect { value -> delay(300) // pretend we are processing it for 300 ms log("conflate $value",coroutineContext) } } log("Collected in $time2 ms ",coroutineContext) val time3 = measureTimeMillis { simple() .conflate() // conflate emissions, don't process each one .collectLatest { value -> log("collectLatest start $value",coroutineContext) delay(300) // pretend we are processing it for 300 ms log("collectLatest end $value",coroutineContext) } } log("Collected in $time3 ms ",coroutineContext)}private fun simple(): Flow<Int> = flow { for (i in 1..6) { log("emit: $i") delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value }}Thread[main,5,main] : flowItem.index 10Thread[main,5,main] : emit: 1Thread[main,5,main] : Normal 1Thread[main,5,main] : emit: 2Thread[main,5,main] : Normal 2Thread[main,5,main] : emit: 3Thread[main,5,main] : Normal 3Thread[main,5,main] : emit: 4Thread[main,5,main] : Normal 4Thread[main,5,main] : emit: 5Thread[main,5,main] : Normal 5Thread[main,5,main] : emit: 6Thread[main,5,main] : Normal 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 2442 ms NormalThread[main,5,main] : emit: 1Thread[main,5,main] : emit: 2Thread[main,5,main] : emit: 3Thread[main,5,main] : emit: 4Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 1Thread[main,5,main] : emit: 5Thread[main,5,main] : emit: 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 2Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 3Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 4Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 5Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 1960 ms bufferThread[main,5,main] : emit: 1Thread[main,5,main] : emit: 2Thread[main,5,main] : emit: 3Thread[main,5,main] : emit: 4Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 1Thread[main,5,main] : emit: 5Thread[main,5,main] : emit: 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 3Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 1034 msThread[main,5,main] : emit: 1Thread[main,5,main] : emit: 2Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 1Thread[main,5,main] : emit: 3Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 2Thread[main,5,main] : emit: 4Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 3Thread[main,5,main] : emit: 5Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 4Thread[main,5,main] : emit: 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 5Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest end 6Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 960 ms9、Composing multiple flows : zip vs Combine
fun onCompose(mainScope: CoroutineScope) = mainScope.launch { val num = (1..3).asFlow() // numbers 1..3 val str = flowOf("one", "two", "three","four") // strings num.zip(str) { a, b -> "$a -> $b" } // compose a single string .collect { log("zip : $it") } // collect and print val num1 = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val str1 = flowOf("one", "two", "three","four").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time num1.zip(str1) { a, b -> "$a -> $b" } // compose a single string with "zip" .collect { value -> // collect and print log("zip delay : $value at ${System.currentTimeMillis() - startTime} ms from start") } num1.combine(str1) { a, b -> "$a -> $b" } // compose a single string with "combine" .collect { value -> // collect and print log("combine delay : $value at ${System.currentTimeMillis() - startTime} ms from start") }}Thread[main,5,main] : zip : 1 -> oneThread[main,5,main] : zip : 2 -> twoThread[main,5,main] : zip : 3 -> threeThread[main,5,main] : zip delay : 1 -> one at 406 ms from startThread[main,5,main] : zip delay : 2 -> two at 806 ms from startThread[main,5,main] : zip delay : 3 -> three at 1207 ms from startThread[main,5,main] : combine delay : 1 -> one at 1617 ms from startThread[main,5,main] : combine delay : 2 -> one at 1821 ms from startThread[main,5,main] : combine delay : 2 -> two at 2020 ms from startThread[main,5,main] : combine delay : 3 -> two at 2125 ms from startThread[main,5,main] : combine delay : 3 -> three at 2423 ms from startThread[main,5,main] : combine delay : 3 -> four at 2827 ms from start10、Flattening flows
- flatMapConcat 顺序展平流
- flatMapMerge 尽快的展平流
- flatMapLatest 获取最新的数据
@OptIn(FlowPreview::class)fun onFlatMapConcat(mainScope: CoroutineScope) = mainScope.launch { var startTime = System.currentTimeMillis() // remember the start time var useTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms .flatMapConcat { requestFlow(it) } .collect { value -> // collect and print log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}") useTime = System.currentTimeMillis() } startTime = System.currentTimeMillis() // remember the start time useTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(600) } // emit a number every 100 ms .flatMapConcat { requestFlow(it) } .collect { value -> // collect and print log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}") useTime = System.currentTimeMillis() }}Thread[main,5,main] : start : 1: First: 110 : 110Thread[main,5,main] : start : 1: Second: 612 : 501Thread[main,5,main] : start : 1: Third: 1114 : 501Thread[main,5,main] : start : 2: First: 1217 : 102Thread[main,5,main] : start : 2: Second: 1718 : 501Thread[main,5,main] : start : 2: Third: 2221 : 502Thread[main,5,main] : start : 3: First: 2328 : 106Thread[main,5,main] : start : 3: Second: 2831 : 503Thread[main,5,main] : start : 3: Third: 3334 : 502Thread[main,5,main] : end : 1: First: 607 : 607Thread[main,5,main] : end : 1: Second: 1110 : 502Thread[main,5,main] : end : 1: Third: 1615 : 504Thread[main,5,main] : end : 2: First: 2220 : 605Thread[main,5,main] : end : 2: Second: 2723 : 502Thread[main,5,main] : end : 2: Third: 3226 : 502Thread[main,5,main] : end : 3: First: 3832 : 605Thread[main,5,main] : end : 3: Second: 4335 : 502Thread[main,5,main] : end : 3: Third: 4838 : 502@OptIn(FlowPreview::class)fun onFlatMapMerge(mainScope: CoroutineScope) = mainScope.launch { var startTime = System.currentTimeMillis() // remember the start time var useTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(800) } // emit a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}") useTime = System.currentTimeMillis() } startTime = System.currentTimeMillis() // remember the start time useTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}") useTime = System.currentTimeMillis() }}Thread[main,5,main] : start : 1: First: 814 : 814Thread[main,5,main] : start : 1: Second: 1316 : 501Thread[main,5,main] : start : 2: First: 1615 : 299Thread[main,5,main] : start : 1: Third: 1816 : 201Thread[main,5,main] : start : 2: Second: 2117 : 301Thread[main,5,main] : start : 3: First: 2416 : 299Thread[main,5,main] : start : 2: Third: 2623 : 206Thread[main,5,main] : start : 3: Second: 2921 : 297Thread[main,5,main] : start : 3: Third: 3424 : 502Thread[main,5,main] : end : 1: First: 116 : 116Thread[main,5,main] : end : 2: First: 221 : 104Thread[main,5,main] : end : 3: First: 326 : 104Thread[main,5,main] : end : 1: Second: 620 : 293Thread[main,5,main] : end : 2: Second: 725 : 104Thread[main,5,main] : end : 3: Second: 829 : 104Thread[main,5,main] : end : 1: Third: 1124 : 294Thread[main,5,main] : end : 2: Third: 1229 : 105Thread[main,5,main] : end : 3: Third: 1333 : 104@OptIn(FlowPreview::class)fun onFlatMapMerge(mainScope: CoroutineScope) = mainScope.launch { var startTime = System.currentTimeMillis() // remember the start time var useTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(800) } // emit a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}") useTime = System.currentTimeMillis() } startTime = System.currentTimeMillis() // remember the start time useTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}") useTime = System.currentTimeMillis() }}Thread[main,5,main] : start : 1: First: 804 : 804Thread[main,5,main] : start : 1: Second: 1305 : 501Thread[main,5,main] : start : 2: First: 1607 : 301Thread[main,5,main] : start : 2: Second: 2111 : 504Thread[main,5,main] : start : 3: First: 2421 : 309Thread[main,5,main] : start : 3: Second: 2923 : 501Thread[main,5,main] : start : 3: Third: 3427 : 504Thread[main,5,main] : end : 1: First: 111 : 111Thread[main,5,main] : end : 2: First: 224 : 113Thread[main,5,main] : end : 3: First: 336 : 111Thread[main,5,main] : end : 3: Second: 838 : 501Thread[main,5,main] : end : 3: Third: 1342 : 50411、Declarative handling
- try catch exception 捕获异常
- catch 捕获异常
- onCompletion 表示完成,能展示异常但是不会处理
fun onDeclarative(mainScope: CoroutineScope) = mainScope.launch { try { simple().collect { value -> log("try : $value") } } catch (e:Exception){ log("try : $e") }finally { log("try : Done") } simple() .onCompletion { cause ->log("Flow completed exceptionally $cause") } .catch { cause -> log("Caught exception: $cause") }.collect { value -> log("catch $value") } (1..4).asFlow() .onCompletion { cause -> log("asFlow Flow completed with $cause") } .catch { cause -> log("asFlow Caught exception: $cause") } .collect { value -> log("asFlow $value") }}private fun simple(): Flow<Int> = flow { emit(1) throw RuntimeException()}Thread[main,5,main] : flowItem.index 15Thread[main,5,main] : try : 1Thread[main,5,main] : try : java.lang.RuntimeExceptionThread[main,5,main] : try : DoneThread[main,5,main] : catch 1Thread[main,5,main] : Flow completed exceptionally java.lang.RuntimeExceptionThread[main,5,main] : Caught exception: java.lang.RuntimeExceptionThread[main,5,main] : asFlow 1Thread[main,5,main] : asFlow 2Thread[main,5,main] : asFlow 3Thread[main,5,main] : asFlow 4Thread[main,5,main] : asFlow Flow completed with null12、FLaunching flow
fun onCollect(mainScope: CoroutineScope) = mainScope.launch { events() .onEach { event -> log("Collect Event: $event") } .collect() // <--- Collecting the flow waits log("Collect Done")}private fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }Thread[main,5,main] : Collect Event: 1Thread[main,5,main] : Collect Event: 2Thread[main,5,main] : Collect Event: 3Thread[main,5,main] : Collect Doneprivate fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }fun onLaunchIn(mainScope: CoroutineScope) = mainScope.launch { events() .onEach { event -> log("LaunchIn Event: $event") } .launchIn(mainScope) // <--- Collecting the flow waits log("LaunchIn Done")}Thread[main,5,main] : LaunchIn DoneThread[main,5,main] : LaunchIn Event: 1Thread[main,5,main] : LaunchIn Event: 2Thread[main,5,main] : LaunchIn Event: 313、Flow cancellation checks
- asFlow:因为 繁忙的数据流无法取消
- flow {}:因为 asFlow 默认使用 ensureActive检查所以可以取消
- cancellable:通过 cancellable 是流可以被取消
- currentCoroutineContext().ensureActive():通过 currentCoroutineContext().ensureActive() 是流可以被取消
/** * 因为 繁忙的数据流无法取消 */fun onAsFlow(mainScope: CoroutineScope) = mainScope.launch { (1..5).asFlow().collect { value -> if (value == 3) cancel() log("asFlow : $value") }}// Thread[main,5,main] : asFlow : 1// Thread[main,5,main] : asFlow : 2// Thread[main,5,main] : asFlow : 3// Thread[main,5,main] : asFlow : 4// Thread[main,5,main] : asFlow : 5/** * 因为 asFlow 默认使用 ensureActive检查所以可以取消 */fun onFlow(mainScope: CoroutineScope) = mainScope.launch { foo().collect { value -> if (value == 3) cancel() log("cancel : $value") }}// Thread[main,5,main] : Emitting 1// Thread[main,5,main] : cancel : 1// Thread[main,5,main] : Emitting 2// Thread[main,5,main] : cancel : 2// Thread[main,5,main] : Emitting 3// Thread[main,5,main] : cancel : 3// Thread[main,5,main] : Emitting 4/** * 通过 cancellable 是流可以被取消 */fun onCancel(mainScope: CoroutineScope) = mainScope.launch { (1..5).asFlow().cancellable().collect { value -> if (value == 3) cancel() log("asFlow : $value") }}// Thread[main,5,main] : asFlow : 1// Thread[main,5,main] : asFlow : 2// Thread[main,5,main] : asFlow : 3/** * 通过 currentCoroutineContext().ensureActive() 是流可以被取消 */fun onEnsureActive(mainScope: CoroutineScope) = mainScope.launch { (1..5).asFlow().onEach { currentCoroutineContext().ensureActive() }.collect { value -> if (value == 3) cancel() log("asFlow : $value") }}// Thread[main,5,main] : asFlow : 1// Thread[main,5,main] : asFlow : 2// Thread[main,5,main] : asFlow : 3private fun foo(): Flow<Int> = flow { for (i in 1..5) { log("Emitting $i") emit(i) }}六. Channels
1、Building channel producers
@OptIn(ExperimentalCoroutinesApi::class)private fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { for (x in 1..5) { delay(300) send(x * x) }}fun onProducer(mainScope: CoroutineScope) = mainScope.launch { val squares = produceSquares() squares.consumeEach { log(it) } println("Done!")}2、Pipelines
private fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // 在流中开始从 1 生产无穷多个整数}private fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (x in numbers) send(x * x * x)}fun onPipeline(mainScope: CoroutineScope) = mainScope.launch { val numbers = produceNumbers() // 从 1 开始生成整数 val squares = square(numbers) // 整数求平方 repeat(5) { log(squares.receive()) // 输出前五个 } log("Done!") // 至此已完成 coroutineContext.cancelChildren() // 取消子协程}3、Fan-out
@OptIn(ExperimentalCoroutinesApi::class)private fun CoroutineScope.produceNumbers() = produce { var x = 1 // start from 1 while (true) { send(x++) // produce next delay(100) // wait 0.1s }}private fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { log("rocessor #$id received $msg",coroutineContext) }}fun fanOut(mainScope:CoroutineScope) = mainScope.launch { val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // cancel producer coroutine and thus kill them all}4、Fan-in
@OptIn(ExperimentalCoroutinesApi::class)fun CoroutineScope.numberProducer(start: Int) = produce<Int> { var index = start while (true) send(index++)}@OptIn(ExperimentalCoroutinesApi::class)fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { for (x in numbers) if (x % prime != 0) send(x)}fun findPrime(mainScope: CoroutineScope) = mainScope.launch { var cur = numberProducer(2) repeat(10) { val prime = cur.receive() log(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren()}4、Buffered channels
fun sendWithBuffer(mainScope:CoroutineScope) = mainScope.launch { val channel = Channel<Int>(4) // create buffered channel val sender = launch { // launch sender coroutine repeat(10) { log("Four Sending $it") // print before sending each element channel.send(it) // will suspend when buffer is full } } // don't receive anything... just wait.... delay(1000) sender.cancel() // cancel sender coroutine val channel1 = Channel<Int>(2) // create buffered channel val sender1 = launch { // launch sender coroutine repeat(10) { log("Two Sending $it") // print before sending each element channel1.send(it) // will suspend when buffer is full } } // don't receive anything... just wait.... delay(1000) sender1.cancel() // cancel sender coroutine}Thread[main,5,main] : Four Sending 0Thread[main,5,main] : Four Sending 1Thread[main,5,main] : Four Sending 2Thread[main,5,main] : Four Sending 3Thread[main,5,main] : Four Sending 4Thread[main,5,main] : Two Sending 0Thread[main,5,main] : Two Sending 1Thread[main,5,main] : Two Sending 25、Channels are fair
发送和接收操作是 公平的 并且尊重调用它们的多个协程。它们遵守先进先出原则
data class Ball(var hits: Int)fun showChannelFair(mainScope:CoroutineScope) = mainScope.launch { val table = Channel<Ball>() // a shared table launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them}suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ log("$name $ball") delay(300) // wait a bit table.send(ball) // send the ball back }}// Thread[main,5,main] : ping Ball(hits=1)// Thread[main,5,main] : pong Ball(hits=2)// Thread[main,5,main] : ping Ball(hits=3)// Thread[main,5,main] : pong Ball(hits=4)6、Ticker channels
计时器通道是一种特别的会合通道,每次经过特定的延迟都会从该通道进行消费并产生 Unit
fun tickerSend(mainScope:CoroutineScope) = mainScope.launch { val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } log("Initial element is available immediately: $nextElement") // no initial delay nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay log("Next element is not ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } log("Next element is ready in 100 ms: $nextElement") // 模拟大量消费延迟 log("Consumer pauses for 150ms") delay(150) // 下一个元素立即可用 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } log("Next element is available immediately after large consumer delay: $nextElement") // 请注意,`receive` 调用之间的暂停被考虑在内,下一个元素的到达速度更快 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } log("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() // 表明不再需要更多的元素}// Thread[main,5,main] : Initial element is available immediately: kotlin.Unit// Thread[main,5,main] : Next element is not ready in 50 ms: null// Thread[main,5,main] : Next element is ready in 100 ms: kotlin.Unit// Thread[main,5,main] : Consumer pauses for 150ms// Thread[main,5,main] : Next element is available immediately after large consumer delay: kotlin.Unit// Thread[main,5,main] : Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit七. Coroutine exceptions handling
1、Exception propagation
- propagating exceptions automatically (launch and actor) 自动传播异常
- exposing them to users (async and produce) 向用户暴露异常
@OptIn(DelicateCoroutinesApi::class)fun propagationException(mainScope:CoroutineScope) = mainScope.launch { val deferred = GlobalScope.async { // root coroutine with async log("Throwing exception from async") throw ArithmeticException() // Nothing is printed, relying on user to call await } try { deferred.await() log("Unreached") } catch (e: ArithmeticException) { log("Caught ArithmeticException") } val job = try { GlobalScope.launch { // root coroutine with launch log("Throwing exception from launch") throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler } }catch(e:Exception) { log("Throwing exception from launch") null } job?.join() log("Joined failed job")}Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from asyncThread[main,5,main] : Caught ArithmeticExceptionThread[DefaultDispatcher-worker-1,5,main] : Throwing exception from launch2022-10-28 15:12:21.144 24517-24585/com.zhihaoliang.coroutine E/AndroidRuntime: FATAL EXCEPTION: DefaultDispatcher-worker-1 Process: com.zhihaoliang.coroutine, PID: 24517 java.lang.IndexOutOfBoundsException at com.zhihaoliang.coroutine.buss.excep.func.ExceptionPropagation$propagationException$1$job$1.invokeSuspend(ExceptionPropagation.kt:29) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@65b7906, Dispatchers.Default]2、CoroutineExceptionHandler
private val handler = CoroutineExceptionHandler { _, exception -> log("CoroutineExceptionHandler got $exception") }fun handlerException(mainScope:CoroutineScope) = mainScope.launch { val deferred = GlobalScope.async(handler) { // root coroutine with async log("Throwing exception from async") throw ArithmeticException() // Nothing is printed, relying on user to call await } try { deferred.await() log("Unreached") } catch (e: ArithmeticException) { log("Caught ArithmeticException") } val job = try { GlobalScope.launch(handler) { // root coroutine with launch log("Throwing exception from launch") throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler } }catch(e:Exception) { log("Throwing exception from launch") null } job?.join() log("Joined failed job")}Thread[main,5,main] : com.zhihaoliang.coroutine.buss.excep.ExceptionActivityThread[main,5,main] : exceptionItem.index 1Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from asyncThread[main,5,main] : Caught ArithmeticExceptionThread[DefaultDispatcher-worker-1,5,main] : Throwing exception from launchThread[DefaultDispatcher-worker-1,5,main] : CoroutineExceptionHandler got java.lang.IndexOutOfBoundsExceptionThread[main,5,main] : Joined failed job3、Cancellation and exceptions
CancellationException are ignored by all handlers
fun cancelExceptionNormal(mainScope:CoroutineScope) = mainScope.launch { val job = launch { val child = launch { try { delay(Long.MAX_VALUE) } finally { log("Child is cancelled") } } yield() log("Cancelling child") child.cancel() child.join() yield() log("arent is not cancelled") } job.join()}// Thread[main,5,main] : Cancelling child// Thread[main,5,main] : Child is cancelled// Thread[main,5,main] : Parent is not cancelled@OptIn(DelicateCoroutinesApi::class)fun cancelExceptionHander(mainScope:CoroutineScope) = mainScope.launch { val handler = CoroutineExceptionHandler { _, exception -> log("CoroutineExceptionHandler got $exception") } val job = GlobalScope.launch(handler) { launch { // the first child try { delay(Long.MAX_VALUE) } finally { withContext(NonCancellable) { log("Children are cancelled, but exception is not handled until all children terminate") delay(100) log("The first child finished its non cancellable block") } } } launch { // the second child delay(10) log("Second child throws an exception") throw ArithmeticException() } } job.join()}// Thread[DefaultDispatcher-worker-2,5,main] : Second child throws an exception// Thread[DefaultDispatcher-worker-1,5,main] : Children are cancelled, but exception is not handled until all children terminate// Thread[DefaultDispatcher-worker-1,5,main] : The first child finished its non cancellable block// Thread[DefaultDispatcher-worker-1,5,main] : CoroutineExceptionHandler got java.lang.ArithmeticException4、Exceptions aggregation
当协程的多个子协程因异常而失败时, 一般规则是“取第一个异常”,因此将处理第一个异常。 在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常
@OptIn(DelicateCoroutinesApi::class)fun exceptionAggregationNormal(mainScope: CoroutineScope) = mainScope.launch { val handler = CoroutineExceptionHandler { _, exception -> log( "CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}") } val job = GlobalScope.launch(handler) { launch { try { delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException 失败时,它将被取消 } finally { throw ArithmeticException() // 第二个异常 } } launch { delay(100) throw IOException() // 首个异常 } delay(Long.MAX_VALUE) } job.join()}// Thread[DefaultDispatcher-worker-3,5,main] : CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]取消异常是透明的,默认情况下是未包装的:handler未捕获到
@OptIn(DelicateCoroutinesApi::class)fun exceptionAggregationCancel(mainScope: CoroutineScope) = mainScope.launch { val handler = CoroutineExceptionHandler { _, exception -> log( "CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}") } val job = GlobalScope.launch(handler) { val inner = launch { // 该栈内的协程都将被取消 launch { launch { throw IOException() // 原始异常 } } } try { inner.join() } catch (e: CancellationException) { log("Rethrowing CancellationException with original cause") throw e // 取消异常被重新抛出,但原始 IOException 得到了处理 } } job.join()}// Thread[DefaultDispatcher-worker-3,5,main] : Rethrowing CancellationException with original cause// Thread[DefaultDispatcher-worker-2,5,main] : CoroutineExceptionHandler got java.io.IOException with suppressed []5、Supervision job
fun supervisionJob(mainScope:CoroutineScope) = mainScope.launch { val supervisor = SupervisorJob() with(CoroutineScope(coroutineContext + supervisor)) { // launch the first child -- its exception is ignored for this example (don't do this in practice!) val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) { log("The first child is failing") throw AssertionError("The first child is cancelled") } // launch the second child val secondChild = launch { firstChild.join() // Cancellation of the first child is not propagated to the second child log("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active") try { delay(Long.MAX_VALUE) } finally { // But cancellation of the supervisor is propagated log("The second child is cancelled because the supervisor was cancelled") } } // wait until the first child fails & completes firstChild.join() log("Cancelling the supervisor") supervisor.cancel() secondChild.join() }}// Thread[main,5,main] : The first child is failing// Thread[main,5,main] : The first child is cancelled: true, but the second one is still active// Thread[main,5,main] : Cancelling the supervisor// Thread[main,5,main] : The second child is cancelled because the supervisor was cancelled6、Supervision scop
fun supervisionScope(mainScope:CoroutineScope) = mainScope.launch { try { supervisorScope { val child = launch { try { log("The child is sleeping") delay(Long.MAX_VALUE) } finally { log("The child is cancelled") } } // Give our child a chance to execute and print using yield yield() log("Throwing an exception from the scope") throw AssertionError() } } catch(e: AssertionError) { log("Caught an assertion error") }}// Thread[main,5,main] : The child is sleeping// Thread[main,5,main] : Throwing an exception from the scope// Thread[main,5,main] : The child is cancelled// Thread[main,5,main] : Caught an assertion error八.Shared mutable state and concurrency
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } log("Completed ${n * k} actions in $time ms")}1、The problem
fun showErro(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default) { var counter = 0 massiveRun { counter++ } log("Counter = $counter")}// Thread[DefaultDispatcher-worker-8,5,main] : Completed 100000 actions in 21 ms// Thread[DefaultDispatcher-worker-8,5,main] : Counter = 450042、Volatiles are of no help
@Volatile // in Kotlin `volatile` is an annotationvar counterVolatile = 0fun showVolatilesErro(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default) { counterVolatile = 0 massiveRun { counterVolatile++ } log("counterVolatile = $counterVolatile")}// Thread[DefaultDispatcher-worker-7,5,main] : Completed 100000 actions in 77 ms// Thread[DefaultDispatcher-worker-7,5,main] : counterVolatile = 887773、Thread-safe data structures
// Thread[DefaultDispatcher-worker-7,5,main] : Completed 100000 actions in 34 ms// Thread[DefaultDispatcher-worker-7,5,main] : Counter = 1000004、Thread confinement coarse-grained
val counterContext = newSingleThreadContext("CounterContext")private fun runSingleThread(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){ var counter = 0 withContext(counterContext) { massiveRun { counter++ } } log("Counter = $counter")}// Thread[CounterContext,5,main] : Completed 100000 actions in 65 ms// Thread[DefaultDispatcher-worker-7,5,main] : Counter = 1000005、Mutual exclusion
val mutex = Mutex()private fun useMutual(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){ var counter = 0 massiveRun { // protect each increment with lock mutex.withLock { counter++ } } log("Counter = $counter")}// Thread[DefaultDispatcher-worker-2,5,main] : Completed 100000 actions in 1547 ms// Thread[DefaultDispatcher-worker-2,5,main] : Counter = 1000006、Actors
// 计数器 Actor 的各种类型sealed class CounterMsgobject IncCounter : CounterMsg() // 递增计数器的单向消息class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求// 这个函数启动一个新的计数器 actorprivate fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor 状态 for (msg in channel) { // 即将到来消息的迭代器 when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } }}private fun useActor(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){ val counter = counterActor() // 创建该 actor withContext(Dispatchers.Default) { massiveRun { counter.send(IncCounter) } } // 发送一条消息以用来从一个 actor 中获取计数值 val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) log("Counter = ${response.await()}") counter.close() // 关闭该actor}// Thread[DefaultDispatcher-worker-4,5,main] : Completed 100000 actions in 907 ms// Thread[DefaultDispatcher-worker-4,5,main] : Counter = 100000代码地址:https://gitee.com/trainAndroid/coroutine#1exception-propagation |