【发布时间】:2019-11-21 15:54:32
【问题描述】:
我目前正在使用 WPF 和 TPL 数据流编写一个应用程序,它应该执行以下操作:
- 加载目录中的所有文件
- 一旦开始处理,就在 ui 中记录一些内容并处理每个文件
- 完成后将内容记录到 ui
问题在于 UI 的日志记录需要在 UI 线程中进行,并且仅在它开始处理之前进行记录。
我现在能够做到这一点的唯一方法是从 TPL 转换块内部手动调用调度程序并更新 UI:
Application.Current.Dispatcher.Invoke(new Action(() =>
{
ProcessedFiles.Add(optimizedFileResult);
}));
我想通过在 UI 线程上运行的 DataFlow 块来执行此操作:
ExecutionDataflowBlockOptions.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
但是,如果我在进行优化的块上设置它,优化也将运行单线程。
另一方面,如果我要在处理块之前创建一个新块并在那里调用它。它会在实际开始之前开始说“处理”。
示例代码
我创建了一些示例代码来重现此问题:
public class TplLoggingToUiIssue
{
public TplLoggingToUiIssue()
{
}
public IEnumerable<string> RecurseFiles()
{
for (int i = 0; i < 20; i++)
{
yield return i.ToString();
}
}
public async Task Go()
{
var block1 = new TransformBlock<string, string>(input =>
{
Console.WriteLine($"1: {input}");
return input;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 4,
BoundedCapacity = 10,
EnsureOrdered = false
});
var block2 = new TransformBlock<string, string>(input =>
{
Console.WriteLine($"2: {input}\t\t\tStarting {input} now (ui logging)");
return input;
}, new ExecutionDataflowBlockOptions()
{
//TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), (Doesn't work in Console app, but you get the idea)
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1,
EnsureOrdered = false
});
var block3 = new TransformBlock<string, string>(async input =>
{
Console.WriteLine($"3 start: {input}");
await Task.Delay(5000);
Console.WriteLine($"3 end: {input}");
return input;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2,
BoundedCapacity = 10,
EnsureOrdered = false
});
var block4 = new ActionBlock<string>(input =>
{
Console.WriteLine($"4: {input}");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1,
EnsureOrdered = false
});
block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });
var files = RecurseFiles();
await Task.Run(async () =>
{
foreach (var file in files)
{
Console.WriteLine($"Posting: {file}");
var result = await block1.SendAsync(file);
if (!result)
{
Console.WriteLine("Result is false!!!");
}
}
});
Console.WriteLine("Completing");
block1.Complete();
await block4.Completion;
Console.WriteLine("Done");
}
}
如果您运行此示例(只有 6 个“文件”),您将获得以下输出:
Posting: 0
Posting: 1
Posting: 2
Posting: 3
Posting: 4
Posting: 5
1: 2
1: 1
1: 3
1: 0
1: 4
1: 5
2: 2 Starting 2 now (ui logging)
Completing
3 start: 2
2: 0 Starting 0 now (ui logging)
3 start: 0
2: 3 Starting 3 now (ui logging)
2: 1 Starting 1 now (ui logging)
2: 4 Starting 4 now (ui logging)
2: 5 Starting 5 now (ui logging)
3 end: 2
3 end: 0
3 start: 3
3 start: 1
4: 2
4: 0
3 end: 3
3 end: 1
4: 3
3 start: 4
3 start: 5
4: 1
3 end: 5
3 end: 4
4: 5
4: 4
Done
从这个输出中可以看出,它开始的日志记录发生得太早了。我也尝试使用广播块代替,但这会覆盖值,因此它们会丢失。
理想的情况是让日志记录块等待,直到处理块有容量,然后推送一个项目。
【问题讨论】:
-
您可以创建一个
TransformBlock包装器,它公开两个事件TransformStarted和TransformFinished,以及一个SynchronizingObject属性,类似于System.Timers.Timer类中的属性。然后,您可以在构造块时将此属性设置为当前的Form,并订阅这两个事件以在 UI 中获取通知。但是所有这些都是很多管道,并且会增加一些开销,所以我个人更喜欢使用你最初的想法,以Application.Current.Dispatcher.Invoke为特色。
标签: c# .net task-parallel-library tpl-dataflow