【问题标题】:Create an interval and countdown using rxpy使用 rxpy 创建间隔和倒计时
【发布时间】:2020-09-08 22:34:22
【问题描述】:

我正在制作一个 Discord 机器人,它允许您创建投票。用户可以将投票的持续时间作为参数。因此,我想每隔 5 秒或 10 秒(或者更多)刷新一次消息,并通过投票编辑用户还剩多少时间。例如,我想从 3600 秒开始倒计时,每 5 秒或 10 秒执行一个函数,该函数将编辑消息,直到时间变​​为 0。机器人端的一切都在我的控制之下,或多或少我知道如何实现它。

所以,我的想法是在当前时间等于开始时间 + 轮询持续时间时进行间隔并停止。所以我可以使用 rx.interval() 来创建 observable 并使用像 .take_while() 这样的运算符。

这是我的代码:

import time
import rx
print(rx.__version__) # 3.1.1

started_at = time.time() # Time in seconds
end_at = started_at + 3600 # One hour after
source = rx.interval(5).take_while(time.time() < end_at)

但我得到AttributeError: 'Observable' object has no attribute 'take_while'

我想我应该把它放在管道或类似的东西中:

from rx import operators as op
sub = source.pipe(op.take_while(time.time() < end_at))

但我得到TypeError: 'bool' object is not callable

如何使用 take_while?谢谢!

【问题讨论】:

    标签: python rx-py


    【解决方案1】:

    您应该通过管道传输源然后订阅。您必须在 Observable 具有的 pipe() 方法中使用运算符 (https://rxpy.readthedocs.io/en/latest/reference_observable.html#rx.Observable.pipe)

    代码应该是这样的

    import time
    import rx
    from rx import operators as op
    
    print(rx.__version__) # 3.1.1
    
    started_at = time.time() # Time in seconds
    end_at = started_at + 3600 # One hour after
    ob = rx.interval(5)
    sub = ob.pipe(op.take_while(lambda _: time.time() < end_at))
    sub.subscribe(lambda i: print(i))
    

    【讨论】:

    • 上面的代码不起作用。我检查了 3.9python 和 3.2.0 rxpy。半年前也没用。这个库中的间隔 nad 计时器似乎有问题
    猜你喜欢
    • 2019-02-02
    • 1970-01-01
    • 1970-01-01
    • 2017-04-23
    • 1970-01-01
    • 1970-01-01
    • 2020-07-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多