Kotlin协程:MutableSharedFlow的实现原理

手机游戏开发者 2024-10-2 01:50:43 5 0 来自 中国
一.MutableSharedFlow接口的实现

1.MutableSharedFlow方法

在Koltin协程:异步热数据流的计划与利用中,提到了可以通过MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:
@Suppress("FunctionName", "UNCHECKED_CAST")public fun <T> MutableSharedFlow(    replay: Int = 0,    extraBufferCapacity: Int = 0,    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): MutableSharedFlow<T> {    // 参数查抄    require(replay >= 0) { "replay cannot be negative, but was $replay" }    require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }    require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {        "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"    }    // 相加盘算缓存容量    val bufferCapacity0 = replay + extraBufferCapacity    // 假如缓存容量小于0,则设置缓存容量为Int类型的最大值    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0    // 创建一个SharedFlowImpl类型的对象并返回    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)}在MutableSharedFlow方法中,起首将参数replay与参数extraBufferCapacity相加,盘算缓存容量,这与上一篇里提到的“参数replay与参数extraBufferCapacity共同决定缓存最大容量”的计划头脑一致。接着判断缓存容量是否溢出,即小于0,假如小于零则将缓存容量设置为Int类型的最大值。最后根据参数,创建并返回一个SharedFlowImpl类型的对象。
二.SharedFlowImpl类

SharedFlowImpl类是MutableSharedFlow接口的焦点实现,它的继承关系如下图所示:


  • AbstractSharedFlow类:提供了对订阅者举行管理的方法。
  • CancellableFlow接口:用于标记SharedFlowImpl类型的Flow对象是可取消的。
  • MutableSharedFlow接口:体现SharedFlowImpl类型的Flow对象是一个热流。
  • FusibleFlow接口:体现SharedFlowImpl类型的Flow对象是可融合的。
1.发射数据的管理

在SharedFlowImpl类中,维护了一个缓存数组,用于生存emit方法发射数据,数据缓存数组分成了buffered values和queued emitters两部分,它的结构如下所示:
2.png

  • buffered values:体现当前缓存数据的巨细,最大容量为SharedFlowImp类构造方法中bufferCapacity。buffered values由extraBuffer和replayCache两部分构成:

    • replayCache的最大容量由MutableSharedFlow方法中参数replay决定。
    • extraBuffer的最大容量由MutableSharedFlow方法中参数extraBufferCapacity决定。

  • queued emitters:通常情况下,当调用emit方法发射数据时,假如缓存数组的buffered values未到达最大容量,则发射的数据将生存到缓存中,并立即返回emit方法。假如缓存数组的buffered values已到达最大容量,则调用emit方法的协程会被立即挂起,而且它的续体和数据会被封装成一个Emitter类型的对象,生存到缓存数组的queued emitters中。
  • 数据缓存的移动:假设上图中,当buffered values中位置为0的数据被全部的订阅者都处置处罚后,buffered values会前移动一位。这时,queued emitters中位置为7的Emitter类型的对象就会被“拆箱”,将此中生存的数据存放到位置7,同时规复此中生存的emit方法地点续体的执行。之后,位置7将作为buffered values的一部分。
    为了实现上述模子的运行,在SharedFlowImpl类中利用了许多的全局变量,代码如下:
private class SharedFlowImpl<T>(    private val replay: Int, // replayCache的最大容量    private val bufferCapacity: Int, // buffered values的最大容量    private val onBufferOverflow: BufferOverflow // 溢出计谋) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {    // 缓存数组,用于生存emit方法发射的数据,在需要时举行初始化    private var buffer: Array<Any?>? = null    // 新的订阅者从replayCache中获取数据的起始位置    private var replayIndex = 0L    // 当前全部的订阅者从缓存数组中获取的数据中,对应位置最小的索引    // 假如没有订阅者,则minCollectorIndex的值便是replayIndex    private var minCollectorIndex = 0L    // 缓存数组中buffered values缓存数据的数量    private var bufferSize = 0    // 缓存数组中queued emitters缓存数据的数量    private var queueSize = 0    // 当前缓存数组的起始位置    private val head: Long get() = minOf(minCollectorIndex, replayIndex)    // 当前缓存数组中replayCache缓存数据的数量    private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()    // 当前缓存数组中已经缓存的数据的数量    private val totalSize: Int get() = bufferSize + queueSize    // 当前缓存数组中buffered values的最末了位置索引的后一位    private val bufferEndIndex: Long get() = head + bufferSize    // 当前数组中queued emitters的最末了位置索引的后一位    private val queueEndIndex: Long get() = head + bufferSize + queueSize    ...}上面代码中的全局变量对应到数组中的位置如下图所示:
2.订阅者的管理

在SharedFlowImpl中,AbstractSharedFlow类与AbstractSharedFlowSlot类实现了对订阅者的管理,这两个类都是抽象类。在AbstractSharedFlow类中维护了一个订阅者数组,数组中每一个元素都是一个AbstractSharedFlowSlot类型的对象。
1)AbstractSharedFlowSlot类与SharedFlowSlot类

在AbstractSharedFlowSlot类中,定义了allocateLocked方法与freeLocked方法,用于实现订阅者数组中AbstractSharedFlowSlot类型对象的复用,代码如下:
internal abstract class AbstractSharedFlowSlot<F> {    // 用于新订阅者申请利用当前AbstractSharedFlowSlot类型的对象    // 返回true代表申请乐成,返回false代表申请失败    abstract fun allocateLocked(flow: F): Boolean    // 用于订阅者开释当前利用的AbstractSharedFlowSlot类型得到对象,    // 并以数组的形式返回待规复的续体    abstract fun freeLocked(flow: F): Array<Continuation<Unit>?>}在SharedFlowImpl中,当有新的订阅者出现时,会为它在订阅者数组中分配一个类型为SharedFlowSlot的对象。SharedFlowSlot类继承自AbstractSharedFlowSlot类,代码如下:
private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {    // 体现将要在处置处罚的数据在数组中的索引    // 假如为-1,体现当前可用    @JvmField    var index = -1L    // 用来生存等候新数据发送的订阅者的续体    @JvmField    var cont: Continuation<Unit>? = null    // 重写    override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {        // 假如已经被其他订阅者利用,则返回false        if (index >= 0) return false        // 走到这里分析没有被其他订阅者利用,分配乐成        // 获取当前的新订阅者应该从缓存数组获取数据的初始位置        index = flow.updateNewCollectorIndexLocked()        return true    }    // 重写    override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {        assert { index >= 0 }        val oldIndex = index        // 打扫索引,体现可用        index = -1L        // 打扫续体        cont = null        return flow.updateCollectorIndexLocked(oldIndex)    }}SharedFlowSlot类在AbstractSharedFlowSlot类的基础上,加入了全局变量index和cont。
当index大于便是0时,体现订阅者应该从缓存数组index对应的位置中获取数据,而当index小于0时,则体现当前SharedFlowSlot类型的对象没有被任何订阅者利用。
当订阅者处置处罚完应该处置处罚的全部数据时,订阅者地点的协程会被挂起,它的续体就会被生存在全局变量cont中。
2)AbstractSharedFlow类

AbstractSharedFlow类内部维护了一个订阅者数组,并管理订阅者数组中AbstractSharedFlowSlot类型的对象。AbstractSharedFlow类定义了两个抽象方法:createSlot方法与createSlotArray方法,createSlot方法用于创建一个类型为AbstractSharedFlowSlot的对象,createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组,代码如下:internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {    @Suppress("UNCHECKED_CAST")    // 存放订阅者的数组,在须要时举行初始化    protected var slots: Array<S?>? = null        private set    // 用于记录订阅者的数量    protected var nCollectors = 0        private set    // 用于生存在订阅者数组中查找空位时,下一次要查找的位置    private var nextIndex = 0    // 订阅者数量的状态流,当订阅者数量发生变革时,会举行回调    private var _subscriptionCount: MutableStateFlow<Int>? = null        val subscriptionCount: StateFlow<Int>        // 加锁        get() = synchronized(this) {            // 对_subscriptionCount初始化,nCollectors为初始值            _subscriptionCount ?: MutableStateFlow(nCollectors).also {                _subscriptionCount = it            }        }    ...    // 创建一个类型为AbstractSharedFlowSlot的对象    protected abstract fun createSlot(): S    // createSlotArray方法用于创建一个泛型AbstractSharedFlowSlot的数组    protected abstract fun createSlotArray(size: Int): Array<S?>    ...    }AbstractSharedFlow类继承自SynchronizedObject类,SynchronizedObject类现实是Any类的别名,代码如下:
@InternalCoroutinesApipublic actual typealias SynchronizedObject = Anya)allocateSlot方法

allocateSlot方法用于为新的订阅者在订阅者数组中分配一个可利用的AbstractSharedFlowSlot类型对象,代码如下:
internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {    ...    @Suppress("UNCHECKED_CAST")    protected fun allocateSlot(): S {        // 在锁外创建一个状态流        var subscriptionCount: MutableStateFlow<Int>? = null        // 加锁        val slot = synchronized(this) {            // 对订阅者数组举行判断处置处罚            val slots = when (val curSlots = slots) {                // 为空则初始化,初始化巨细为2,并生存到全局变量                null -> createSlotArray(2).also { slots = it }                // 假如容量已满,则举行扩容,扩容前巨细为扩容后巨细的2倍,并生存到全局变量                else -> if (nCollectors >= curSlots.size) {                    curSlots.copyOf(2 * curSlots.size).also { slots = it }                } else {                    curSlots                }            }            // 获取全局变量            var index = nextIndex            var slot: S            // 遍历            while (true) {                // 获取index位置的AbstractSharedFlowSlot类型的对象,                // 假如为空,则调用createSlot方法创建一个,并生存到订阅者数组中                slot = slots[index] ?: createSlot().also { slots[index] = it }                // 自增                index++                // 假如遍历到数组的最后一个元素,则重新开始                if (index >= slots.size) index = 0                // 实验对AbstractSharedFlowSlot类型的对象分配订阅者,                // 分配乐成则跳出循环                if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break            }            // 走到这里分析已经分配乐成            // 将下一次要遍历的位置生存到全局变量            nextIndex = index            // 订阅者数量自增            nCollectors++            // 获取全局变量            subscriptionCount = _subscriptionCount            // 返回分配的AbstractSharedFlowSlot类型的对象            slot        }        // 订阅者状态流增长1,此方法会触发回调关照        subscriptionCount?.increment(1)        // 返回        return slot    }    ...}b)freeSlot方法

freeSlot方法用于开释已分配给订阅者的AbstractSharedFlowSlot类型的对象,代码如下:
internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {    ...    @Suppress("UNCHECKED_CAST")    protected fun freeSlot(slot: S) {        // 在锁外创建一个状态流        var subscriptionCount: MutableStateFlow<Int>? = null        // 加锁        val resumes = synchronized(this) {            // 订阅者数量自减            nCollectors--            // 获取全局变量            subscriptionCount = _subscriptionCount            // 假如订阅者为0,分析订阅者数组里没有订阅者,则下一次重新开始            if (nCollectors == 0) nextIndex = 0            // 开释已分配的AbstractSharedFlowSlot类型对象            (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)        }        // 对开释后返回的续体举行遍历,规复续体        for (cont in resumes) cont?.resume(Unit)        // 订阅者状态流减1,此方法会触发回调关照        subscriptionCount?.increment(-1)    }    // 用于遍历订阅者数组    protected inline fun forEachSlotLocked(block: (S) -> Unit) {        // 假如没有订阅者,则直接返回        if (nCollectors == 0) return        // 遍历订阅者数组        slots?.forEach { slot ->            if (slot != null) block(slot)        }    }    ...}3.数据的吸收

当调用SharedFlow类型对象的collect方法,会触发订阅过程,吸收emit方法发送的数据,这部分在 SharedFlowImpl中实现,代码如下:
@Suppress("UNCHECKED_CAST")override suspend fun collect(collector: FlowCollector<T>) {    // 为当前的订阅者分配一个SharedFlowSlot类型的对象    val slot = allocateSlot()    try {        // 假如collector类型为SubscribedFlowCollector,        // 分析订阅者监听了订阅过程的启动,则先回调        if (collector is SubscribedFlowCollector) collector.onSubscription()        // 获取订阅者地点的协程        val collectorJob = currentCoroutineContext()[Job]        // 死循环        while (true) {            var newValue: Any?            // 死循环            while (true) {                // 从缓存数组中获取数据                newValue = tryTakeValue(slot)                // 假如获取数据乐成,则跳出循环                if (newValue !== NO_VALUE) break                // 走到这里,分析获取数据失败,                // 挂起订阅者地点协程,等候新数据的到来                awaitValue(slot)            }            // 走到这里,分析已经获取到了数据            // 判断订阅者地点协程是否是存活的,假如不是则抛出异常            collectorJob?.ensureActive()            // 举行类型转换,并向鄙俚发射数据            collector.emit(newValue as T)        }    } finally {        // 开释已分配的SharedFlowSlot类型的对象        freeSlot(slot)    }}@SharedImmutable@JvmFieldinternal val NO_VALUE = Symbol("NO_VALUE")1)数据的获取

在collect方法中,通过tryTakeValue方法获取数据,代码如下:
private fun tryTakeValue(slot: SharedFlowSlot): Any? {    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES    // 加锁    val value = synchronized(this) {        // 从slot中获取index        // index体现当前应该从缓存数组的index位置中获取数据        val index = tryPeekLocked(slot)        // 假如index小于0,分析没有数据        if (index < 0) {            // 返回空数据标识            NO_VALUE        } else { // 假如有数据            // 获取当前的slot的index            val oldIndex = slot.index            // 从缓存数组的index处获取数据            val newValue = getPeekedValueLockedAt(index)            // 盘算下一次获取数据的位置,并生存到slot中            slot.index = index + 1            // 更新缓存数组的位置,并获取缓存数组与订阅者数组中可规复的续体            resumes = updateCollectorIndexLocked(oldIndex)            // 返回获取的数据            newValue        }    }    // 遍历,规复续体    for (resume in resumes) resume?.resume(Unit)    // 返回获取的数据    return value}@JvmField@SharedImmutableinternal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)a)数据获取计谋

在tryTakeValue方法,获取数据之前,起首会调用tryPeekLocked方法,判断数据地点的位置是否符合要求,代码如下:
private fun tryPeekLocked(slot: SharedFlowSlot): Long {    // 从slot中获取index    val index = slot.index    // 假如是在buffered values中获取,则直接返回    if (index < bufferEndIndex) return index    // 走到这里分析是要在queued emitters中获取,    // 假如buffered values的最大容量大于0,则返回-1    // 在buffered values可以存在的情况下,克制发射者和订阅者接触    if (bufferCapacity > 0) return -1L    // 走到这里分析要在queued emitters中获取,同时buffered values的最大容量为0    // 这种情况缓存数组只能有queued emitters,    // 因此,只能处置处罚queued emitters中的第一个Emitter类型的对象    // 假如当前订阅者想要处置处罚下一个Emitter类型的对象,则返回-1    if (index > head) return -1L    // 走到这里分析要在queued emitters中获取,同时buffered values的最大容量为0    // 而且要获取当前的正在处置处罚的Emmiter类型的对象    // 假如queued emitters为空,分析当前没有Emmiter类型的对象,则返回-1    if (queueSize == 0) return -1L    // 满意上述要求,返回index    return index}在允许bufferd values存在的情况下,只能从bufferd values获取数据。在不允许bufferd values存在的情况下,只能处置处罚queued emitters的第一个Emitter类型的对象。
b)获取数据

假如数据地点的位置符合要求,则会调用getPeekedValueLockedAt方法获取数据,代码如下:
private fun getPeekedValueLockedAt(index: Long): Any? =    // 从缓存数组中index位置获取数据    when (val item = buffer!!.getBufferAt(index)) {        // 假如是Emitter类型的,则举行拆箱,获取数据        is Emitter -> item.value        // 直接返回        else -> item    }Emitter类是SharedFlowImpl类的内部类,用于在挂起调用emit方法地点的协程后,对emit方法发射的数据及挂起后的续体举行封装,代码如下:
private class Emitter(    @JvmField val flow: SharedFlowImpl<*>,    @JvmField var index: Long, // 当前对象在缓存数组中的位置    @JvmField val value: Any?,// emit方法发射的数据    @JvmField val cont: Continuation<Unit> // 挂起的续体) : DisposableHandle {    override fun dispose() = flow.cancelEmitter(this)}2)订阅者协程的挂起

在collect方法中,当订阅者无数据可获取时,则会调用awaitValue方法,挂起订阅者地点的协程,代码如下:
private suspend fun awaitValue(slot: SharedFlowSlot): Unit =   // 直接挂起订阅者地点的协程  suspendCancellableCoroutine { cont ->    // 加锁    synchronized(this) lock@{        // 再次查抄当前的index是否满意要求        val index = tryPeekLocked(slot)        // 假如确实不满意要求        if (index < 0) {            // 生存续体到slot中            slot.cont = cont        } else { // 假如再次查抄发现index这时满意要求            // 则规复挂起,并返回            cont.resume(Unit)            return@lock        }        // 生存续体到slot中        slot.cont = cont    }}4.数据的发射

当需要发射数据时,会调用SharedFlowImpl类的emit方法,代码如下:
override suspend fun emit(value: T) {    // 起首实验调用不需要挂起的tryEmit方法,假如发射乐成,则返回    if (tryEmit(value)) return    // 走到这里分析需要挂起,则调用emitSuspend方法    emitSuspend(value)}1)以不挂起的方式发射数据

SharedFlowImpl类中实现了MutableSharedFlow接口中tryEmit方法,用于以不挂起的方式发射数据,代码如下:
override fun tryEmit(value: T): Boolean {    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES    // 加锁    val emitted = synchronized(this) {        // 实验发射数据,假如发射乐成        if (tryEmitLocked(value)) {            // 网络已经挂起的订阅者的续体            resumes = findSlotsToResumeLocked(resumes)            // 返回true            true        } else { // 发射失败            // 返回false            false        }    }    // 唤起挂起的订阅者    for (cont in resumes) cont?.resume(Unit)    // 返回效果    return emitted}tryEmit方法中通过tryEmitLocked方法实验对数据举行发射,假如发射乐成,会调用findSlotsToResumeLocked方法网络已经挂起的订阅者的续体,并叫醒订阅者去吸收消费数据。
tryEmitLocked方法代码如下:
@Suppress("UNCHECKED_CAST")private fun tryEmitLocked(value: T): Boolean {    // 假如当前没有订阅者,则调用tryEmitNoCollectorsLocked处置处罚,并返回    // 该方法永世返回true    if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)    // 假如当前有订阅者,同时buffered values已到达最大容量    if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {        // 根据溢出计谋举行判断        when (onBufferOverflow) {            // 假如是挂起,则返回fasle            BufferOverflow.SUSPEND -> return false            // 假如是丢掉最新的数据,则返回true            BufferOverflow.DROP_LATEST -> return true            // 假如是丢掉最旧的数据,则暂不作处置处罚            BufferOverflow.DROP_OLDEST -> {}        }    }    // 走到这里,有两种情况:    // 情况1:buffered values还可以继承添加数据    // 情况2:buffered values已到达最大容量,同时溢出计谋为DROP_OLDEST    // 将数据加入到缓存数组中    // 这里由于tryEmit方法不会挂起emit方法地点的协程,    // 以是value没有被封装成Emitter类型的对象    enqueueLocked(value)    // buffered values的数据数量加1    bufferSize++    // 假如buffered values的数据数量凌驾最大容量的限定,    // 分析此时为情况2,则调用dropOldestLocked方法,扬弃最旧的数据    if (bufferSize > bufferCapacity) dropOldestLocked()    // 假如replayCache中数据的数量凌驾了最大容量    if (replaySize > replay) {        // 更新replayIndex的值,replayIndex向前移动一位        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)    }    // 返回true    return true}**buffered values已到达最大容量,同时溢出计谋为DROP_OLDEST情况下数据发射图解**: 在tryEmitLocked方法中,假如当前没有订阅者时,会调用tryEmitNoCollectorsLocked方法,代码如下:
// 假如当前没有订阅者,会调用tryEmitNoCollectorsLocked方法private fun tryEmitNoCollectorsLocked(value: T): Boolean {    assert { nCollectors == 0 }    // 假如不允许有replayCache,则不处置处罚,直接返回true    if (replay == 0) return true    // 走到这里分析可以有replayCache    // 加入到缓存数组中    enqueueLocked(value)    // buffered values的数据数量加1    bufferSize++    // 假如buffered values的数据数量凌驾了replayCache的最大容量    // 则扬弃最旧的数据    // 由于新订阅者只会从replayCache中取数据,    // 假如没有订阅者,buffered values的数据数量凌驾replayCache的最大容量没故意义    if (bufferSize > replay) dropOldestLocked()    // 重新盘算minCollectorIndex    minCollectorIndex = head + bufferSize    // 返回true    return true}在tryEmitNoCollectorsLocked方法中,假如发现当前buffered values的数据数量凌驾了replayCache的最大容量,则会扬弃最旧的数据,保持buffered values中数据的数量最大为replay。由于当有新的订阅者出现时,起首会从replayCache中获取数据,因此在buffered values中,replayCache前的数据只对已经订阅的订阅者有用,而此时又没有订阅者,因此缓存凌驾replayCache最大容量的数据只会占用更多内存,是没故意义的。
通过对tryEmitLocked方法与tryEmitNoCollectorsLocked方法的分析,可以知道数据的发射终极都调用了enqueueLocked方法,代码如下:
private fun enqueueLocked(item: Any?) {    // 获取当前缓存数组中缓存的数量    val curSize = totalSize    // 判断    val buffer = when (val curBuffer = buffer) {        // 缓存数组为空,则举行初始化,初始化容量为2        null -> growBuffer(null, 0, 2)        // 假如凌驾了当前缓存数组的最大容量,则举行扩容,新的缓存数组的容量为之前的2倍        // growBuffer方法会把原来缓存数组的数据添补到新的缓存数组中        else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer    }    // 将要发射的数据添补的缓存数组的head + curSize位置    buffer.setBufferAt(head + curSize, item)}enqueueLocked方法内部将要发射的数据添补的缓存数组的序次位置,终极完成了数据发射的过程。通过分析可以知道,数据发射的实质就是将数据添加到缓存数组中。
2)以挂起的方式发射数据

SharedFlowImpl类中实现了MutableSharedFlow接口中emit方法。在上面分析的emit方法中,起首会实验通过tryEmit方法发射数据,假如发射失败,分析发射过程需要挂起,这时会调用emitSuspend方法,代码如下:
private suspend fun emitSuspend(value: T) =   // 直接挂起emit方法地点的协程,获取续体  suspendCancellableCoroutine<Unit> sc@{ cont ->    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES    // 加锁    val emitter = synchronized(this) lock@{        // 这里再次实验以tryEmit的方式发射数据        if (tryEmitLocked(value)) {            // 假如发射乐成,则规复续体的执行            cont.resume(Unit)            // 网络已经挂起的订阅者的续体            resumes = findSlotsToResumeLocked(resumes)            // 返回            return@lock null        }        // 将续体、待发射的数据等封装成Emitter类型的对象        Emitter(this, head + totalSize, value, cont).also {            // 加入到缓存数组中            enqueueLocked(it)            // queued emitters的数据的数量加1            queueSize++            // 假如buffered values的最大容量为0,即不存在            // 则网络已经挂起的订阅者的续体,生存到局部变量resumes中            if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)        }    }    // emitter对象监听emit方法地点协程的取消    // 发生取消时会调用emitter对象的dispose方法    emitter?.let { cont.disposeOnCancellation(it) }    // 遍历,唤起挂起的订阅者    for (cont in resumes) cont?.resume(Unit)}3)叫醒挂起的订阅者

无论是在tryEmit方法,还是在emit方法,当发射数据乐成后,都会调用findSlotsToResumeLocked方法,获取已经挂起的订阅者的续体,然后规复订阅者地点协程的执行,代码如下:
private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {    // 引用参数中的续体数组    var resumes: Array<Continuation<Unit>?> = resumesIn    // 用于记录需要规复的续体的数量    var resumeCount = resumesIn.size    // 遍历订阅者数组    forEachSlotLocked loop@{ slot ->        // 获取续体,假如续体为空,分析对应订阅者的协程没有挂起,本次循环返回        val cont = slot.cont ?: return@loop        // 判断slot中index是否符合要求        // 假如不符合要求,则本次循环返回        if (tryPeekLocked(slot) < 0) return@loop        // 假如需要规复的续体的数量凌驾续体数组的容量,则举行扩容        // 新的续体数组的容量是之前续体数组容量的2倍        if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))        // 生存续体到续体数组中        resumes[resumeCount++] = cont        // 清空slot中生存的续体        slot.cont = null    }    // 返回网络完的续体数组    return resumes}5.新订阅者获取缓存数据

SharedFlowImpl类实现了SharedFlow接口,重写了此中的常量replayCache,当有新订阅者出现时,假如replayCache存在,而且有缓存数据,则优先从replayCache中获取,代码如下:
override val replayCache: List<T>    // 只能获取,不能设置,加锁    get() = synchronized(this) {        // 获取当前replayCache中缓存数据的数量        val replaySize = this.replaySize        // 假如数量为0,则返回一个空列表        if (replaySize == 0) return emptyList()        // 若数量不为0,则根据容量创建一个列表        val result = ArrayList<T>(replaySize)        // 获取缓存数组        val buffer = buffer!!        // 遍历replayCache,将数据举行类型转换,并添加到列表中        @Suppress("UNCHECKED_CAST")        for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T        // 返回列表        result    }6.热流的融合

SharedFlowImpl类实现了FusibleFlow接口,重写了此中的fuse方法,代码如下:
// 调用了fuseSharedFlow方法实现override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =    fuseSharedFlow(context, capacity, onBufferOverflow)...internal fun <T>  SharedFlow<T>.fuseSharedFlow(    context: CoroutineContext,    capacity: Int,    onBufferOverflow: BufferOverflow): Flow<T> {    // 假如容量为0或默认值,同时溢出计谋为SUSPEND    if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) {        // 返回自身        return this    }    // 其他情况,将当前的SharedFlow对象包装成ChannelFlowOperatorImpl类型的对象    return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)}在Kotlin协程:Flow的融合、Channel容量、溢出计谋中提到过,当对类型为SharedFlowImpl的对象利用某些利用符时,会触发fuse方法的执行。fuse方法默认的容量为OPTIONAL_CHANNEL,默认的溢出计谋为SUSPEND,返回自身,因此融合后还是SharedFlowImpl类型的对象。
假如容量为RENDEZVOUS,同时溢出计谋为SUSPEND时,也会返回自身。RENDEZVOUS体现容量为0,无论SharedFlowImpl类型的对象的buffered values最大容量是否为0,在外貌再套一层RENDEZVOUS是没故意义的。
其他情况下,SharedFlowImpl类型的对象会被封装成一个类型为ChannelFlowOperatorImpl的对象,根据Kotlin协程:flowOn与线程切换讲过的,之后向鄙俚发射的数据会通过Channel来发送。
7.只读热流

调用MutableSharedFlow方法,可以得到一个类型为MutableSharedFlow的对象。通过这个对象,我们可以调用它的collect方法来订阅吸收,也可以调用它的emit方法来发射数据。但大多数的时间,我们需要同一数据的发射过程,因此需要对外袒露一个只可以调用collect方法订阅而不能调用emit方法发射的对象,而不是直接袒露MutableSharedFlow类型的对象。
根据上面代码的先容,订阅的过程现实上是从缓存数组中读取数据的,而发射的过程现实上是向缓存数据中写数据,因此假如一个流只能调用collect方法而不能调用emit方法,这种流这是一种只读流。
究竟上,根据在Koltin协程:异步热数据流的计划与利用中对接口的分析可以发现,MutableSharedFlow接口继承了FlowCollector接口和SharedFlow接口,emit方法定义在FlowCollector中。SharedFlow接口继承了Flow接口,collect方法定义在Flow接口中。因此只要将MutableSharedFlow接口指向的对象转换为SharedFlow接口指向的对象就可以将读写流转换为只读流。
在代码中,对MutableSharedFlow类型的对象调用asSharedFlow方法恰好可以实现将读写流转换为只读流,代码如下:
// 该方法调用了ReadonlySharedFlow方法,返回一个类型为SharedFlow的对象public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =    // 传入当前的MutableSharedFlow类型的对象    ReadonlySharedFlow(this)// 实现了FusibleFlow接口,// 实现了SharedFlow接口,而且利用上一步传入的MutableSharedFlow类型的对象作为代理private class ReadonlySharedFlow<T>(    flow: SharedFlow<T>) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {    // 用于流融合,也是通过fuseSharedFlow方法实现    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =        fuseSharedFlow(context, capacity, onBufferOverflow)}
作者:李萧蝶
链接:https://juejin.cn/post/7144692583535476773
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-10-18 16:44, Processed in 0.166678 second(s), 35 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表