【问题标题】:Reactive Extension .NetCore Observing on MainThreadReactive Extensions .Net Core 在主线程上观察
【发布时间】:2018-06-25 11:44:43
【问题描述】:

我正在尝试并行执行大量网络操作,并且我想为每个操作设置超时。

由于 Parallel.ForEach 没有简单的超时选项,我使用 System.Reactive。

这是我的代码:

public void networkOps(List<MacCpe> source, Action<List<Router>, List<Exception>> onDone) {

var routers = new List<Router>();
var exceptions = new List<Exception>();

Observable.Defer(() => source.ToObservable()) 
    .ObserveOn(Scheduler.CurrentThread)
            .SubscribeOn(Scheduler.Default)
            .SelectMany(it => 
                Observable.Amb(
                            Observable.Start(() => {
                                switch(it.type) {
                                    case AntennaType.type1: {
                                        //network stuff
                                    }
                                    break;

                                    case AntennaType.type2: {
                                       //network stuff
                                    }
                                    break;

                                    case AntennaType.type3: {
                                       //network stuff
                                    }
                                    break;

                                    case AntennaType.type4: {
                                      //network stuff
                                    }
                                    break;

                                    default: throw new NullReferenceException("Nothing");
                                }
                            }).Select(_ => true),
                            Observable.Timer(TimeSpan.FromSeconds(60)).Select(_ => false)
                        ),
                        (it, result) => new { it, result }
                    )
                    .Subscribe (
                        x => {
                            Console.WriteLine("checked item number " + x.it.Id);
                        },
                        ex => {
                            Console.WriteLine("error string");
                        }, () => {
                            onDone(routers, exceptions);
                        }
                    );
}

我正在使用 Observable.Amb 运算符并行运行一个 60 秒计时器,它作为超时工作。

但是,当我运行此方法时,程序会立即退出,而不会到达回调 onDone。

我在网上看到我可以使用 ObserveOnDispatcher 来观察 Ui 线程,同时在线程池上运行阻塞代码,但我在终端应用程序服务器端的 linux 上的 dotnet core 上使用它。

如何在控制台应用程序的“主线程”上进行观察?

提前感谢您的回复。

【问题讨论】:

  • 您确定这不会仅仅因为您没有将 Console.ReadKey 之类的东西放入应用程序而终止。

标签: linux .net-core system.reactive


【解决方案1】:

当您替换 Parallel.ForEach 时,听起来您很高兴有一个阻塞操作。以您设置的方式使用 Rx,它不是阻塞操作,因此该方法立即结束。

修复起来非常简单。只需将您的 .Subscribe 更改为:

.Do(
    x =>
    {
        Console.WriteLine("checked item number " + x.it.Id);
    },
    ex =>
    {
        Console.WriteLine("error string");
    }, () =>
    {
        onDone(routers, exceptions);
    }
)
.Wait();

我还会删除你的 .ObserveOn(Scheduler.CurrentThread).SubscribeOn(Scheduler.Default),直到你确定你需要它们。

【讨论】:

  • 正是我所需要的,你是一个真正的专业先生
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-08-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多