【问题标题】:Downloading multiple files from S3 concurrently and consolidated them同时从 S3 下载多个文件并合并它们
【发布时间】:2017-06-27 01:18:05
【问题描述】:

我正在尝试同时从 S3 下载多个文件,并将它们的内容合并到一个字节缓冲区中。这些文件是 csv 格式的。我的代码似乎在大部分时间工作(10 次尝试中的 8 次)。但在某些情况下,在我检查合并缓冲区后,我得到的结果少于我应该得到的(通常不超过缺少 100 行)。预期的记录总数为 4802。 如果按顺序运行我的代码,则不会出现此问题。但是我需要使用 goroutines 来提高速度。这是我尝试做的主要要求。我运行了 go data race 检查器,没有出现数据竞争,并且错误我打印的语句永远不会打印出来。

这是我使用的代码:

    var pingsBuffer = aws.NewWriteAtBuffer([]byte{}) 
        //range over the contents of the index file
    for _, file := range indexList {
        wg.Add(1)
        go download(key + string(file), pingsBuffer, &wg)
    }
    wg.Wait()

和下载功能(也整合下载的文件)

func download(key string, buffer *aws.WriteAtBuffer, wg *sync.WaitGroup)  {

defer wg.Done()

awsBuffer := aws.NewWriteAtBuffer([]byte{})

input := &s3.GetObjectInput {
    Bucket: aws.String(defaultLocationRootBucket),
    Key:    aws.String(key),
}

n1, downloadError := downloader.Download(awsBuffer, input)
if downloadError != nil {
    loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to download from S3, file(%v) with error : %v.", key, downloadError))
    return
}


lenghts3:= int64(len(buffer.Bytes()))

n2, bufferError := buffer.WriteAt(awsBuffer.Bytes(), lenghts3 )
if bufferError != nil {
    loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to write to buffer, the file(%v) downloaded from S3  with error : %v.", key, bufferError))
}

【问题讨论】:

    标签: go aws-sdk aws-sdk-go


    【解决方案1】:

    这段代码:

    lenghts3:= int64(len(buffer.Bytes()))
    

    是并发问题:两个例程可能同时获取长度,获取相同的起始位置,并且都以相同的起始位置继续写入缓冲区,互相踩脚。

    由于您已经在内存中检索整个对象而不是流式传输到组合缓冲区,因此您也可以将每个文件的全部内容发送到通道上,并让该通道上的接收器将每个结果附加到共享字节缓冲区,因为它们同步进入。

    【讨论】:

      猜你喜欢
      • 2021-12-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-04-03
      • 1970-01-01
      • 1970-01-01
      • 2022-09-23
      相关资源
      最近更新 更多