【问题标题】:Golang AWS S3manager multipartreader w/ GoroutinesGolang AWS S3manager multipartreader w/ Goroutines
【发布时间】:2018-07-02 11:36:54
【问题描述】:

我正在创建一个端点,允许用户同时上传多个文件并将它们存储在 S3 中。目前我可以使用 MultipartReader 和 s3manager 实现这一点,但只能以非同步方式。

我正在尝试实施 Go 例程以加快此功能并将多个文件同时上传到 S3,但数据竞争错误正在引起麻烦。我认为 *s3manager 可能不像文档所说的那样是 goroutine 安全的。 (如果 go-statement 被函数代码替换,代码会同步工作)。

实施互斥锁能否解决我的错误?

func uploadHandler(w http.ResponseWriter, r *http.Request) {
    counter := 0
    switch r.Method {
    // GET to display the upload form.
    case "GET":
        err := templates.Execute(w, nil)
        if err != nil {
            log.Print(err)
        }
        // POST uploads each file and sends them to S3
    case "POST":
        c := make(chan string)
        // grab the request.MultipartReader
        reader, err := r.MultipartReader()
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        // copy each part to destination.
        for {
            part, err := reader.NextPart()
            if err == io.EOF {
                break
            }
            // if part.FileName() is empty, skip this iteration.
            if part.FileName() == "" {
                continue
            }
            counter++
            go S3Upload(c, part)
        }
        for i := 0; i < counter; i++ {
          fmt.Println(<-c)
         }
        // displaying a success message.
        err = templates.Execute(w, "Upload successful.")
        if err != nil {
            log.Print(err)
        }
    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
}  

func S3Upload(c chan string, part *multipart.Part) {
    bucket := os.Getenv("BUCKET")
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String(os.Getenv("REGION"))},
    )
    if err != nil {
        c <- "error occured creating session"
        return
    }
    uploader := s3manager.NewUploader(sess)
    _, err = uploader.Upload(&s3manager.UploadInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(part.FileName()),
        Body:   part,
    })
    if err != nil {
        c <- "Error occurred attempting to upload to S3"
        return
    }
    // successful upload
    c <- "successful upload"
}

【问题讨论】:

  • 你的问题让我想起了这篇文章。 marcio.io/2015/07/… 作者必须想办法大规模上传到 s3。对您来说可能值得一读。
  • 另外,您可能不想为每个 POST 创建一个新的 s3 会话,这将无法很好地扩展。您可以创建会话的单个实例和 s3manager 的单个实例,然后您可以将它们用于服务器的生命周期。它将减少对象/内存流失,减少 GC,因此您的程序会更快。
  • 为避免竞争条件,您可能需要考虑使用docs.aws.amazon.com/sdk-for-go/api/service/s3/s3manager/…。同时使用 Upload 来上传整个文件是安全的。它会为您将这些文件分成几部分。您不是发送整个文件,而是发送同一文件的一部分。你期望它如何维持秩序?因此竞争条件。如果您想使用 Upload 读取整个文件并将其发送到 AWS。
  • 最后,如果你使用upload,UploadInput的Body是一个io.Reader,你可以将Request的body直接传给upload,让它处理部分消费body,推迟关闭Body到确保你释放你的记忆

标签: go amazon-s3 goroutine


【解决方案1】:

^ 看到上面所有的cmets,

这里是一些修改过的代码示例,通道在这里没有用。

package main

import (
    "bytes"
    "io"
    "log"
    "net/http"
    "os"
    "strings"
    "sync"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

var (
    setupUploaderOnce sync.Once
    uploader          *s3manager.Uploader
    bucket            string
    region            string
)
// ensure sessions and uploader are setup only once using a Singleton pattern
func setupUploader() {
    setupUploaderOnce.Do(func() {
        bucket = os.Getenv("BUCKET")
        region = os.Getenv("REGION")
        sess, err := session.NewSession(&aws.Config{Region: aws.String(region)})
        if err != nil {
            log.Fatal(err)
        }
        uploader := s3manager.NewUploader(sess)
    })
}

// normally singleton stuff is packaged out and called before starting the server, but to keep the example a single file, load it up here
func init() {
    setupUploader()
}

func uploadHandler(w http.ResponseWriter, r *http.Request) {
    counter := 0
    switch r.Method {
    // GET to display the upload form.
    case "GET":
        err := templates.Execute(w, nil)
        if err != nil {
            log.Print(err)
        }
        // POST uploads each file and sends them to S3
    case "POST":
        var buf bytes.Buffer
        // "file" is defined by the form field, change it to whatever your form sets it too
        file, header, err := r.FormFile("file")
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        // close the file
        defer file.Close()
        fileName := strings.Split(header.Filename, ".")
        // load the entire file data to the buffer
        _, err = io.Copy(&buf, file)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        // copy each part to destination.
        go S3Upload(buf, fileName[0])
        // displaying a success message.
        err = templates.Execute(w, "Upload successful.")
        if err != nil {
            log.Print(err)
        }
    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
}

// keeping this simple, do something with the err, like log
// if the uploader fails in the goroutine, there is potential
// for false positive uploads... channels are not really good here
// either, for that, bubble the error up,
// and don't spin up a goroutine.. same thing as waiting for the channel to return.
func S3Upload(body bytes.Buffer, fileName string) {
    _, err := uploader.Upload(&s3manager.UploadInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(fileName),
        Body:   bytes.NewReader(body.Bytes()),
    })
}

【讨论】:

  • 非常感谢您提供的有用提示!您提供的示例非常适合单个文件上传,因此我自己进行了一些编辑以同时适用于多个文件。
猜你喜欢
  • 1970-01-01
  • 2014-02-01
  • 2015-04-03
  • 2016-02-19
  • 2020-07-16
  • 1970-01-01
  • 2014-04-10
  • 2016-04-25
  • 1970-01-01
相关资源
最近更新 更多