【问题标题】:RX stream of start and stop events with late subscription延迟订阅的开始和停止事件的 RX 流
【发布时间】:2017-10-27 10:23:32
【问题描述】:

我正在尝试创建一个开始和停止事件的主题,其中迟到的订阅者只收到未完成的开始事件。 IE。那些还没有相应的停止事件。

这是一些 RxPY 代码:

from rx.subjects import ReplaySubject

start = ReplaySubject()

start.subscribe(lambda x: print("subscriber1: " + str(x)))

start.on_next(("a", "start"))
start.on_next(("b", "start"))
start.on_next(("b", "stop"))

start.subscribe(lambda x: print("subscriber2: " + str(x)))

start.on_next(("c", "start"))

这给出了输出:

subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber2: ('b', 'start')
subscriber2: ('b', 'stop')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')

而我想:

subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')

我认为像扫描运算符这样的东西是必需的,但不能完全放在一起。任何想法都非常感谢:)

【问题讨论】:

    标签: rxjs reactive-programming rx-py


    【解决方案1】:

    最干净的解决方案是使用主流之外的副作用来更新字典并将未完成的事件合并到新订阅者。

    class EventObserver(Observer):
      def __init__(self):
        self.cached_events = set()
        self.mirror = Subject() # re-emits all values
    
      on_next(self, value):
        self.mirror.next(value) # stream to late observers
        if(value[1] == 'stop'):
          try:
            self.cached_events.remove(value[0])
          except KeyError:
            pass
        else:
          self.cached_events.add(value[0])
    
      on_error(self, e):
        self.mirror.error(e) # + other error logic
    
      on_completed(self):
        self.mirror.complete() # + other completion logic
    
      late_subscribe(self, subscriber):
        return Observable.merge(
          Observable.from(list(self.cached_events)),
          self.mirror
        ).subscribe(subscriber)
    

    如下使用:

    event_observer = EventObserver()
    events$.subscribe(event_observer)
    
    # late subscription:
    event_observer.late_subscribe(...)
    

    答案的其余部分解释了为什么您可能更喜欢这种方法而不是被动方法。


    反应式方法:

    这是我能想到的最简单的解决方案,如果您不介意迟到的订阅者等到下一个活动。如您所见,它不是最漂亮的。

    pub_events$ = events$.publish(); # in case your events$ aren't hot
    replay_events$ = pub_events$.replay();
    
    # late subscription:
    replay_events$.window(events$.take(1))
                  .scan(lambda is_first, o: 
                          o.reduce(lambda D, x: D.update({ x[0]: x[1] == 'stop' }) or D, {})
                           .flatMap(lambda D: Observable.from([ k for k, v in D.items() if v == False ]))
                          if is_first == True else o,
                        True)
                  .flatMap(lambda o: o)
    

    目标是使用从所有先前事件的缓存中构建的未完成事件的过滤列表开始延迟订阅。最大的障碍是ReplaySubject 没有将这些缓存事件与新事件区分开来。解决上述问题的第一步是在下一个事件上 window,期望 ReplaySubject 在此之前发出缓存的事件。由于您的要求听起来像是优化而不是正确性,因此这里的竞争条件可能没什么大不了的。

    最多有两个窗口:一个缓存事件,一个新事件(如果有的话),所以scan 稍微利用 Python 类型的弱点来检查我们在哪个窗口。如果是对于缓存的事件,我们构建一个事件键字典→该事件是否“停止”。最后一步是使用flatMap 将未停止的值注入回流中。

    【讨论】:

      猜你喜欢
      • 2017-11-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多