【问题标题】:Database Record Processing [closed]数据库记录处理[关闭]
【发布时间】:2013-03-10 06:02:55
【问题描述】:

我如何将记录的工作负载拆分到多个线程,特别是 Accuracer DB,它有 169 条记录和 7 个线程。

因为我可以只拆分范围内的记录数并让每个线程处理该范围。但是,如果用户删除或添加记录,它将无法正常工作。

【问题讨论】:

  • 您的问题是“如何在线程中处理它?”或“如何划分工作,使每个线程都有相同的工作要做?”,或两者兼而有之?。
  • 您是说 numberOfRecordsPerThread = totalRecords/7 不起作用,因为可能会出现新记录?
  • @jachguate 如何分配工作,以便每个线程都有相同的工作量 :) 谢谢你提醒我我会编辑..
  • 好吧,在您将初始数量的记录分配给线程之后,如果您有更多记录要处理,您可以将它们随机分配给线程。但是 I 会做的是有一个调度程序类,并让线程请求记录,因为线程用完了要处理的记录。如果记录处理得很快,您可以给线程批次 2 或 3 或 5 或 10 条记录。如果缓慢,则一次分发 1 条记录。
  • 嗯,你的问题很有趣,但很笼统,所以很难写出具体的答案。我的一般建议是查看OmniThreadLibrary,恕我直言,这是最好的选择,你必须毫不费力地写这个。如果您已经完成了某件事,最好展示您必须获得的有用答案。

标签: database delphi delphi-xe2 accuracerdb


【解决方案1】:

您可以使用OmniThreadLibrary 并行处理数据库中的记录,而不会带来太多麻烦。

我使用 Pipeline 抽象编写了一个示例。 3个阶段的管道常量:

  1. 第一阶段从数据库中读取数据,创建一个容器对象的实例来代表管道下一阶段的数据。
  2. 第二阶段处理传入的数据。

    • 调用DoSomethingWith 过程,这只会浪费大约 100 毫秒。模拟数据的处理
    • 释放容器实例的内存。
    • 然后将文字值1 添加到输出队列,以通知最后阶段已处理了另一条记录。

    此阶段配置为在 7 个线程中并行运行。

  3. 最后一个阶段只是统计从前一个阶段完成了多少条记录

该示例是一个控制台应用程序,您只需复制/粘贴即可查看它在您的机器中的实时运行情况。

program Project1;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  OtlCommon,
  OtlCollections,
  OtlParallel,
  System.Diagnostics,
  DB, DBClient;

type
  //auxiliar container, used to copy the database data
  //to avoid synchronization. remember TDataSet "current record"
  //may cause conflicts if changed from different threads.
  TContainer = class
  private
    FName: string;
    FID: Int64;
  public
    property ID: Int64 read FID write FID;
    property Name: string read FName write FName;
  end;

//does nothing, but wastes around 100ms. "processing" each record
procedure DoSomethingWith(const AValue: TContainer);
begin
  Sleep(100);
end;

//creates a DataSet on the fly with a random number of records
function CreateDataSet: TClientDataSet;
var
  I: Integer;
begin
  Result := TClientDataSet.Create(nil);
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'ID';
    DataType := ftLargeint;
  end;
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'NAME';
    DataType := ftString;
  end;
  Result.CreateDataSet;
  for I := 1 to Random(1000) do
    Result.InsertRecord([I, 'Test']);
end;

var
  RecordsProcessed: Integer;
  SW: TStopwatch;
  Data: TDataSet;
begin
  IsMultiThread := True;
  Randomize;
  Writeln('wait while processing...');
  SW := TStopwatch.Create;
  SW.Start;
  try
    Data := CreateDataSet;
    try
      RecordsProcessed := Parallel.Pipeline
        .Stage(
          procedure (const Input, Output: IOmniBlockingCollection)
          var
            RecData: TContainer;
          begin
            Data.First;
            while not Data.Eof do
            begin
              RecData := TContainer.Create;
              RecData.ID := Data.Fields[0].AsLargeInt;
              RecData.Name := Data.Fields[1].AsString;
              Output.Add(RecData);
              Data.Next;
            end;
          end)
        .Stage(
          procedure (const Input: TOmniValue; var Output: TOmniValue)
          begin
            //process the real thing here
            DoSomethingWith(Input);
            Input.AsObject.Free;
            Output := 1; //another record
          end)
        .NumTasks(7) //this stage is processed by 7 parallel tasks
        .Stage(
           procedure (const Input, Output: IOmniBlockingCollection)
           var
             Recs: Integer;
             Value: TOmniValue;
           begin
             Recs := 0;
             for Value in Input do
               Inc(Recs, Value);
             Output.Add(Recs);
           end)
        .Run.Output.Next;
      SW.Stop;
      Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
      Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
    finally
      Data.Free;
    end;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
  readln;
end.

恕我直言,这样做的主要优点是:

  • 您有一个灵活的机制在多个工作人员之间分配工作。如果某些记录需要更多时间来处理,图书馆会处理这种情况,您可以合理地期望在更短的时间内完成全部工作。
  • 从数据库中读取第一条记录后,您的第一个处理线程就会立即启动。
  • 如果您必须等待基表中的更多传入记录,您可以轻松调整它。在阶段过程中的代码结束之前,阶段的输出队列不会被标记为完成。如果在某个时候没有更多工作要做,所有即将到来的阶段都会阻止等待更多数据处理。
  • 只需更改参数值即可更改工作线程数!

【讨论】:

  • 一个问题会阻塞主线程的GUI吗?因为 OTL 中的示例阻止了 GUI。 :)
  • 在这个例子中我需要它来block,因为应用程序在实际工作开始之前就结束了。但是你可以改变它,当你知道你在做什么时,改变是微不足道的。
  • 提示:Run 方法立即返回。
  • 数据是直接从Dataset读取还是复制过来的?
  • 嘿,你有(注释的)代码在这里看到它!。你为什么要问这样的问题?你看懂代码了吗?,你看过cmets吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-02-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多