【问题标题】:How is an observable subscription gracefully terminated?可观察订阅如何优雅地终止?
【发布时间】:2017-07-19 14:37:09
【问题描述】:

我正在尝试使用响应式扩展 (Rx) 来处理数据流。但是,每个元素的处理可能需要一些时间。为了中断处理,我使用了CancellationToken,它有效地停止了订阅。

当请求取消时,我如何优雅地完成当前工作并正确终止而不丢失任何数据?

示例

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

observable
    .Subscribe(
        value =>
            {
                Console.WriteLine(value);
                Thread.Sleep(500); // Simulate processing
                
                if (cts.Token.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation detected on {0}.", value);
                    Thread.Sleep(500); // Simulate some time consuming shutdown
                    Console.WriteLine("Cleaning up done for {0}.", value);
                }
            },
        () => Console.WriteLine("Completed"),
        cts.Token);
        
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");

输出

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.

从输出中可以看出,“作业终止”这行不是最后一行,这意味着清理在应用程序终止之前没有足够的时间完成。

预期输出

0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.

“作业终止”行是要打印的最后一行。 “取消”和“清理”行已被允许慢慢来。

(编辑:添加预期输出)

【问题讨论】:

  • 您希望输出是什么?
  • 在这个例子中,应用线程在订阅之前退出。我找不到任何方法来等待订阅的 OnNext 处理程序完成其工作。我不希望应用程序在处理程序完成之前退出,所以“作业终止”应该是最后一次打印。

标签: c# system.reactive cancellation


【解决方案1】:

如果我正确理解了这个问题,这不是 Rx 问题,这是一个“无论你在订阅中做什么”问题。您的订阅操作需要半秒,清理可能需要半秒,您的工作终止需要微秒。您希望在取消和终止之间插入什么?

我能给你的最好建议是让订阅操作比 Thread.Sleep 调用更好地兑现取消令牌。

【讨论】:

  • 我希望有某种机制允许应用程序等待订阅的 OnNext 处理程序完成其工作。
  • @Reyhn - 使用 1WaitHandle`。
  • @Reyhn - 使用 1WaitHandle`。
【解决方案2】:

使用similar question 的答案和a question about waiting before terminating 的答案,我想出了一个可以满足我需求的解决方案。

我最初的问题是我找不到等待订阅线程的方法。上面链接的答案引导我以三种方式重构代码:

  1. 我将取消逻辑从订阅移到可观察对象中。

  2. 订阅包含在自己的Task 中(因此可以继续执行ReadLine 语句)。

  3. 引入了ManualResetEvent 来控制应用程序退出策略。

解决方案:

var reset = new ManualResetEvent(false);

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250))
    .TakeWhile(x => !cts.Token.IsCancellationRequested)
    .Finally(
        () =>
            {
                Console.WriteLine("Finally: Beginning finalization.");
                Thread.Sleep(500);
                Console.WriteLine("Finally: Done with finalization.");
                reset.Set();
            });

await Task.Factory.StartNew(
    () => observable
        .Subscribe(
            value =>
                {
                    Console.WriteLine("Begin: {0}", value);
                    Thread.Sleep(2000);
                    Console.WriteLine("End: {0}", value);
                },
            () => Console.WriteLine("Completed: Subscription completed.")),
    TaskCreationOptions.LongRunning);

Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");

输出:

Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.

对 Reactive Extensions 很陌生,我不知道这是否是解决我的问题的最佳方法。但这对问题中发布的示例来说是一个很大的改进,因为它满足了我的要求:

  • 每个 OnNext 操作都允许运行到完成。
  • 应用程序等待流处理完成(由ManualResetEvent 发出信号)。
  • 流取消逻辑在TakeWhile-方法中移至生产者(而不是消费者)。
  • 应用程序终止逻辑是对生产者Finally-方法中的流取消的反应。

这是一个更好的解决方案。

【讨论】:

  • 这个解决方案多年来一直在生产中运行良好。但是,@Theodor-Zoulias 的答案完全依赖于 Reactive,这是一个更好的解决方案。
【解决方案3】:

Observable 是 (a) 可等待的。订阅 observables 是不可等待的。因此,如果您想等待订阅代码完成,而不是求助于使用ManualResetEvents 之类的人工解决方案,您应该使您的订阅代码成为派生可观察对象的副作用,并(a)等待该可观察对象。您的问题中提供的示例有额外的要求,这使事情变得有点复杂,但没那么复杂:

  1. 您想在订阅 observable 和等待它完成之间做其他事情(Console.ReadLine() 等)。

  2. 你想在 CancellationToken 被取消时终止 observable。

以下是如何满足这些要求的示例。它仅显示了解决此问题的众多可用方法之一:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

var withCancellation = observable
    .TakeUntil(Observable.Create<Unit>(observer =>
        cts.Token.Register(() => observer.OnNext(default))));

var withSideEffectsAndCancellation = withCancellation
    .Do(value =>
    {
        Console.WriteLine(value);
        Thread.Sleep(500);

        if (cts.Token.IsCancellationRequested)
        {
            Console.WriteLine("Cancellation detected on {0}.", value);
            Thread.Sleep(500);
            Console.WriteLine("Cleaning up done for {0}.", value);
        }
    }, () => Console.WriteLine("Completed"));

var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
    .Publish()
    .AutoConnect(0);

Console.ReadLine();
cts.Cancel();

hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");

解释:

  1. .TakeUntil...cts.Token.Register... 是当cts.Token 被取消时立即取消订阅Interval observable 的惯用方式。它是从relevant question 复制粘贴的。您也可以使用更简单的.TakeWhile(x =&gt; !cts.Token.IsCancellationRequested),前提是您可以接受稍微不那么响应的取消。

  2. Do 运算符是执行订阅副作用的一种自然方式,因为它与 Subscribe 方法具有相同的参数。

  3. .Publish().AutoConnect(0); 立即使序列变得热门。 AutoConnect 运算符无法断开与底层 observable 的连接(与 RefCount 运算符相反),但在这种特殊情况下不需要断开功能。底层 observable 的生命周期已经由我们之前附加的 CancellationToken 控制。

  4. .Wait() 之前的.DefaultIfEmpty() 是必需的,以防止在生成任何元素之前取消序列的边缘情况下出现InvalidOperationException。如果你 await 异步序列,它也是必需的。这些等待 observable 的机制(以及其他类似 RunAsyncToTask 运算符的机制)正在返回 observable 发出的最后一个值,当不存在这样的值时它们会感到沮丧。

【讨论】:

  • 感谢您提供详细解释的答案!感谢您尝试留在 Reactive 中解决问题,但此解决方案仍然不允许 Do-方法在终止之前完成其工作。
  • @Reyhn 是的,你是对的。我变得过于热情,错过了这个重要的细节。我编辑了代码,以便在执行副作用之前观察到取消。我认为现在应该按预期工作。 Wait 将阻塞当前线程,直到所有处理完成。
  • 如果您将Console.WriteLine("Job terminated."); 下移到最后,您的答案就完美了!我将您的答案标记为已接受,因为它保留在 Reactive 中,我认为这比在我的原始解决方案中使用 ManualResetEvent 更好。感谢您的帮助!
猜你喜欢
  • 2017-04-04
  • 1970-01-01
  • 1970-01-01
  • 2018-06-23
  • 2019-04-14
  • 2018-07-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多