【发布时间】:2017-07-19 14:37:09
【问题描述】:
我正在尝试使用响应式扩展 (Rx) 来处理数据流。但是,每个元素的处理可能需要一些时间。为了中断处理,我使用了CancellationToken,它有效地停止了订阅。
当请求取消时,我如何优雅地完成当前工作并正确终止而不丢失任何数据?
示例
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
输出
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
从输出中可以看出,“作业终止”这行不是最后一行,这意味着清理在应用程序终止之前没有足够的时间完成。
预期输出
0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.
“作业终止”行是要打印的最后一行。 “取消”和“清理”行已被允许慢慢来。
(编辑:添加预期输出)
【问题讨论】:
-
您希望输出是什么?
-
在这个例子中,应用线程在订阅之前退出。我找不到任何方法来等待订阅的 OnNext 处理程序完成其工作。我不希望应用程序在处理程序完成之前退出,所以“作业终止”应该是最后一次打印。
标签: c# system.reactive cancellation