【问题标题】:How to make an insert correctly in parallel loop [closed]如何在并行循环中正确插入 [关闭]
【发布时间】:2021-03-20 15:04:47
【问题描述】:

我有一个文件夹,我将在其中保存文本文件(200-500mb - 不是很大,但它的文本文件很大),我想并行处理这个文件。 该文件将有

"ComnanyTestIsert", "Firs Comment", "LA 132", "222-33-22", 1
"ComnanyTestIsert1", "Seconds Comment", "LA 132", "222-33-22", 1

例如,我使用了 2 个这样的文件。我不太明白何时将 BufferedStream 与并行循环一起使用 如何设置并行操作的数量?以及如何正确插入

static void Main(string[] args)
        {
            //Basic usage to help you get started:
            ProcessFileTaskItem(
                new string[] { "\\Insert.txt"
                                        , "\\Insert1.txt" }
                , "Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=test;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=False;ApplicationIntent=ReadWrite;MultiSubnetFailover=False"
                , "test");
        }


        /// This will read an array of input files, process the lines in parallel, and upload
        /// everything into the database.
        public static void ProcessFileTaskItem(string[] SourceFiles, string DatabaseConnectionString, string DestinationTable)
        {
            //Make sure there's files to read
            if (SourceFiles != null && SourceFiles.Length > 0)
            {
                //Loop through the file array
                Parallel.For(0, SourceFiles.Length, x =>
                //for (int x = 0; x < SourceFiles.Length; x++)
                {
                    //Make sure the file exists and if so open it for reading.
                    if (File.Exists(SourceFiles[x]))
                    {
                        using (SqlConnection connectionDest = new SqlConnection(DatabaseConnectionString))
                        {
                            connectionDest.Open();

                            //Configure everything to upload to the database via bulk copy.
                            using (SqlBulkCopy sbc = new SqlBulkCopy(connectionDest, SqlBulkCopyOptions.TableLock, null))
                            {
                                //Configure the bulk copy settings
                                sbc.DestinationTableName = DestinationTable;
                                sbc.BulkCopyTimeout = 28800; //8 hours

                                //Now read and process the file
                                ProcessAllLinesInInputFile(SourceFiles[x], connectionDest, sbc);
                            }

                            connectionDest.Close();
                        }

                    }
                    
                } //for
                ); //End Parallel reading of files

                //Explicitly clean up before exiting
                Array.Clear(SourceFiles, 0, SourceFiles.Length);
            }
            
        } 

        /// Processes every line in the source input file.
        private static void ProcessAllLinesInInputFile(string SourceFiles, SqlConnection connectionDest, SqlBulkCopy sbc)
        {
            
            //Create a local data table. Should be the same name as the table
            //in the database you'll be uploading everything to.
            DataTable CurrentRecords = new DataTable("test");

            //The column names. They should match what's in the database table.
            string[] ColumnNames = new string[] { "Name", "Comment", "Address", "Phone", "IsActive" };

            
            using (FileStream fs = File.Open(SourceFiles, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
            using (BufferedStream bs = new BufferedStream(fs))
            using (StreamReader sr = new StreamReader(bs))
            {
                string s;
                while ((s = sr.ReadLine()) != null)
                {

                }
            }

            //Create the datatable with the column names.
            for (int x = 0; x < ColumnNames.Length; x++)
                CurrentRecords.Columns.Add(ColumnNames[x], typeof(string));


                //Now process each line in parallel.
                Parallel.For(0, SourceFiles, x =>
                {
                    List<object> values = null; //so each thread gets its own copy. 
                    


                }
        }

【问题讨论】:

  • 你是说不能将文件加载到内存中?
  • 我需要将数据从文本文件插入到 sql。我尝试使用并行文件处理,然后创建本地数据表并批量插入此数据

标签: c# .net linq bulkinsert


【解决方案1】:

Parallel.For 自动调整使用的线程数,但可以在parallelOptions 参数中指定。

您是否有理由相信并行执行此操作会提高性能?多线程不会神奇地让一切变得更快。像这样的 IO 操作通常不会从多线程中受益。尤其是如果您有旋转介质,其中并发 IO 会大幅降低吞吐量,甚至 SSD 通常也会遭受非顺序 IO 的影响。此外,如果您担心性能,您应该进行适当的测量,这样您就可以判断您是否真的在改进。

除非另有说明,非静态框架方法不是线程安全的。所以你不应该尝试从多个线程中读取同一个流。您可以使用多个流,但如果您有足够的内存,我建议您使用File.ReadAllLines 而不是ReadLine,我怀疑这样会更快,因为它可以顺序读取所有内容。

【讨论】:

    【解决方案2】:

    您的过程中较慢的部分是读取文件中的数据。通常,您的程序将不得不等待“硬盘”提供大量数据。您的程序已经可以对已经获取的项目进行一些处理,而不是无所事事地等待。

    当您的程序必须等待某个外部进程时,例如写入磁盘、从数据库管理系统查询数据或从 Internet 获取信息,考虑使用 async-await 是明智的.

    如果您使用 async-await,那么,每当您的进程必须等待某个其他进程完成时,它就不会无所事事地等待,而是会环顾四周,看看它是否可以做其他事情。

    在您的情况下,您可以调用一个异步函数,该函数异步读取一个文件并将读取的数据异步写入数据库。如果您启动其中几个任务,那么每当这些任务之一必须等待文件读取或数据库写入的结果时,它可以环顾四周,看看它是否可以为其他任务做任何事情。因此,当它在任务 A 中等待读取文件 X 的数据块时,它可能已经开始在任务 B 中将数据写入数据库。

    当我们逐行处理文件时,我们需要一个返回 IEnumerable&lt;string&gt; 的函数,或异步等效项:IAsyncEnumable&lt;string&gt;。见iterating with AsyncEnumerable

    public async IAsyncEnumerable<string> ReadLines(string fileName)
    {
        using (StreamReader reader = File.OpenText(fileName)
        {
            while(!reader.EndOfStream)
            {
                yield return await reader.ReadLineAsync().ConfigureAwait(false);
            }
        }
    }
    

    File.OpenText 遗憾的是只允许同步 I/O;在这种情况下,异步 API 的实现很差。要打开真正的异步文件,您需要使用具有布尔参数 isAsync 或 FileOptions.Asynchronous 的 overloads of the FileStream constructors 之一。

    用法:

    async Task DisplayFileContentsAsync(string fileName)
    {
        await foreach(string line in ReadFileAsync(fileName))
        {
            Console.WriteLine(line);
        }
    }
    

    我们还需要一个将读取的数据写入数据库的方法。我在这里一行一行地做,如果你愿意,你可以改变它,一次写几行。

    async Task SaveInDbAsync(string line, string dbConnectionString)
    {
        using (SqlConnection dbConnection = new SqlConnection(dbConnectionString))
        {
            // prepare the SQL command (consider using other methods)
            const string sqlCommandText = @"Insert into ...";
            var dbCommand = dbConnection.CreateCommand();
            dbCommand.CommandText = sqlCommandText;
            dbCommand.Parameters.Add(...)
    
            // async execute the dbCommand
            await dbConnection.OpenAsync();
            await dbCommand.ExecuteNonQueryAsync();
            // TODO: consider to use the return value to detect problems
        }
    }
    

    把它们放在一起:读取一个文件并将行保存在数据库中:

    async Task SaveFileInDbAsync(string fileName, string dbConnectionString)
    {
        await foreach(string line in ReadFileAsync(fileName))
        {
            await SaveInDbAsync(line, dbConnectionString);
        }
    }
    

    保存所有文件:

    async Task SaveFilesInDbAsync(IEnumerable<string> fileNames, string dbConnectionString)
    {
        // start all Tasks, do not await yet:
        List<Task> tasks = new List<Task>();
        foreach (string fileName in fileNames)
        {
            Task task = SaveFileInDbAsync(fileName, dbConnectionString);
            tasks.Add(task);
        }
    
        // now that all Tasks are started and happily reading files
        // and writing the read lines to the  database
        // await until all Tasks are finished.
        await Task.WhenAll(tasks);
    }
    

    或者如果您需要同步版本:

    void SaveFilesInDb(IEnumerable<string> fileNames, string dbConnectionString)
    {
        // start all Tasks, do not await yet:
        List<Task> tasks = new List<Task>();
        foreach (string fileName in fileNames)
        {
            Task task = SaveFileInDbAsync(fileName, dbConnectionString);
            tasks.Add(task);
        }
    
        // Wait until all Tasks are finished.
        Task.WaitAll(tasks);
    }
    

    【讨论】:

    • 谢谢。我使用 dapper(获取模型列表并等待 db.ExecuteAsync(sql,参数)并查看时间读取的结果:00:00:00.0291921 和插入时间:00:03:13.3398096。在文件中只有 20k 行,什么可以你建议?
    • 有时我会想:我是不是必须告诉他们如何调试。你确定读的时间是DateTime,不是TimeSpan吗?调查它是否来自Dapper:如果将读取的数据写入文件会发生什么?如果您使用 SQL 而不是 Dapper 将其写入数据库会怎样?用小程序测试一下,就知道跟async await部分没有关系了。
    • “遗憾的是,File.OpenText 只允许同步 I/O;在这种情况下异步 API 的实现很差。”
    【解决方案3】:

    同时对同一个表进行 >1 次批量插入不会给您带来很好的加速,尤其是如果您放置 SqlBulkCopyOptions.TableLock

    一个更好的加速策略是把你的文件组合成更大的批量插入批次,甚至只是 1 个批次。如果您可以将整个批次放入内存中,则并行读取所有文件(首选使用异步方法,而不是并行处理),将每个文件组合到您的批次中,然后批量插入单个批次。

    您可以使其并行上传,但您确实需要将其分开表以使其快速工作。

    【讨论】:

    • 在这种情况下您将如何实现?
    • 不要用 parallel.for 想太多。让它在没有第一个的情况下工作。第 1 步:遍历所有文件,并为每一行添加到您的批处理中。第二步:上传批次
    猜你喜欢
    • 2013-09-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-08
    • 2018-11-14
    • 2016-05-03
    • 2015-08-13
    相关资源
    最近更新 更多