【问题标题】:Celery Async get timeout never times out芹菜异步获取超时永远不会超时
【发布时间】:2020-09-22 11:00:27
【问题描述】:

我的 celery 应用程序如下。我在 Python 2.7.5 和 celery 4.4.2 上,操作系统是 CentOs 7.4。这个想法是使用 celery 通过 java_zip_sign_exe 变量中定义的 shell 脚本对大量文件进行签名(当文件大小以 GB 为单位时,每个文件有时需要长达 30 分钟),分布在运行在不同主机上的多个工作人员中。 sign_heavy_java_zip_files() 对 sign_heavy_java_zip_file() 进行异步调用,并为其提供所有文件名。

sign_celery_app.py

app = Celery('tasks', broker='redis://:<hostname>:6379/0',backend='redis://:<hostname>:6379/0')
app.conf['worker_prefetch_multiplier'] = 1
app.conf['task_acks_late'] = True
app.conf.task_default_queue = 'default'
app.conf.tasks_queues = (
    Queue('default', exchange='default', routing_key='default'),
    Queue('heavy_java_zip', exchange='heavy_java_zip', routing_key='heavy_java_zip'),

@app.task

def sign_java_zip_file(filename,User,MaxPerJavaZipFileTime,Site):
  print ("Started Signing " + filename + java_zip_sign_exe )
  process=subprocess.Popen([java_zip_sign_exe,filename,User,str(MaxPerJavaZipFileTime),Site],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
  out,err = process.communicate()
  print ("Finished Signing " + filename )
  return (process.returncode,out,err)

@app.task(queue='heavy_java_zip')
def sign_heavy_java_zip_files(filenames,User,MaxPerJavaZipFileTime,Site):
  job = group(sign_java_zip_file.s(filename,User,MaxPerJavaZipFileTime,Site) for filename in filenames )
  job_results=job.apply_async(queue='heavy_java_zip')
  return job_results

Celery worker 开始如下

celery -A sign_celery_app  worker --loglevel=info --concurrency=2  -O fair -Q "heavy_java_zip"

我的 main() python 调用文件如下

results_heavy_java_zip=sign_heavy_java_zip_files.delay(heavy_java_zip_file_list,User,int(MaxPerJavaZipFileTime),Site)

results_heavy_java_zip.get(timeout=(MaxTotalJavaZipTime*60))

当没有工人时,如预期的那样,超时会引发超时异常。但是如果有 workers ,并且一旦任务开始异步工作,那么超时就没有任何影响。我预计即使它们正在运行,它们也会在超时结束时被中断。是不是我理解错了?

【问题讨论】:

    标签: python redis celery django-celery


    【解决方案1】:

    我相信是错误的方式,函数调用是在主程序中完成的。

    results_heavy_java_zip=sign_heavy_java_zip_files.delay(heavy_java_zip_file_list,User,int(MaxPerJavaZipFileTime),Site)
    

    必须是(没有延迟 - 程序需要异步调用,因为它在任务调用中得到了照顾)

    results_heavy_java_zip=sign_heavy_java_zip_files(heavy_java_zip_file_list,User,int(MaxPerJavaZipFileTime),Site)
    

    【讨论】:

      猜你喜欢
      • 2017-01-21
      • 1970-01-01
      • 1970-01-01
      • 2018-05-11
      • 1970-01-01
      • 1970-01-01
      • 2019-03-14
      • 2015-10-26
      • 1970-01-01
      相关资源
      最近更新 更多