【发布时间】:2018-06-20 12:32:13
【问题描述】:
我正在努力加快一些流程的执行速度,这些流程将大量记录(大部分为数百万)发布到 Elasticsearch。在我的 C# 代码中,我已经使用 Dataflow 实现了一个多线程解决方案,如下所示:
var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));
fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });
fetchRecords.Post("Start");
然后是我要实现的发送批量请求调用:
public IBulkResponse sendBulkRequest(List<?> records)
{
lock(SomeStaticObject)
{
// Execute several new threads to send records in bulk
}
}
你的问题是关于在作为 Dataflow 管道的一部分存在的锁中执行额外线程的实用性。
这样好吗?我能看到性能、执行、缓存/内存未命中等方面的任何潜在问题吗?
我们很乐意接受任何见解。
【问题讨论】:
-
不要将
lock与 TPL 数据流一起使用。它违背了目的。 TPL DataFlow 具有内置的并发限制和保护,无需求助于lock -
@Fabjan 不正确
-
@Stephn_R 我假设您的目标是确保 sendBulkRequest 仅由一个线程执行,但其中的某些操作由多个线程并行执行。对吗?
-
@Stephn_R 你见过
BulkAll()方法,它接受一个延迟构造的IEnumerable<T>文档并发送带有重试语义的并发批量请求吗? github.com/elastic/elasticsearch-net/blob/… 和测试 github.com/elastic/elasticsearch-net/blob/… -
@RussCam 您可以提交您的建议作为解决方案吗?这正是我想要的。我的执行时间现在减少了十倍。
标签: c# multithreading elasticsearch .net-core nest