【发布时间】:2015-01-21 22:48:47
【问题描述】:
我希望能够在多个内核上处理事件流,但保持一切同步,以便所有订阅者同步处理事件,因此没有一个订阅者会领先于任何其他订阅者。
换句话说,我希望快速订阅者等待所有其他慢速订阅者完成每个事件,然后再进行下一个事件。每个订阅者都有一个过滤器,因此它只处理它感兴趣的事件。
如果可行,我可以轻松利用系统中的所有内核,而不会遇到太多多线程或同步问题。
示例
假设我们有一个在单个线程上生成的 RX 事件流。我们有两个 RX 订阅者,A 和 B。我们有这些限制:
-
每个 RX 事件必须由所有订阅者锁步处理,即事件
j=2将不会被订阅者B处理,直到事件j=1已被所有订阅者A完全处理和B,事件j=3将不会被订阅者B处理,直到事件j=2被所有订阅者A和B完全处理完,等等。 -
每个RX事件的并行处理,即订阅者
A可以并行处理事件j=1,订阅者B处理事件j=1等。 -
顺序不变性,即所有订阅者都按照创建的顺序接收事件,因此事件
j=0将始终进行j=1,事件j=1将始终进行j=2等。如果事件被推送到单个线程上,这会自动发生,所以这个约束已经满足了。
到目前为止我所拥有的
我尝试了很多Synchronize的组合,结合以下代码:
var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
rx.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Fast Subscriber A. Takes 20 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(20));
Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});
rx.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(o =>
{
// Slow Subscriber B. Takes 500 milliseconds.
Thread.Sleep(TimeSpan.FromMilliseconds(500));
Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});
for (int j = 0; j < 5; j++)
{
int j1 = j;
rx.OnNext(j1);
Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}
程序的当前输出
Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 288 milliseconds.
Subscriber A: j=1 (thread 10). Time: 308 milliseconds.
Subscriber A: j=2 (thread 10). Time: 328 milliseconds.
Subscriber A: j=3 (thread 10). Time: 348 milliseconds.
Subscriber A: j=4 (thread 10). Time: 368 milliseconds.
Subscriber B: j=0 (thread 11). Time: 768 milliseconds.
Subscriber B: j=1 (thread 11). Time: 1268 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1768 milliseconds.
Subscriber B: j=3 (thread 11). Time: 2268 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2768 milliseconds.
程序的期望输出
Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 000 milliseconds.
Subscriber B: j=0 (thread 11). Time: 000 milliseconds.
Subscriber A: j=1 (thread 10). Time: 500 milliseconds.
Subscriber B: j=1 (thread 11). Time: 500 milliseconds.
Subscriber A: j=2 (thread 10). Time: 1000 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1000 milliseconds.
Subscriber A: j=3 (thread 10). Time: 1500 milliseconds.
Subscriber B: j=3 (thread 11). Time: 1500 milliseconds.
Subscriber A: j=4 (thread 10). Time: 2000 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2000 milliseconds.
基本上,我希望所有订阅者并行处理事件j=0,然后所有订阅者并行处理事件j=1,等等,即使某些订阅者比其他订阅者慢。在这种情况下,订阅者 A 很快(20 毫秒)而订阅者 B 很慢(500 毫秒),所以我们需要某种锁或门,以便订阅者 A 等待订阅者 B 完成,然后再进行下一个事件,反之亦然如果订阅者 B 比订阅者 A 快,则反之亦然。
当然,这是在单线程模式下自然发生的事情,但随后就失去了让许多订阅者并行处理同一事件的能力,这意味着我无法轻松利用系统上的所有内核.
更新
感谢@Jonas Chapuis 使用Sort() 回答。
但是,在这种特殊情况下,我的目标是阻止快速订阅者在消费事件时领先于慢速订阅者,即我需要某种锁或门,以便快速订阅者等到所有慢速订阅者订阅者已完成该活动,然后再进行下一个活动。
换句话说,我希望所有订阅者在事件中步调一致,没有单个订阅者领先于其他订阅者。 RX 事件将在单个线程上创建,因此它们永远不会出现故障。
更新
几个月后,我发现我使用了错误的架构,这是一个错误的问题。
我应该观察EventLoopScheduler,而不是观察ThreadPoolScheduler.Instance,它将所有订阅锁定到单个线程。这样可以保留顺序。
为了获得时序数据的并行性,最好将数据处理划分为具有多个阶段的管道,每个线程集中在一个管道阶段。这更容易处理,并且满足上述所有约束。
【问题讨论】:
标签: c# multithreading system.reactive data-synchronization thread-synchronization