【发布时间】:2013-07-04 08:40:15
【问题描述】:
我有一个async 方法,它是一个长时间运行的方法,它读取流并在发现某些东西时触发一个事件:
public static async void GetStream(int id, CancellationToken token)
它需要一个取消令牌,因为它是在新任务中创建的。它在内部读取流时调用await:
var result = await sr.ReadLineAsync()
现在,我想将其转换为返回 IObservable 的方法,以便我可以将其与反应式扩展一起使用。根据我的阅读,最好的方法是使用Observable.Create,并且由于 RX 2.0 现在也支持异步,所以我可以使用这样的东西:
public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
return Observable.Create<Message>(
async (IObserver<Message> observer) =>
{
里面的其余代码是相同的,但我调用的是observer.OnNext(),而不是触发事件。但是,这感觉不对。一方面,我在其中混合了 CancellationTokens,虽然添加了 async 关键字使它起作用,但这实际上是最好的做法吗?我这样称呼我的 ObservableStream:
Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));
【问题讨论】:
-
你几乎不应该使用
async void,当然不能在库方法中使用。
标签: c# system.reactive async-await