【问题标题】:An observable to emit tuple of latest values of N other observables?一个 observable 发出 N 个其他 observables 的最新值的元组?
【发布时间】:2017-01-30 11:35:27
【问题描述】:

是否有类似zip 的操作,但它不等待整个元组聚集,而是在每次更改时发出元组。

例如,如果它只是发射器 1AB 出现在第二个 observable 上,它会立即发射 1B,这是“最新的”元组。

一开始,此操作应等到所有N 元素聚集。

【问题讨论】:

    标签: reactive-programming reactivex


    【解决方案1】:

    您要查找的内容通常称为zipLatest

    以下是 Python 中的示例实现:

    from typing import *
    import rx
    import rx.operators as ops
    
    def zip_latest(*xss: rx.Observable) -> rx.Observable:
        helper = ZipLatestHelper(len(xss))
        return mux(*xss).pipe(
            ops.map(helper.process),
            ops.filter(lambda x: x is not None),
        )
    
    def mux(*xss: rx.Observable) -> rx.Observable:
        def pair_index(i: int) -> Callable[[Any], Tuple[int, Any]]:
            def inner(x: Any) -> Tuple[int, Any]:
                return i, x
            return inner
    
        paired = [xs.pipe(ops.map(pair_index(i))) for i, xs in enumerate(xss)]
        return rx.from_iterable(paired).pipe(ops.merge_all())
    
    class ZipLatestHelper:
        def __init__(self, num_streams):
            self.latest = [None for _ in range(num_streams)]
            self.ready = set()
    
        def process(self, pair: Tuple[int, Any]) -> Optional[Tuple[Any, ...]]:
            i, x = pair
            self.latest[i] = x
            self.ready.add(i)
            return (
                tuple(self.latest) if len(self.ready) == len(self.latest) else None
            )
    

    及用法:

    from time import sleep
    
    zipped = zip_latest(
        rx.interval(0.5).pipe(ops.map(lambda i: f"A{i}")),
        rx.interval(0.3).pipe(ops.map(lambda i: f"B{i}")),
    )
    zipped.subscribe(print)
    sleep(10)
    

    有输出:

    ('A0', 'B0')
    ('A0', 'B1')
    ('A0', 'B2')
    ('A1', 'B2')
    ('A1', 'B3')
    ('A2', 'B3')
    ('A2', 'B4')
    ('A2', 'B5')
    

    注意事项:

    • 可能不是线程安全的。
    • 如果流收到OnCompleted,应该怎么办?压缩后的流应该继续发送项目,还是应该发出OnCompleted
    • 如果流收到OnError,应该怎么办?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多