【问题标题】:How to insert 4 million records from Oracle to Elasticsearch table faster using C#?如何使用 C# 更快地将 400 万条记录从 Oracle 插入到 Elasticsearch 表中?
【发布时间】:2015-09-10 18:19:11
【问题描述】:

我有以下用 C# 编写的代码,但据此,将数据从 Oracle 数据库迁移到 Elasticsearch 需要 4-5 天。我分批插入 100 条记录。有没有其他方法可以更快地迁移 400 万条记录(如果可能的话,可能在不到一天的时间内)?

   public static void Selection()
        {
            for(int i = 1; i < 4000000; i += 1000)
            {
                for(int j = i; j < (i+1000); j += 100)
                {
                    OracleCommand cmd = new OracleCommand(BuildQuery(j), 
                                                     oracle_connection);
                    OracleDataReader reader = cmd.ExecuteReader();
                    List<Record> list=CreateRecordList(reader);
                    insert(list);
                }
            }
        }

   private static List<Record> CreateRecordList(OracleDataReader reader)
        {
            List<Record> l = new List<Record>();
            string[] str = new string[7];
            try
            {
                while (reader.Read())
                {
                    for (int i = 0; i < 7; i++)
                    {
                        str[i] = reader[i].ToString();
                    }

                    Record r = new Record(str[0], str[1], str[2], str[3],                              
                                str[4], str[5], str[6]);
                    l.Add(r);
                }
            }
            catch (Exception er)
            {
                string msg = er.Message;
            }
            return l;
        }

   private static string BuildQuery(int from)
        {
            int to = from + change - 1;
            StringBuilder builder = new StringBuilder();
            builder.AppendLine(@"select * from");
            builder.AppendLine("(");
            builder.AppendLine("select FIELD_1, FIELD_2, 
            FIELD_3, FIELD_4, FIELD_5, FIELD_6, 
            FIELD_7, ");
            builder.Append(" row_number() over(order by FIELD_1) 
             rn");
            builder.AppendLine("   from tablename");
            builder.AppendLine(")");
            builder.AppendLine(string.Format("where rn between {0} and {1}", 
            from, to));
            builder.AppendLine("order by rn");
            return builder.ToString();
        }

   public static void insert(List<Record> l)
        {
            try
            {
                foreach(Record r in l)
                    client.Index<Record>(r, "index", "type");
            }
            catch (Exception er)
            {
                string msg = er.Message;
            }
        }

【问题讨论】:

  • client.Index 替换为client.IndexMany(..) 并尝试找出批量插入的最佳块大小elastic.co/guide/en/elasticsearch/guide/current/…
  • 我需要 4-5 天 .. 你跑过看看是否真的需要 4/5 天才能迁移 4M 行?
  • ROW_NUMBER() 函数会对性能产生负面影响,而且您正在运行它数千次。你已经在使用OracleDataReader——它不会一次将所有四百万行拉到你的机器上,它基本上是一次流式传输一个或几个。相反,您应该有一个查询,并且在构建 Record 对象时,每隔 100 或 500 或 1000 个(例如,保留一个递增每个循环的 count),提交它们(例如在count % 500 == 0)。这必须在几分钟或几小时内完成,而不是几天。
  • 您确定要插入任何行吗?我不认为语法执行和插入。

标签: c# .net oracle elasticsearch nest


【解决方案1】:

ROW_NUMBER() 函数会对性能产生负面影响,并且您正在运行它数千次。你已经在使用OracleDataReader——它不会一次将所有四百万行拉到你的机器上,它基本上是一次流式传输一个或几个。

这必须在几分钟或几小时内完成,而不是几天——我们有几个进程以类似的方式在 Sybase 和 SQL 服务器之间移动数百万条记录,而且只需不到五分钟。

不妨试一试:

OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection);
int batchSize = 500;    
using (OracleDataReader reader = cmd.ExecuteReader())
{
    List<Record> l = new List<Record>(batchSize);
    string[] str = new string[7];
    int currentRow = 0;

    while (reader.Read())
    {
        for (int i = 0; i < 7; i++)
        {
            str[i] = reader[i].ToString();
        }

        l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6]));

        // Commit every time batchSize records have been read
        if (++currentRow == batchSize)
        {
            Commit(l);
            l.Clear();
            currentRow = 0;
        }
    }

    // commit remaining records
    Commit(l);
}

Commit 可能是这样的:

public void Commit(IEnumerable<Record> records)
{
    // TODO: Use ES's BULK features, I don't know the exact syntax

    client.IndexMany<Record>(records, "index", "type");
    // client.Bulk(b => b.IndexMany(records))... something like this
}

【讨论】:

  • 非常感谢您的帮助!正如您提到的,我使用了 ES Bulk 功能。它开始以非常高的速度插入记录,但过了一段时间,它给出了 Argument cannot be empty 错误。我已经相应地编辑了问题。
  • @AakritiMittal:你不应该覆盖你的问题——如果你有一个新问题,请提出一个新问题(如果你愿意,可以参考这个问题)。这是一个问答网站,通过更改您的问题,您会使其他答案无效。我已回滚您的更改。
  • 抱歉覆盖,但我已经解决了这个错误。我还想问您,您的应用程序是如何在不到 5 分钟的时间内在 Sybase 和 SQL 服务器之间移动数百万条记录的?我的新应用程序仍然需要 2 天时间才能将 400 万从 Oracle 迁移到 Elasticsearch,因为它的表中有大量列。
【解决方案2】:

但您不是分批插入 100 个
最后你一次插入一个
(这甚至可能不是插入的正确代码)

foreach(Record r in l)
  client.Index<Record>(r, "index", "type");

如果一次插入一行,则所有读取时的旋转都不起作用
您只是在获得下一批
时引入了延迟 读(几乎)总是比写快

OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection);
OracleDataReader reader = cmd.ExecuteReader();
while (reader.Read())
{
   client.Index<Record>(new Record(reader.GetSting(0),   
                        reader.GetSting(1), reader.GetSting(2), reader.GetSting(3),    
                        reader.GetSting(4), reader.GetSting(5), reader.GetSting(6),  
                        "index", "type");
}
reader.Close();

如果您想并行读写,可以使用 BlockingCollection
但是使用最大大小读取不会比写入提前太多

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-11-10
    • 2011-02-22
    • 2015-01-29
    • 2016-01-28
    • 2015-05-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多