【问题标题】:How to dispose of an observable on completion of another observable?如何在完成另一个可观察对象时处置可观察对象?
【发布时间】:2016-04-19 22:29:26
【问题描述】:

我有一个 source observable,我订阅了一个 logger 观察者用于日志记录。

我还订阅了source,所以我可以执行计算。当我的计算完成后,我已经完成了source,我想处置logger

             +-------------------+
             |                   |
   +---------+ source observable +--------+
   |         |                   |        |
   |         +-------------------+        |
   |                                      |
   |                                      |
+--v---------------+         +------------v--------+
|                  |         |                     |
|     logger       |         |    computations     |
|    (observer)    |         |    (observable)     |
+-------^----------+         +-----------+---------+
        |                                |
        |                                |
        |        dispose logger          |
        +--------------------------------+
            when computations completed

但是,logger 并没有在正确的时间完全释放——通常会发生一两个额外的滴答声:

MWE

from rx import Observable

# Some source
source = Observable.interval(1)

# Create logger for source
logged = []
logger = source.subscribe(logged.append)

# Now do stuff/computations with source
calculated = source.map(lambda x: x**2).take_while(lambda x: x < 20)

# Output computed values and stop logging when we're done with our computation
calculated.subscribe(print, print, logger.dispose)

# I expect only values that passed through our computation to have been logged
# The last value should be 5 because 5**2 = 25 which is larger than 20
# which in turn causes our computation to terminate
assert logged == [0, 1, 2, 3, 4, 5], logged

但我明白了:

Traceback (most recent call last):
  File "C:\Program Files (x86)\Python27\lib\site-packages\IPython\core\interactiveshell.py", line 3035, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-54-e8cb1fb583bf>", line 1, in <module>
    assert logged == [0, 1, 2, 3, 4, 5], logged
AssertionError: [0, 1, 2, 3, 4, 5, 6, 7]

7 是如何被记录的?我们的计算应该在 source 发出 5 后终止,此时 logger 被释放。

我做错了什么?

【问题讨论】:

    标签: python reactive-programming rx-py


    【解决方案1】:

    这是线程同步问题。 interval() 运算符启动新线程以在指定的时间间隔内调用 on_next()。处理订阅后,其他线程检测到此信号并停止工作需要时间。一毫秒接近它所花费的时间。

    为了记录通过反应链传递的消息,将记录功能直接插入该链会更可靠:

    logged = []
    def logger(x):
        logged.append(x)
        return x
    
    calculated = source \
        .map(logger) \
        .map(lambda x: x**2) \
        .take_while(lambda x: x < 20) \
        .subscribe(print, print)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-08-19
      • 2019-04-23
      • 2019-02-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多