【发布时间】:2022-01-12 22:14:49
【问题描述】:
我正在使用 TPL Dataflow 从票务系统下载数据。
系统将票号作为输入,调用 API 并接收包含各种信息的嵌套 JSON 响应。收到后,一组块处理嵌套结构的每个级别并将其写入关系数据库。例如对话、对话附件、用户、用户照片、用户标签等
Json
{
"conversations":[
{
"id":12345,
"author_id":23456,
"body":"First Conversation"
},
{
"id":98765,
"authorid":34567,
"body":"Second Conversation",
"attachments":[
{
"attachmentid":12345
"attachment_name":"Test.jpg"
}
}
],
"users":[
{
"userid":12345
"user_name":"John Smith"
},
{
"userid":34567
"user_name":"Joe Bloggs"
"user_photo":
{
"photoid":44556,
"photo_name":"headshot.jpg"
},
tags:[
"development",
"vip"
]
}
]
代码
一些块需要广播,以便更深的嵌套仍然可以访问数据。例如UserModelJson 被广播,因此 1 个块可以处理写入用户,1 个块可以处理写入用户标签,1 个块可以处理写入用户照片。
var loadTicketsBlock = new TransformBlock<int, ConversationsModelJson>(async ticketNumber => await p.GetConversationObjectFromTicket(ticketNumber));
var broadcastConversationsObjectBlock = new BroadcastBlock<ConversationsModelJson>(conversations => conversations);
//Conversation
var getConversationsFromConversationObjectBlock = new TransformManyBlock<ConversationsModelJson, ConversationModelJson>(conversation => ModelConverter.ConvertConversationsObjectJsonToConversationJson(conversation));
var convertConversationsBlock = new TransformBlock<ConversationModelJson, ConversationModel>(conversation => ModelConverter.ConvertConversationJsonToConversation(conversation));
var batchConversionBlock = new BatchBlock<ConversationModel>(batchBlockSize);
var convertConversationsToDTBlock = new TransformBlock<IEnumerable<ConversationModel>, DataTable>(conversations => ModelConverter.ConvertConversationModelToConversationDT(conversations));
var writeConversationsBlock = new ActionBlock<DataTable>(async conversations => await p.ProcessConversationsAsync(conversations));
var getUsersFromConversationsBlock = new TransformManyBlock<ConversationsModelJson, UserModelJson>(conversations => ModelConverter.ConvertConversationsJsonToUsersJson(conversations));
var broadcastUserBlock = new BroadcastBlock<UserModelJson>(userModelJson => userModelJson);
//User
var convertUsersBlock = new TransformBlock<UserModelJson, UserModel>(user => ModelConverter.ConvertUserJsonToUser(user));
var batchUsersBlock = new BatchBlock<UserModel>(batchBlockSize);
var convertUsersToDTBlock = new TransformBlock<IEnumerable<UserModel>, DataTable>(users => ModelConverter.ConvertUserModelToUserDT(users));
var writeUsersBlock = new ActionBlock<DataTable>(async users => await p.ProcessUsersAsync(users));
//UserTag
var getUserTagsFromUserBlock = new TransformBlock<UserModelJson, UserTagModel>(user => ModelConverter.ConvertUserJsonToUserTag(user));
var batchTagsBlock = new BatchBlock<UserTagModel>(batchBlockSize);
var convertTagsToDTBlock = new TransformBlock<IEnumerable<UserTagModel>, DataTable>(tags => ModelConverter.ConvertUserTagModelToUserTagDT(tags));
var writeTagsBlock = new ActionBlock<DataTable>(async tags => await p.ProcessUserTagsAsync(tags));
DataflowLinkOptions linkOptions = new DataflowLinkOptions()
{
PropagateCompletion = true
};
loadTicketsBlock.LinkTo(broadcastConversationsObjectBlock, linkOptions);
//Conversation
broadcastConversationsObjectBlock.LinkTo(getConversationsFromConversationObjectBlock, linkOptions);
getConversationsFromConversationObjectBlock.LinkTo(convertConversationsBlock, linkOptions);
convertConversationsBlock.LinkTo(batchConversionBlock, linkOptions);
batchConversionBlock.LinkTo(convertConversationsToDTBlock, linkOptions);
convertConversationsToDTBlock.LinkTo(writeConversationsBlock, linkOptions);
var tickets = await provider.GetAllTicketsAsync();
foreach (var ticket in tickets)
{
cts.Token.ThrowIfCancellationRequested();
await loadTicketsBlock.SendAsync(ticket.TicketID);
}
loadTicketsBlock.Complete();
LinkTo 块针对要写入的每种数据类型重复。
我知道整个管道何时完成使用
await Task.WhenAll(<Last block of each branch>.Completion);
但如果我将 1 号票证传递到 loadTicketsBlock 块,那么我如何知道该特定票证何时已通过管道中的所有块并因此完成?
我想知道这一点的原因是我可以向 UI 报告 100 票中的第 1 票已完成。
【问题讨论】:
-
您是将所有这些异构数据都保存在同一个数据库中,还是保存在多个数据库中?如果您使用单个数据库,我觉得您将数据持久性的责任分配给了这么多数据流块,这对我来说似乎很奇怪。如果一个块失败会发生什么?数据库中存储的数据的一致性难道不重要吗?
-
是的,所有数据都写入同一个数据库,但写入不同的表,是的,数据的一致性很重要。对我来说,这似乎是一个合乎逻辑的流程,但如果你认为这太多了,你会建议什么?
-
Ninja 我的第一个想法是使用两个数据流块。一种用于调用 API 并接收 JSON 数据,另一种用于将 JSON 数据持久化到数据库中。这样我就可以根据远程服务器和本地数据库的能力来优化配置每个块的最大并行度。
-
Theodor,感谢您的建议,但如果我将所有处理/写入操作限制为 1 个块(将有 30 个不同的表要写入),1. 我如何从以前的在写入 SQL 和 2 之前进行批处理。这不是破坏了数据流的要点并基本上使它成为一个巨大的 foreach 循环吗?
-
啊,这就是拥有所有这些块的原因。您是否使用
SqlBulkCopy将批量记录插入数据库?
标签: c# tpl-dataflow