【问题标题】:How to wait for RxPy parallel threads to complete如何等待 RxPy 并行线程完成
【发布时间】:2017-05-15 21:33:02
【问题描述】:

基于excellent SO answer,我可以让多个任务在 RxPy 中并行工作,我的问题是如何等待它们全部完成?我知道使用线程我可以做到.join(),但 Rx 调度程序似乎没有任何这样的选项。 .to_blocking() 也无济于事,主线程在所有通知触发并调用完整处理程序之前完成。这是一个例子:

from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread

def printthread(val):
    print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
    printthread("calc {}".format(value))
    time.sleep(random.randint(5, 20) * .1)
    return value

if __name__ == "__main__":
    Observable.range(1, 3) \
        .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
        .observe_on(Scheduler.event_loop) \
        .subscribe(
            on_next=lambda x: printthread("on_next: {}".format(x)),
            on_completed=lambda: printthread("on_completed"),
            on_error=lambda err: printthread("on_error: {}".format(err)))

    printthread("\nAll done")
    # time.sleep(2)

预期输出

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread

实际输出

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread

取消注释睡眠调用时的实际输出

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4

【问题讨论】:

    标签: python multithreading python-multithreading reactivex rx-py


    【解决方案1】:

    在这里发布完整的解决方案:

    from __future__ import print_function
    import os, sys
    import time
    import random
    from rx import Observable
    from rx.core import Scheduler
    from threading import current_thread
    from rx.concurrency import ThreadPoolScheduler
    
    def printthread(val):
        print("{}, thread: {}".format(val, current_thread().name))
    
    def intense_calculation(value):
        printthread("calc {}".format(value))
        time.sleep(random.randint(5, 20) * .1)
        return value
    
    if __name__ == "__main__":
        scheduler = ThreadPoolScheduler(4)
    
        Observable.range(1, 3) \
            .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=scheduler)) \
            .observe_on(Scheduler.event_loop) \
            .subscribe(
                on_next=lambda x: printthread("on_next: {}".format(x)),
                on_completed=lambda: printthread("on_completed"),
                on_error=lambda err: printthread("on_error: {}".format(err)))
    
        printthread("\nAll done")
        scheduler.executor.shutdown()
        # time.sleep(2)
    

    【讨论】:

    • 一些操作符在 RXpy version3 上不起作用,并且 select_many 似乎不再存在。
    【解决方案2】:

    对于ThreadPoolScheduler,您可以:

    1. scheduler = ThreadPoolScheduler(pool_size)
    2. 并行调用。
    3. scheduler.executor.shutdown()

    那么,完成后就可以得到所有结果了。

    【讨论】:

    • 完美!谢谢卡卡,scheduler.executor.shutdown() 做到了。
    【解决方案3】:

    使用run() 等待 RxPy 并行线程完成。

    BlockingObservables 已从 RxPY v3 中删除。

    from threading import current_thread
    import rx, random, multiprocessing, time
    from rx import operators as ops
    
    def intense_calculation(value):
       delay = random.randint(5, 20) * 0.2
       time.sleep(delay)
       print("From adding_delay: {0} Value : {1} {2}".format(current_thread(), value, delay))
       return (value[0], value[1]+ " processed")
    
    thread_pool_scheduler = rx.scheduler.NewThreadScheduler()
    
    my_dict={'A':'url1', 'B':'url2', 'C':'url3'}
    
    new_dict = rx.from_iterable(my_dict.items()).pipe(
        ops.flat_map(lambda a: rx.of(a).pipe(
            ops.map(lambda a: intense_calculation(a)),
            ops.subscribe_on(thread_pool_scheduler)
        )),
        ops.to_dict(lambda x: x[0], lambda x: x[1])
    ).run()
    
    print("From main: {0}".format(current_thread()))
    print(str(new_dict))
    

    【讨论】:

      猜你喜欢
      • 2018-06-21
      • 2012-04-09
      • 2010-12-07
      • 1970-01-01
      • 2014-03-18
      • 2020-11-21
      • 2010-11-18
      相关资源
      最近更新 更多