【发布时间】:2017-08-02 11:01:50
【问题描述】:
我对 Rx 非常陌生,我正在尝试使用它来处理多达 X 个并发订阅任务。数据源实际上来自数据库,所以我不得不轮询数据库。我意识到 Rx 背后的想法是它是 push 而不是 pull - 所以民意调查不太适合,但从概念上讲,进入数据库的数据是我想订阅并做某事的事件流。
我遇到的主要问题是LimitedConcurrencyLevelTaskScheduler 似乎没有成功限制指定数量的任务。它比我指定的 8 个并发运行更多。
我也不确定以下两个解决方案中哪一个更好(或者可能都是错误的?!)。
这是我尝试过的一种方法,它使用 Observable.Timer ...
public static void Main()
{
var taskFactory = new TaskFactory (new LimitedConcurrencyLevelTaskScheduler (8));
var scheduler = new TaskPoolScheduler (taskFactory);
Observable.Timer (TimeSpan.FromMilliseconds (10), scheduler)
.SelectMany (x => Observable.FromAsync (GetItemsSource))
.Repeat ()
.ObserveOn (scheduler)
.Subscribe (x => Observable.FromAsync(y => DoSomethingAsync (x.ToList())));
Console.ReadKey ();
}
private static async Task<IEnumerable<Guid>> GetItemsSource()
{
return await _myRepo.GetMoreAsync(10);
}
private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
// Do something with the data
}
我也尝试过这样做......
public static void Main()
{
GetItemsSource()
.ObserveOn(scheduler)
.Select (async x => await DoSomethingAsync(x))
.Subscribe();
Console.ReadKey ();
}
public static IObservable<Guid> GetItemsSource()
{
return Observable.Create<Guid>(
async obs =>
{
while (true)
{
var item = (await _myRepo.GetMoreAsync(1)).FirstOrDefault();
if(item != null)
{
obs.OnNext(item);
}
await Task.Delay(TimeSpan.FromMilliseconds(10))
}
});
}
private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
// Do something with the data
}
显然非常简单的示例,没有错误处理或取消支持。
两者似乎都有效,但都不限于 8 个并发任务。
正如我所说,我对 Rx 很陌生,可能缺少很多基本的东西。我当然打算做大量阅读以完全理解 Rx,因为它看起来非常强大,但现在我想让一些东西快速工作。
更新
从 Enigmativity 的答案和 cmets 开始,这里有一些记录并发计数的代码......
void Main()
{
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(8));
var scheduler = new TaskPoolScheduler(taskFactory);
using (
(
from n in Observable.Interval(TimeSpan.FromMilliseconds(10), scheduler)
from g in Observable.FromAsync(GetItemsSource, scheduler)
from u in Observable.FromAsync(() => DoSomethingAsync(g), scheduler)
select u)
.ObserveOn(scheduler)
.Subscribe())
{
Console.ReadLine();
}
}
private static volatile int _numIn = 0;
private static volatile int _numOut = 0;
public static async Task<IEnumerable<Guid>> GetItemsSource()
{
try
{
_numIn++;
$"Concurrent tasks (in): {_numIn}".Dump();
// Simulate async API call
await Task.Delay(TimeSpan.FromMilliseconds(10));
return new List<Guid> { Guid.NewGuid() };
}
finally
{
_numIn--;
}
}
private static async Task DoSomethingAsync(IEnumerable<Guid> deliveryIds)
{
try
{
_numOut++;
// Simulate async calls required to process the event
await Task.Delay(TimeSpan.FromMilliseconds(1000));
$"Concurrent tasks (out): {_numOut}".Dump();
}
finally
{
_numOut--;
}
}
这显示了大约 64 个并发任务正在运行。
更新 2
看起来确实是因为订阅者是异步的。如果我使用非异步订阅者进行测试,它可以正常工作。不幸的是,我需要一个异步订阅者,因为它需要调用其他异步方法。
看起来我可以通过这样做来做类似的事情......
GetItemsSource2()
.Select(x => Observable.FromAsync(() => DoSomethingAsync(x)))
.Merge(64)
.Subscribe();
所以使用Merge 而不是LimitedConcurrencyLevelTaskScheduler。
【问题讨论】:
-
你怎么知道它不限于8个并发任务?你的 CPU 有多少个内核?
-
你真的在这里混合了 Rx、TPL 和 Linq-for-objects。你不应该那样做。这里的大部分事情都可以用 Rx 来完成。保持纯洁。这要容易得多。
-
我不认为您的更新显示了有多少任务正在运行 - 它显示了在调用
GetItemsSource和DoSomethingAsync期间运行了多少任务。这就像一条只能容纳 8 辆汽车的道路,但您计算的是前面的汽车和紧随其后的 7 辆汽车。 -
我不太明白。
_numOut仅在DoSomethingAsync方法中使用。开始时递增,结束时递减。这肯定表明其中有多少是同时运行的? -
看起来它与异步订阅者有关。我已经通过使用
Merge来解决它(请参阅我的Update 2。感谢@Enigmativity 的帮助!:)
标签: c# .net concurrency task-parallel-library system.reactive