【问题标题】:Firebase realtime snapshot listener using Coroutines使用协程的 Firebase 实时快照监听器
【发布时间】:2019-08-22 21:43:29
【问题描述】:

我希望能够在我的 ViewModel 中使用 Kotlin 协程来收听 Firebase DB 中的实时更新。

问题在于,每当在集合中创建新消息时,我的应用程序都会冻结并且无法从该状态中恢复。我需要杀死它并重新启动应用程序。

这是第一次通过,我可以在 UI 上看到之前的消息。第二次调用SnapshotListener 时会出现此问题。

我的observer() 函数

val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener { data, error ->
    if (error != null) {
        channel.close(error)
    } else {
        if (data != null) {
            val messages = data.toObjects(MessageEntity::class.java)
            //till this point it gets executed^^^^
            channel.sendBlocking(messages)
        } else {
            channel.close(CancellationException("No data received"))
        }
    }
}
return channel

这就是我想要观察消息的方式

launch(Dispatchers.IO) {
        val newMessages =
            messageRepository
                .observer()
                .receive()
    }
}

在我用send() 替换sendBlocking() 之后,我仍然没有在频道中收到任何新消息。 SnapshotListener方被执行

//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO) {
    channel.send(messages)
}
//scope is my viewModel

如何使用 Kotlin 协程观察 firestore/realtime-dbs 中的消息?

【问题讨论】:

  • Firebase 回调默认在主线程上执行。我看到你在主线程上调用了一个名为sendBlocking 的方法。阻塞主线程总是一个坏主意。您需要找到另一种使用 Firebase SDK 的方法,而不是像这样阻塞主线程。
  • @DougStevenson 我找到了解决方案

标签: android kotlin google-cloud-firestore kotlin-coroutines


【解决方案1】:

我有这些扩展函数,所以我可以简单地从查询中获取结果作为流。

Flow 是一个完美的 Kotlin 协程结构。 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/

@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?> {
    return callbackFlow {
        val listenerRegistration =
            addSnapshotListener { querySnapshot, firebaseFirestoreException ->
                if (firebaseFirestoreException != null) {
                    cancel(
                        message = "error fetching collection data at path - $path",
                        cause = firebaseFirestoreException
                    )
                    return@addSnapshotListener
                }
                offer(querySnapshot)
            }
        awaitClose {
            Timber.d("cancelling the listener on collection at path - $path")
            listenerRegistration.remove()
        }
    }
}

@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> {
    return getQuerySnapshotFlow()
        .map {
            return@map mapper(it)
        }
}

以下是如何使用上述功能的示例。

@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>> {
    return FirebaseFirestore.getInstance()
        .collection("$COLLECTION_SHOPPING_LIST")
        .getDataFlow { querySnapshot ->
            querySnapshot?.documents?.map {
                getShoppingListItemFromSnapshot(it)
            } ?: listOf()
        }
}

// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem {
        return documentSnapshot.toObject(ShoppingListItem::class.java)!!
    }

在您的 ViewModel 类(或您的 Fragment)中,确保您从正确的范围内调用它,以便在用户离开屏幕时适当地移除侦听器。

viewModelScope.launch {
   getShoppingListItemsFlow().collect{
     // Show on the view.
   }
}

【讨论】:

【解决方案2】:

最后我使用了Flow,它是协同程序的一部分1.2.0-alpha-2

return flowViaChannel { channel ->
   firestore.collection(path).addSnapshotListener { data, error ->
        if (error != null) {
            channel.close(error)
        } else {
            if (data != null) {
                val messages = data.toObjects(MessageEntity::class.java)
                channel.sendBlocking(messages)
            } else {
                channel.close(CancellationException("No data received"))
            }
        }
    }
    channel.invokeOnClose {
        it?.printStackTrace()
    }
} 

这就是我在 ViewModel 中观察它的方式

launch {
    messageRepository.observe().collect {
        //process
    }
}

更多主题https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9

