【问题标题】:How to wrap async callback with Observable or async/await?如何用 Observable 或 async/await 包装异步回调?
【发布时间】:2017-10-17 01:54:06
【问题描述】:

我正在使用与回调一起使用的网络 API。所以基本上,我有一堆方法调用需要在这个 3rd 方库中使用,它们看起来像这样:

void SendNetworkRequest(string requestType, Action<Response> callback)

我发现代码有点古怪,因为我的所有方法都依赖于来自这个 3rd 方 API 的网络资源,也需要自己实现回调。例如,在我的主要场景中,我可能想要获取玩家信息,我的代码如下所示:

void InitMainScene()
{
       _networkHelper.SendNetworkRequest("GetPlayersInfo",OnPlayerInfoResponse);
}

void OnPlayerInfoResponse(Response response)
{
     _playerInfo = response.Info;
}

我最近接触了 RX,并在我的代码中大量使用它。我已经阅读了一些关于 async/await 的内容。我已经做了很多实验,尤其是使用 RX,并尝试使用 Observable.FromAsync() 但无法让它工作..

我在这里缺少什么?如何编写更简洁且不需要使用 RX 或 async/await 回调的代码?以下是我正在寻找的伪代码:

        void InitMainScene()
        {
            _playerInfo = Overservable.DoMagicThing<Response>( () => {
_networkHelper.SendNetworkRequest("GetPlayersInfo",(r) =>{return r;}); });
        }

