【问题标题】:Altering python-gearman worker tasks during job processing在作业处理期间更改 python-gearman 工作人员任务
【发布时间】:2011-03-19 14:54:53
【问题描述】:

我正在尝试更改 python-gearman 工人在其工作周期内可用的任务。我这样做的原因是允许我对我的工作进程进行一点控制,并允许它们从数据库中重新加载。我需要每个工作人员定期重新加载,但我不想简单地终止进程,我希望服务始终可用,这意味着我必须分批重新加载。所以我会让 4 名工人重新加载,而另外 4 名工人可以处理,然后重新加载接下来的 4 名工人。

流程:

  1. 开始重新加载过程 4 次。
    1. 注销reload 进程
    2. 重新加载数据集
    3. 注册一个finishReload 任务
    4. 返回
  2. 重复第 1 步,直到没有注册 reload 任务的工作人员。
  3. 启动finishReload(1) 任务,直到没有具有finishReload 任务的工作人员可用。

(1)finishReload 任务注销finishReload 任务并注册reload 任务然后返回。

现在,我遇到的问题是,当我更改工作进程可用的任务时,作业会失败。没有错误消息或异常,gearmand 日志中只有一个“错误”。这是一个复制问题的快速程序。

工人

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

客户

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

如果有什么我可以解释的,请告诉我。

编辑:我知道有人会要求查看我提到的日志。我也将这个问题发布到 Google 上的 gearman 小组,log is available there

【问题讨论】:

    标签: python gearman python-gearman


    【解决方案1】:

    看起来像子类化 GearmanWorker 类并添加一些标志可以解决这个问题。在开始从工作人员向服务器发出新命令之前,我需要让作业完成,这似乎会中断当前作业。因此,如果我们覆盖on_job_complete 函数,我们可以检查启用/禁用标志并在我们发出send_job_complete 命令后对它们进行操作。新的工人计划如下:

    工人

    import gearman
    
    def reversify(gmWorker, gmJob):
            return "".join(gmJob.data[::-1])
    
    def enable_reversify(gmWorker, gmJob):
            myWorker.enableReversify = 1
            return 'OK'
    
    def strcount(gmWorker, gmJob):
            myWorker.enableReversify = -1
            return str(len(gmJob.data))
    
    class myWorker(gearman.GearmanWorker):
    
            enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on
    
            def on_job_complete(self, current_job, job_result):
                    self.send_job_complete(current_job, job_result)
                    ### check the flag here and enable or disable tasks ###
                    if myWorker.enableReversify == -1:
                            self.unregister_task('reversify')
                    if myWorker.enableReversify == 1:
                            self.register_task('reversify', reversify)
                    myWorker.enableReversify = 0 # reset the flag
                    return True
    
    worker = myWorker(['localhost:4730']) 
    worker.register_task('reversify', reversify)
    worker.register_task('strcount', strcount)
    worker.register_task('enableReversify', enable_reversify)
    
    while True:
            worker.work() 
    

    【讨论】:

      【解决方案2】:

      乍一看,问题似乎是您正在开始一项工作,然后在工作完成之前从工作服务器取消注册工作人员执行该工作的能力。

      【讨论】:

      • 找到了解决方案。不是特别优雅,但它可以解决问题。我的第一个倾向和你的一样,虽然我很久以前就排除了。注册新任务时会出现相同的症状。我似乎中断与客户端的对话以与服务器交谈是不行的。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-02-05
      • 2014-04-13
      • 2016-01-15
      • 1970-01-01
      • 2011-10-28
      相关资源
      最近更新 更多