一.MutableSharedFlow接口的实现
1.MutableSharedFlow方法
在Koltin协程:异步热数据流的计划与利用中,提到了可以通过MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:
@Suppress("FunctionName", "UNCHECKED_CAST")public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): MutableSharedFlow<T> { // 参数查抄 require(replay >= 0) { "replay cannot be negative, but was $replay" } require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" } require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) { "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow" } // 相加盘算缓存容量 val bufferCapacity0 = replay + extraBufferCapacity // 假如缓存容量小于0,则设置缓存容量为Int类型的最大值 val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // 创建一个SharedFlowImpl类型的对象并返回 return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)}在MutableSharedFlow方法中,起首将参数replay与参数extraBufferCapacity相加,盘算缓存容量,这与上一篇里提到的“参数replay与参数extraBufferCapacity共同决定缓存最大容量”的计划头脑一致。接着判断缓存容量是否溢出,即小于0,假如小于零则将缓存容量设置为Int类型的最大值。最后根据参数,创建并返回一个SharedFlowImpl类型的对象。
二.SharedFlowImpl类
SharedFlowImpl类是MutableSharedFlow接口的焦点实现,它的继承关系如下图所示:
- AbstractSharedFlow类:提供了对订阅者举行管理的方法。
- CancellableFlow接口:用于标记SharedFlowImpl类型的Flow对象是可取消的。
- MutableSharedFlow接口:体现SharedFlowImpl类型的Flow对象是一个热流。
- FusibleFlow接口:体现SharedFlowImpl类型的Flow对象是可融合的。
1.发射数据的管理
在SharedFlowImpl类中,维护了一个缓存数组,用于生存emit方法发射数据,数据缓存数组分成了buffered values和queued emitters两部分,它的结构如下所示:
- buffered values:体现当前缓存数据的巨细,最大容量为SharedFlowImp类构造方法中bufferCapacity。buffered values由extraBuffer和replayCache两部分构成:
- replayCache的最大容量由MutableSharedFlow方法中参数replay决定。
- extraBuffer的最大容量由MutableSharedFlow方法中参数extraBufferCapacity决定。
- queued emitters:通常情况下,当调用emit方法发射数据时,假如缓存数组的buffered values未到达最大容量,则发射的数据将生存到缓存中,并立即返回emit方法。假如缓存数组的buffered values已到达最大容量,则调用emit方法的协程会被立即挂起,而且它的续体和数据会被封装成一个Emitter类型的对象,生存到缓存数组的queued emitters中。
- 数据缓存的移动:假设上图中,当buffered values中位置为0的数据被全部的订阅者都处置处罚后,buffered values会前移动一位。这时,queued emitters中位置为7的Emitter类型的对象就会被“拆箱”,将此中生存的数据存放到位置7,同时规复此中生存的emit方法地点续体的执行。之后,位置7将作为buffered values的一部分。
为了实现上述模子的运行,在SharedFlowImpl类中利用了许多的全局变量,代码如下:
private class SharedFlowImpl<T>( private val replay: Int, // replayCache的最大容量 private val bufferCapacity: Int, // buffered values的最大容量 private val onBufferOverflow: BufferOverflow // 溢出计谋) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> { // 缓存数组,用于生存emit方法发射的数据,在需要时举行初始化 private var buffer: Array<Any?>? = null // 新的订阅者从replayCache中获取数据的起始位置 private var replayIndex = 0L // 当前全部的订阅者从缓存数组中获取的数据中,对应位置最小的索引 // 假如没有订阅者,则minCollectorIndex的值便是replayIndex private var minCollectorIndex = 0L // 缓存数组中buffered values缓存数据的数量 private var bufferSize = 0 // 缓存数组中queued emitters缓存数据的数量 private var queueSize = 0 // 当前缓存数组的起始位置 private val head: Long get() = minOf(minCollectorIndex, replayIndex) // 当前缓存数组中replayCache缓存数据的数量 private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt() // 当前缓存数组中已经缓存的数据的数量 private val totalSize: Int get() = bufferSize + queueSize // 当前缓存数组中buffered values的最末了位置索引的后一位 private val bufferEndIndex: Long get() = head + bufferSize // 当前数组中queued emitters的最末了位置索引的后一位 private val queueEndIndex: Long get() = head + bufferSize + queueSize ...}上面代码中的全局变量对应到数组中的位置如下图所示:
2.订阅者的管理
在SharedFlowImpl中,AbstractSharedFlow类与AbstractSharedFlowSlot类实现了对订阅者的管理,这两个类都是抽象类。在AbstractSharedFlow类中维护了一个订阅者数组,数组中每一个元素都是一个AbstractSharedFlowSlot类型的对象。
1)AbstractSharedFlowSlot类与SharedFlowSlot类
在AbstractSharedFlowSlot类中,定义了allocateLocked方法与freeLocked方法,用于实现订阅者数组中AbstractSharedFlowSlot类型对象的复用,代码如下:
internal abstract class AbstractSharedFlowSlot<F> { // 用于新订阅者申请利用当前AbstractSharedFlowSlot类型的对象 // 返回true代表申请乐成,返回false代表申请失败 abstract fun allocateLocked(flow: F): Boolean // 用于订阅者开释当前利用的AbstractSharedFlowSlot类型得到对象, // 并以数组的形式返回待规复的续体 abstract fun freeLocked(flow: F): Array<Continuation<Unit>?>}在SharedFlowImpl中,当有新的订阅者出现时,会为它在订阅者数组中分配一个类型为SharedFlowSlot的对象。SharedFlowSlot类继承自AbstractSharedFlowSlot类,代码如下:
private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() { // 体现将要在处置处罚的数据在数组中的索引 // 假如为-1,体现当前可用 @JvmField var index = -1L // 用来生存等候新数据发送的订阅者的续体 @JvmField var cont: Continuation<Unit>? = null // 重写 override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean { // 假如已经被其他订阅者利用,则返回false if (index >= 0) return false // 走到这里分析没有被其他订阅者利用,分配乐成 // 获取当前的新订阅者应该从缓存数组获取数据的初始位置 index = flow.updateNewCollectorIndexLocked() return true } // 重写 override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> { assert { index >= 0 } val oldIndex = index // 打扫索引,体现可用 index = -1L // 打扫续体 cont = null return flow.updateCollectorIndexLocked(oldIndex) }}SharedFlowSlot类在AbstractSharedFlowSlot类的基础上,加入了全局变量index和cont。
当index大于便是0时,体现订阅者应该从缓存数组index对应的位置中获取数据,而当index小于0时,则体现当前SharedFlowSlot类型的对象没有被任何订阅者利用。
当订阅者处置处罚完应该处置处罚的全部数据时,订阅者地点的协程会被挂起,它的续体就会被生存在全局变量cont中。
2)AbstractSharedFlow类
AbstractSharedFlow类内部维护了一个订阅者数组,并管理订阅者数组中AbstractSharedFlowSlot类型的对象。AbstractSharedFlow类定义了两个抽象方法:createSlot方法与createSlotArray方法,createSlot方法用于创建一个类型为AbstractSharedFlowSlot的对象,createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组,代码如下:internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() { @Suppress("UNCHECKED_CAST") // 存放订阅者的数组,在须要时举行初始化 protected var slots: Array<S?>? = null private set // 用于记录订阅者的数量 protected var nCollectors = 0 private set // 用于生存在订阅者数组中查找空位时,下一次要查找的位置 private var nextIndex = 0 // 订阅者数量的状态流,当订阅者数量发生变革时,会举行回调 private var _subscriptionCount: MutableStateFlow<Int>? = null val subscriptionCount: StateFlow<Int> // 加锁 get() = synchronized(this) { // 对_subscriptionCount初始化,nCollectors为初始值 _subscriptionCount ?: MutableStateFlow(nCollectors).also { _subscriptionCount = it } } ... // 创建一个类型为AbstractSharedFlowSlot的对象 protected abstract fun createSlot(): S // createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组 protected abstract fun createSlotArray(size: Int): Array<S?> ... }AbstractSharedFlow类继承自SynchronizedObject类,SynchronizedObject类现实是Any类的别名,代码如下:
@InternalCoroutinesApipublic actual typealias SynchronizedObject = Anya)allocateSlot方法
allocateSlot方法用于为新的订阅者在订阅者数组中分配一个可利用的AbstractSharedFlowSlot类型对象,代码如下:
internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() { ... @Suppress("UNCHECKED_CAST") protected fun allocateSlot(): S { // 在锁外创建一个状态流 var subscriptionCount: MutableStateFlow<Int>? = null // 加锁 val slot = synchronized(this) { // 对订阅者数组举行判断处置处罚 val slots = when (val curSlots = slots) { // 为空则初始化,初始化巨细为2,并生存到全局变量 null -> createSlotArray(2).also { slots = it } // 假如容量已满,则举行扩容,扩容前巨细为扩容后巨细的2倍,并生存到全局变量 else -> if (nCollectors >= curSlots.size) { curSlots.copyOf(2 * curSlots.size).also { slots = it } } else { curSlots } } // 获取全局变量 var index = nextIndex var slot: S // 遍历 while (true) { // 获取index位置的AbstractSharedFlowSlot类型的对象, // 假如为空,则调用createSlot方法创建一个,并生存到订阅者数组中 slot = slots[index] ?: createSlot().also { slots[index] = it } // 自增 index++ // 假如遍历到数组的最后一个元素,则重新开始 if (index >= slots.size) index = 0 // 实验对AbstractSharedFlowSlot类型的对象分配订阅者, // 分配乐成则跳出循环 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break } // 走到这里分析已经分配乐成 // 将下一次要遍历的位置生存到全局变量 nextIndex = index // 订阅者数量自增 nCollectors++ // 获取全局变量 subscriptionCount = _subscriptionCount // 返回分配的AbstractSharedFlowSlot类型的对象 slot } // 订阅者状态流增长1,此方法会触发回调关照 subscriptionCount?.increment(1) // 返回 return slot } ...}b)freeSlot方法
freeSlot方法用于开释已分配给订阅者的AbstractSharedFlowSlot类型的对象,代码如下:
internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() { ... @Suppress("UNCHECKED_CAST") protected fun freeSlot(slot: S) { // 在锁外创建一个状态流 var subscriptionCount: MutableStateFlow<Int>? = null // 加锁 val resumes = synchronized(this) { // 订阅者数量自减 nCollectors-- // 获取全局变量 subscriptionCount = _subscriptionCount // 假如订阅者为0,分析订阅者数组里没有订阅者,则下一次重新开始 if (nCollectors == 0) nextIndex = 0 // 开释已分配的AbstractSharedFlowSlot类型对象 (slot as AbstractSharedFlowSlot<Any>).freeLocked(this) } // 对开释后返回的续体举行遍历,规复续体 for (cont in resumes) cont?.resume(Unit) // 订阅者状态流减1,此方法会触发回调关照 subscriptionCount?.increment(-1) } // 用于遍历订阅者数组 protected inline fun forEachSlotLocked(block: (S) -> Unit) { // 假如没有订阅者,则直接返回 if (nCollectors == 0) return // 遍历订阅者数组 slots?.forEach { slot -> if (slot != null) block(slot) } } ...}3.数据的吸收
当调用SharedFlow类型对象的collect方法,会触发订阅过程,吸收emit方法发送的数据,这部分在 SharedFlowImpl中实现,代码如下:
@Suppress("UNCHECKED_CAST")override suspend fun collect(collector: FlowCollector<T>) { // 为当前的订阅者分配一个SharedFlowSlot类型的对象 val slot = allocateSlot() try { // 假如collector类型为SubscribedFlowCollector, // 分析订阅者监听了订阅过程的启动,则先回调 if (collector is SubscribedFlowCollector) collector.onSubscription() // 获取订阅者地点的协程 val collectorJob = currentCoroutineContext()[Job] // 死循环 while (true) { var newValue: Any? // 死循环 while (true) { // 从缓存数组中获取数据 newValue = tryTakeValue(slot) // 假如获取数据乐成,则跳出循环 if (newValue !== NO_VALUE) break // 走到这里,分析获取数据失败, // 挂起订阅者地点协程,等候新数据的到来 awaitValue(slot) } // 走到这里,分析已经获取到了数据 // 判断订阅者地点协程是否是存活的,假如不是则抛出异常 collectorJob?.ensureActive() // 举行类型转换,并向鄙俚发射数据 collector.emit(newValue as T) } } finally { // 开释已分配的SharedFlowSlot类型的对象 freeSlot(slot) }}@SharedImmutable@JvmFieldinternal val NO_VALUE = Symbol("NO_VALUE")1)数据的获取
在collect方法中,通过tryTakeValue方法获取数据,代码如下:
private fun tryTakeValue(slot: SharedFlowSlot): Any? { var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES // 加锁 val value = synchronized(this) { // 从slot中获取index // index体现当前应该从缓存数组的index位置中获取数据 val index = tryPeekLocked(slot) // 假如index小于0,分析没有数据 if (index < 0) { // 返回空数据标识 NO_VALUE } else { // 假如有数据 // 获取当前的slot的index val oldIndex = slot.index // 从缓存数组的index处获取数据 val newValue = getPeekedValueLockedAt(index) // 盘算下一次获取数据的位置,并生存到slot中 slot.index = index + 1 // 更新缓存数组的位置,并获取缓存数组与订阅者数组中可规复的续体 resumes = updateCollectorIndexLocked(oldIndex) // 返回获取的数据 newValue } } // 遍历,规复续体 for (resume in resumes) resume?.resume(Unit) // 返回获取的数据 return value}@JvmField@SharedImmutableinternal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)a)数据获取计谋
在tryTakeValue方法,获取数据之前,起首会调用tryPeekLocked方法,判断数据地点的位置是否符合要求,代码如下:
private fun tryPeekLocked(slot: SharedFlowSlot): Long { // 从slot中获取index val index = slot.index // 假如是在buffered values中获取,则直接返回 if (index < bufferEndIndex) return index // 走到这里分析是要在queued emitters中获取, // 假如buffered values的最大容量大于0,则返回-1 // 在buffered values可以存在的情况下,克制发射者和订阅者接触 if (bufferCapacity > 0) return -1L // 走到这里分析要在queued emitters中获取,同时buffered values的最大容量为0 // 这种情况缓存数组只能有queued emitters, // 因此,只能处置处罚queued emitters中的第一个Emitter类型的对象 // 假如当前订阅者想要处置处罚下一个Emitter类型的对象,则返回-1 if (index > head) return -1L // 走到这里分析要在queued emitters中获取,同时buffered values的最大容量为0 // 而且要获取当前的正在处置处罚的Emmiter类型的对象 // 假如queued emitters为空,分析当前没有Emmiter类型的对象,则返回-1 if (queueSize == 0) return -1L // 满意上述要求,返回index return index}在允许bufferd values存在的情况下,只能从bufferd values获取数据。在不允许bufferd values存在的情况下,只能处置处罚queued emitters的第一个Emitter类型的对象。
b)获取数据
假如数据地点的位置符合要求,则会调用getPeekedValueLockedAt方法获取数据,代码如下:
private fun getPeekedValueLockedAt(index: Long): Any? = // 从缓存数组中index位置获取数据 when (val item = buffer!!.getBufferAt(index)) { // 假如是Emitter类型的,则举行拆箱,获取数据 is Emitter -> item.value // 直接返回 else -> item }Emitter类是SharedFlowImpl类的内部类,用于在挂起调用emit方法地点的协程后,对emit方法发射的数据及挂起后的续体举行封装,代码如下:
private class Emitter( @JvmField val flow: SharedFlowImpl<*>, @JvmField var index: Long, // 当前对象在缓存数组中的位置 @JvmField val value: Any?,// emit方法发射的数据 @JvmField val cont: Continuation<Unit> // 挂起的续体) : DisposableHandle { override fun dispose() = flow.cancelEmitter(this)}2)订阅者协程的挂起
在collect方法中,当订阅者无数据可获取时,则会调用awaitValue方法,挂起订阅者地点的协程,代码如下:
private suspend fun awaitValue(slot: SharedFlowSlot): Unit = // 直接挂起订阅者地点的协程 suspendCancellableCoroutine { cont -> // 加锁 synchronized(this) lock@{ // 再次查抄当前的index是否满意要求 val index = tryPeekLocked(slot) // 假如确实不满意要求 if (index < 0) { // 生存续体到slot中 slot.cont = cont } else { // 假如再次查抄发现index这时满意要求 // 则规复挂起,并返回 cont.resume(Unit) return@lock } // 生存续体到slot中 slot.cont = cont }}4.数据的发射
当需要发射数据时,会调用SharedFlowImpl类的emit方法,代码如下:
override suspend fun emit(value: T) { // 起首实验调用不需要挂起的tryEmit方法,假如发射乐成,则返回 if (tryEmit(value)) return // 走到这里分析需要挂起,则调用emitSuspend方法 emitSuspend(value)}1)以不挂起的方式发射数据
SharedFlowImpl类中实现了MutableSharedFlow接口中tryEmit方法,用于以不挂起的方式发射数据,代码如下:
override fun tryEmit(value: T): Boolean { var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES // 加锁 val emitted = synchronized(this) { // 实验发射数据,假如发射乐成 if (tryEmitLocked(value)) { // 网络已经挂起的订阅者的续体 resumes = findSlotsToResumeLocked(resumes) // 返回true true } else { // 发射失败 // 返回false false } } // 唤起挂起的订阅者 for (cont in resumes) cont?.resume(Unit) // 返回效果 return emitted}tryEmit方法中通过tryEmitLocked方法实验对数据举行发射,假如发射乐成,会调用findSlotsToResumeLocked方法网络已经挂起的订阅者的续体,并叫醒订阅者去吸收消费数据。
tryEmitLocked方法代码如下:
@Suppress("UNCHECKED_CAST")private fun tryEmitLocked(value: T): Boolean { // 假如当前没有订阅者,则调用tryEmitNoCollectorsLocked处置处罚,并返回 // 该方法永世返回true if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // 假如当前有订阅者,同时buffered values已到达最大容量 if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) { // 根据溢出计谋举行判断 when (onBufferOverflow) { // 假如是挂起,则返回fasle BufferOverflow.SUSPEND -> return false // 假如是丢掉最新的数据,则返回true BufferOverflow.DROP_LATEST -> return true // 假如是丢掉最旧的数据,则暂不作处置处罚 BufferOverflow.DROP_OLDEST -> {} } } // 走到这里,有两种情况: // 情况1:buffered values还可以继承添加数据 // 情况2:buffered values已到达最大容量,同时溢出计谋为DROP_OLDEST // 将数据加入到缓存数组中 // 这里由于tryEmit方法不会挂起emit方法地点的协程, // 以是value没有被封装成Emitter类型的对象 enqueueLocked(value) // buffered values的数据数量加1 bufferSize++ // 假如buffered values的数据数量凌驾最大容量的限定, // 分析此时为情况2,则调用dropOldestLocked方法,扬弃最旧的数据 if (bufferSize > bufferCapacity) dropOldestLocked() // 假如replayCache中数据的数量凌驾了最大容量 if (replaySize > replay) { // 更新replayIndex的值,replayIndex向前移动一位 updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex) } // 返回true return true}**buffered values已到达最大容量,同时溢出计谋为DROP_OLDEST情况下数据发射图解**: 在tryEmitLocked方法中,假如当前没有订阅者时,会调用tryEmitNoCollectorsLocked方法,代码如下:
// 假如当前没有订阅者,会调用tryEmitNoCollectorsLocked方法private fun tryEmitNoCollectorsLocked(value: T): Boolean { assert { nCollectors == 0 } // 假如不允许有replayCache,则不处置处罚,直接返回true if (replay == 0) return true // 走到这里分析可以有replayCache // 加入到缓存数组中 enqueueLocked(value) // buffered values的数据数量加1 bufferSize++ // 假如buffered values的数据数量凌驾了replayCache的最大容量 // 则扬弃最旧的数据 // 由于新订阅者只会从replayCache中取数据, // 假如没有订阅者,buffered values的数据数量凌驾replayCache的最大容量没故意义 if (bufferSize > replay) dropOldestLocked() // 重新盘算minCollectorIndex minCollectorIndex = head + bufferSize // 返回true return true}在tryEmitNoCollectorsLocked方法中,假如发现当前buffered values的数据数量凌驾了replayCache的最大容量,则会扬弃最旧的数据,保持buffered values中数据的数量最大为replay。由于当有新的订阅者出现时,起首会从replayCache中获取数据,因此在buffered values中,replayCache前的数据只对已经订阅的订阅者有用,而此时又没有订阅者,因此缓存凌驾replayCache最大容量的数据只会占用更多内存,是没故意义的。
通过对tryEmitLocked方法与tryEmitNoCollectorsLocked方法的分析,可以知道数据的发射终极都调用了enqueueLocked方法,代码如下:
private fun enqueueLocked(item: Any?) { // 获取当前缓存数组中缓存的数量 val curSize = totalSize // 判断 val buffer = when (val curBuffer = buffer) { // 缓存数组为空,则举行初始化,初始化容量为2 null -> growBuffer(null, 0, 2) // 假如凌驾了当前缓存数组的最大容量,则举行扩容,新的缓存数组的容量为之前的2倍 // growBuffer方法会把原来缓存数组的数据添补到新的缓存数组中 else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer } // 将要发射的数据添补的缓存数组的head + curSize位置 buffer.setBufferAt(head + curSize, item)}enqueueLocked方法内部将要发射的数据添补的缓存数组的序次位置,终极完成了数据发射的过程。通过分析可以知道,数据发射的实质就是将数据添加到缓存数组中。
2)以挂起的方式发射数据
SharedFlowImpl类中实现了MutableSharedFlow接口中emit方法。在上面分析的emit方法中,起首会实验通过tryEmit方法发射数据,假如发射失败,分析发射过程需要挂起,这时会调用emitSuspend方法,代码如下:
private suspend fun emitSuspend(value: T) = // 直接挂起emit方法地点的协程,获取续体 suspendCancellableCoroutine<Unit> sc@{ cont -> var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES // 加锁 val emitter = synchronized(this) lock@{ // 这里再次实验以tryEmit的方式发射数据 if (tryEmitLocked(value)) { // 假如发射乐成,则规复续体的执行 cont.resume(Unit) // 网络已经挂起的订阅者的续体 resumes = findSlotsToResumeLocked(resumes) // 返回 return@lock null } // 将续体、待发射的数据等封装成Emitter类型的对象 Emitter(this, head + totalSize, value, cont).also { // 加入到缓存数组中 enqueueLocked(it) // queued emitters的数据的数量加1 queueSize++ // 假如buffered values的最大容量为0,即不存在 // 则网络已经挂起的订阅者的续体,生存到局部变量resumes中 if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes) } } // emitter对象监听emit方法地点协程的取消 // 发生取消时会调用emitter对象的dispose方法 emitter?.let { cont.disposeOnCancellation(it) } // 遍历,唤起挂起的订阅者 for (cont in resumes) cont?.resume(Unit)}3)叫醒挂起的订阅者
无论是在tryEmit方法,还是在emit方法,当发射数据乐成后,都会调用findSlotsToResumeLocked方法,获取已经挂起的订阅者的续体,然后规复订阅者地点协程的执行,代码如下:
private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> { // 引用参数中的续体数组 var resumes: Array<Continuation<Unit>?> = resumesIn // 用于记录需要规复的续体的数量 var resumeCount = resumesIn.size // 遍历订阅者数组 forEachSlotLocked loop@{ slot -> // 获取续体,假如续体为空,分析对应订阅者的协程没有挂起,本次循环返回 val cont = slot.cont ?: return@loop // 判断slot中index是否符合要求 // 假如不符合要求,则本次循环返回 if (tryPeekLocked(slot) < 0) return@loop // 假如需要规复的续体的数量凌驾续体数组的容量,则举行扩容 // 新的续体数组的容量是之前续体数组容量的2倍 if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size)) // 生存续体到续体数组中 resumes[resumeCount++] = cont // 清空slot中生存的续体 slot.cont = null } // 返回网络完的续体数组 return resumes}5.新订阅者获取缓存数据
SharedFlowImpl类实现了SharedFlow接口,重写了此中的常量replayCache,当有新订阅者出现时,假如replayCache存在,而且有缓存数据,则优先从replayCache中获取,代码如下:
override val replayCache: List<T> // 只能获取,不能设置,加锁 get() = synchronized(this) { // 获取当前replayCache中缓存数据的数量 val replaySize = this.replaySize // 假如数量为0,则返回一个空列表 if (replaySize == 0) return emptyList() // 若数量不为0,则根据容量创建一个列表 val result = ArrayList<T>(replaySize) // 获取缓存数组 val buffer = buffer!! // 遍历replayCache,将数据举行类型转换,并添加到列表中 @Suppress("UNCHECKED_CAST") for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T // 返回列表 result }6.热流的融合
SharedFlowImpl类实现了FusibleFlow接口,重写了此中的fuse方法,代码如下:
// 调用了fuseSharedFlow方法实现override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseSharedFlow(context, capacity, onBufferOverflow)...internal fun <T> SharedFlow<T>.fuseSharedFlow( context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> { // 假如容量为0或默认值,同时溢出计谋为SUSPEND if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) { // 返回自身 return this } // 其他情况,将当前的SharedFlow对象包装成ChannelFlowOperatorImpl类型的对象 return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)}在Kotlin协程:Flow的融合、Channel容量、溢出计谋中提到过,当对类型为SharedFlowImpl的对象利用某些利用符时,会触发fuse方法的执行。fuse方法默认的容量为OPTIONAL_CHANNEL,默认的溢出计谋为SUSPEND,返回自身,因此融合后还是SharedFlowImpl类型的对象。
假如容量为RENDEZVOUS,同时溢出计谋为SUSPEND时,也会返回自身。RENDEZVOUS体现容量为0,无论SharedFlowImpl类型的对象的buffered values最大容量是否为0,在外貌再套一层RENDEZVOUS是没故意义的。
其他情况下,SharedFlowImpl类型的对象会被封装成一个类型为ChannelFlowOperatorImpl的对象,根据Kotlin协程:flowOn与线程切换讲过的,之后向鄙俚发射的数据会通过Channel来发送。
7.只读热流
调用MutableSharedFlow方法,可以得到一个类型为MutableSharedFlow的对象。通过这个对象,我们可以调用它的collect方法来订阅吸收,也可以调用它的emit方法来发射数据。但大多数的时间,我们需要同一数据的发射过程,因此需要对外袒露一个只可以调用collect方法订阅而不能调用emit方法发射的对象,而不是直接袒露MutableSharedFlow类型的对象。
根据上面代码的先容,订阅的过程现实上是从缓存数组中读取数据的,而发射的过程现实上是向缓存数据中写数据,因此假如一个流只能调用collect方法而不能调用emit方法,这种流这是一种只读流。
究竟上,根据在Koltin协程:异步热数据流的计划与利用中对接口的分析可以发现,MutableSharedFlow接口继承了FlowCollector接口和SharedFlow接口,emit方法定义在FlowCollector中。SharedFlow接口继承了Flow接口,collect方法定义在Flow接口中。因此只要将MutableSharedFlow接口指向的对象转换为SharedFlow接口指向的对象就可以将读写流转换为只读流。
在代码中,对MutableSharedFlow类型的对象调用asSharedFlow方法恰好可以实现将读写流转换为只读流,代码如下:
// 该方法调用了ReadonlySharedFlow方法,返回一个类型为SharedFlow的对象public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> = // 传入当前的MutableSharedFlow类型的对象 ReadonlySharedFlow(this)// 实现了FusibleFlow接口,// 实现了SharedFlow接口,而且利用上一步传入的MutableSharedFlow类型的对象作为代理private class ReadonlySharedFlow<T>( flow: SharedFlow<T>) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> { // 用于流融合,也是通过fuseSharedFlow方法实现 override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseSharedFlow(context, capacity, onBufferOverflow)}作者:李萧蝶
链接:https://juejin.cn/post/7144692583535476773
|