【问题标题】:Complete Observable Sequence After "long" interval“长”间隔后的完整可观察序列
【发布时间】:2019-02-01 20:10:22
【问题描述】:

以下可观察序列将每个元素添加到 ReplaySubject,以便我可以稍后访问任何元素,甚至等待 ReplaySubject 的完成。它在达到时间跨度后完成 ReaplySubject。

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

我希望序列一直运行到自上次“OnNext”调用以来的给定时间,然后完成。这在各种蓝牙通信场景中非常有用,蓝牙设备会给我一个数据序列,然后在没有任何类型的完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。因此,如果自上次蓝牙通​​知以来已经“太长”了,我想完成 ReplaySubject。

我可以通过创建一个计时器,在收到每个元素时重置它,然后在计时器达到“太长”时完成 ReplaySubject 来做到这一点,但我听说创建一个对象并从可观察对象中操作它订阅不是线程安全的。

关于如何在“太长”间隔后完成序列的任何建议?

据我所知,这是一个不是线程安全的版本,但应该可以按预期工作:

bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
    reading = false;
};

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeWhile(x => reading)
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            timer.Stop();
            timer.Start();

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

根据 Simonare 的第一个答案,这似乎令人满意:

                characteristic.WhenNotificationReceived()
                .Timeout(TimeSpan.FromSeconds(1))
                .Subscribe(
                    onNext: result =>
                    {
                        string nextTag = BitConverter.ToString(result.Data);
                        nextTag = nextTag.Replace("-", "");

                        rfidPlayer.OnNext(nextTag);
                    },
                    onError: error =>
                    {
                        rfidPlayer.OnCompleted();
                    });

【问题讨论】:

  • 请不要使用外部状态(即bool reading = true;),也不要使用计时器。您正在使用一个非常强大的工具,不需要任何这些。当你正确使用 Rx 时,它的效果非常好。
  • 也尽量不要在您的.Subscribe 中工作。试试这个:characteristic.WhenNotificationReceived().TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1))).Select(result =&gt; BitConverter.ToString(result.Data).Replace("-", "")).Subscribe(rfidPlayer);.
  • TakeUntil 是设置整体定时器还是间隔定时器?
  • @Enigmativity 感谢您的 cmets,我的 Rx 有很长的路要走,但在很大程度上取决于您的反馈。
  • 不用担心。我添加了一个答案,我认为您可能会从中受益匪浅。它使用了两个我认为是最重要但经常未被充分利用的运算符——GenerateSwitch。如果你觉得有用,请告诉我。

标签: c# asynchronous system.reactive


【解决方案1】:

您可以考虑使用Timeout Operator 。唯一的缺点是它以错误信号终止。您可能需要处理错误错误

如果 Observable 在指定的时间范围内未能发出任何项目,Timeout 运算符允许您使用 onError 终止来中止 Observable。

如果你使用下面的方法你可以超越错误

 .Timeout(200, Promise.resolve(42));

另一个变体允许您指示超时切换到您指定的备份 Observable,而不是在触发超时条件时以错误终止。


characteristic.WhenNotificationReceived()
    .Timeout(TimeSpan.FromSeconds(1))
    .Subscribe(
        onNext: result =>
        {
            ....
            rfidPlayer.OnNext(....);
        },
        onError: error =>
        {
            rfidPlayer.OnCompleted();
        });

【讨论】:

  • 超时运算符的工作方式与 TakeUntil 运算符相同,只是它会抛出异常而不是完成序列。
  • 你可以坚持使用 TakeUntil,我认为你不会遇到任何问题。
  • TakeUntil 在总时间过去后完成。我正在寻找一种解决方案,该解决方案在自最后一个元素到达后​​的给定间隔过去时完成。我已经发布了示例代码,其行为方式符合我的要求,但不是线程安全的。
  • 超时这样做
  • 看看我上面的编辑。我已将您的答案标记为答案,但如果您更新答案以包含 onError 块会很好。
【解决方案2】:

因为有例外,我觉得使用Timeout 很讨厌。

我更喜欢在序列中注入一个值,我可以用它来终止序列。例如,如果我的序列产生非负数,那么如果我注入 -1,我就知道结束序列。

这是一个例子:

从这个 observable 开始,它生成从 1 开始的 2 的幂,它还会将每个值的生成延迟挂起值的毫秒数。

Observable
    .Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))

所以1、2、4、8等,越来越慢。

如果3.0 秒没有值,现在我想停止这个序列,然后我可以这样做:

    .Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
    .Switch()
    .TakeWhile(x => x >= 0)

如果我运行这个序列,我会得到这个输出:

1 2 4 8 16 32 64 128 256 512 1024 2048

序列即将产生4096,但它首先等待4096 毫秒以产生该值 - 同时Observable.Timer(TimeSpan.FromSeconds(3.0)) 触发并输出-1 从而停止序列。

这个查询的关键部分是使用Switch。它接受IObservable&lt;IObservable&lt;T&gt;&gt; 并通过仅订阅最新的外部可观察对象并取消订阅前一个对象来生成IObservable&lt;T&gt;

因此,在我的查询中,序列生成的每个新值都会停止并重新启动 Timer

在你的情况下,你的 observable 看起来像这样:

characteristic
    .WhenNotificationReceived()
    .Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
    .Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
    .Switch()
    .TakeWhile(x => x != null)
    .Subscribe(rfidPlayer);

【讨论】:

    【解决方案3】:

    这是您可以使用的自定义运算符 TakeUntilTimeout,它是内置 Timeout 运算符之上的薄层。

    /// <summary>
    /// Applies a timeout policy for each element in the observable sequence.
    /// If the next element isn't received within the specified timeout duration
    /// starting from its predecessor, the sequence terminates.
    /// </summary>
    public static IObservable<T> TakeUntilTimeout<T>(
        this IObservable<T> source,
        TimeSpan timeout)
    {
        return source.Timeout(timeout, Observable.Empty<T>());
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-08-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多