Database Record Processing

418 Views Asked by At

How can I split the workload of records across multiple threads specifically Accuracer DB that has 169 records with 7 threads for example.

Because I could just split the number of records in ranges and let each thread process the range. But if user deletes or adds record it will not work good.

1

There are 1 best solutions below

7
On BEST ANSWER

You can use OmniThreadLibrary to process records from a database in parallel without much hassle.

I wrote an example using the Pipeline abstraction. The pipeline consts of 3 stages:

  1. The first stage reads data from the database, creates a instance of the container object to represent that data for the next stage of the pipeline.
  2. The second stage processes the incoming data.

    • calls the DoSomethingWith procedure that simply wastes around 100 ms. to simulate the processing of the data
    • frees the memory of the container instance.
    • Then adds the literal value 1 to the output queue to inform the final stage that another record has been processed.

    This stage is configured to run in parallel in 7 threads.

  3. The last stage just counts how many records has been completed from the previous stage

The example is a console application to allow you just copy/paste to see it working live in your machine.

program Project1;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  OtlCommon,
  OtlCollections,
  OtlParallel,
  System.Diagnostics,
  DB, DBClient;

type
  //auxiliar container, used to copy the database data
  //to avoid synchronization. remember TDataSet "current record"
  //may cause conflicts if changed from different threads.
  TContainer = class
  private
    FName: string;
    FID: Int64;
  public
    property ID: Int64 read FID write FID;
    property Name: string read FName write FName;
  end;

//does nothing, but wastes around 100ms. "processing" each record
procedure DoSomethingWith(const AValue: TContainer);
begin
  Sleep(100);
end;

//creates a DataSet on the fly with a random number of records
function CreateDataSet: TClientDataSet;
var
  I: Integer;
begin
  Result := TClientDataSet.Create(nil);
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'ID';
    DataType := ftLargeint;
  end;
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'NAME';
    DataType := ftString;
  end;
  Result.CreateDataSet;
  for I := 1 to Random(1000) do
    Result.InsertRecord([I, 'Test']);
end;

var
  RecordsProcessed: Integer;
  SW: TStopwatch;
  Data: TDataSet;
begin
  IsMultiThread := True;
  Randomize;
  Writeln('wait while processing...');
  SW := TStopwatch.Create;
  SW.Start;
  try
    Data := CreateDataSet;
    try
      RecordsProcessed := Parallel.Pipeline
        .Stage(
          procedure (const Input, Output: IOmniBlockingCollection)
          var
            RecData: TContainer;
          begin
            Data.First;
            while not Data.Eof do
            begin
              RecData := TContainer.Create;
              RecData.ID := Data.Fields[0].AsLargeInt;
              RecData.Name := Data.Fields[1].AsString;
              Output.Add(RecData);
              Data.Next;
            end;
          end)
        .Stage(
          procedure (const Input: TOmniValue; var Output: TOmniValue)
          begin
            //process the real thing here
            DoSomethingWith(Input);
            Input.AsObject.Free;
            Output := 1; //another record
          end)
        .NumTasks(7) //this stage is processed by 7 parallel tasks
        .Stage(
           procedure (const Input, Output: IOmniBlockingCollection)
           var
             Recs: Integer;
             Value: TOmniValue;
           begin
             Recs := 0;
             for Value in Input do
               Inc(Recs, Value);
             Output.Add(Recs);
           end)
        .Run.Output.Next;
      SW.Stop;
      Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
      Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
    finally
      Data.Free;
    end;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
  readln;
end.

The main advantages of doing it this way, IMHO, are:

  • you have a flexible mechanism to distribute the job between the multiple workers. If some record takes more time to process, the library takes care of the situation and you can reasonably expect to finish the total work in the less possible time.
  • You'r first processing thread starts as soon as you finish reading the first record from the database.
  • You can easily adapt it if you have to wait for more incoming records in the base table. The output queue of the stage will not be marked as finished until the code in the stage procedure ends. If at some time there's no more work to do, all the upcoming stages would just block waiting for more data to process.
  • You change the number of worker threads just by changing a parameter value!