【问题标题】:Wait for several Flows to finish before proceeding等待几个流程完成后再继续
【发布时间】:2020-04-15 15:37:49
【问题描述】:

我需要将图像上传到服务器,为此我不能使用其他库,而是将其拆分(base64 编码)并全部上传。

我正在为此使用 Kotlin 协程 Flows,我目前所做的是进行第一次调用(返回一个流)以获取我需要在所有上传请求中附加的图像 ID

这是我用来上传图片的两个函数

fun submitImage(payload: Payload): Flow<String> {
    val request = requestBuilder.buildUploadImageRequest(payload)
    return client.execute(request)
        .serviceFlow({ response ->
            val imageId = response.body.id
            uploadImage(payload.imageBase64, imageId)
            imageId
        }, { response ->
            throw MyServerError("Error ${response.error}")
        })
}

private fun uploadImage(imageBase64: String, imageId: String) {
    val chunks = divideEncodedImageInChunksOfSize(imageBase64)
    var v = 1
    for (chunk in chunks) {
        val payload = generatePayload(imageId, v, chunk, false)
        submitImageChunk(payload)
        v++
    }
    val payload = generatePayload(imageId, v, "", true)
    submitImageChunk(payload)
}

private fun submitImageChunk(payload: JSONObject): Flow<Unit> {
    val request = requestBuilder.buildUploadImageChunkRequest(payload)
    return client.execute(request)
        .serviceFlow({ }, { response ->
            throw MyHttpError(response)
        })
}

我使用了以下实用功能

// Extension function to handle Flows and their activation
internal fun MyHttpClient.execute(request: MyHttpRequest): Flow<MyHttpResponse> {
    return flow {
        val deferred = CompletableDeferred<MyHttpResponse>()
        executeHttp(request, object : MyHttpListener {
            override fun onSuccess(response: MyHttpResponse) {
                deferred.complete(response)
            }

            override fun onFailure(response: MyHttpResponse) {
                deferred.completeExceptionally(MyHttpError(response))
            }
        })
        emit(deferred.await())
    }
}

// Extension function to catch exceptions AND to check if the response body is null
internal fun <T> Flow<MyHttpResponse>.serviceFlow(
    onSuccess: (response: MyHttpResponse) -> T,
    onError: (response: MyHttpResponse) -> Unit
) = flatMapConcat { response ->
    flowOf(response)
        .map { res ->
            res.body?.let { it ->
                onSuccess(res)
            } ?: throw MyParseError("MyHttpResponse has a null body")
        }
        .catchException<JSONException, T> { e ->
            throw MyParseError("Parsing exception $e")
        }
}.catchException<MyHttpError, T> { e ->
    onError(e.response)
}

// Function leveraging OkHttpClient to make a HTTPRequest
internal fun executeHttp { ... }

我认为这个问题是由于函数submitImage 在启动所有子流程以上传图像后返回,但它并没有等待所有子流程完成。 我不确定 Kotlin 协程对于这样的用例有什么构造,有人可以帮助我吗?

【问题讨论】:

  • Flow.zip() 可能是候选人。
  • zip 不会合并两个流程,而是在第一个流程完成后立即终止?加上图像中的块很容易加起来十几个,所以它需要一种递归压缩函数

标签: android kotlin-coroutines


【解决方案1】:

我认为您应该使用WorkManager 并考虑使用chain woker feature

使用 Flow 功能,试试这个:

private suspend fun uploadImage(imageBase64: String, imageId: String) {
withContext(Dispatchers.IO){
  val chunks = divideEncodedImageInChunksOfSize(imageBase64)
  var v = 1
  for (chunk in chunks) {
     val payload = generatePayload(imageId, v, chunk, false)
     submitImageChunk(payload)
     v++
   }
  val payload = generatePayload(imageId, v, "", true)
  submitImageChunk(payload).await();
 }

private suspend fun submitImageChunk(payload: JSONObject): Deferred<Unit> {
 val request = requestBuilder.buildUploadImageChunkRequest(payload)
 return client.execute(request);
}

【讨论】:

  • 不幸的是,我目前无法添加其他依赖项。我希望能够通过 Kotlin 协程库中存在的结构和功能来解决问题
  • uploadImage 不是挂起函数。所以 submitImage 函数应该在图片上传成功之前返回 imageId
  • 这是真的@musafee,但即使让uploadImage 返回一个流也不能让它继续工作(也许它可能是正确的方向)
【解决方案2】:

感谢musafee 让我朝着正确的方向前进。

最后的答案是我在 uploadImage 函数中创建了这些流,但我实际上从未对它们调用 collect,因此它们仍未启动。

我选择的解决方案是将那里创建的流列表返回给调用函数,并从那里将submitImage函数的返回类型从Flow&lt;String&gt;更改为Flow&lt;List&lt;Flow&lt;Unit&gt;&gt;&gt;,然后从那里上层触发他们

【讨论】:

    猜你喜欢
    • 2013-04-25
    • 2019-11-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-06
    • 2021-05-21
    • 2015-10-04
    • 1970-01-01
    相关资源
    最近更新 更多