【问题标题】:Selecting lots of data async from database with EF6使用 EF6 从数据库中选择大量异步数据
【发布时间】:2015-04-12 22:13:15
【问题描述】:

我要做的是从大约有 400 万行的表中获取行,以针对 ElasticSearch 对其进行索引。

底层索引器将使用 IndexManyAsync 并批量处理提供给它的枚举。

类似:

public void IndexMany(IEnumerable<IIndexModel> indexModels) {
    var client = new ElasticClient(settings);
    var batches = indexModels.Batch(1000);
    var tasks = new List<Task>();
    Parallels.ForEach(partitions, partition =>
    {
        var task = client.IndexManyAsync(partition);
        tasks.Add(task);
    }

    Task.WaitAll(tasks.ToArray());
}

因此,考虑到这一点,我想使用 IndexModels 创建一个可枚举。

IndexModels 将获取一个实体并通过给定实体初始化各种属性。比如:

public class FooModel<T> : IIndexModel
{
    public FooModel(T entity) 
    {
        Name = entity.Name;
    }

    public string Name { get; set; }
}

我有一个包含约 40 万行的表,这显然需要一些时间来查询。所以我想做的是异步执行。

我已经尝试了各种方法来做到这一点。第一种方法是批量查询并对其进行并行处理。这给 ObjectContext 带来了各种并发问题。

public void IndexAllModels() {
    using (var db = new Db()) {
        var batchedEntities = db.BigTable.Select(p => p).Batch(1000);

        Parallels.ForEach(batchedEntities, currentBatch =>
        {
            var indexModels = new List<IIndexModel>();
            foreach (var entity in currentBatch) 
            {
                var indexModel = new FooModel<BigTable>(entity);
                indexModels.Add(indexModel);
            }

            IndexMany(indexModels);
        }
    }
}

我想知道是否有任何方法可以通过使用新的 EF6 异步操作来做到这一点?

【问题讨论】:

  • indexModels 的大小是多少?
  • 可能会有所不同。一些 indexmodel 设置了大约 20 个属性,而一些只有 5 个左右。
  • 多达 400 万个索引模型。
  • 为什么 ElasticSearch 已经通过 JDBC 连接支持 ORM 来爬取数据库?使用 ORM 并没有任何好处(不涉及 个对象),但是通过只传递数据的中介确实会增加大量开销

标签: c# .net entity-framework asynchronous async-await


【解决方案1】:

自然使用async API 的好处是您不需要使用线程来使用它们。一直到 WinAPI 级别,There Is No Thread

您可以创建一个采用 IEnumerable&lt;IndexModel&gt; 的方法并使用 ElasticSearchs 异步 API,如下所示:

public async Task IndexManyAsync(IEnumerable<IIndexModel> indexModels) 
{
    var client = new ElasticClient(settings);

    var taskBatches = indexModels.Batch(1000)
                                 .Select(partition =>
                                         client.IndexManyAsync(partition));

    await Task.WhenAll(taskBatches);
}

假设IndexManyAsync 为每个请求使用单独的DbContext,这应该可以工作。

【讨论】:

  • 谢谢!这解释了一些。但是,这并没有回答有关如何从数据库异步检索大块数据的问题。我的问题可能不是很清楚。但是选择 4 个磨机行也可能是内存问题。
  • @Ekenstein 哦,我以为你已经在使用 Batch 来处理每 1000 个请求。
  • 是的。我使用 morelinq 来批量查询。但是,当尝试使用并行 foreach 获取每个批次时,它会变得很麻烦,因为 objectcontext 似乎不是线程安全的。此外,我还希望为每个并行实体实例化一个索引模型。
  • @Ekenstein 为什么使用 ORM 爬取数据库?你永远无法达到 ElasticSearch 直接抓取它的速度,更不用说 ORM 不适合批量/集合操作。
  • @Ekenstein 您可以通过指定视图而不是表本身来做到这一点。即使您不得不使用中间处理步骤(例如,对于某些无法在 SQL 中表达的复杂计算),使用 SqlReader 作为 firehose 游标和立即或分批将每个已处理的行发送到 ElasticSearch。如果您希望多个这样的通道同时从数据库加载数据,并发将有帮助 - 假设 ElasticSearch 可以处理该负载
猜你喜欢
  • 2010-10-12
  • 1970-01-01
  • 1970-01-01
  • 2014-03-18
  • 2012-06-13
  • 1970-01-01
  • 2016-01-24
  • 2014-05-06
  • 1970-01-01
相关资源
最近更新 更多