引子
Android应用步调寿命与其扩展的机动性有关,由于它必要一个结实的根本,这就是为什么对于每个项目来说,最紧张的步调是创建应用步调架构,在与技能团队就界说系统中包罗的元素、每个元素的功能以及它们将怎样相互通讯举行长时间的讨论后,我们必须对团体架构举行清楚的设计。
Android应用步调有不同的架构,与我客岁的履历不同,MVVM和MVI架构是用于大型应用步调的最常见架构,即使每个应用步调都没有一种实现方式,这取决于其应用步调需求,以及处置惩罚它的开发职员风格,由于我信赖,独立于Android框架,每个开发职员都有他们独特的软件开发履历,他们带来的不但仅是他们的知识,尚有他们独特的头脑方式、办理标题和设计代码的方式。
业务场景是如许的:从网络拉取 Feeds 流并长期化在数据库中,以便下次启动时可先展示本地数据,待哀求返回后再革新 Feeds。
现援引上一篇的办理方案:
// 实现访问网络和数据库的细节class NewsRepository(context: Context) { // 使用 Retrofit 构建哀求访问网络 private val retrofit = Retrofit.Builder() .baseUrl("https://api.apiopen.top") .addConverterFactory(MoshiConverterFactory.create()) // 将返回数据构造成 LiveData .addCallAdapterFactory(LiveDataCallAdapterFactory()) .client(OkHttpClient.Builder().build()) .build() private val newsApi = retrofit.create(NewsApi::class.java) private var executor = Executors.newSingleThreadExecutor() // 使用 room 访问数据库 private var newsDatabase = NewsDatabase.getInstance(context) private var newsDao = newsDatabase.newsDao() // 用于将消息流转达给上层的 LiveData private var newsLiveData = MediatorLiveData<List<News>>() fun fetchNewsLiveData(): LiveData<List<News>?> { // 从数据库获取消息 val localNews = newsDao.queryNews() // 从网络获取消息 val remoteNews = newsApi.fetchNewsLiveData( mapOf("page" to "1", "count" to "4") ) .let { Transformations.map(it) { response: ApiResponse<NewsBean>? -> when (response) { is ApiSuccessResponse -> { val news = response.body.result // 将网络消息入库 news?.let {executor.submit { newsDao.insertAll(it) }} news } else -> null } } } // 将数据库和网络相应的 LiveData 归并 newsLiveData.addSource(localNews) {newsLiveData.value = it} newsLiveData.addSource(remoteNews) {newsLiveData.value = it} return newsLiveData }}这是 Clean Architecture 中的 Repository,它提供数据访问本领,隐蔽了访问网络和数据库的细节。
关于 Clean Architecture 的具体表明可以点击[我是怎么把业务代码越写越复杂的 | MVP - MVVM - Clean Architecture
为了使用 LiveData 承载整个数据链路,Retrofit 增长了 LiveDataCallAdapterFactory,它使得接口能直接返回 LiveData:
interface NewsApi { @POST("/getWangYiNews") fun fetchNewsLiveData( @FieldMap map:Map<String,String> )iveData<ApiResponse<NewsBean>>}Room 也支持将数据库查询内容 LiveData 化:
@Daointerface NewsDao { @Query("select * from news") fun queryNews(): LiveData<List<News>?>}网络 & 数据库 Flow 化
数据链路 Flow 化从链路源头开始。
Room 支持以 Flow 形式返回查询结果:
@Daointerface NewsDao { @Query("select * from news") fun queryNewsFlow(): Flow<List<News>?>}Retrofit 并未支持 Flow 形式的接口返回值,于是在 GitHub 上找了一遍,有是有,但 star 数都很少,不太敢用。正在夷由之际,看到了下面 retrofit 官方的复兴:[[Feature Request] Support adapter for Kotlin Coroutine Flow · Issue #3497 · square/retrofit (github.com)
有人提 issue 盼望 retrofit 官方支持接口 Flow 化,但作者复兴说网络哀求返回的是“一个异步结果”而不是“一串异步结果”,所以suspend就够用了。假如想要将接口 Flow 化,可以如许做:
flow { emit(getPosts())}作者接着说:“假如有机会重写 RxJava 的 call adapter,大概也不会支持接口 Observable 化。”
醍醐灌顶,立马照做:
interface NewsApi { @POST("/getWangYiNews") suspend fun fetchNews(@FieldMap map:Map<String,String>): NewsBean}将接口界说为suspend方法。查询数据库内容也应该这么改:
@Daointerface NewsDao { @Query("select * from news") suspend fun queryNewsSuspend(): List<News>}着实若将查询数据库的结果界说为 Flow 的话,每当数据库内容发生增删,Flow 的订阅者都会收到关照。相较于“多个异步结果”,当前场景使用“单个异步结果”更符合。
将访问数据库及哀求网络在 Repository 中转化成流:
class NewsRepo() { // 访问网络的 Flow(冷流:此时并未发生网络哀求) fun remoteNewsFlow(page: Int, count: Int) = suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) } .asFlow() // 将 suspend 代码块转换成流 .map { newsBean -> if (newsBean.code == 200) { // 哀求乐成,更新缓存 if (!newsBean.result.isNullOrEmpty()) { newsDao.deleteAllNews() newsDao.insertAll(newsBean.result.map { it.toNews() }) newsBean.result } else { emptyList() } } else { throw Exception(newsBean.message) } } // 访问数据库的 Flow(冷流:此时并未发生数据库查询) val localNewsOneShotFlow = flow { val news = newsDao.queryNewsSuspend() val newsList = news.map { it.convert() }// 将数据库数据同一为网络数据 emit(newsList) }}在 Flow 数据链路的场景下,Repository 作为数据链路的出发点,提供给上层的是“原始的冷流”。
代码中固然调用了访问网络和查询数据库的方法,但是它们是被界说在“冷流”中的,若未发生订阅活动,就不会实验。订阅活动通常是在界面中举行。
变换 & 合流
当链路用 LiveData 表达时,访问数据库和网络的操纵被界说在一个 Repository 的方法中:
class NewsRepository(context: Context) { fun fetchNewsLiveData(): LiveData<List<News>?> { // 1.从数据库获取消息 val localNews = newsDao.queryNews() // 2.从网络获取消息 val remoteNews = newsApi.fetchNewsLiveData(mapOf("page" to "1", "count" to "4")) // 3.将数据库和网络相应的 LiveData 归并 newsLiveData.addSource(localNews) {newsLiveData.value = it} newsLiveData.addSource(remoteNews) {newsLiveData.value = it} return newsLiveData }}并且它们是串行的,即只有当数据库访问竣过后才开始网络哀求,末了再将它们通过 MediatorLiveData 合流。
而使用流时,数据库和网络操纵被界说在不同的流中,这为它们提供了更机动的合流方式。
串行合流
串行合流的思绪是将多个流构造成“嵌套流”,然后将它们“展平”。
拿 List 举例,List.flat()提供了在列表上的展平操纵,flat 即展平,为啥要展平?由于有嵌套,比如List<List<Int>>,即 List 中每个元素照旧 List:
val lists = listOf( listOf(1,2,3), listOf(4,5,6))Log.v("ttaylor","${lists.flatten()}") //[1, 2, 3, 4, 5, 6]Log.v("ttaylor","${lists.flatMap { it.map { it+1 } }}") //[2, 3, 4, 5, 6, 7]List.flat() 将两层嵌套布局酿成单层布局,而List.flatMap()在展平的同时提供了变换内部 List 的机会。
流也提供了雷同的展平方法flattenConcat():
flowOf( flow { emit(1) emit(2) }, flow { emit(3) }, flow { emit(4) },).flattenConcat().collect { Log.v("ttaylor", "${it}") // 1,2,3,4}flattenConcat() 的合流是串行的,即只有消耗了前一个流中全部的数据后才会消耗后一个流。
在 ViewModel 层对原始数据流举行合流:
// 消息 ViewModel 持有 repoclass NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { fun newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenConcat() // 串行合流 .map { NewsModel(it, false) }}// 通过 ViewModelProvider.Factory 界说构建 ViewModel 的细节(注入Repository)class NewsViewModelFactory(private val newsRepo: NewsRepo) : ViewModelProvider.Factory { override fun <T : ViewModel> create(modelClass: Class<T>): T { return NewsViewModel(newsRepo) as T }}在 Repository + Flow 的加持下,ViewModel 变得非常简单,它持有原始数据流并对其举行合流以及变换。
两个原始数据流分别是数据库流和网络流,使用flowOf()将它们构造成Flow<Flow<News>>嵌套布局,然后调用 flattenConcat() 将它们串行合流并展平酿成一个流,即先查询数据库,待查询完毕后才哀求网络。合流之后还举行了数据变换,以将网络数据转换为界面数据 NewsModel:
data class NewsModel( val news: List<News>, // 消息列表 val loading: Boolean, // 是否正在加载 val errorMessage: String = "" // 错误信息)将消息列表举行如许包装的目标是实现“唯一可信数据源”,这是 MVI 的关键词之一。关于它的具体先容可以点击Android 架构最新盼望 | MVI = 相应式编程 + 单向数据流 + 唯一可信数据源(该篇和本文同时发布,若链接无法跳转,大概是还未过审,请稍等~)
并行合流
串行合流中网络哀求必须期待数据库查询,若两者能并行,则性能就会更好一点。
flattenMerge()方法就用于多流并发的场景:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { fun newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() // 并行合流 .map { NewsModel(it, false) }}此时数据库和网络流会并发启动,性能是好了,但也产生了新标题。
每个流生成的数据会合成到一个流中并关照界面革新。若数据库流老师成数据,让用户先看到缓存消息,然后网络流再生成数据,用新数据把老数据刷掉。这个流程是符合预期的。但万一数据库抽风了,比网络还慢咋办?这就会发生老数据刷掉新数据的 bug。
办理方案是:当吸收到网络流的数据时,就丢弃流上背面的数据。
在 RxJava 中有一个操纵符叫takeUntil()就是用来形貌这个场景的。
但 Kotlin Flow 并未提供这个方法。。。于是我开始在网上找。。。直到我发现了这个官方复兴:Flow.transformWhile operator · Issue #2065 · Kotlin/kotlinx.cor…
官方说不会提供 takeUntil() 方法。由于 Kotlin Flow 的设计原则是“简单”,只提供须要的和高度机动性的方法,以便自界说。Kotlin Flow 中以transform开头的方法都是高度机动的,它们通常用来界说其他操纵符。在Kotlin 异步 | Flow 应用场景及原理中分析过Flow.transform()方法的机动性。现在来看下transformWhile():
public fun <T, R> Flow<T>.transformWhile( transform: suspend FlowCollector<R>.(value: T) -> Boolean // 这 lambda 带有数据发射本领): Flow<R> = safeFlow { // 举行有条件的转发流数据,条件便是 transform return@safeFlow collectWhile { value -> transform(value) } }// 有条件的网络流数据internal suspend inline fun <T> Flow<T>.collectWhile( crossinline predicate: suspend (value: T) -> Boolean) { // 自界说流网络器,形貌怎样发射数据 val collector = object : FlowCollector<T> { override suspend fun emit(value: T) { // 当满意条件时才发射数据,否则丢弃流今后的数据 if (!predicate(value)) { throw AbortFlowException(this) } } } try { collect(collector)// 网络上游流并通过自界说的方式转发给卑鄙 } catch (e: AbortFlowException) { e.checkOwnership(collector) }}transformWhile() 的套路依然是拦截转发机制,即新建卑鄙流,它生产数据的方式是通过网络上游数据,并将数据转发到一个带有发射数据本领的 lambda 中,当前这个 lambda 必要有一个返回值,该值决定了是否要制止上游流数据的生产。
现在的标题转化为,怎样让网络流告诉数据库流“我已经生成数据了你歇菜吧~”
“流的通讯”,听上去有点高大上,但转念一想,是我把标题想复杂了。由于网络和数据库流已经在 ViewModel 层合流了,它们并成一个流了,活动的是List<News>,在这个数据布局上套一层就能实现所谓的“流通讯”:
// 消息流包装类data class NewsFlowWrapper( val news: List<News>,// 消息列表 val abort: Boolean // 是否停止流)用 NewsFlowWrapper 改造下 NewsRepo:
class NewsRepo(context: Context) { val localNewsFlow = flow { val news = newsDao.queryNewsSuspend() val newsList = news.map { it.convert() } // 使用 NewsFlowWrapper 包装数据库流 emit(NewsFlowWrapper(newsList, false)) } fun remoteNewsFlow(page: Int, count: Int) = suspend { newApi.fetchNews(mapOf("page" to page, "count" to count)) } .asFlow() .map { newsBean -> if (newsBean.code == 200) { if (!newsBean.result.isNullOrEmpty()) { newsDao.deleteAllNews() newsDao.insertAll(newsBean.result.map { it.toNews() }) // 网络哀求乐成时,停止流 NewsFlowWrapper(newsBean.result, true) } else { NewsFlowWrapper(emptyList(), false) } } else { throw Exception(newsBean.message) } }}接着用 transformWhile() 改造一下 ViewModel 层的合流:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { fun newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news)// 总是直接转发上游数据 ,直到 abort 为 true !it.abort } .map { NewsModel(it, false) }}就如许自界说了一个新操纵符用于流通讯。
在讨论到用 Kotlin Flow 取代 RxJava 的时间,有一种声音说“相比 RxJava,Kotlin Flow 的操纵符还很匮乏,有待丰富~”。我倒是以为这是 RxJava 的劣势,Kotlin Flow 的上风。RxJava 让人最望而却步的正是由于复杂性,品种繁多的“流”、琳琅满目标操纵符、以及 Rx 版的回调地狱。Kotlin Flow 的战略是简单 + 高机动性。
如许一来,用 Flow 重构的数据链路上,Repository 和 ViewModel 的边界就很清楚了:Repository 提供原始的数据流,以供 ViewModel 用各种本身喜好的方式举行合流及变换。
异步化
若直接在界面中网络上述消息流的话,步调会 crash,提示不能在主线程操纵数据库。
全部在流中的操纵,默认环境下都是实验在主线程的。
将流中的操纵异步化也很简单:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { fun newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) // 将全部上游操纵都分发到 IO 线程实验}在 LiveData 承载数据链路的版本中,需自行启动线程池实验数据库操纵(网络操纵的异步化由OkHttp实现)。
当用 Flow 构造数据库流和网络流时,只需一个方法就能实现异步化,无疑大大地低落了复杂度。
捕获非常
使用catch()可以捕获全部上游抛出的非常:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { fun newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsOneShotFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) .catch { // 捕获自界说非常并向流发送消息 if (it is YourException) emit(NewsModel(emptyList(),false,"network error,show old news")) }机动的是,在捕获非常之后还可以继续向流发送数据。比如当网络非常时,向界面发送一个带有 errorMessage 的 Model,界面根据此字段决定是否展示错误 toast。也可以在这里处置惩罚和服务端约定的特别错误码。
感知生命周期
流预备地差不多了,下一步就是让界面网络流并革新:
class NewsActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 网络消息流并展示 lifecycleScope.launch { newsViewModel.newsFlow(1, 8).collect { showNews(it) } } } // 如许革新界面是 MVI 提倡的 private fun showNews(newsModel: NewsModel) { when { // 展示 loading newsModel.loading -> { showLoading() } newsModel.errorMessage.isEmpty() -> { dismissLoading() // 将消息展示在 RecyclerView 上 } // 展示错误提示 else -> { dismissLoading() showErrorMessage(newsModel.errorMessage) } } }}此中展示/遣散 loading 的方法界说如下:
// 展示 loadingfun Activity.showLoading() { contentView()?.apply { ProgressBar { layout_id = "pb" layout_width = 50 layout_height = 50 layout_gravity = gravity_center } }}// 遣散 loadingfun Activity.dismissLoading() { val pb = contentView()?.find<rogressBar>("pb") pb?.let { contentView()?.removeView(it) }}// 获取 Activity 的 content viewfun Activity.contentView(): FrameLayout? = takeIf { !isFinishing && !isDestroyed }?.window?.decorView?.findViewById(android.R.id.content)展示 loading 即向当前 Activity 的 contentView 添加一个子 View,遣散 loading 便是移除该子 View。此中使用了 DSL 声明式地构建了界面,具体先容可以点击Android性能优化 | 把构建布局用时紧缩 20 倍(下)。
如许写会有一个坑,若消息流由于各种缘故因由迟迟未生成消息列表,此时用户切换到另一个页面,不久后消息流有数据了,数据被推到界面,就发生了 crash,由于要革新的界面已不再前台。
lifecycleScope
刚才是在lifecycleScope网络消息流的,它是一个和生命周期对象绑定的协程域:
public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope get() = lifecycle.coroutineScopepublic val Lifecycle.coroutineScope: LifecycleCoroutineScope get() { while (true) { // 获取现有 lifecycleScope val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl? if (existing != null) { return existing } // 若没有现成的,则构建 val newScope = LifecycleCoroutineScopeImpl( this, SupervisorJob() + Dispatchers.Main.immediate ) // 并通过 cas + 自旋的方式包管存入 mInternalScopeRef if (mInternalScopeRef.compareAndSet(null, newScope)) { // 开始观察生命周期变革 newScope.register() return newScope } } }lifecycleScope 是一个LifecycleCoroutineScope实例,并以 Lifecycle 对象的扩展属性存在。之所以能如许做是由于 Lifecycle 开了后门:
public abstract class Lifecycle { // 后门,方便在类的外存取“附加值” AtomicReference<Object> mInternalScopeRef = new AtomicReference<>();}这种动态为类新增属性的方法,在 Kotlin 源码中很常见。
新建 LifecycleCoroutineScope 实例后,会就地调用 register() 方法观察生命周期变革:
internal class LifecycleCoroutineScopeImpl( override val lifecycle: Lifecycle, override val coroutineContext: CoroutineContext) : LifecycleCoroutineScope(), LifecycleEventObserver { fun register() { launch(Dispatchers.Main.immediate) { // 开始观察生命周期 if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) { lifecycle.addObserver(this@LifecycleCoroutineScopeImpl) } else { coroutineContext.cancel() } } } override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) { // 当生命周期为 DESTROYED 时,取消观察并取消协程中 job 的实验 if (lifecycle.currentState <= Lifecycle.State.DESTROYED) { lifecycle.removeObserver(this) coroutineContext.cancel() } }}lifecycleScope.launch() 会立即启动协程,并在生命周期 DESTROYED 时取消协程。
当 Activity 被另一个 Activity 遮挡时并不会 DESTROYED,所以此时如有流数据推过来照旧可以更新到界面,并导致 crash。
flowWithLifecycle()
为此官方提供了flowWithLifecycle():
public fun <T> Flow<T>.flowWithLifecycle( lifecycle: Lifecycle, minActiveState: Lifecycle.State = Lifecycle.State.STARTED): Flow<T> = callbackFlow { lifecycle.repeatOnLifecycle(minActiveState) { this@flowWithLifecycle.collect { send(it) } } close()}flowWithLifecycle() 内部生成了一个中心消耗者callbackFlow,中心消耗者会将上游数据转发给卑鄙,不外是有条件的,只有当生命周期满意要求时才会转发。
此中的 repeatOnLifecycle() 是 Lifecycle 的扩展方法:
public suspend fun Lifecycle.repeatOnLifecycle( state: Lifecycle.State, block: suspend CoroutineScope.() -> Unit) { ... }repeatOnLifecycle() 会在新的协程实验 block,当且仅当生命周期至少到达 state 状态,若生命周期未达标,则会取消 block 实验,若再次达标,则再次实验。
让 Flow 感知生命周期的写法如下:只有当生命周期满意要求时,才网络上游并转发给卑鄙,否则取消网络:
class NewsActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 以感知生命周期的方式网络消息流 lifecycleScope.launch { repeatOnLifecycle(Lifecycle.State.STARTED) { newsViewModel.newsFlow(1, 8).collect { showNews(it) } } } }}嵌套回调出现了,看上去有点复杂。 还好有扩展方法,可以把这些细节隐蔽起来:
// 用感知生命周期的方式网络流fun <T> Flow<T>.collectIn( lifecycleOwner: LifecycleOwner, minActiveState: Lifecycle.State = Lifecycle.State.STARTED, action: (T) -> Unit): Job = lifecycleOwner.lifecycleScope.launch { flowWithLifecycle(lifecycleOwner.lifecycle, minActiveState).collect(action)}然后就可以像如许在界面中网络消息流:
class NewsActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) newsViewModel.newsFlow(1, 8).collectIn(this) { showNews(it) } }}超轻便,把 LiveData 又比下去了~
这个方法需留意调用次序,当不满意生命周期时,它只会取消订阅上游的数据,若卑鄙尚有另一流在生成数据,则无法感知生命周期。(封装的collectIn()包管了它是网络数据前的末了一个操纵符)
避免重复触发冷流
按照上面的写法,照旧有标题。当重消息界面跳转到另一个界面再返回时,会重新查数据库,重新哀求网络。。。
由于 Repository 提供的数据库和网络流都是“冷流”。冷流只有被网络之后才会生产数据,且冷流是没有地方存数据的,当数据从上游颠末多少个中心消耗者末了转达给订阅者,数据被展示在界面上,但整个数据链路上没有一个地方把数据存了下了。
又由于使用了repeatOnLifecycle(Lifecycle.State.STARTED),所以从另一个界面返回时,重新订阅了冷流,那它就绝不包涵地开始重新生产数据。
SharedFlow
对于这种场景,办理方案是:让冷流共享,即多个订阅也不会触发冷流重新生产数据,最好能让冷流的数据被缓存,如许就能将最新的数据粘性地转达给新订阅者。
SharedFlow由此而生:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { val newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) .catch { if (it is YourException) emit(NewsModel(emptyList(),false,"network error,show old news")) } // 将流转换为 SharedFlow .shareIn(viewModelScope, SharingStarted.Lazily)}使用shareIn()将冷流转换成共享热流:
public fun <T> Flow<T>.shareIn( scope: CoroutineScope, started: SharingStarted,// 启动战略 replay: Int = 0 // 缓存巨细,默认不缓存(非粘性)): SharedFlow<T> {...}shareIn 是 Flow 的扩展方法:
- started参数是启动战略,它决定了上游流的生命周期,SharingStarted.Lazily实用于当前的场景,即当共享热流有订阅者时才启动上游流,上游流将不停存在世。
- replay参数决定了缓存的巨细,若为1,表现会缓存最新的1个值,当有新订阅者,会将缓存值分发给它,实现粘性结果(同 LiveData)。默以为0不缓存。
可以把 SharedFlow 想象成一个中心消耗者,它网络上游流的数据并将其推送到热流中,然后将这些数据缓存并分享给全部的卑鄙订阅者。
StateFlow
StateFlow 是一个特别的 SharedFlow,它是 Kotlin Flow 中更像 LiveData 的存在。由于:
- StateFlow 总是会缓存1个最新的数据,上游流产生新数据后就会覆盖旧值(LiveData 也是)。
- StateFlow 持有一个 value 字段,可通过stateFlow.value读取最新值(LiveData 也是)。
- StateFlow 是粘性的,会将缓存的最新值分发给新订阅者(LiveData 也是)。
- StateFlow 必须有一个初始值(LiveData 不是)。
- StateFlow 会过滤重复值,即新值和旧值雷同时不更新。(LiveData 不是)。
可以使用stateIn()重写消息流:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { val newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) .catch { if (it is YourException) emit(NewsModel(emptyList(),false,"network error,show old news")) } // 将流转换为 StateFlow .stateIn(viewModelScope, SharingStarted.Lazily, NewsModel(emptyList(), true))}stateIn() 中的第三个参数就是必须有的初始值,当 Repository 的原始数据流未生成数据时,初始值就已经推送给了订阅者,界面可以借此展示 loading。
若使用 shareIn(),则可以如许展示 loading:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { val newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) .onStart { emit(NewsModel(emptyList(), true)) }// 展示loading .catch { if (it is YourException) emit(NewsModel(emptyList(),false,"network error,show old news")) } // 将流转换为 SharedFlow .shareIn(viewModelScope, SharingStarted.Lazily)}使用onStart(),它会在流被网络时立即发生一个数据。
到底使用 StateFlow 照旧 SharedFlow?得看场景:
- 当需在流以外的地方访问流的最新值,则用 StateFlow。
- 当需过滤重复值,则用 StateFlow(在 SharedFlow 上用 distinctUntilChanged() 结果雷同)。
- 在需粘性的场景下,则用 StateFlow(将 SharedFlow 的 replay 置为1结果雷同)。
我试图找到更多使用 StateFlow 的来由,但就像你看到的那样,大部分来由都不充实。只有第一个场景下,必用 StateFlow 不可。其他都可用 SharedFlow 取代,而且后者提供了更大的机动性。
MVI 化
上面的代码已经比较接近 MVI 的容貌了。
MVI 有三个关键词:相应式编程 + 单向数据流 + 唯一可信数据源。
现援引“单向数据流”图片如下:
界面产生的数据叫变乱(意图)Intent,它流向 ViewModel,经加工后转换成 状态State供界面革新。
sealed class FeedsIntent { // Feeds 初始化 data class InitIntent(val type: Int, val count: Int) : FeedsIntent() // Feeds 加载更多 data class MorePageIntent(val timestamp: Long, val count: Int) : FeedsIntent() // 删除某个帖子 data class RemoveIntent(val id: Long) : FeedsIntent()}原来界面发起的变乱是通过 ViewModel 的一个方法调用转达的。为了使用相应式编程形成数据流,得把函数调用用“数据”的形式包装起来。
变乱产生自界面,所以变乱流理所固然在界面构造:
class StateFlowActivity : AppCompatActivity() { private val refreshLayout: RefreshLayout // 在界面层构造变乱流 private val intents by lazy { merge( // 加载 Feeds 首页变乱 flowOf(FeedsIntent.InitIntent(1, 5)) // 加载更多 Feeds 变乱 loadMoreFeedsFlow() ) } private fun loadMoreFeedsFlow(): Flow<FeedsIntent> = callbackFlow { refreshLayout.setOnRefreshListener { trySend(FeedsIntent.MorePageIntent) } awaitClose() }}上述代码包罗了两个变乱,分别是加载首页和加载更多,它俩都被构造成流,并使用 merge 举行合流,merge 会将每个 Flow 中的数据合起来并发地转发到一个新的流上。
当流被订阅后,加载首页的变乱会立即产生并无条件的分发给卑鄙,而加载更多变乱需期待上拉动作发生时才会生成。
class StateFlowActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } private val intents by lazy { merge( flowOf(FeedsIntent.InitIntent(1, 5)) loadMoreFeedsFlow() ) } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 订阅变乱流,将变乱转达给 ViewModel intents .onEach(newsViewModel::send) // .onEach { newsViewModel.send(it) } 结果一样 .launchIn(lifecycleScope) }}在 onCreate() 订阅变乱流,每产生一个变乱都会调用 NewsViewModel.send() 方法将变乱转达给 ViewModel。此中::用于将一个方法变为 lambda,方法就可以作为参数传给另一个方法,以简化代码。
NewsViewModel.send() 方法界说如下:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { // 用于吸收界面变乱的共享流 private val _feedsIntent = MutableSharedFlow<FeedsIntent>() // 界面变乱唯一入口,向流中发送变乱 fun send(intent: FeedsIntent) { viewModelScope.launch { _feedsIntent.emit(intent) } }}现在界面变乱已经以数据流Flow<FeedsIntent>的方式流入了 ViewModel,下一步就是在流上举行数据变换,即流入的是 Intent,流出的是 State。遂界说一个将Flow<FeedsIntent>转化成Flow<NewsState>的扩展方法:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { // 将变乱转换成状态(NewsState便是上面的NewsModel,换了个名字而已) private fun Flow<FeedsIntent>.toNewsStateFlow(): Flow<NewsState> = merge( // 加载首页变乱处置惩罚 filterIsInstance<FeedsIntent.InitIntent>() .flatMapConcat { it.toFetchInitPageFlow() }, // 删除帖子变乱处置惩罚 filterIsInstance<FeedsIntent.RemoveIntent>() .flatMapConcat { ... }, // 加载更多变乱处置惩罚 filterIsInstance<FeedsIntent.MorePageIntent>() .flatMapConcat { ... } )}每一个上游的FeedsIntent都会在这里被转换成一个Flow<NewsState>,就形成了Flow<Flow<NewsState>>如许的布局,然后用 flatMapConcat() 将其展平酿成Flow<NewsState>。
由于有多种变乱,遂使用 filterIsInstance() 按变乱范例过滤,实现了变乱分流,便是用流的方式写 if-else。
此中toFetchInitPageFlow()形貌了怎样将加载首页变乱转换成Flow<NewsState>:
// NewsViewModel.ktprivate fun FeedsIntent.InitIntent.toFetchInitPageFlow() = flowOf( newsRepo.localNewsOneShotFlow, newsRepo.remoteNewsFlow(this.type, this.count) ) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsState(it, false) } .onStart { emit(NewsState(emptyList(), true)) } .catch { if (it is SSLHandshakeException) emit( NewsState( emptyList(), false, "network error,show old news" ) ) }转化的方法便是拉取数据库以及网络(就是把之前界说好的数据库网络合流拿过来)。
是时间把变乱流以及它的变换操纵合起来了:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { // 变乱流 private val _feedsIntent = MutableSharedFlow<FeedsIntent>() // 状态流 val newsState = _feedsIntent .toNewsStateFlow() // 将变乱流转换成状态流 .flowOn(Dispatchers.IO) // 异步地举行变换操纵 .shareIn(viewModelScope, SharingStarted.Eagerly) // 将流转换成共享流以供界面订阅}末了界面观察状态流:
class StateFlowActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } // 构造界面变乱 private val intents by lazy { merge( flowOf(FeedsIntent.InitIntent(1, 5)) loadMoreFeedsFlow() ) } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 数据流出发点:发送变乱 intents .onEach(newsViewModel::send) .launchIn(lifecycleScope) // 数据流尽头:消耗状态 newsViewModel.newsState .collectIn(this) { showNews(it) } } // 渲染界面 private fun showNews(newsModel: NewsState) { when { newsModel.loading -> { showLoading() } newsModel.errorMessage.isEmpty() -> { dismissLoading() newsAdapter.news = newsModel.news rvNews.adapter = newsAdapter } else -> { dismissLoading() tv.text = newsModel.errorMessage } } }}(这里的 MVI 是一个半制品,比如该代码布局就无法实现“上拉加载更多”这个需求,后续文章会在此根本上做重构,欢迎连续关注~)
LiveData vs Flow
对于承载数据来说,Kotlin Flow 相较于 LiveData 只能说有过之而无不及:
- LiveData 不能方便地支持异步化。
- LiveData 粘性标题标办理方案固然很多,但用起来都很变扭。
- LiveData 大概发生数据丢失的环境。
- LiveData 的数据变换本领远远不如 Flow。
- LiveData 多数据源的合流本领远远不如 Flow。
除此之外,Flow 尚有一点非常吸引人,那就是 轻便,Flow 可以用及其轻松简单的方式实现复杂的结果,代码的复杂度斗降,可读性斗升。更紧张的是,这是局面所趋,还在夷由什么~
作者:唐子玄
链接:https://juejin.cn/post/7087718088681979934
|