【问题标题】:Upload files to s3 using celery tasks使用 celery 任务将文件上传到 s3
【发布时间】:2019-03-04 16:53:44
【问题描述】:

我正在尝试将视频文件上传到 s3,但是在使用 celery 放入任务队列之后。在上传视频时,用户可以做其他事情。

我的 views.py 调用 celery 任务

def upload_blob(request, iterator, interview_id, candidate_id, question_id):
    try:
        interview_obj = Interview.objects.get(id=interview_id)
    except ObjectDoesNotExist:
        interview_obj = None
    current_interview = interview_obj
    if request.method == 'POST':
        print("inside POST")
        # newdoc1 = Document(upload=request.FILES['uploaded_video'], name="videos/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        # newdoc1.save()
        save_document_model.delay(request.FILES['uploaded_video'],"videos/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        # newdoc2 = Document(upload=request.FILES['uploaded_audio'], name="audios/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        # newdoc2.save()
        save_document_model.delay(request.FILES['uploaded_audio'],"audios/interview_"+interview_id+"_candidate_"+candidate_id+"_question_"+question_id)
        iterator = str(int(iterator) + 1)

        return HttpResponseRedirect(reverse('candidate:show_question', kwargs={'iterator': iterator,'interview_id':current_interview.id,'question_id':question_id}))
    else:

        return render(request, 'candidate/record_answer.html')

实际的芹菜任务.py

@task(name="save_document_model")
def save_document_model(uploaded_file, file_name):

    newdoc = Document(upload=uploaded_file, name=file_name)
    newdoc.save()

    logger.info("document saved successfully")
    return HttpResponse("document saved successfully")

文档模型

def upload_function(instance, filename):
    getname = instance.name
    customlocation = os.path.join(settings.AWS_S3_CUSTOM_DOMAIN, settings.MEDIAFILES_LOCATION, getname)
    # Add other filename logic here
    return getname # Return the end filename where you want it saved.

class Document(models.Model):
    name = models.CharField(max_length=25)
    uploaded_at = models.DateTimeField(auto_now_add=True)
    upload = models.FileField(upload_to=upload_function)

Settings.py

AWS_ACCESS_KEY_ID = '**********************'
AWS_SECRET_ACCESS_KEY = '**************************'
AWS_STORAGE_BUCKET_NAME = '*********'
AWS_S3_CUSTOM_DOMAIN = '%s.s3.amazonaws.com' % AWS_STORAGE_BUCKET_NAME
AWS_S3_OBJECT_PARAMETERS = {
    'CacheControl': 'max-age=86400',
}
AWS_LOCATION = 'static'
AWS_DEFAULT_ACL = None

MEDIAFILES_LOCATION = 'uploads/'
DEFAULT_FILE_STORAGE = 'watsonproj.storage_backends.MediaStorage'

# CELERY STUFF
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Nairobi'
CELERY_IMPORTS=("candidate.tasks")

直接上传在没有 celery 的情况下工作,但使用 celery 我收到此错误:

“InMemoryUploadedFile”类型的对象不是 JSON 可序列化的

【问题讨论】:

    标签: django amazon-s3 file-upload django-celery celery-task


    【解决方案1】:

    可以使用uppy js直接上传视频文件到s3。以上方法写入缓存仍然使用内存。作为 s3 接受 post 方法。通过 Ajax 将 URL 文件名和大小存储到数据库中

    【讨论】:

      【解决方案2】:

      Celery 提供了配置任务负载如何序列化的选项。

      您的项目设置中配置的任务序列化程序设置为CELERY_TASK_SERIALIZER = json

      request.FILES['<input>']django.core.files.uploaded.files.InMemoryUploadedFile 的一个实例,不能直接使用json 序列化程序(List of supported types) 进行编码。
      虽然有一些方法可以将文件序列化为二进制数据,但如果您的用户上传大文件,您的应用程序就有可能耗尽大量内存

      您可以考虑在任何情况下使用django.core.files.uploadedfile.TemporaryFileUploadHandler,并在任务负载中转发临时文件路径(request.FILES['<input>'] .temporary_file_path())而不是request.FILES['<input>']

      要强制执行此操作,请在您的项目设置中配置 FILE_UPLOAD_MAX_MEMORY_SIZE = 0警告:deactivates the MemoryFileUploadHandler 用于您的整个项目。

      随后在任务定义中,您可以将文件读入内存以保存新的Document

      from django.core.files import File
      from django.conf import DEFAULT_FILE_STORAGE as storage
      
      @task(name="save_document_model")
      def save_document_model(file_path, file_name):
      
          with open(file_path, 'r') as f:
              file = File(f)
      
              newdoc = Document(upload=file, name=file_name)
              newdoc.save()
      
              logger.info("document saved successfully")
      
              storage.delete(file_path) # cleanup temp file
      
          return HttpResponse("document saved successfully")
      

      【讨论】:

      • 有没有其他方法可以先将文件保存到本地服务器,然后再传输到s3?在这种情况下,我可以将文档 ID 传递给 celery 任务。
      • 我的答案解释了先保存到本地服务器,然后传输到 S3 然后清理保存的文件。我不清楚你的评论。
      猜你喜欢
      • 1970-01-01
      • 2021-11-22
      • 2019-12-13
      • 2016-02-06
      • 2016-06-23
      • 2017-10-18
      • 2017-12-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多