【问题标题】:Converting an async method to return IObservable<>将异步方法转换为返回 IObservable<>
【发布时间】:2013-07-04 08:40:15
【问题描述】:

我有一个async 方法,它是一个长时间运行的方法,它读取流并在发现某些东西时触发一个事件:

public static async void GetStream(int id, CancellationToken token)

它需要一个取消令牌,因为它是在新任务中创建的。它在内部读取流时调用await

var result = await sr.ReadLineAsync()

现在,我想将其转换为返回 IObservable 的方法,以便我可以将其与反应式扩展一起使用。根据我的阅读,最好的方法是使用Observable.Create,并且由于 RX 2.0 现在也支持异步,所以我可以使用这样的东西:

public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
    return Observable.Create<Message>(
        async (IObserver<Message> observer) =>
            {

里面的其余代码是相同的,但我调用的是observer.OnNext(),而不是触发事件。但是,这感觉不对。一方面,我在其中混合了 CancellationTokens,虽然添加了 async 关键字使它起作用,但这实际上是最好的做法吗?我这样称呼我的 ObservableStream:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));

【问题讨论】:

  • 你几乎不应该使用async void,当然不能在库方法中使用。

标签: c# system.reactive async-await


【解决方案1】:

你是对的。一旦您通过IObservable 表示您的接口,您应该避免要求调用者提供CancellationToken。这并不意味着您不能在内部使用它们。 Rx 提供了几种机制来生成 CancellationToken 实例,当观察者取消订阅您的 observable 时,这些实例会被取消。

有多种方法可以解决您的问题。最简单的方法几乎不需要更改您的代码。它使用Observable.Create 的重载,它为您提供CancellationToken,如果调用者取消订阅就会触发:

public static IObservable<Message> ObservableStream(int id)
{
    return Observable.Create<Message>(async (observer, token) =>
    {
         // no exception handling required.  If this method throws,
         // Rx will catch it and call observer.OnError() for us.
         using (var stream = /*...open your stream...*/)
         {
             string msg;
             while ((msg = await stream.ReadLineAsync()) != null)
             {
                 if (token.IsCancellationRequested) { return; }
                 observer.OnNext(msg);
             }
             observer.OnCompleted();
         }
    });
}

【讨论】:

  • 不应该 if (token.IsCancellationRequested) { return; }if (token.IsCancellationRequested) { break; } 以便调用 OnCompleted 吗?
  • @TiMoch 没有观察者已经取消订阅,在这种情况下不需要向他们发送任何额外的通知
【解决方案2】:

您应该更改 GetStream 以返回一个 Task,而不是 void(返回 async void 不好,除非绝对需要,正如 svick 评论的那样)。一旦你返回一个任务,你只需调用 .ToObservable() 就可以了。

例如:

public static async Task<int> GetStream(int id, CancellationToken token) { ... }

那么,

GetStream(1, new CancellationToken(false))
   .ToObservable()
   .Subscribe(Console.Write);

【讨论】:

  • 我认为GetStream() 会多次触发一个事件,所以Task&lt;int&gt; 还不够。
  • 是的,签名是一个例子,但在编辑中,我暗示通过直接使用 ToObservable() 来摆脱 GetStream。我正在更新我的答案并进行一些澄清。
  • 嗯,好的,谢谢。是的,GetStream 永远运行并在找到消息时触发一个事件。所以,如果我改用Task&lt;Message&gt;,这有意义吗?我会为了别的事情而放弃那里的事件触发吗?
  • 对不起,不能直接使用 sr.ReadLineAsync().ToObservable() ,否则只会读取一行。我正在删除该建议。
  • @MattRoberts 是的,这是一个很好的可能性。我在想别的东西,但实际上我最终得到了我也不喜欢的东西。我将不得不考虑更多,也许稍后会想出更好的东西,否则我只会得到 return Observable.Create(async o => { string text; while((text = await sr.ReadLineAsync()) != null) { o.OnNext(new Message { Text = text }); } o.OnCompleted(); });
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-11
  • 2021-04-12
相关资源
最近更新 更多