Flow 操作符 shareIn 和 stateIn 使用须知

Flow.shareInFlow.stateIn 操作符可以将冷流转换为热流: 它们可以将来自上游冷数据流的信息广播给多个收集者。这两个操作符通常用于提升性能: 在没有收集者时加入缓冲;或者干脆作为一种缓存机制使用。

注意 : 冷流 是按需创建的,并且会在它们被观察时发送数据;热流 则总是活跃,无论是否被观察,它们都能发送数据。

本文将会通过示例帮您熟悉 shareIn 与 stateIn 操作符。您将学到如何针对特定用例配置它们,并避免可能遇到的常见陷阱。

底层数据流生产者

继续使用我 之前文章 中使用过的例子——使用底层数据流生产者发出位置更新。它是一个使用 callbackFlow 实现的 冷流。每个新的收集者都会触发数据流的生产者代码块,同时也会将新的回调加入到 FusedLocationProviderClient

class LocationDataSource(
    private val locationClient: FusedLocationProviderClient
) {
    val locationsSource: Flow<Location> = callbackFlow<Location> {
        val callback = object : LocationCallback() {
            override fun onLocationResult(result: LocationResult?) {
                result ?: return
                try { offer(result.lastLocation) } catch(e: Exception) {}
            }
        }
        requestLocationUpdates(createLocationRequest(), callback, Looper.getMainLooper())
            .addOnFailureListener { e ->
                close(e) // in case of exception, close the Flow
            }
        // 在 Flow 结束收集时进行清理
        awaitClose {
            removeLocationUpdates(callback)
        }
    }
}

让我们看看在不同的用例下如何使用 shareIn 与 stateIn 优化 locationsSource 数据流。

shareIn 还是 stateIn?

我们要讨论的第一个话题是 shareInstateIn 之间的区别。shareIn 操作符返回的是 SharedFlowstateIn 返回的是 StateFlow

注意 : 要了解有关 StateFlowSharedFlow 的更多信息,可以查看 我们的文档

StateFlow 是 SharedFlow 的一种特殊配置,旨在优化分享状态: 最后被发送的项目会重新发送给新的收集者,并且这些项目会使用 Any.equals 进行合并。您可以在 StateFlow 文档 中查看更多相关信息。

两者之间的最主要区别,在于 StateFlow 接口允许您通过读取 value 属性同步访问其最后发出的值。而这不是 SharedFlow 的使用方式。

提升性能

通过共享所有收集者要观察的同一数据流实例 (而不是按需创建同一个数据流的新实例),这些 API 可以为我们提升性能。

在下面的例子中,LocationRepository 消费了 LocationDataSource 暴露的 locationsSource 数据流,同时使用了 shareIn 操作符,从而让每个对用户位置信息感兴趣的收集者都从同一数据流实例中收集数据。这里只创建了一个 locationsSource 数据流实例并由所有收集者共享:

class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource.shareIn(externalScope, WhileSubscribed())
}

WhileSubscribed 共享策略用于在没有收集者时取消上游数据流。这样一来,我们便能在没有程序对位置更新感兴趣时避免资源的浪费。

Android 应用小提醒! 在大部分情况下,您可以使用 WhileSubscribed(5000),当最后一个收集者消失后再保持上游数据流活跃状态 5 秒钟。这样在某些特定情况 (如配置改变) 下可以避免重启上游数据流。当上游数据流的创建成本很高,或者在 ViewModel 中使用这些操作符时,这一技巧尤其有用。

缓冲事件

在下面的例子中,我们的需求有所改变。现在要求我们保持监听位置更新,同时要在应用从后台返回前台时在屏幕上显示最后的 10 个位置:

class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource
            .shareIn(externalScope, SharingStarted.Eagerly, replay = 10)
}

我们将参数 replay 的值设置为 10,来让最后发出的 10 个项目保持在内存中,同时在每次有收集者观察数据流时重新发送这些项目。为了保持内部数据流始终处于活跃状态并发送位置更新,我们使用了共享策略 SharingStarted.Eagerly,这样就算没有收集者,也能一直监听更新。

缓存数据

我们的需求再次发生变化,这次我们不再需要应用处于后台时 持续 监听位置更新。不过,我们需要缓存最后发送的项目,让用户在获取当前位置时能在屏幕上看到一些数据 (即使数据是旧的)。针对这种情况,我们可以使用 stateIn 操作符。

class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource.stateIn(externalScope, WhileSubscribed(), EmptyLocation)
}

Flow.stateIn 可以缓存最后发送的项目,并重放给新的收集者。

注意!不要在每个函数调用时创建新的实例

切勿 在调用某个函数调用返回时,使用 shareIn 或 stateIn 创建新的数据流。这样会在每次函数调用时创建一个新的 SharedFlow 或 StateFlow,而它们将会一直保持在内存中,直到作用域被取消或者在没有任何引用时被垃圾回收。

class UserRepository(
    private val userLocalDataSource: UserLocalDataSource,
    private val externalScope: CoroutineScope
) {
    // 不要像这样在函数中使用 shareIn 或 stateIn 
    // 这将在每次调用时创建新的 SharedFlow 或 StateFlow,而它们将不会被复用。
    fun getUser(): Flow<User> =
        userLocalDataSource.getUser()
            .shareIn(externalScope, WhileSubscribed())    

    // 可以在属性中使用 shareIn 或 stateIn 
    val user: Flow<User> = 
        userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
}

需要入参的数据流

需要入参 (如 userId) 的数据流无法简单地使用 shareInstateIn 共享。以开源项目——Google I/O 的 Android 应用 iosched 为例,您可以在 源码中 看到,从 Firestore 获取用户事件的数据流是通过 callbackFlow 实现的。由于其接收 userId 作为参数,因此无法简单使用 shareInstateIn 操作符对其进行复用。

class UserRepository(
    private val userEventsDataSource: FirestoreUserEventDataSource
) {
    // 新的收集者会在 Firestore 中注册为新的回调。
    // 由于这一函数依赖一个 `userId`,所以在这个函数中
    // 数据流无法通过调用 shareIn 或 stateIn 进行复用.
    // 这样会导致每次调用函数时,都会创建新的  SharedFlow 或 StateFlow
    fun getUserEvents(userId: String): Flow<UserEventsResult> =
        userLocalDataSource.getObservableUserEvents(userId)
}

如何优化这一用例取决于您应用的需求:

  • 您是否允许同时从多个用户接收事件?如果答案是肯定的,您可能需要为 SharedFlowStateFlow 实例创建一个 map,并在 subscriptionCount 为 0 时移除引用并退出上游数据流。
  • 如果您只允许一个用户,并且收集者需要更新为观察新的用户,您可以向一个所有收集者共用的 SharedFlowStateFlow 发送事件更新,并将公共数据流作为类中的变量。

shareInstateIn 操作符可以与冷流一同使用来提升性能,您可以使用它们在没有收集者时添加缓冲,或者直接将其作为缓存机制使用。小心使用它们,不要在每次函数调用时都创建新的数据流实例——这样会导致资源的浪费及预料之外的问题!