【发布时间】:2020-04-15 15:37:49
【问题描述】:
我需要将图像上传到服务器,为此我不能使用其他库,而是将其拆分(base64 编码)并全部上传。
我正在为此使用 Kotlin 协程 Flows,我目前所做的是进行第一次调用(返回一个流)以获取我需要在所有上传请求中附加的图像 ID
这是我用来上传图片的两个函数
fun submitImage(payload: Payload): Flow<String> {
val request = requestBuilder.buildUploadImageRequest(payload)
return client.execute(request)
.serviceFlow({ response ->
val imageId = response.body.id
uploadImage(payload.imageBase64, imageId)
imageId
}, { response ->
throw MyServerError("Error ${response.error}")
})
}
private fun uploadImage(imageBase64: String, imageId: String) {
val chunks = divideEncodedImageInChunksOfSize(imageBase64)
var v = 1
for (chunk in chunks) {
val payload = generatePayload(imageId, v, chunk, false)
submitImageChunk(payload)
v++
}
val payload = generatePayload(imageId, v, "", true)
submitImageChunk(payload)
}
private fun submitImageChunk(payload: JSONObject): Flow<Unit> {
val request = requestBuilder.buildUploadImageChunkRequest(payload)
return client.execute(request)
.serviceFlow({ }, { response ->
throw MyHttpError(response)
})
}
我使用了以下实用功能
// Extension function to handle Flows and their activation
internal fun MyHttpClient.execute(request: MyHttpRequest): Flow<MyHttpResponse> {
return flow {
val deferred = CompletableDeferred<MyHttpResponse>()
executeHttp(request, object : MyHttpListener {
override fun onSuccess(response: MyHttpResponse) {
deferred.complete(response)
}
override fun onFailure(response: MyHttpResponse) {
deferred.completeExceptionally(MyHttpError(response))
}
})
emit(deferred.await())
}
}
// Extension function to catch exceptions AND to check if the response body is null
internal fun <T> Flow<MyHttpResponse>.serviceFlow(
onSuccess: (response: MyHttpResponse) -> T,
onError: (response: MyHttpResponse) -> Unit
) = flatMapConcat { response ->
flowOf(response)
.map { res ->
res.body?.let { it ->
onSuccess(res)
} ?: throw MyParseError("MyHttpResponse has a null body")
}
.catchException<JSONException, T> { e ->
throw MyParseError("Parsing exception $e")
}
}.catchException<MyHttpError, T> { e ->
onError(e.response)
}
// Function leveraging OkHttpClient to make a HTTPRequest
internal fun executeHttp { ... }
我认为这个问题是由于函数submitImage 在启动所有子流程以上传图像后返回,但它并没有等待所有子流程完成。
我不确定 Kotlin 协程对于这样的用例有什么构造,有人可以帮助我吗?
【问题讨论】:
-
Flow.zip()可能是候选人。 -
zip不会合并两个流程,而是在第一个流程完成后立即终止?加上图像中的块很容易加起来十几个,所以它需要一种递归压缩函数