【问题标题】:How do I fairly assign tasks to workers in Python? - Splitting iterable into similarly sized chunks如何在 Python 中公平地将任务分配给工作人员? - 将可迭代拆分成类似大小的块
【发布时间】:2012-10-20 03:21:36
【问题描述】:

我有工人和任务要做:

workers = ['peter', 'paul', 'mary']
tasks = range(13)

现在我想将任务拆分成多个工作块或批次,这样每个工人就可以处理一个批次,并且做的工作量与其他人大致相同。在我的现实生活中,我想将批处理作业安排到计算场。批处理作业应该并行运行。实际的调度和调度是通过 lsf 或 grid 等商业级工具完成的。

我所期望的一些例子:

>>> distribute_work(['peter', 'paul', 'mary'], range(3))
[('peter', [0]), ('paul', [1]), ('mary', [2])]
>>> distribute_work(['peter', 'paul', 'mary'], range(6))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2, 5])]
>>> distribute_work(['peter', 'paul', 'mary'], range(5))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2])]

这个问题与hereherehere的问题非常相似

不同之处在于我想要这些功能,按顺序或优先顺序:

  1. 不要使用len,如果可能,不要在内部建立长数据结构
  2. 接受生成器
  3. 返回生成器
  4. 尽可能多地使用 stdlib 组件

关于要求的一些附注:

  • 没有故意的命令:我有同名的工人可以做多个批次(unix 主机名)。如果您的解决方案使用 dicts,那很好,因为我们始终可以通过批量枚举进行工作人员查找。
  • 任意长度:worker 和任务都可以是任何长度 >= 1 的迭代。并且它们不必像上面示例中所示的均分,其中 Mary 只得到一个任务。
  • 命令:对我来说并不重要。我猜其他人可能更喜欢 [0,1]、[2,3]、[5] 之类的顺序,但我不在乎。如果您的解决方案可以保持或切换顺序,也许值得向其他人指出。

我试图围绕itertools 和这个特殊问题展开思考,并想出了以下代码来说明这个问题:

from itertools import *

def distribute_work(workers, tasks):
    batches = range(len(workers))
    return [ ( workers[k],
               [t[1] for t in i]
               )   for (k,i) in groupby(sorted(zip(cycle(batches),
                                                   tasks),
                                               key=lambda t: t[0]),
                                        lambda t: t[0]) ]

这满足 4.,但排序很可能违反 1.. 和 2./3。甚至都没有考虑过。

可能有一些简单的解决方案,以我没有想到的方式组合一些 stdlib 组件。但也许不是。有接盘侠吗?

