Come and Meet Kotlin Coroutine
Tags of Kotlin Coroutine
- 挂起和规复
// suspend functionsuspend fun function1() { delay(1000L) println("suspend function1")}// normal functionfun function2() {// delay(2000L) not satisfy structural concurrency println("suspend function2")}// type check:val funcVal1: suspend () -> Unit = ::function1val funcVal2: () -> Unit = ::function2相比平凡的函数,suspend函数可以明白为一种新的函数范例。
launch async runBlocking是三种常见的协程构建器,我们从函数署名上【感性】地熟悉一下他们的区别:
// launchpublic fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): Job// aysncpublic fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T): Deferred<T> // runBlockingpublic fun <T> runBlocking( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
我们通常可以利用launch启动一个协程,他的返回值Job可以用于控制这个协程的生命周期, async可以看做是一个升级版的launch,他的block的返回值会被放在Deferred中。Deferred是Job的子类,可以通过await方法获取返回值:
fun main() = runBlocking { val job = launch { println("execute of job") "execute of job" // launch中的block不思量返回值,lambda的返回值会被忽略 } val deferred = async { println("execute of deferred") "result of deferred" } println(deferred.await()) Unit}/** execute of job execute of deferred result of deferred**/launch和async默认都是写了之后立刻启动(这一点非常紧张,aysnc并不必要await触发实验),可以通过调解CoroutineStart参数变更启动方式:
fun main() = runBlocking { val lazyJob = launch(start = CoroutineStart.LAZY) { println("execute now!") } println("before lazy starts") // 通过delay先让父协程挂起,显着去别处launch没有立刻实验 println("parent sleeps") delay(1000L) println("parent wakes up") lazyJob.start() Unit}/** before lazy starts parent sleeps parent wakes up execute now!**/下面我分别在两个suspend函数和两个由launch发起的协程中delay两秒,叨教main函数实验完因素别必要几秒?
fun main() = runBlocking { getUserInfo() getFriendList()}suspend fun getUserInfo() { println("getUserInfo: start time: ${System.currentTimeMillis()}") delay(2000L) println("getUserInfo: end time: ${System.currentTimeMillis()}") logX("suspend function1")}suspend fun getFriendList() { println("getFriendList: start time: ${System.currentTimeMillis()}") delay(2000L) println("getFriendList end time: ${System.currentTimeMillis()}") logX("suspend function2")}
fun main() = runBlocking { launch { println("launch1: start time: ${System.currentTimeMillis()}") delay(2000L) println("launch1: end time: ${System.currentTimeMillis()}") logX("launch1") } launch { println("launch2: start time: ${System.currentTimeMillis()}") delay(2000L) println("launch2: end time: ${System.currentTimeMillis()}") logX("launch2") } Unit}答案发表时间:
public suspend fun await(): Tawait是一个挂起函数,后续的流程会像上图以上被挂起,我们来看这个例子:
fun main() = runBlocking { val def = async { println("async starts") delay(2000L) println("async end") "hello world" } println("message from main") println(def.await()) println("end of story")}/** message from main async starts async end hello world // end of story的输出被挂起到await实验完成再规复 end of story**/suspend函数到主动机的转换在末了一节会阐明。Kotlin Coroutine狭义的协程指的是通过构建器启动的协程,后文不再阐明。
- 轻量级的线程
fun main() = runBlocking { repeat(1000_000_000) { launch { //常见的协程 delay(1000000) } } delay(10000L)}
- Kotlin Coroutine VS Thread
fun main() = runBlocking { logX("main ") val job = launch(Dispatchers.IO) { logX("launch 1") }}/**================================main Thread:main @coroutine#1================================================================launch 1Thread efaultDispatcher-worker-1 @coroutine#2================================**/ Dispatchers就可以指定运行的线程池。
Structured Concurrency
布局化并发的头脑贯穿于Kotlin coroutine的始终,我通过一句话概述:控制协程实验的范围。这个范围利用CoroutineScope实现。由于上面的代码都运行在runBlocking中,传入参数的时间直接将block设置为CoroutineScope的扩展lambda,以是不必要再指定scope:
// runBlockingpublic fun <T> runBlocking( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T 包罗suspend函数也必要运行在scope中,否则就会在编译期报错。
Suspend Function : A CPS Transformation
- 编译后新增长的匿名内部类:TestContinuation
- 看【挂起】和【规复】的逻辑:invokeSuspend
// 编译前的代码suspend fun testCoroutine() { log("start") val user = getUserInfo() log(user) val friendList = getFriendList(user) log(friendList) val feedList = getFeedList(friendList) log(feedList)}// 编译后的代码fun testCoroutine(completion: Continuation<Any?>): Any? { // TestContinuation本质上是匿名内部类 class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) { // 表示协程状态机当前的状态 var label: Int = 0 // 三个变量,对应原函数的三个变量 lateinit var user: String lateinit var friendList: String lateinit var feedList: String // result 吸收协程的运行结果 var result = continuation.result // suspendReturn 吸收挂起函数的返回值 var suspendReturn: Any? = null // CoroutineSingletons 是个摆列类 // COROUTINE_SUSPENDED 代表当前函数被挂起了 val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED // invokeSuspend 是协程的关键 // 它终极会调用 testCoroutine(this) 开启协程状态机 // 状态机干系代码就是反面的 when 语句 // 协程的本质,可以说就是 CPS + 状态机 override fun invokeSuspend(_result: Result<Any?>): Any? { result = _result label = label or Int.Companion.MIN_VALUE return testCoroutine(this) } } // ... val continuation = if (completion is TestContinuation) { completion } else { // 作为参数 // ↓ TestContinuation(completion) }}testCoroutine运行的逻辑如下:
when (continuation.label) { 0 -> { // 检测非常 throwOnFailure(result) log("start") // 将 label 置为 1,预备进入下一次状态 continuation.label = 1 // 实验 getUserInfo suspendReturn = getUserInfo(continuation) // 判定是否挂起 if (suspendReturn == sFlag) { return suspendReturn } else { result = suspendReturn //go to next state } } 1 -> { throwOnFailure(result) // 获取 user 值 user = result as String log(user) // 预备进入下一个状态 continuation.label = 2 // 实验 getFriendList suspendReturn = getFriendList(user, continuation) // 判定是否挂起 if (suspendReturn == sFlag) { return suspendReturn } else { result = suspendReturn //go to next state } } 2 -> { throwOnFailure(result) user = continuation.mUser as String // 获取 friendList 的值 friendList = result as String log(friendList) // 预备进入下一个状态 continuation.label = 3 // 实验 getFeedList suspendReturn = getFeedList(user, friendList, continuation) // 判定是否挂起 if (suspendReturn == sFlag) { return suspendReturn } else { result = suspendReturn //go to next state } } 3 -> { throwOnFailure(result) user = continuation.mUser as String friendList = continuation.mFriendList as String feedList = continuation.result as String log(feedList) loop = false }}我们来捋一下此中的次序,最开始先构建一个TestContinuation的实例,留意,Continuation的这个实例是三个挂起函数的公共参数。
开始时label = 0, 此时进入逻辑,先举行非常的查抄,设置下一次的入口label=1,实验getUserInfo:
when (continuation.label) { 0 -> { // ... continuation.label = 1 // 实验 getUserInfo suspendReturn = getUserInfo(continuation) // 判定是否挂起 if (suspendReturn == sFlag) { return suspendReturn } else { result = suspendReturn //go to next state } } // ...}在Kotlin编译器CPS转换之后的getUserInfo方法中,由于传入了continuation参数,必要再实验一次Continuation#invokeSuspend,这个方法同时也将结果纪录在了result处
override fun invokeSuspend(_result: Result<Any?>): Any? { result = _result label = label or Int.Companion.MIN_VALUE return testCoroutine(this) }相当于【递归】地实验一次如许的逻辑(个人以为这个逻辑和通报变乱的分发有点相似)。此时getUserInfo实验完成返回的结果是CoroutineSingletons.COROUTINE_SUSPEND,以是继承实验下个when的case。
必要留意的是,以上的挂起函数都是【真正的】挂起函数,suspend function中都带有挂起的操纵,但是Kotlin编译器在举行CPS转换的时间只认supsend关键字,对于伪suspend函数,走else分支,节流开销:
if (suspendReturn == sFlag) { return suspendReturn } else { result = suspendReturn //go to next state } |