Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
946 views
in Technique[技术] by (71.8m points)

multithreading - How to use Pipeline pattern in Delphi

I am trying to implement a Pipeline pattern in my test project (How to make a Mutlithreded idhttp calls to do work on a StringList), but am having a struggle adapting TThread code to Pipeline pattern code. There are not many resources about how to use it.

I tried my best below, please DO NOT downvote, I know my code is messy but I'll edit my question if needed.

type
  TForm2 = class(TForm)
    ...
  private
    procedure Retriever(const input: TOmniValue; var output: TOmniValue);
    procedure Inserter(const input, output: IOmniBlockingCollection);
    function HttpGet(url: string; var page: string): boolean;
  end;

procedure TForm2.startButton1Click(Sender: TObject);
var
  pipeline: IOmniPipeline;
  i       : Integer;
  v       : TOmniValue;
  s       : string;
  urlList : TStringList;
begin
  pipeline := Parallel.Pipeline;
  pipeline.Stage(Retriever);
  pipeline.Stage(Inserter).NumTasks(10);
  pipeline.Run;
  for s in urlList do
    pipeline.Input.Add(s);
  pipeline.Input.CompleteAdding;
  // wait for pipeline to complete
  pipeline.WaitFor(INFINITE);
end;

function TForm2.HttpGet(url: string; var page: string): boolean;
var
  lHTTP: TIdHTTP;
  i : integer;
  X : Tstrings;
  S,M,fPath : String;
begin
  lHTTP := TIdHTTP.Create(nil);
  X := TStringList.Create;
  try
    X.Text := lHTTP.Get('https://instagram.com/'+fPath);
    S:= ExtractDelimitedString(X.Text);
    X.Clear;
    Memo2.Lines.Add(fPath+ ' :     '+ M ); //how to pass the result to Inserter
  finally
    lHttp.Free;
  end;
end;

procedure TForm2.Inserter(const input, output: IOmniBlockingCollection);
var
  result   : TOmniValue;
  lpage     : string;
begin
  for result in input do begin
    Memo2.Lines.Add(lpage);
    FreeAndNil(lpage);
  end;
  // correect?
end;

procedure TForm2.Retriever(const input: TOmniValue; var output: TOmniValue);
var
  pageContents: string;
begin
  if HttpGet(input.AsString, pageContents) then
    output := //???
end;
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

First of all - describe what is your specific problem. No one can stand behind your back and look at your computer and see what you are doing. http://www.catb.org/esr/faqs/smart-questions.html#beprecise

You do imply your program misbehaves. But you do not describe how and why. And we do not know it.

As general remarks, you overuse the pipeline a bit.

  1. all the worker procedures you pass to OTL - in your case those are Inserter and Retriever work in random threads. That means none of them should touch GUI without synchronizing - VCL is not multithreaded. Also using TThread.Synchronize is a poor choice as I explained to you in the linked question. It makes program slow and it makes forms unreadable. To update your form use polling with fixed framerate. Do not update your form from inside OTL workers.

In other words, Inserter is not what you need. All you need from the pipeline here is its Input collection, a downloader procedure and the Output collection. Yes it is very simple task for the complex things pipelines are, that is why I mentioned two other simpler patterns before it.

You need TTimer on your form that would poll the Output collection at fixed framerate 2-3 times per second, and check that the collection is not finalized yet ( if it is - the pipeline got stopped ) and that should update GUI from a main thread.

  1. You should not wait for a pipeline to finish inside your main VCL thread. Instead You should detach the pipeleine and let it run totally in background. Save the reference to the created pipeline into the Form's member variable so you could access its Output collection from the TTimer event and also can free the pipeline after its process run over.

You should keep that variable linked to the pipeline object until the downloading is over and set to nil (Free the objects) after that, but not before. You know about interfaces and reference-counting in Delphi, right?

For other OTL patterns like parallel-FOR read OTL docs about their .NoWait() calls.

  1. You should make this Your form bi-modal, to have different set of enabled controls when downloading is running and when it is not. I usually do it with special Boolean property like I shown to you in the topic you linked. Your user is not supposed to change the lists and settings while the pipeline is in progress (unless you would implement that realtime task changing, but you did not yet). This mode switcher would also be a good place to free the finished pipeline object when the switching is going from working mode to idle mode.

  2. If you would want to play with the pipeline workers chaining, then you can put into the Input Collection not the URL strings themselves, but the array of those - the Memo1.Lines.ToArray(), then you can start with Unpacker stage that gets string arrays from the input collection (there would be only one, actually) and enumerate it and put the strings into stage-output collection. This however has little practical value, it would even slow your program down a tiny bit, as the Memo1.Lines.ToArray() function would still work in the main VCL thread. But just to experiment with the pipelines this might be funny.

