【问题标题】:How would I organize these calls using Reactive Extensions (Rx) in Silverlight?如何在 Silverlight 中使用 Reactive Extensions (Rx) 组织这些调用?
【发布时间】:2010-08-16 22:00:21
【问题描述】:

我有一些必须按顺序执行的调用。考虑一个具有 Query 和 Load 方法的 IService。查询提供了一个小部件列表,加载提供了一个“默认”小部件。因此,我的服务看起来像这样。

void IService.Query(Action<IEnumerable<Widget>,Exception> callback);
void IService.Load(Action<Widget,Exception> callback); 

考虑到这一点,下面是视图模型的粗略草图:

public class ViewModel : BaseViewModel
{
   public ViewModel()
   {
      Widgets = new ObservableCollection<Widget>();

      WidgetService.Query((widgets,exception) =>
      {
          if (exception != null) 
          {
              throw exception;
          }

          Widgets.Clear();

          foreach(var widget in widgets)
          {
             Widgets.Add(widget);
          }

          WidgetService.Load((defaultWidget,ex) =>
          {
             if (ex != null)
             {
                 throw ex;
             }
             if (defaultWidget != null)
             {
                CurrentWidget = defaultWidget;
             }
          }
      });
   }

   public IService WidgetService { get; set; } // assume this is wired up

   public ObservableCollection<Widget> Widgets { get; private set; }

   private Widget _currentWidget; 

   public Widget CurrentWidget 
   {
      get { return _currentWidget; }
      set 
      {
         _currentWidget = value; 
         RaisePropertyChanged(()=>CurrentWidget);
      }
   }
}

我想做的是简化调用查询的顺序工作流程,然后是默认的。也许最好的方法是嵌套 lambda 表达式,正如我所展示的,但我认为 Rx 可能有更优雅的方法。我不想为了 Rx 而使用 Rx,但是如果它可以让我组织上面的逻辑以便在方法中更容易阅读/维护,我会利用它。理想情况下,类似:

Observable.Create(
   ()=>firstAction(), 
   ()=>secondAction())
.Subscribe(action=>action(),error=>{ throw error; }); 

使用电源线程库,我会做类似的事情:

Service.Query(list=>{result=list};
yield return 1;
ProcessList(result);
Service.Query(widget=>{defaultWidget=widget};
yield return 1;
CurrentWidget = defaultWidget;

这使得工作流是顺序的并且消除了嵌套更加明显(收益是异步枚举器的一部分,并且是在结果返回之前阻塞的边界)。

任何类似的东西对我来说都是有意义的。

所以问题的本质是:我是在尝试将方形钉安装到圆孔中,还是有办法使用 Rx 重新定义嵌套的异步调用?

【问题讨论】:

  • 我正在为这个问题寻找类似的东西:stackoverflow.com/questions/3280345/… - 如果您能够以您的经验回答我的问题,我们将不胜感激 =)
  • 我正在研究概念证明,以展示聚合多个(不同)服务调用并按顺序执行它们。准备好后会通知您!

标签: silverlight asynchronous system.reactive


【解决方案1】:

您可以转换服务方法,以便它们返回 IObservable 而不是将回调作为参数。在这种情况下,可以使用 SelectMany 来实现顺序工作流,类似这样...

        WidgetService.Query()
            .SelectMany(
                widgets =>
                {
                    Widgets.Clear();
                    foreach (var w in widgets)
                    {
                        Widgets.Add(w);
                    }

                    return WidgetService.Load();
                }
            )
            .Do(
                defaultWidget =>
                {
                    if (defaultWidget != null)
                        Default = defaultWidget;
                }
            )
            .Subscribe(
                _ => { },
                e => { throw e; }
            );

但是,IMO F# 异步看起来会更加清晰(在示例中,我假设服务方法分别返回 Async> 和 Async)。请注意,示例没有考虑到哪个线程正在修改数据字段,在实际代码中您应该注意这一点:

    let load = async {
            let! widgets = WidgetService.Query()

            Widgets.Clear()
            for w in widgets do
                Widgets.Add(w)

            let! defaultWidget = WidgetService.Load()
            if defaultWidget <> null then
                Default <- defaultWidget

            return ()
        }

    Async.StartWithContinuations(
        load, 
        ignore, // success continuation - ignore result
        raise,  // error continuation - reraise exception
        ignore  // cancellation continuation - ignore
        )

已编辑

事实上,可以将技术与您在问题中提到的迭代器一起使用:

    private IEnumerable<IObservable<object>> Intialize()
    {
        var widgetsList = WidgetService.Query().Start();
        yield return widgetsList;

        Widgets.Clear();
        foreach (var w in widgetsList[0])
        {
            Widgets.Add(w);
        }

        var defaultWidgetList = WidgetService.Load().Start();
        yield return defaultWidgetList;

        if (defaultWidgetList[0] != null)
            Default = defaultWidgetList[0];
    }

    Observable
        .Iterate(Intialize)
        .Subscribe(
        _ => { },
        ex => { throw ex; }
        );

【讨论】:

  • 谢谢 - 正是我想要的!显然,对于两个步骤来说,它似乎并不买太多,但在具有多个必须按顺序执行的异步步骤的工作流中,这是黄金。
【解决方案2】:

您也可以使用ReactiveXaml 来执行此操作,但由于您的 CurrentWidget 和 Widgets 都是可变的,因此您不能让它变得干净(有一个名为 ObservableAsPropertyHelper 的类将根据 IObservable 更新属性并触发RaisePropertyChanged):

public class ViewModel
{
    public ViewModel()
    {
        // These return a Func that wraps an async call in an IObservable<T>
        // that always yields only one item (the result of the call)
        var QueryAsObservable = Observable.FromAsyncCommand<IEnumerable<Widget>>(WebService.BeginQuery, WebService.EndQuery);
        var LoadAsObservable = Observable.FromAsyncCommand<Widget>(WebService.BeginLoad, WebService.EndLoad);

        // Create a new command 
        QueryAndLoad = new ReactiveAsyncCommand();

        // QueryAndLoad fires every time someone calls ICommand.Execute
        // The .Do is the hacky part, for sync calls it's hidden by RegisterAsyncFunction
        var async_results = QueryAndLoad.SelectMany(_ => QueryAsObservable())
                                        .Do(_ => DoTranslate.AsyncCompletedNotification.OnNext(new Unit()));

        // Query up the Widgets 
        async_results.Subscribe(x => x.Run(Widgets.Add));

        // Now execute the Load
        async_results.SelectMany(_ => LoadAsObservable())
                     .Subscribe(x => CurrentWidget = x);

        QueryAndLoad.Execute();
    }

    public ReactiveAsyncCommand QueryAndLoad {get; private set; }

    public ObservableCollection<Widget> Widgets {get; private set; }

    public Widget CurrentWidget {get; set; }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-18
    • 1970-01-01
    • 2011-05-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多