【讨论】:

    【解决方案3】:

    移除回调的扩展函数

    对于 Firebase 的 Firestore 数据库,有两种调用类型。

    1. 一次性请求 - addOnCompleteListener
    2. 实时更新 - addSnapshotListener

    一次性请求

    对于一次性请求,库org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X 提供了一个await 扩展函数。该函数从addOnCompleteListener返回结果。

    有关最新版本,请参阅 Maven 存储库,kotlinx-coroutines-play-services

    资源

    实时更新

    扩展函数awaitRealtime 进行检查,包括验证continuation 的状态,以查看它是否处于isActive 状态。这一点很重要,因为当用户的主要内容提要通过生命周期事件更新、手动刷新提要或从提要中删除内容时,会调用该函数。没有这个检查就会崩溃。

    ExtensionFuction.kt

    data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)
    
    suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> { continuation ->
        addSnapshotListener({ value, error ->
            if (error == null && continuation.isActive)
                continuation.resume(QueryResponse(value, null))
            else if (error != null && continuation.isActive)
                continuation.resume(QueryResponse(null, error))
        })
    }
    

    为了处理错误,使用了try/catch 模式。

    Repository.kt

    object ContentRepository {
        fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> {
            emit(Loading())
            val labeledSet = HashSet<String>()
            val user = usersDocument.collection(getInstance().currentUser!!.uid)
            syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
            getLoggedInNonRealtimeContent(timeframe, labeledSet, this)        
        }
        // Realtime updates with 'awaitRealtime' used
        private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
                                           labeledSet: HashSet<String>, collection: String,
                                           lce: FlowCollector<Lce<PagedListResult>>) {
            val response = user.document(COLLECTIONS_DOCUMENT)
                .collection(collection)
                .orderBy(TIMESTAMP, DESCENDING)
                .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
                .awaitRealtime()
            if (response.error == null) {
                val contentList = response.packet?.documentChanges?.map { doc ->
                    doc.document.toObject(Content::class.java).also { content ->
                        labeledSet.add(content.id)
                    }
                }
                database.contentDao().insertContentList(contentList)
            } else lce.emit(Error(PagedListResult(null,
                "Error retrieving user save_collection: ${response.error?.localizedMessage}")))
        }
        // One time updates with 'await' used
        private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
                                                          labeledSet: HashSet<String>,
                                                          lce: FlowCollector<Lce<PagedListResult>>) =
                try {
                    database.contentDao().insertContentList(
                            contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                                    .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
                                    .documentChanges
                                    ?.map { change -> change.document.toObject(Content::class.java) }
                                    ?.filter { content -> !labeledSet.contains(content.id) })
                    lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
                } catch (error: FirebaseFirestoreException) {
                    lce.emit(Error(PagedListResult(
                            null,
                            CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "${error.localizedMessage}")))
                }
    }
    

    【讨论】:

    • 我尝试了您的 awaitRealtime() 扩展功能,但它对我不起作用。它只在 Firestore 中与实际模型一起发射一次。在发出 continuation.isActive() 之后总是返回 false。你知道为什么/如何让它保持活跃吗?
    • 感谢@elementstyle 的反馈!我昨晚注意到了这个问题,现在正在努力解决它。
    • 我已删除条件!value!!.isEmpty 已解决问题。我将全天测试以确保它正常工作。
    • 嗨@elementstyle,修复在24小时测试后工作。如果它也适用于您自己,请投票赞成答案。谢谢!
    • 感谢您让我知道@Andrew。如果有机会,我将重新查看开源 Coinverse 应用程序中的代码的当前实现,并在必要时在此处更新。
    【解决方案4】:

    这对我有用:

    suspend fun DocumentReference.observe(block: suspend (getNextSnapshot: suspend ()->DocumentSnapshot?)->Unit) {
        val channel = Channel<Pair<DocumentSnapshot?, FirebaseFirestoreException?>>(Channel.UNLIMITED)
    
        val listenerRegistration = this.addSnapshotListener { value, error ->
            channel.sendBlocking(Pair(value, error))
        }
    
        try {
            block {
                val (value, error) = channel.receive()
    
                if (error != null) {
                    throw error
                }
                value
            }
        }
        finally {
            channel.close()
            listenerRegistration.remove()
        }
    }
    

    然后你可以像这样使用它:

    docRef.observe { getNextSnapshot ->
        while (true) {
             val value = getNextSnapshot() ?: continue
             // do whatever you like with the database snapshot
        }
    }
    

    如果观察者块抛出错误,或者块结束,或者你的协程被取消,监听器会被自动移除。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-09
      • 2022-12-17
      • 2019-03-26
      • 1970-01-01
      相关资源
      最近更新 更多