【问题讨论】:

    标签: c# asynchronous async-await system.reactive reactive-programming


    【解决方案1】:

    以下是使用 Rx 的方法。我将Response 换成了string,这样我就可以编译和测试了,但是换回来很简单。:

    public IObservable<string> SendNetworkRequestObservable(string requestType)
    {
        return Observable.Create<string>(observer =>  
        {
            SendNetworkRequest(requestType, s => observer.OnNext(s));
            observer.OnCompleted();
            return Disposable.Empty;
        });
    }
    
    // Define other methods and classes here
    public void SendNetworkRequest(string requestType, Action<string> callback)
    {
        callback(requestType); // implementation for testing purposes
    }
    

    【讨论】:

    • Urrgh - 如果您曾经返回 Disposable.Empty,那么您可能做错了什么。
    • 感谢@Schlomo!这对我帮助很大!我会玩这个,并尝试更好地理解 Enigmativity 所描述的 Disposable.Empty 的含义
    • @Enigmativity,请随意扩展原因。我不同意:在这种情况下,没有什么可处置的,但订阅需要一次性。这是为什么Disposable.Empty 存在的完美示例。
    • @Shlomo - 我看Disposable.Empty 有点像catch (Exception ex) - 这是一种反模式。只需进行几次.OnNext 调用并返回Disposable.Empty 通常很容易——这总是会导致同步订阅调用。我将尝试找到一种避免使用Disposable.Empty 的方法作为一种好习惯。你是对的,这可能是一个完美的例子来说明它为什么存在。
    • @Shlomo - 我看Disposable.Empty 有点像catch (Exception ex) - 这是一种反模式。只需进行几次.OnNext 调用并返回Disposable.Empty 通常很容易——这总是会导致同步订阅调用。我将尝试找到一种避免使用Disposable.Empty 的方法作为一种好习惯。你是对的,这可能是一个完美的例子来说明它为什么存在。
    【解决方案2】:

    我不确定 RX。但是,您可以将基于回调的 SendNetworkRequest 转换为可等待的任务,如下所示:

    void SendNetworkRequest(string requestType, Action<Response> callback)
            {
                // This code by third party, this is here just for testing
                if (callback != null)
                { 
                    var result = new Response();
                    callback(result);
                }
            }
    
            Task<Response> SendNetworkRequestAsync(string requestType)
            {
                return Task.Run(() =>
                {
                    var t = new TaskCompletionSource<Response>();
    
                    SendNetworkRequest(requestType, s => t.TrySetResult(s));
    
                    return t.Task;
                });
            }
    

    现在您可以更自然地使用带有 async/await 的 SendNetworkRequestAsync

    【讨论】:

    • 非常感谢!这很棒!我今天会玩这个。给你的问题 - 在我的情况下,你是否推荐这种语义而不是 Overservable?顺便说一句 - 我还没有使用 async/await,事实上,我在 Unity3d 中这样做,它只支持 .net 4.6 作为实验性功能。
    • RX 在处理事件流时非常有用。在您的情况下,您调用 SendNetworkRequest 然后等待任务完成,就是这样。所以是的,在这种情况下,async/await 比 Rx 更容易、更合理。干净更方便
    【解决方案3】:

    这是我对 Rx 的看法。

    NetworkHelper类里面添加这个方法:

    public IObservable<Response> SendNetworkRequestObservable(string requestType)
    {
        return
            Observable
                .Create<Response>(observer =>
                {
                    Action<Response> callback = null;
                    var callbackQuery =
                        Observable
                            .FromEvent<Response>(h => callback += h, h => callback -= h)
                            .Take(1);
                    var subscription = callbackQuery.Subscribe(observer);
                    this.SendNetworkRequest(requestType, callback);
                    return subscription;
                });
    }
    

    这将创建一个可观察对象,该可观察对象在内部使用 Observable.FromEvent(...).Take(1) 创建一个可观察对象,该可观察对象基于预期对传递给 SendNetworkRequest 方法的 Action&lt;Response&gt; callback 的单个调用。

    唯一的问题是调用发生在当前线程上直到完成。如果您希望这段代码在后台线程上运行,但将结果返回给当前线程,那么您可以这样做:

    public IObservable<Response> SendNetworkRequestObservable(string requestType)
    {
        return
            Observable
                .Start(() =>
                    Observable
                        .Create<Response>(observer =>
                        {
                            Action<Response> callback = null;
                            var callbackQuery =
                                Observable
                                    .FromEvent<Response>(h => callback += h, h => callback -= h)
                                    .Take(1);
                            var subscription = callbackQuery.Subscribe(observer);
                            this.SendNetworkRequest(requestType, callback);
                            return subscription;
                        }))
                .Merge();
    }
    

    不管怎样,你都可以这样称呼它:

    var _playerInfo =
        _networkHelper
            .SendNetworkRequestObservable("GetPlayersInfo")
            .Wait();
    

    附带说明:您的 DoMagicThing 是一个同步调用。在 Rx 中,这是一个 .Wait() 调用,但最好尽可能使用普通的 .Subscribe


    根据评论,您可以使用async

    async void InitMainScene()
    {
        _playerInfo = await _networkHelper.SendNetworkRequestObservable("GetPlayersInfo");
    }
    

    【讨论】:

    • 非常感谢这个例子。我已经花了一些时间试图让它在我的游戏中工作.. 似乎 .Wait() 方法导致 Unity 冻结然后崩溃。我需要深入了解原因——甚至下面的 async/await 示例也是如此。非常感谢你的帮忙!我想问你——你认为 async/await 是这个问题的更好解决方案,还是你认为你的解决方案更适合这个特定情况?很难知道在哪种情况下使用哪个,因为我现在对这两种情况都缺乏经验。再次感谢!
    • @user3010678 - 你不应该使用.Wait()。您应该使用常规的 .Subscribe(...)` 调用,或者更好的是使用 async 并使调用看起来像这样:var _playerInfo = await _networkHelper.SendNetworkRequestObservable("GetPlayersInfo");。这适用于可观察对象并返回最后产生的值(如果有多个)。
    猜你喜欢
    • 2017-06-13
    • 2019-05-08
    • 1970-01-01
    • 2017-05-13
    • 2021-06-06
    • 2019-07-24
    • 1970-01-01
    • 2023-03-10
    • 2018-02-05
    相关资源
    最近更新 更多