So the draft becomes like that,

 TfrmMain = class(TForm)
  private
    var pipeline: IOmniPipeline;

    property inProcess: Boolean read ... write SetInProcess;
...
  end.

procedure Retriever(const input: TOmniValue; var output: TOmniValue);
var
  pageContents, URL: string;
  lHTTP: TIdHTTP;
begin
  URL := input.AsString;

  lHTTP := TIdHTTP.Create(nil);
  try
    lHTTP.ReadTimeout := 30000;
    lHTTP.HandleRedirects := True;

    pageContents := ExtractDelimitedString( lHTTP.Get('https://instagram.com/' + URL) );

    if pageContents > '' then
       Output := pageContents;
  finally
    lHTTP.Destroy;
  end;
end;

procedure TfrmMain.FormCloseQuery(Sender: TObject; var CanClose: Boolean);
begin
  if InProgress then begin
     CanClose := False;
     ShowMessage( 'You cannot close this window now.'^M^J+
                  'Wait for downloads to complete first.' ); 
  end;
end;

procedure TfrmMain.SetInProcess(const Value: Boolean);
begin
  if Value = InProcess then exit; // form already is in this mode

  FInProcess := Value;

  memo1.ReadOnly := Value;
  StartButton.Enabled := not Value;
  if Value then 
     Memo2.Lines.Clear;

  Timer1.Delay := 500; // twice per second
  Timer1.Enabled := Value;

  If not Value then  // for future optimisation - make immediate mode change 
     FlushData;      // when last worker thread quits, no waiting for timer event

  If not Value then
     pipeline := nil; // free the pipeline object

  If not Value then
     ShowMessage('Work complete');
end;

procedure TfrmMain.Timer1Timer(const Sender: TObject);
begin
  If not InProcess then exit;

  FlushData;

  if Pipeline.Output.IsFinalized then
     InProcess := False;
end;

procedure TForm2.startButton1Click(Sender: TObject);
var
  s       : string;
  urlList : TStringList;
begin
  urlList := Memo1.Lines;

  pipeline := Parallel.Pipeline;

  pipeline.Stage(Retriever).NumTasks(10).Run;

  InProcess := True; // Lock the input data GUI - user no more can edit it
  for s in urlList do
    pipeline.Input.Add(s);
  pipeline.Input.CompleteAdding;
end;

procedure TfrmMain.FlushData;
var v: TOmniValue;
begin
  if pipeline = nil then exit;
  if pipeline.Output = nil then exit;
  if pipeline.Output.IsFinalized then
  begin
    InProcess := False;  
    exit;
  end;

  Memo2.Lines.BeginUpdate;
  try
    while pipeline.Output.TryTake(v) do
      Memo2.Lines.Add( v.AsString );
  finally
    Memo2.Lines.EndUpdate;
  end;

  // optionally - scroll output memo2 to the last line 
end;

Note few details, think about them and understand the essence of those:

  1. Only FlushData is updating the output memo. FlushData is called from the TTimer event or from the form mode property setter. Both of them only are ever called from the main VCL thread. Thus FlushData is NEVER called form background threads.

  2. Retriever is a free standalone function, it is not a member of the form and it knows nothing about the form and has no reference to your form instance(s). That way you achieve both goals: you avoid "tight coupling" and you avoid a chance of mistakingly access the form's controls from a background thread, which is not allowed in VCL. Retriever functions work in background threads, they do load the data, they do store the data, but they never touch the GUI. That is the idea.

Rule of thumb - all methods of the form are only called from the main VCL thread. All pipeline stage subroutines - bodies of the background threads - are declared and work outside of any VCL forms and have no access to none of those. There should be no mix between those realms.

  1. you throttle GUI update to a fixed refresh rate. And that rate should be not too frequent. Windows GUI and user eyes should have time to catch up.

  2. Your form operates in two clearly delineated modes - InProcess and not InProcess. In those modes different sets of functions and controls are available to the user. It also manages mode-to-mode transitions like clearing output-memo text, alerting user of status changes, freeing memory of used threads-managing objects (here: pipelines), etc. Consequently, this property only is changed (setter is called) from main VCL thread, never from background workers. And #2 helps with that too.

  3. The possible future enhancement would be to use pipeline.OnStop event to issue a PostMessage with a custom Windows Message to your form, so it would switch the mode immediately as the work is done, not waiting for the next timer olling event. This might be the ONLY place where pipeline knows anything about the form and has any references to it. But this open the can of Windows messaging, HWND recreation and other subtle things that I do not want to put here.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...