【问题标题】:ActionBlock Framework 4 rx alternativeActionBlock 框架 4 RX 替代方案
【发布时间】:2015-02-24 23:26:28
【问题描述】:

我对框架 4.0 的 ActionBlock 实现感兴趣,因为框架 4.0 似乎不支持 TPL.Dataflow。 更具体地说,我对接收 Func 委托和 MaxDegreeOfParallism = 1 案例的构造函数的情况感兴趣。

我考虑过使用响应式扩展来实现它,但我不知道该怎么做。考虑过创建 Subject 并在 Post 上调用 OnNext,并使用 SelectMany 和任务 ToObservable 的东西,但我不确定如何处理调度程序。这是我的想法的草稿。

public class ActionBlock<TInput>
{
    private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
    private readonly Subject<TInput> mQueue = new Subject<TInput>();

    public ActionBlock(Func<TInput, Task> action)
    {
        var observable =
            from item in mQueue
            from _ in action(item).ToObservable()
            select _;

        observable.Subscribe(x => { },
            OnComplete);
    }

    private void OnComplete()
    {
        mCompletion.SetResult(null);
    }

    public void Post(TInput input)
    {
        mQueue.OnNext(input);
    }

    public Task Completion
    {
        get
        {
            return mCompletion.Task;
        }
    }

    public void Complete()
    {
        mQueue.OnCompleted();
    }
}

我想也许可以使用 EventLoopScheduler,但我不确定它是否适合这里,因为这是异步的。

有什么想法吗?

【问题讨论】:

    标签: c# system.reactive tpl-dataflow


    【解决方案1】:
    mQueue
        .Select(input => Observable.FromAsync(() => action(input))
        .Merge(maxDegreeOfParallelism)
        .Subscribe(...);
    

    如果确实maxDegreeOfParallelism 始终为1,那么只需使用Concat 而不是Merge

    mQueue
        .Select(input => Observable.FromAsync(() => action(input))
        .Concat()
        .Subscribe(...);
    

    这是有效的,因为FromAsync 只是创建了一个冷的可观察对象,在订阅之前不会运行异步操作。然后我们使用Merge(或只是Concat)的maxConcurrency 参数来限制并发订阅的数量(从而限制运行的异步操作的数量)。

    编辑:

    由于您的目标是只拥有一个代表流完成的Task,因此您可以使用ToTask 而不是直接订阅。 ToTask 将订阅并返回带有最终值的Task。因为 ToTask 会在 observable 不产生值时抛出,我们将使用 Count 来保证它产生值:

    // task to mark completion
    private readonly Task mCompletion;
    
    // ...
    
    this.mCompletion = mQueue
        .Select(input => Observable.FromAsync(() => action(input))
        .Concat()
        .Count()
        .ToTask();
    

    【讨论】:

      猜你喜欢
      • 2010-11-09
      • 2014-08-14
      • 2022-01-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-12-28
      • 2014-12-02
      相关资源
      最近更新 更多