【问题标题】:Schedule a redis job that schedules another redis job with python-rq使用 python-rq 调度另一个 redis 作业的 redis 作业
【发布时间】:2019-02-02 08:33:57
【问题描述】:

我有两种工作:一种是我想串行运行,另一种是我想并行运行。但是我希望并行作业以串行方式安排(如果您仍在关注)。那就是:

  1. 做一个。
  2. 等待 A,做 B。
  3. 等待 B,同时执行 2 个以上版本的 C。

我认为它有 2 个 redis 队列,一个只有一个工作人员的 serial_queue。还有一个 parallel_queue,上面有多个工人。

serial_queue.schedule(
    scheduled_time=datetime.utcnow(),
    func=job_a,
     ...)    
serial_queue.schedule(
    scheduled_time=datetime.utcnow(),
    func=job_b,
     ...)

def parallel_c():
    for task in range(args.n_tasks):
        queue_concurrent.schedule(
            scheduled_time=datetime.utcnow(),
            func=job_c,
            ...)

serial_queue.schedule(
    scheduled_time=datetime.utcnow(),
    func=parallel_c,
     ...)

但是目前的这个设置,给出的错误是 AttributeError: module '__main__' has no attribute 'schedule_fetch_tweets' 。如何为python-rq正确打包这个函数?

【问题讨论】:

    标签: python redis python-rq


    【解决方案1】:

    该解决方案需要一些技巧,因为您必须像导入外部模块一样导入当前脚本

    例如。 schedule_twitter_jobs.py 的内容是:

    from redis import Redis
    from rq_scheduler import Scheduler
    import schedule_twitter_jobs
    # we are importing the very module we are executing
    
    def schedule_fetch_tweets(args, queue_name):
        ''' This is the child process to schedule'''
    
        concurrent_queue = Scheduler(queue_name=queue_name+'_concurrent', connection=Redis())
        # this scheduler is created based on a queue_name that will be passed in
        for task in range(args.n_tasks):
            scheduler_concurrent.schedule(
                scheduled_time=datetime.utcnow(),
                func=app.controller.fetch_twitter_tweets,
                args=[args.statuses_backfill, fill_start_time])
    
    serial_queue = Scheduler(queue_name='myqueue', connection=Redis())
    serial_queue.schedule(
    '''This is the first schedule.'''
       scheduled_time=datetime.utcnow(),
       func=schedule_twitter_jobs.schedule_fetch_tweets,
       #now we have a fully-qualified reference to the function we need to schedule.
       args=(args, ttl, timeout, queue_name)
       #pass through the args to the child schedule
       )
    

    【讨论】:

      猜你喜欢
      • 2021-09-16
      • 2012-08-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多