【问题标题】:Unwrapping IObservable<Task<T>> into IObservable<T> with order preservation将 IObservable<Task<T>> 展开为 IObservable<T> 并保留订单
【发布时间】:2017-04-10 02:51:55
【问题描述】:

有没有办法将IObservable&lt;Task&lt;T&gt;&gt; 解包成IObservable&lt;T&gt;,并保持相同的事件顺序,像这样?

Tasks:  ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->

假设我有一个使用消息流的桌面应用程序,其中一些需要大量的后处理:

IObservable<Message> streamOfMessages = ...;

IObservable<Task<Result>> streamOfTasks = streamOfMessages
    .Select(async msg => await PostprocessAsync(msg));

IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks

我想有两种处理方法。

首先,我可以使用异步事件处理程序订阅streamOfTasks

streamOfTasks.Subscribe(async task =>
{
    var result = await task;
    Display(result);
});

其次,我可以使用Observable.Create转换streamOfTasks,像这样:

var streamOfResults =
    from task in streamOfTasks
    from value in Observable.Create<T>(async (obs, cancel) =>
    {
        var v = await task;
        obs.OnNext(v);

        // TODO: don't know when to call obs.OnComplete()
    })
    select value;

streamOfResults.Subscribe(result => Display(result));

无论哪种方式,消息的顺序都不会保留:一些后来的消息 不需要任何后处理比之前的消息更快 需要后期处理。我的两个解决方案都处理传入的消息 并行,但我希望它们一个接一个地按顺序处理。

我可以编写一个简单的任务队列来一次只处理一个任务, 但也许这是一个矫枉过正。在我看来,我遗漏了一些明显的东西。


UPD。我编写了一个示例控制台程序来演示我的方法。到目前为止,所有解决方案都不保留事件的原始顺序。这是程序的输出:

Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0

这里是完整的源代码:

// "C:\Program Files (x86)\MSBuild\14.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll

using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        Console.WriteLine("Press ENTER to exit.");

        // the source stream
        var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
        timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));

        // solution #1: using async event handler
        timerEvents.Subscribe(async x =>
        {
            var result = await PostprocessAsync(x);
            Console.WriteLine($"Async handler: {x}");
        });

        // solution #2: using Observable.Create
        var processedEventsV2 =
            from task in timerEvents.Select(async x => await PostprocessAsync(x))
            from value in Observable.Create<long>(async (obs, cancel) =>
            {
                var v = await task;
                obs.OnNext(v);
            })
            select value;
        processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));

        // solution #3: using FromAsync, as answered by @Enigmativity
        var processedEventsV3 =
            from msg in timerEvents
            from result in Observable.FromAsync(() => PostprocessAsync(msg))
            select result;

        processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));

        Console.ReadLine();
    }

    static async Task<long> PostprocessAsync(long x)
    {
        // some messages require long post-processing
        if (x % 3 == 0)
        {
            await Task.Delay(TimeSpan.FromSeconds(2.5));
        }

        // and some don't
        return x;
    }
}

【问题讨论】:

  • 为什么人们甚至不发表评论就投票关闭它?
  • @yallie - 你能解释一下“事件的顺序”是什么意思吗?创建消息但未完成任务的顺序?
  • @Enigmativity 是的,完全正确。无论处理它们需要多少时间,我都需要保持消息到达的顺序。
  • 我写了一个扩展来做这件事——我想把源代码和处理后的结果压缩在一起——顺序很重要。我看看能不能找到。
  • @Enigmativity 那太好了!

标签: c# task-parallel-library .net-4.5 system.reactive rx.net


【解决方案1】:

结合@Enigmativity 的simple approach 与@VMAtm 的attaching the counter 的想法和this SO question 的一些代码sn-ps,我想出了这个解决方案:

// usage
var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));

processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));

// my sample console program prints the events ordered properly:
Timer: 0
Timer: 1
Timer: 2
Processed: 0
Processed: 1
Processed: 2
Timer: 3
Timer: 4
Timer: 5
Processed: 3
Processed: 4
Processed: 5
....

这是我将IObservable&lt;Task&lt;TSource&gt;&gt; 转换为IObservable&lt;TResult&gt;SelectAsync 扩展方法,保持原来的事件顺序:

