【发布时间】:2017-04-10 02:51:55
【问题描述】:
有没有办法将IObservable<Task<T>> 解包成IObservable<T>,并保持相同的事件顺序,像这样?
Tasks: ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->
假设我有一个使用消息流的桌面应用程序,其中一些需要大量的后处理:
IObservable<Message> streamOfMessages = ...;
IObservable<Task<Result>> streamOfTasks = streamOfMessages
.Select(async msg => await PostprocessAsync(msg));
IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks
我想有两种处理方法。
首先,我可以使用异步事件处理程序订阅streamOfTasks:
streamOfTasks.Subscribe(async task =>
{
var result = await task;
Display(result);
});
其次,我可以使用Observable.Create转换streamOfTasks,像这样:
var streamOfResults =
from task in streamOfTasks
from value in Observable.Create<T>(async (obs, cancel) =>
{
var v = await task;
obs.OnNext(v);
// TODO: don't know when to call obs.OnComplete()
})
select value;
streamOfResults.Subscribe(result => Display(result));
无论哪种方式,消息的顺序都不会保留:一些后来的消息 不需要任何后处理比之前的消息更快 需要后期处理。我的两个解决方案都处理传入的消息 并行,但我希望它们一个接一个地按顺序处理。
我可以编写一个简单的任务队列来一次只处理一个任务, 但也许这是一个矫枉过正。在我看来,我遗漏了一些明显的东西。
UPD。我编写了一个示例控制台程序来演示我的方法。到目前为止,所有解决方案都不保留事件的原始顺序。这是程序的输出:
Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0
这里是完整的源代码:
// "C:\Program Files (x86)\MSBuild\14.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;
class Program
{
static void Main()
{
Console.WriteLine("Press ENTER to exit.");
// the source stream
var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));
// solution #1: using async event handler
timerEvents.Subscribe(async x =>
{
var result = await PostprocessAsync(x);
Console.WriteLine($"Async handler: {x}");
});
// solution #2: using Observable.Create
var processedEventsV2 =
from task in timerEvents.Select(async x => await PostprocessAsync(x))
from value in Observable.Create<long>(async (obs, cancel) =>
{
var v = await task;
obs.OnNext(v);
})
select value;
processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));
// solution #3: using FromAsync, as answered by @Enigmativity
var processedEventsV3 =
from msg in timerEvents
from result in Observable.FromAsync(() => PostprocessAsync(msg))
select result;
processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));
Console.ReadLine();
}
static async Task<long> PostprocessAsync(long x)
{
// some messages require long post-processing
if (x % 3 == 0)
{
await Task.Delay(TimeSpan.FromSeconds(2.5));
}
// and some don't
return x;
}
}
【问题讨论】:
-
为什么人们甚至不发表评论就投票关闭它?
-
@yallie - 你能解释一下“事件的顺序”是什么意思吗?创建消息但未完成任务的顺序?
-
@Enigmativity 是的,完全正确。无论处理它们需要多少时间,我都需要保持消息到达的顺序。
-
我写了一个扩展来做这件事——我想把源代码和处理后的结果压缩在一起——顺序很重要。我看看能不能找到。
-
@Enigmativity 那太好了!
标签: c# task-parallel-library .net-4.5 system.reactive rx.net