【问题标题】:Concurrent S3 File Upload via Kotlin Coroutines通过 Kotlin 协程并发 S3 文件上传
【发布时间】:2017-11-26 00:59:25
【问题描述】:

我需要将许多文件上传到 S3,按顺序完成这项工作需要几个小时。这正是 Kotlin 的新协程擅长的地方,所以我想先尝试一下,而不是再摆弄一些基于线程的执行服务。

这是我的(简化的)代码:

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking {
    val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build()
    for ((x, ys) in superTiles) {
        val jobs = mutableListOf<Deferred<Any>>()
        for ((y, superTile) in ys) {
            val job = async(CommonPool) {
                uploadTile(s3, x, y, superTile)
            }
            jobs.add(job)
        }
        jobs.map { it.await() }
    }
}

suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) {
    val json: String = "{}"
    val key = "$s3Prefix/x4/$z/$x/$y.json"
    s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
}

问题:代码仍然很慢,日志显示请求仍然按顺序执行:一个作业在创建下一个作业之前完成。只有在极少数情况下(十分之一)我会看到作业同时运行。

为什么代码不能更快/同时运行?我该怎么办?

【问题讨论】:

  • 未受教育的猜测:将val s3 = AmazonS3ClientBuilder... 移动到异步部分,以便您拥有多个客户端?
  • 那也没用。我现在没有受过教育的猜测是 putObject 正在阻止请求,协程无法更改的东西
  • 没错。 S3 SDK 似乎不支持非阻塞 IO(通过 NIO),因此每次上传需要一个线程。您仍然可以并行运行多个,但可能不建议将它们 all 并行运行。在某些时候,您也会受到网络带宽的限制。

标签: amazon-s3 kotlin upload kotlin-coroutines


【解决方案1】:

当您使用 异步 API 时,Kotlin 协程表现出色,而您使用的 AmazonS3.putObject API 是老式的阻塞同步 API,因此您只能获得与数量一样多的并发上传您正在使用的CommonPool 中的线程数。将 uploadTile 函数标记为已修改的 suspend 没有任何价值,因为它在其主体中不使用任何挂起函数。

在上传任务中获得更多吞吐量的第一步是为此开始使用异步 API。我建议查看Amazon S3 TransferManager 的钱包。看看能不能先解决你的问题。

Kotlin 协程旨在帮助您将异步 API 组合成易于使用的逻辑工作流。例如,通过编写以下扩展函数,可以直接将 TransferManager 的异步 API 用于协程:

suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont ->
    addProgressListener {
        if (isDone) {
            // we know it should not actually wait when done
            try { cont.resume(waitForUploadResult()) }
            catch (e: Throwable) { cont.resumeWithException(e) }
        }
    }
    cont.invokeOnCompletion { abort() }
}

此扩展使您可以编写与TransferManager 一起使用的非常流畅的代码,并且您可以重写您的uploadTile 函数以与TransferManager 一起使用,而不是使用阻塞AmazonS3 接口:

suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) {
    val json: String = "{}"
    val key = "$s3Prefix/x4/$z/$x/$y.json"
    tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
        .await()
}

注意,这个新版本的uploadTile 如何使用上面定义的挂起函数await

【讨论】:

    猜你喜欢
    • 2011-12-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-04
    • 1970-01-01
    • 1970-01-01
    • 2020-04-16
    • 2017-11-21
    相关资源
    最近更新 更多