【问题标题】:TPL Dataflow duplicate message to all consumersTPL Dataflow 向所有消费者重复消息
【发布时间】:2019-11-21 15:54:32
【问题描述】:

我目前正在使用 WPF 和 TPL 数据流编写一个应用程序,它应该执行以下操作:

  1. 加载目录中的所有文件
  2. 一旦开始处理,就在 ui 中记录一些内容并处理每个文件
  3. 完成后将内容记录到 ui

问题在于 UI 的日志记录需要在 UI 线程中进行,并且仅在它开始处理之前进行记录。

我现在能够做到这一点的唯一方法是从 TPL 转换块内部手动调用调度程序并更新 UI:

Application.Current.Dispatcher.Invoke(new Action(() =>
{
    ProcessedFiles.Add(optimizedFileResult);
}));

我想通过在 UI 线程上运行的 DataFlow 块来执行此操作:

ExecutionDataflowBlockOptions.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

但是,如果我在进行优化的块上设置它,优化也将运行单线程。

另一方面,如果我要在处理块之前创建一个新块并在那里调用它。它会在实际开始之前开始说“处理”。

示例代码

我创建了一些示例代码来重现此问题:

public class TplLoggingToUiIssue
    {
        public TplLoggingToUiIssue()
        {

        }

        public IEnumerable<string> RecurseFiles()
        {
            for (int i = 0; i < 20; i++)
            {
                yield return i.ToString();
            }
        }

        public async Task Go()
        {
            var block1 = new TransformBlock<string, string>(input =>
            {
                Console.WriteLine($"1: {input}");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity = 10,
                EnsureOrdered = false
            });

            var block2 = new TransformBlock<string, string>(input =>
            {
                Console.WriteLine($"2: {input}\t\t\tStarting {input} now (ui logging)");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                //TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), (Doesn't work in Console app, but you get the idea)
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1,
                EnsureOrdered = false
            });


            var block3 = new TransformBlock<string, string>(async input =>
            {
                Console.WriteLine($"3 start: {input}");
                await Task.Delay(5000);
                Console.WriteLine($"3 end: {input}");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 2,
                BoundedCapacity = 10,
                EnsureOrdered = false
            });

            var block4 = new ActionBlock<string>(input =>
            {
                Console.WriteLine($"4: {input}");
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1,
                EnsureOrdered = false
            });


            block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
            block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
            block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });


            var files = RecurseFiles();
            await Task.Run(async () =>
            {
                foreach (var file in files)
                {
                    Console.WriteLine($"Posting: {file}");
                    var result = await block1.SendAsync(file);

                    if (!result)
                    {
                        Console.WriteLine("Result is false!!!");
                    }
                }
            });

            Console.WriteLine("Completing");
            block1.Complete();
            await block4.Completion;
            Console.WriteLine("Done");
        }
    }

如果您运行此示例(只有 6 个“文件”),您将获得以下输出:

Posting: 0
Posting: 1
Posting: 2
Posting: 3
Posting: 4
Posting: 5
1: 2
1: 1
1: 3
1: 0
1: 4
1: 5
2: 2                    Starting 2 now (ui logging)
Completing
3 start: 2
2: 0                    Starting 0 now (ui logging)
3 start: 0
2: 3                    Starting 3 now (ui logging)
2: 1                    Starting 1 now (ui logging)
2: 4                    Starting 4 now (ui logging)
2: 5                    Starting 5 now (ui logging)
3 end: 2
3 end: 0
3 start: 3
3 start: 1
4: 2
4: 0
3 end: 3
3 end: 1
4: 3
3 start: 4
3 start: 5
4: 1
3 end: 5
3 end: 4
4: 5
4: 4
Done

从这个输出中可以看出,它开始的日志记录发生得太早了。我也尝试使用广播块代替,但这会覆盖值,因此它们会丢失。

理想的情况是让日志记录块等待,直到处理块有容量,然后推送一个项目。

