【问题标题】:Report when input to first dataflow block finishes all linked blocks当输入到第一个数据流块完成所有链接块时报告
【发布时间】:2022-01-12 22:14:49
【问题描述】:

我正在使用 TPL Dataflow 从票务系统下载数据。

系统将票号作为输入,调用 API 并接收包含各种信息的嵌套 JSON 响应。收到后,一组块处理嵌套结构的每个级别并将其写入关系数据库。例如对话、对话附件、用户、用户照片、用户标签等

Json

{
    "conversations":[
        {
            "id":12345,
            "author_id":23456,
            "body":"First Conversation"
        },
        {
            "id":98765,
            "authorid":34567,
            "body":"Second Conversation",
            "attachments":[
            {
                "attachmentid":12345
                "attachment_name":"Test.jpg"
            }
        }
    ],
    "users":[
        {
            "userid":12345
            "user_name":"John Smith"
        },
        {
            "userid":34567
            "user_name":"Joe Bloggs"
            "user_photo":
            {
                "photoid":44556,
                "photo_name":"headshot.jpg"
            },
            tags:[
                "development",
                "vip"
            ]
        }
    ]

代码

一些块需要广播,以便更深的嵌套仍然可以访问数据。例如UserModelJson 被广播,因此 1 个块可以处理写入用户,1 个块可以处理写入用户标签,1 个块可以处理写入用户照片。

var loadTicketsBlock = new TransformBlock<int, ConversationsModelJson>(async ticketNumber => await p.GetConversationObjectFromTicket(ticketNumber));
var broadcastConversationsObjectBlock = new BroadcastBlock<ConversationsModelJson>(conversations => conversations);

//Conversation
var getConversationsFromConversationObjectBlock = new TransformManyBlock<ConversationsModelJson, ConversationModelJson>(conversation => ModelConverter.ConvertConversationsObjectJsonToConversationJson(conversation));
var convertConversationsBlock = new TransformBlock<ConversationModelJson, ConversationModel>(conversation => ModelConverter.ConvertConversationJsonToConversation(conversation));
var batchConversionBlock = new BatchBlock<ConversationModel>(batchBlockSize);
var convertConversationsToDTBlock = new TransformBlock<IEnumerable<ConversationModel>, DataTable>(conversations => ModelConverter.ConvertConversationModelToConversationDT(conversations));
var writeConversationsBlock = new ActionBlock<DataTable>(async conversations => await p.ProcessConversationsAsync(conversations));

var getUsersFromConversationsBlock = new TransformManyBlock<ConversationsModelJson, UserModelJson>(conversations => ModelConverter.ConvertConversationsJsonToUsersJson(conversations));
var broadcastUserBlock = new BroadcastBlock<UserModelJson>(userModelJson => userModelJson);

//User
var convertUsersBlock = new TransformBlock<UserModelJson, UserModel>(user => ModelConverter.ConvertUserJsonToUser(user));
var batchUsersBlock = new BatchBlock<UserModel>(batchBlockSize);
var convertUsersToDTBlock = new TransformBlock<IEnumerable<UserModel>, DataTable>(users => ModelConverter.ConvertUserModelToUserDT(users));
var writeUsersBlock = new ActionBlock<DataTable>(async users => await p.ProcessUsersAsync(users));

//UserTag
var getUserTagsFromUserBlock = new TransformBlock<UserModelJson, UserTagModel>(user => ModelConverter.ConvertUserJsonToUserTag(user));
var batchTagsBlock = new BatchBlock<UserTagModel>(batchBlockSize);
var convertTagsToDTBlock = new TransformBlock<IEnumerable<UserTagModel>, DataTable>(tags => ModelConverter.ConvertUserTagModelToUserTagDT(tags));
var writeTagsBlock = new ActionBlock<DataTable>(async tags => await p.ProcessUserTagsAsync(tags));


DataflowLinkOptions linkOptions = new DataflowLinkOptions()
{
    PropagateCompletion = true
};

loadTicketsBlock.LinkTo(broadcastConversationsObjectBlock, linkOptions);

//Conversation
broadcastConversationsObjectBlock.LinkTo(getConversationsFromConversationObjectBlock, linkOptions);
getConversationsFromConversationObjectBlock.LinkTo(convertConversationsBlock, linkOptions);
convertConversationsBlock.LinkTo(batchConversionBlock, linkOptions);
batchConversionBlock.LinkTo(convertConversationsToDTBlock, linkOptions);
convertConversationsToDTBlock.LinkTo(writeConversationsBlock, linkOptions);         

var tickets = await provider.GetAllTicketsAsync();

foreach (var ticket in tickets)
{
    cts.Token.ThrowIfCancellationRequested();
    await loadTicketsBlock.SendAsync(ticket.TicketID);
}

loadTicketsBlock.Complete();

LinkTo 块针对要写入的每种数据类型重复。

我知道整个管道何时完成使用

await Task.WhenAll(<Last block of each branch>.Completion);

但如果我将 1 号票证传递到 loadTicketsBlock 块,那么我如何知道该特定票证何时已通过管道中的所有块并因此完成?

我想知道这一点的原因是我可以向 UI 报告 100 票中的第 1 票已完成。

【问题讨论】:

  • 您是将所有这些异构数据都保存在同一个数据库中,还是保存在多个数据库中?如果您使用单个数据库,我觉得您将数据持久性的责任分配给了这么多数据流块,这对我来说似乎很奇怪。如果一个块失败会发生什么?数据库中存储的数据的一致性难道不重要吗?
  • 是的,所有数据都写入同一个数据库,但写入不同的表,是的,数据的一致性很重要。对我来说,这似乎是一个合乎逻辑的流程,但如果你认为这太多了,你会建议什么?
  • Ninja 我的第一个想法是使用两个数据流块。一种用于调用 API 并接收 JSON 数据,另一种用于将 JSON 数据持久化到数据库中。这样我就可以根据远程服务器和本地数据库的能力来优化配置每个块的最大并行度。
  • Theodor,感谢您的建议,但如果我将所有处理/写入操作限制为 1 个块(将有 30 个不同的表要写入),1. 我如何从以前的在写入 SQL 和 2 之前进行批处理。这不是破坏了数据流的要点并基本上使它成为一个巨大的 foreach 循环吗?
  • 啊,这就是拥有所有这些块的原因。您是否使用SqlBulkCopy 将批量记录插入数据库?

标签: c# tpl-dataflow


【解决方案1】:

您可以考虑使用TaskCompletionSource 作为所有子实体的基类。例如:

class Attachment : TaskCompletionSource
{
}

class Conversation : TaskCompletionSource
{
}

然后每次你在数据库中插入一个子实体,你就将它标记为完成:

attachment.SetResult();

...或者如果插入失败,则将其标记为错误:

attachment.SetException(ex);

最后你可以将所有的异步完成合二为一,使用方法Task.WhenAll

Task ticketCompletion = Task.WhenAll(Enumerable.Empty<Task>()
    .Append(ticket.Task)
    .Concat(attachments.Select(e => e.Task))
    .Concat(conversations.Select(e => e.Task)));

【讨论】:

    【解决方案2】:

    如果我在 Dataflow 中跟踪进度,通常我会将最后一个块设置为通知 UI 进度类型块。为了能够跟踪输入的进度,您需要在传递的所有对象中保留原始输入的上下文,因此在这种情况下,您需要能够知道您正在处理工单 1 all通过管道的方式,并且如果其中一个转换删除了它正在处理票证 1 的上下文,那么您将需要重新考虑通过管道传递的对象类型,以便保留该上下文。

    我正在谈论的一个简单示例如下所示,一个广播块进入三个变换块,然后所有三个变换块都返回一个通知管道进度的动作块。

    当组合回单个动作块时,您需要确保此时不要传播完成,因为一旦一个块将完成传播到动作块,该动作块将停止接受输入,因此您仍将等待每个管道的最后一个块要完成,然后手动完成最终通知 UI 操作块。

    using System;
    using System.Threading.Tasks.Dataflow;
    using System.Threading.Tasks;
    using System.Collections.Generic;
    
    public class Program
    {
        public static void Main()
        {
            var broadcastBlock = new BroadcastBlock<string>(x => x);
            
            var transformBlockA = new TransformBlock<string, string>(x =>
            {
                return x + "A";
            });
            
            var transformBlockB = new TransformBlock<string, string>(x =>
            {
                return x + "B";
            });
            
            var transformBlockC = new TransformBlock<string, string>(x =>
            {
                return x + "C";
            });
            
            var ticketTracking = new Dictionary<int, List<string>>();
            var notifyUiBlock = new ActionBlock<string>(x =>
            {
                var ticketNumber = int.Parse(x.Substring(5,1));
                var taskLetter = x.Substring(7,1);
                var success = ticketTracking.TryGetValue(ticketNumber, out var tasksComplete);
                if (!success)
                {
                    tasksComplete = new List<string>();
                    ticketTracking[ticketNumber] = tasksComplete;
                }
                tasksComplete.Add(taskLetter);
                
                if (tasksComplete.Count == 3)
                {
                    Console.WriteLine($"Ticket {ticketNumber} is complete");
                }
            });
            
            DataflowLinkOptions linkOptions = new DataflowLinkOptions() {PropagateCompletion = true};
            
            broadcastBlock.LinkTo(transformBlockA, linkOptions);
            broadcastBlock.LinkTo(transformBlockB, linkOptions);
            broadcastBlock.LinkTo(transformBlockC, linkOptions);
            transformBlockA.LinkTo(notifyUiBlock);
            transformBlockB.LinkTo(notifyUiBlock);
            transformBlockC.LinkTo(notifyUiBlock);
            
            for(var i = 0; i < 5; i++)
            {
                broadcastBlock.Post($"Task {i} ");
            }
            
            broadcastBlock.Complete();
            
            Task.WhenAll(transformBlockA.Completion, transformBlockB.Completion, transformBlockC.Completion).Wait();
            notifyUiBlock.Complete();
            notifyUiBlock.Completion.Wait();
            
            Console.WriteLine("Done");
        }
    }
    

    这将给出与此类似的输出

    票 0 已完成
    票 1 已完成
    票 2 已完成
    票 3 已完成
    票 4 已完成
    完成

    【讨论】:

    • 感谢 TJ 洛克菲勒。虽然这允许我在每次“工单 1”完成其中一个区块时进行报告,但它仍然不允许我报告工单 1 已完成其所有区块。
    • @Ninja 如果您知道您的所有区块是什么,那么如果您在上下文中包含该信息,那么您的报告区块应该很容易跟踪给定工单的哪些区块已完成,并且然后当所有块都完成后,您可以将该票标记为完成。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多