public static IObservable<TResult> SelectAsync<TSource, TResult>(
    this IObservable<TSource> src,
    Func<TSource, Task<TResult>> selectorAsync)
{
    // using local variable for counter is easier than src.Scan(...)
    var counter = 0;
    var streamOfTasks =
        from source in src
        from result in Observable.FromAsync(async () => new
        {
            Index = Interlocked.Increment(ref counter) - 1,
            Result = await selectorAsync(source)
        })
        select result;

    // buffer the results coming out of order
    return Observable.Create<TResult>(observer =>
    {
        var index = 0;
        var buffer = new Dictionary<int, TResult>();

        return streamOfTasks.Subscribe(item =>
        {
            buffer.Add(item.Index, item.Result);

            TResult result;
            while (buffer.TryGetValue(index, out result))
            {
                buffer.Remove(index);
                observer.OnNext(result);
                index++;
            }
        });
    });
}

我对我的解决方案不是特别满意,因为它看起来对我来说太复杂了,但至少它不需要任何外部依赖项。我在这里使用一个简单的字典来缓冲和重新排序任务结果,因为订阅者need not to be thread-safe(订阅不需要同时调用)。

欢迎任何 cmets 或建议。我仍然希望在没有自定义缓冲扩展方法的情况下找到本机 RX 方式。