【问题讨论】:

  • 您可以创建一个TransformBlock 包装器,它公开两个事件TransformStartedTransformFinished,以及一个SynchronizingObject 属性,类似于System.Timers.Timer 类中的属性。然后,您可以在构造块时将此属性设置为当前的Form,并订阅这两个事件以在 UI 中获取通知。但是所有这些都是很多管道,并且会增加一些开销,所以我个人更喜欢使用你最初的想法,以 Application.Current.Dispatcher.Invoke 为特色。

标签: c# .net task-parallel-library tpl-dataflow


【解决方案1】:

如其他答案所示,有几种方法可以解决此问题。我想指出一个替代方案:为此使用Progress&lt;T&gt;。尽管它被设计为最适合与任务一起使用,但它也适用于数据流,如下所示:

        private void Form1_Load(object sender, EventArgs e)
        {
            var progressReporter = new Progress<string>();
            progressReporter.ProgressChanged += (reporter, message) => label1.Text = message;

            var b1 = new ActionBlock<string>((input) =>
            {
                ((IProgress<string>)progressReporter).Report(input);
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 10
            }); 

            b1.Post("a");
            b1.Post("b");
            b1.Post("c");
            b1.Post("d");
        }

对我而言,总体而言,这看起来是一个干净的替代方案,无需为各个块添加一些管道。

更多信息可以在这个优秀的blogpost中找到

【讨论】:

  • 这是一个比我更好的解决方案。每个块可以有两个Progress 对象,一个用于报告开始,另一个用于报告项目处理的完成。第二个不仅可以报告正在处理的项目,还可以报告计算结果。
  • 谢谢大家,我已经在 Github 上创建了一个示例,该示例可以重现此问题,您的修复程序现在包括:github.com/devedse/TPLBeestWPF/tree/Reporter 在我完成此操作之前,您是否知道调用 ot IProgress 是否真的发生同步还是异步? (例如,如果我多次调用 Report ,由于 ProgressChanged 事件中发生的工作,它实际上会显着减慢我的程序吗?)
  • @Devedse 事件处理程序中的工作是使用 UI 线程完成的,而不是由 TPL 数据流完成的工作(在此示例中)。所以不,它不会对块的性能产生负面影响。但是 UI 可能会变得无响应,但是在 UI 线程上运行的任何长时间运行的方法都会导致这种情况,除非可以应用 async / await。
【解决方案2】:

这是一种有点做作的方法,它通过启动完成的事件来增强异步 lambda 作为参数传递给 ActionBlock

public static Func<TInput, Task> Enhance<TInput>(
    Func<TInput, Task> action,
    Action<TInput> onActionStarted = null,
    Action<TInput> onActionFinished = null,
    ISynchronizeInvoke synchronizingObject = null)
{
    return async (item) =>
    {
        RaiseEvent(onActionStarted, item, synchronizingObject);
        await action(item).ConfigureAwait(false);
        RaiseEvent(onActionFinished, item, synchronizingObject);
    };
}

private static void RaiseEvent<T>(Action<T> onEvent, T arg1,
    ISynchronizeInvoke synchronizingObject)
{
    if (onEvent == null) return;
    if (synchronizingObject != null && synchronizingObject.InvokeRequired)
    {
        synchronizingObject.Invoke(onEvent, new object[] { arg1 });
    }
    else
    {
        onEvent(arg1);
    }
}

使用示例:

private void Form_Load(object sender, EventArgs e)
{
    var block = new ActionBlock<string>(Enhance<string>(async item =>
    {
        await Task.Delay(5000); // Simulate some lengthy asynchronous job
    }, onActionStarted: item =>
    {
        this.Text = $"{item} started";
    }, onActionFinished: item =>
    {
        ListBoxCompleted.Items.Add(item);
    }, synchronizingObject: this), new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2,
        BoundedCapacity = 10,
        EnsureOrdered = false
    });
}

onActionStartedonActionFinished 回调将在 UI 线程中为每个已处理的项目调用一次。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-19
    • 1970-01-01
    • 1970-01-01
    • 2017-12-21
    • 2017-11-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多