【问题讨论】:

    标签: python python-3.x split itertools iterable


    【解决方案1】:

    这是我喜欢的一种方法:

    parallelism = os.cpu_count()
    num_todos = len(todos)
    
    # this zip fanciness makes each chunk stripe through the data sequentially overall so that the
    # first items still get done first across all the workers
    chunksize = math.ceil(num_todos / parallelism)
    chunks = list(itertools.zip_longest(*[todos[i:i+chunksize] for i in range(0, num_todos, chunksize)]))
    chunks = [[c for c in chunk if c is not None] for chunk in chunks]
    
    with Pool(processes=parallelism) as pool:
        tasks = [pool.apply_async(my_function, args=(chunk)) for chunk in chunks]
        [task.get() for task in tasks]
    

    根据您是否需要累积结果,您可以进行调整,但对我来说有趣的部分是让工作人员协作以全局顺序完成工作(在我的情况下,处理图像的连续帧,以便我了解如何看起来所有的 CPU 都在启动)。

    【讨论】:

      【解决方案2】:

      好的,说完不可能,这里有一个想法。也许这是我应该转移到 codereview 的东西——我对 cme​​ts 非常感兴趣,因为这会在内存中产生多少开销。换句话说,我不知道这是否真的解决了任务列表很长且大小未知的问题。 作为Blckknght mentioned multiprocessing might be the better alternative

      代码:

      import itertools
      
      def distribute_work(workers, tasks):
          """Return one generator per worker with a fair share of tasks
      
          Task may be an arbitrary length generator.
          Workers should be an iterable.
          """
          worker_count = len(workers)
          worker_ids = range(worker_count)
          all_tasks_for_all_workers = itertools.tee(tasks, worker_count)
          assignments = [ (workers[id], itertools.islice(i, id, None, worker_count))
                          for (id,i) in enumerate(all_tasks_for_all_workers) ]    
          return(assignments)
      

      算法是

      1. 为每个工作人员复制一次原始任务列表。由于这只是复制生成器对象,因此它应该与内存中任务列表的大小无关。即使这是一项相对昂贵的操作,它也只是一次启动成本,对于非常大的任务列表来说在内存中是微不足道的。
      2. 要将任务分配给一个工作人员,每个工作人员必须获取任务列表的一部分。如果#W是worker的数量,第一个worker接任务0#W2*#W3*#W等。第二个worker接0+1#W+12*#W+1、@ 987654332@等。每个工人的拼接可以用itertools.islice完成

      对于纯粹的任务拆分/分配,此功能并不真正需要工作人员的姓名。但是工人的数量是。改变它会使函数更加通用和有用,并使返回值更容易理解。为了回答我自己的问题,我将保留该功能。

      用法和结果:

      >>> for (worker,tasks) in distribute_work(['peter', 'paul', 'mary'], range(5)):
      ...   print(worker, list(tasks))
      ... 
      peter [0, 3]
      paul [1, 4]
      mary [2]
      

      它还处理工人具有相同名称但不同实体的情况:

      >>> for (worker,tasks) in distribute_work(['p', 'p', 'mary'], range(5)): 
      ...   print(worker, list(tasks))
      ... 
      p [0, 3]
      p [1, 4]
      mary [2]
      

      【讨论】:

        【解决方案3】:

        我认为您想使用multiprocessing.Pool.imap 来处理您的员工并分配他们的工作。我相信它可以满足您的所有需求。

        jobs = (some generator)                   # can consume jobs from a generator
        pool = multiprocessing.Pool(3)            # set number of workers here
        results = pool.imap(process_job, jobs)    # returns a generator
        
        for r in results:                         # loop will block until results arrive
            do_something(r)
        

        如果结果的顺序对您的应用程序无关紧要,您也可以使用imap_unordered

        【讨论】:

        • 嗯。如果我在 Python 中执行任务调度,我猜就可以了。它并没有真正回答这个问题,但它可以很好地解决我的整体问题。我还没有计划在同一个 python 脚本中编写实际的作业调度,并且可能不想这样做。整个系统更加复杂,实际的作业调度当前发生在所有配置都已经可用的 shell 脚本中。我只是想在 Python 中破解一个 10 班轮来处理排序/分配子问题。如果我让def process_job 只创建/打印 shell 命令,这可能仍然有效。
        • @cfi:嗯,我不确定我是否理解。如果您不想从一开始就完全使用 jobs 生成器(就像您在问题中的代码中所做的那样),您需要让 Python 对工作人员和工作供应之间的同步进行一些控制(至少类似于multiprocessing.Queue)。但是,如果您要走这条路,我会让multiprocessing 模块尽可能多地处理它,而不是自己重新发明它的Pool 类。但是,如果您详细解释一下您的系统架构,或许我们可以提出其他建议?
        【解决方案4】:

        关注Tyler's answer

        def doleOut(queue, workers):
            for worker,task in itertools.izip(itertools.cycle(workers),queue):
                yield worker,task
        

        只要有队列,这将继续返回 (worker, task) 元组。所以如果你有一个阻塞waitForMoreWork 你可以这样做:

        queue = []
        doler = distribute_work(workers, queue)
        while 1:
            queue.append(waitForMoreWork)
            currentqueuelen = len(queue)
            for i in range(0,queuelen):
                worker,item = doler.next()
                worker.passitem(item)
        

        这样它会阻塞直到有更多队列项目,然后分发这些,然后再次阻塞。您可以将 waitForMoreWork 表达式设置为一次分发尽可能多的项目。

        【讨论】:

        • 对于 Python 3,请将 itertools.izip 替换为 zip
        • 不幸的是,这是我所拥有的解决方案的一半——尽管它更简单、更好。这只会产生一台发电机,我每个工人都需要一台。
        • @cfi: 除非你有一个分配负载的方法,否则你将如何正确分配它们?唯一的方法是枚举它们并使用模数或类似的值,但是你必须在某个地方提供枚举......
        • 正确。坚持使用生成器,我们必须首先创建类似于doleOut 的输出的东西,然后为每个工人创建itertools.tee 它,然后使用仅在每个nth 步骤返回的生成器。这对我来说似乎有点矫枉过正。
        【解决方案5】:

        你需要预批处理吗?

        为什么不只是有一个队列,并让每个工作人员在完成一个工作单元时退出队列?

        【讨论】:

        • 好点。必须澄清这是关于为应该并行运行的机器安排作业。重点是并行化工作负载以减少从开始到结果的延迟。
        猜你喜欢
        • 1970-01-01
        • 2014-06-28
        • 2022-08-22
        • 1970-01-01
        • 2019-12-25
        • 2021-02-26
        • 2018-06-04
        • 1970-01-01
        • 2012-01-07
        相关资源
        最近更新 更多