【讨论】:

    【解决方案2】:

    以下简单的方法适合您吗?

    IObservable<Result> streamOfResults =
        from msg in streamOfMessages
        from result in Observable.FromAsync(() => PostprocessAsync(msg))
        select result;
    

    【讨论】:

    • 不,我已经尝试过这种方法,但它对我不起作用。它在需要后处理的第一条消息上停止(即,在未同步完成的第一个任务上)。
    • 我将代码复制到控制台应用程序,现在它不仅仅停止了,所以我猜这是一个 UI 线程问题。但是代码的行为方式与我的两个解决方案相同,即没有保留正确的事件顺序。
    【解决方案3】:

    为了保持事件的顺序,您可以将您的信息流从TPL Dataflow 汇集到TransformBlockTransformBlock 将执行您的后处理逻辑,并默认保持其输出的顺序。

    using System;
    using System.Collections.Generic;
    using System.Reactive.Linq;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    using NUnit.Framework;
    
    namespace HandlingStreamInOrder {
    
        [TestFixture]
        public class ItemHandlerTests {
    
            [Test]
            public async Task Items_Are_Output_In_The_Same_Order_As_They_Are_Input() {
                var itemHandler = new ItemHandler();
                var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
                timerEvents.Subscribe(async x => {
                    var data = (int)x;
                    Console.WriteLine($"Value Produced: {x}");                
                    var dataAccepted = await itemHandler.SendAsync((int)data);
                    if (dataAccepted) {
                        InputItems.Add(data);
                    }                
                });
    
                await Task.Delay(5000);
                itemHandler.Complete();
                await itemHandler.Completion;
    
                CollectionAssert.AreEqual(InputItems, itemHandler.OutputValues);
            }
    
            private IList<int> InputItems {
                get;
            } = new List<int>();
        }
    
        public class ItemHandler {
    
    
            public ItemHandler() {            
                var options = new ExecutionDataflowBlockOptions() {
                    BoundedCapacity = DataflowBlockOptions.Unbounded,
                    MaxDegreeOfParallelism = Environment.ProcessorCount,
                    EnsureOrdered = true
                };
                PostProcessBlock = new TransformBlock<int, int>((Func<int, Task<int>>)PostProcess, options);
    
                var output = PostProcessBlock.AsObservable().Subscribe(x => {
                    Console.WriteLine($"Value Output: {x}");
                    OutputValues.Add(x);
                });
            }
    
            public async Task<bool> SendAsync(int data) {
                return await PostProcessBlock.SendAsync(data);
            }
    
            public void Complete() {
                PostProcessBlock.Complete();
            }
    
            public Task Completion {
                get { return PostProcessBlock.Completion; }
            }
    
            public IList<int> OutputValues {
                get;
            } = new List<int>();
    
            private IPropagatorBlock<int, int> PostProcessBlock {
                get;
            }
    
            private async Task<int> PostProcess(int data) {
                if (data % 3 == 0) {
                    await Task.Delay(TimeSpan.FromSeconds(2));
                }            
                return data;
            }
        }
    }
    

    【讨论】:

    • 感谢您的建议!这种方法确实很好用。我从未使用过 TPL DataFlow,因此需要一些时间来消化这里发生的事情。我很惊讶,一个看似基本的任务除了 RX 之外还需要一个依赖项。
    • BoundedCapacity 默认等于DataflowBlockOptions.UnboundedEnsureOrdered 默认等于true。在Microsoft.Tpl.Dataflow的最新版本中也没有EnsureOrdered
    【解决方案4】:

    Rx and TPL can be easily combined 在这里,TPL 默认保存事件的顺序,所以你的代码可能是这样的:

    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    static async Task<long> PostprocessAsync(long x) { ... }
    
    IObservable<Message> streamOfMessages = ...;
    var streamOfTasks = new TransformBlock<long, long>(async msg => 
        await PostprocessAsync(msg)
        // set the concurrency level for messages to handle
        , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });
    // easily convert block into observable
    IObservable<long> streamOfResults = streamOfTasks.AsObservable();
    

    编辑Rx 扩展旨在成为 UI 事件的反应式管道。由于这种类型的应用程序通常是单线程的,因此正在处理消息并保存订单。但是一般events in C# aren't thread safe,所以你必须提供一些额外的逻辑来相同的顺序。

    如果你不喜欢引入另一个依赖的想法,你需要用Interlocked类存储操作号,像这样:

    // counter for operations get started
    int operationNumber = 0;
    // counter for operations get done
    int doneNumber = 0;
    ...
    var currentOperationNumber = Interlocked.Increment(ref operationNumber);
    ...
    while (Interlocked.CompareExchange(ref doneNumber, currentOperationNumber + 1, currentOperationNumber) != currentOperationNumber)
    {
        // spin once here
    }
    // handle event
    Interlocked.Increment(ref doneNumber);
    

    【讨论】:

    • 非常感谢!实际上,我很惊讶它不能使用 RX 本身提供的工具来完成。我想知道保留事件的顺序对于 RX 来说是否不习惯?
    • @yallie 更新了答案。 C# 中的事件默认情况下不是线程安全的,因为它们由委托列表表示,因此您需要一些额外的同步逻辑。
    • 我不太明白事件线程安全与可观察序列产生的事件顺序之间的联系是什么。 John Skeet 的文章是关于可以围绕订阅和调用发生的竞争条件。就我而言,没有竞争条件,因为所有订阅都是在程序开始时完成的。我喜欢计算源事件以将源事件(任务)与其后处理结果相匹配的想法,但我不确定是否有 RX 原语可以进行这种匹配。
    • 这篇文章也是关于thread-safe,用于事件。带有事件的订阅会导致竞态条件,就像 处理 一次性处理所有事件一样,并且在将结果合并到一行期间会发生竞态条件。
    • 不太对。多播委托在调用委托的线程上依次调用。如果事件处理程序抛出异常,则整个调用链将中止。所以在调用过程中没有竞争条件。这是演示:gist.github.com/yallie/fd1105ba98919006d3e93d956f705ef8
    【解决方案5】:

    RX 库包含三个可以解包可观察任务序列的运算符,ConcatMergeSwitch。这三个都接受IObservable&lt;Task&lt;T&gt;&gt; 类型的单个source 参数,并返回IObservable&lt;T&gt;。以下是文档中的描述:

    Concat

    连接所有任务结果,只要前一个任务成功终止。

    Merge

    将所有源任务的结果合并到一个可观察的序列中。

    Switch

    将可观察的任务序列转换为仅从最近的可观察序列产生值的可观察序列。每次收到新任务时,都会忽略前一个任务的结果。

    换句话说,Concat 按原始顺序返回结果,Merge 按完成顺序返回结果,Switch 过滤掉下一个任务之前未完成的任务的任何结果被发射了。因此,只需使用内置的Concat 运算符即可解决您的问题。不需要自定义运算符。

    var streamOfResults = streamOfTasks
        .Select(async task =>
        {
            var result1 = await task;
            var result2 = await PostprocessAsync(result1);
            return result2;
        })
        .Concat();
    

    任务在streamOfTasks 发出之前已经启动。换句话说,它们以"hot" 状态出现。因此,Concat 运算符一个接一个地等待它们这一事实对操作的并发性没有任何影响。它只影响结果的顺序。如果你有冷的可观察对象而不是热任务,这将是一个考虑因素,比如由Observable.FromAsyncObservable.Create 方法创建的这些,在这种情况下Concat 将按顺序执行操作。

    【讨论】:

      猜你喜欢
      • 2013-06-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-03-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多