【问题标题】:How can I implement an exhaustMap handler in Rx.Net?如何在 Rx.Net 中实现排气映射处理程序?
【发布时间】:2020-10-14 13:03:27
【问题描述】:

我正在寻找类似于rxjs 中的exhaustMap 运算符的东西,但RX.NET 似乎没有这样的运算符。

我需要实现的是,在源流的每个元素上,我需要启动一个 async 处理程序,直到它完成,我想从源中删除任何元素。处理程序完成后,立即继续获取元素。

我不希望在 每个 元素上启动异步处理程序 - 当处理程序运行时,我想删除源元素。

我还怀疑我需要在这里巧妙地使用 defer 运算符?

谢谢!

【问题讨论】:

    标签: c# .net system.reactive rx.net


    【解决方案1】:

    这是ExhaustMap 运算符的一个实现。源 observable 投影到 IObservable<Task<TResult>>,其中每个后续任务要么是前一个任务(如果它仍在运行),要么是与当前项目关联的新任务。然后使用 DistinctUntilChanged 运算符删除重复出现的相同任务,最后使用 Concat 运算符将 observable 展平。

    /// <summary>Invokes an asynchronous function for each element of an observable
    /// sequence, ignoring elements that are emitted before the completion of an
    /// asynchronous function of a preceding element.</summary>
    public static IObservable<TResult> ExhaustMap<TSource, TResult>(
        this IObservable<TSource> source,
        Func<TSource, Task<TResult>> function)
    {
        return source
            .Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
            {
                return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
            })
            .DistinctUntilChanged()
            .Concat();
    
        async Task<TResult> HideIdentity(Task<TResult> task) => await task;
    }
    

    function 返回的任务不能保证是不同的,因此需要 HideIdentity 本地函数来返回不同的任务包装。

    使用示例:

    Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(x => (int)x + 1)
        .Take(10)
        .Do(x => Console.WriteLine($"Input: {x}"))
        .ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
        .Do(x => Console.WriteLine($"Result: {x}"))
        .Wait();
    

    输出:

    Input: 1
    Result: 1
    Input: 2
    Result: 2
    Input: 3
    Input: 4
    Input: 5
    Result: 3
    Input: 6
    Input: 7
    Input: 8
    Result: 6
    Input: 9
    Input: 10
    Result: 9
    

    更新:这是一个替代实现,其中function 生成IObservable&lt;TResult&gt; 而不是Task&lt;TResult&gt;

    /// <summary>Projects each element to an observable sequence, which is merged
    /// in the output observable sequence only if the previous projected observable
    /// sequence has completed.</summary>
    public static IObservable<TResult> ExhaustMap<TSource, TResult>(
        this IObservable<TSource> source,
        Func<TSource, IObservable<TResult>> function)
    {
        return Observable.Defer(() =>
        {
            int mutex = 0; // 0: not acquired, 1: acquired
            return source.SelectMany(item =>
            {
                // Attempt to acquire the mutex immediately. If successful, return
                // a sequence that releases the mutex when terminated. Otherwise,
                // return immediately an empty sequence.
                if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
                    return function(item).Finally(() => Volatile.Write(ref mutex, 0));
                return Observable.Empty<TResult>();
            });
        });
    }
    

    【讨论】:

    • 不错,西奥多!我已经离开 rx 标签有一段时间了 - 很高兴看到你在坚守堡垒。
    • 嗨@Asti!是的,这是一个很好的问题,适合优雅的答案,是 StackOverflow 的理想问题。正如您在revisions 中看到的那样,早期存在一些摩擦,问题被无缘无故关闭。但现在一切正常。 ?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-10-23
    • 1970-01-01
    • 1970-01-01
    • 2014-12-21
    • 2014-11-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多