【问题标题】:Multithreading within a thread lock线程锁中的多线程
【发布时间】:2018-06-20 12:32:13
【问题描述】:

我正在努力加快一些流程的执行速度,这些流程将大量记录(大部分为数百万)发布到 Elasticsearch。在我的 C# 代码中,我已经使用 Dataflow 实现了一个多线程解决方案,如下所示:

var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));

fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });

fetchRecords.Post("Start");

然后是我要实现的发送批量请求调用:

public IBulkResponse sendBulkRequest(List<?> records)
{
    lock(SomeStaticObject)
    {
       // Execute several new threads to send records in bulk
    }
}

的问题是关于在作为 Dataflow 管道的一部分存在的锁中执行额外线程的实用性。

这样好吗?我能看到性能、执行、缓存/内存未命中等方面的任何潜在问题吗?

我们很乐意接受任何见解。

【问题讨论】:

  • 不要将lock 与 TPL 数据流一起使用。它违背了目的。 TPL DataFlow 具有内置的并发限制和保护,无需求助于lock
  • @Fabjan 不正确
  • @Stephn_R 我假设您的目标是确保 sendBulkRequest 仅由一个线程执行,但其中的某些操作由多个线程并行执行。对吗?
  • @Stephn_R 你见过BulkAll() 方法,它接受一个延迟构造的IEnumerable&lt;T&gt; 文档并发送带有重试语义的并发批量请求吗? github.com/elastic/elasticsearch-net/blob/… 和测试 github.com/elastic/elasticsearch-net/blob/…
  • @RussCam 您可以提交您的建议作为解决方案吗?这正是我想要的。我的执行时间现在减少了十倍。

标签: c# multithreading elasticsearch .net-core nest


【解决方案1】:

您可能希望在此处使用BulkAll,它实现了可观察模式以向 Elasticsearch 发出并发批量请求。这是一个例子

void Main()
{   
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var connectionSettings = new ConnectionSettings(pool);

    var client = new ElasticClient(connectionSettings);
    var indexName = "bulk-index";

    if (client.IndexExists(indexName).Exists)
        client.DeleteIndex(indexName);

    client.CreateIndex(indexName, c => c
        .Settings(s => s
            .NumberOfShards(3)
            .NumberOfReplicas(0)
        )
        .Mappings(m => m
            .Map<DeviceStatus>(p => p.AutoMap())
        )
    );

    var size = 500;

    // set up the observable
    var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
        .Index(indexName)
        .MaxDegreeOfParallelism(4)
        .RefreshOnCompleted()
        .Size(size)
    );

    var countdownEvent = new CountdownEvent(1);

    Exception exception = null;

    // set up an observer. Delegates passed are:
    // 1. onNext
    // 2. onError
    // 3. onCompleted
    var bulkAllObserver = new BulkAllObserver(
        response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
        ex => 
        {
            // capture exception for throwing outside Observer.
            // You may decide to do something different here
            exception = ex;
            countdownEvent.Signal();
        },
        () => 
        {
            Console.WriteLine("Finished");
            countdownEvent.Signal();
        });

    // subscribe to the observable          
    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait indefinitely for it to finish. May want to put a
    // max timeout on this  
    countdownEvent.Wait();

    if (exception != null) 
    {
        throw exception;
    }
}

// lazily enumerated collection
private static IEnumerable<DeviceStatus> GetDeviceStatus()
{
    for (var i = 0; i < DocumentCount; i++)
        yield return new DeviceStatus(i); 
}

private const int DocumentCount = 20000;

public class DeviceStatus
{
    public DeviceStatus(int id) => Id = id;
    public int Id {get;set;}
}

如果你不需要在观察者中做任何特殊的事情,你可以在observable上使用.Wait()方法

var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
    .Index(indexName)
    .MaxDegreeOfParallelism(4)
    .RefreshOnCompleted()
    .Size(size)
)
.Wait(
    TimeSpan.FromHours(1), 
    response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
);

BulkAllScrollAllReindex 有一些可观察的方法(尽管有 ReindexOnServer 在 Elasticsearch 中重新索引并映射到 the Reindex API - Reindex 方法早于此)

【讨论】:

  • 有点过头了,不过这样就行了哈哈
  • 会帮助其他人找到它。需要更好地记录这些:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-06-16
  • 1970-01-01
  • 2014-03-20
  • 2011-07-29
  • 1970-01-01
相关资源
最近更新 更多