【发布时间】:2017-01-30 11:35:27
【问题描述】:
是否有类似zip 的操作,但它不等待整个元组聚集,而是在每次更改时发出元组。
例如,如果它只是发射器 1A 和 B 出现在第二个 observable 上,它会立即发射 1B,这是“最新的”元组。
一开始,此操作应等到所有N 元素聚集。
【问题讨论】:
标签: reactive-programming reactivex
是否有类似zip 的操作,但它不等待整个元组聚集,而是在每次更改时发出元组。
例如,如果它只是发射器 1A 和 B 出现在第二个 observable 上,它会立即发射 1B,这是“最新的”元组。
一开始,此操作应等到所有N 元素聚集。
【问题讨论】:
标签: reactive-programming reactivex
您要查找的内容通常称为zipLatest。
以下是 Python 中的示例实现:
from typing import *
import rx
import rx.operators as ops
def zip_latest(*xss: rx.Observable) -> rx.Observable:
helper = ZipLatestHelper(len(xss))
return mux(*xss).pipe(
ops.map(helper.process),
ops.filter(lambda x: x is not None),
)
def mux(*xss: rx.Observable) -> rx.Observable:
def pair_index(i: int) -> Callable[[Any], Tuple[int, Any]]:
def inner(x: Any) -> Tuple[int, Any]:
return i, x
return inner
paired = [xs.pipe(ops.map(pair_index(i))) for i, xs in enumerate(xss)]
return rx.from_iterable(paired).pipe(ops.merge_all())
class ZipLatestHelper:
def __init__(self, num_streams):
self.latest = [None for _ in range(num_streams)]
self.ready = set()
def process(self, pair: Tuple[int, Any]) -> Optional[Tuple[Any, ...]]:
i, x = pair
self.latest[i] = x
self.ready.add(i)
return (
tuple(self.latest) if len(self.ready) == len(self.latest) else None
)
及用法:
from time import sleep
zipped = zip_latest(
rx.interval(0.5).pipe(ops.map(lambda i: f"A{i}")),
rx.interval(0.3).pipe(ops.map(lambda i: f"B{i}")),
)
zipped.subscribe(print)
sleep(10)
有输出:
('A0', 'B0')
('A0', 'B1')
('A0', 'B2')
('A1', 'B2')
('A1', 'B3')
('A2', 'B3')
('A2', 'B4')
('A2', 'B5')
注意事项:
OnCompleted,应该怎么办?压缩后的流应该继续发送项目,还是应该发出OnCompleted?OnError,应该怎么办?【讨论】: