Share Button

Executar um “select” em paralelo na verdade é bem simples, mas complexo se não deter algumas informações elementares sobre o tratamento que o FireDAC dá ao isolamento de conexão.

Basicamente o processo se dá pelo isolamento da “connection” ao fazer a chamada no banco de dados, talvez aí o maior problema já que é necessário ter uma conexão individual para cada chamada ao banco.

No FireDAC esta disponível na IDE o componente TFDMonitor que é responsável em gerar um novo componente de conexão para cada chamada feita. Este é o caminho mais fácil para utilizar “multithreaded”, assunto que não irei tratar aqui já que o interessante é explorar recursos que entregue mais conteúdo a quem quer entender como o processo acontece.

em construção …. ajude a escrever este texto enviando comentários. Gratidão

Base para o texto:
Unit: Data.FireDAC.Helper
Ver: Exemplo no GIT

Share Button

Congelando a janela com TTask.WaitForAll ???

#compartilhandoconhecimento #wba10anos
Depois que publiquei o vídeo Papo sobre POO (TTask e outros), recebi um comentário que me deixou intrigado.

Porquê o a janela principal trava quando executo   TTask.WaitForAll(  ….  );

Fui dar uma olhando como foi implementado o método – observei que é feito uma chamada para uma camada de TEvent que é implementado nas chamadas internas da rotina. Por traz da mecânica com TEvent é feito uso de  WaitForSingleObject – que é uma camada de acesso a biblioteca do windows.

A alteração não é trivial. O primeiro problema é como reescrever o método considerando que o array passado como parametro é um   .. AArray: array of ITask… qualquer deslize no seu uso vai provocar um incremento no contador RefCount da interface e pode levar a perda de controle no autofree do processo…
Para não causar um incremento do RefCount é preciso fazer uso da instrução  [unsafe] o que foi feito através de um “wrapper” para um record marcado para não incrementar o RefCount.

Contornado a questão de referência, o próximo obstáculo é encontrar um mecanismo que permita parar o processamento sem congelar a janela…

Depois de várias tentativas a solução encontrada foi o “infamous” application.processmessage. Esta não é uma boa opção, já que  mantém o processador em atividade,  quando o ideal seria encontrar um modelo que não fizesse uso do processador enquanto esta atualizando a janela principal.

Primeiramente foi criado um Class Helper para o TTask:

 


Type
  TTaskHelper = class helper for TTask
  private type
    TUnsafeTaskEx = record
    private
      [Unsafe]
      // preciso de um record UNSAFE para nao incrementar o RefCount da Interface
      FTask: TTask;
    public
      property Value: TTask read FTask write FTask;
    end;
  public
    class function WaitForAllEx(AArray: Array of ITask;
    ATimeOut: int64 = INFINITE): boolean;
  end;

Versão 1. Implementando o método:

class function TTaskHelper.WaitForAllEx(AArray: array of ITask;
ATimeOut: int64 = INFINITE): boolean;
var
  task: TUnsafeTaskEx;
  i: integer;
  taskInter: TArray<TUnsafeTaskEx>;
  completou: boolean;
  Canceled, Exceptions: boolean;
begin
  Canceled := false;
  Exceptions := false;
  result := true;
  try
    for i := low(AArray) to High(AArray) do
    begin
      task.Value := TTask(AArray[i]);
      if task.Value = nil then
        raise EArgumentNilException.Create('Wait Nil Task');

      completou := task.Value.IsComplete;
      if not completou then
      begin
        taskInter := taskInter + [task];
      end
      else
      begin
        if task.Value.HasExceptions then
          Exceptions := true
        else if task.Value.IsCanceled then
          Canceled := true;
      end;
    end;

    try
      for task in taskInter do
      begin
        while not task.Value.IsComplete do
        begin
          try
            TThread.Queue(nil,
              procedure
              begin
                application.ProcessMessages;
              end);
          finally
          end;
        end;
        if task.Value.IsComplete then
        begin
          if task.Value.HasExceptions then
            Exceptions := true
          else if task.Value.IsCanceled then
            Canceled := true;
        end;
      end;
    finally
    end;
  except
    result := false;
  end;

  if (not Exceptions and not Canceled) then
    Exit;
  if Exceptions or Canceled then
    raise EOperationCancelled.Create
      ('One Or More Tasks HasExceptions/Canceled');

end;

Versão 2. Revisando o código para um uso mais eficiente com MsgWaitForMultipleObjectsEx:

class function TTaskHelper.WaitForAllEx(AArray: array of ITask;
ATimeOut: int64 = INFINITE): boolean;
var
  FEvent: TEvent;
  task: TUnsafeTaskEx;
  i: integer;
  taskInter: TArray<TUnsafeTaskEx>;
  completou: boolean;
  Canceled, Exceptions: boolean;
  ProcCompleted: TProc<ITask>;
  LHandle: THandle;
  LStop: TStopwatch;
begin
  LStop := TStopwatch.StartNew;
  ProcCompleted := procedure(ATask: ITask)
    begin
      FEvent.SetEvent;
    end;

  Canceled := false;
  Exceptions := false;
  result := true;
  try
    for i := low(AArray) to High(AArray) do
    begin
      task.Value := TTask(AArray[i]);
      if task.Value = nil then
        raise EArgumentNilException.Create('Wait Nil Task');

      completou := task.Value.IsComplete;
      if not completou then
      begin
        taskInter := taskInter + [task];
      end
      else
      begin
        if task.Value.HasExceptions then
          Exceptions := true
        else if task.Value.IsCanceled then
          Canceled := true;
      end;
    end;

    try
      FEvent := TEvent.Create();
      for task in taskInter do
      begin
        try
          FEvent.ResetEvent;
          if LStop.ElapsedMilliseconds > ATimeOut then
            break;
          LHandle := FEvent.Handle;
          task.Value.AddCompleteEvent(ProcCompleted);
          while not task.Value.IsComplete do
          begin
            try
              if LStop.ElapsedMilliseconds > ATimeOut then
                break;
                  if MsgWaitForMultipleObjectsEx(1, LHandle,
                    ATimeOut - LStop.ElapsedMilliseconds, QS_ALLINPUT, 0)
                    = WAIT_OBJECT_0 + 1 then
                    application.ProcessMessages;
            finally
            end;
          end;
          if task.Value.IsComplete then
          begin
            if task.Value.HasExceptions then
              Exceptions := true
            else if task.Value.IsCanceled then
              Canceled := true;
          end;
        finally
          task.Value.removeCompleteEvent(ProcCompleted);

        end;
      end;
    finally
      FEvent.Free;
    end;
  except
    result := false;
  end;

  if (not Exceptions and not Canceled) then
    Exit;
  if Exceptions or Canceled then
    raise EOperationCancelled.Create
      ('One Or More Tasks HasExceptions/Canceled');

end;

Reescrevendo o Exemplo:  Dia11_Threading_TParallel

 

Este é um comportamento quando o SO é windows. Em outras plataformas o resultado poderá ser outro.

 

 

Share Button

[usa LogEvents]
Tenho uma quantidade de produtos relativamente grande que requer processamento de custos de produção envolvendo custo de matérias primas, mão-de-obra e outros custos vinculado a célula de produção.

A modelagem prevê que uma ficha de produção pode conter outras fichas formando uma lista de dependências dos processos o que gera processamento recursivo de dependências.

Como se pode imaginar, não é um processamento sequenciado tão simples e pode ser demorado em face a profundidade da arvore de dependência que um produto pode exigir.

Então repensando os processos, o desafio passou exigir processamento em paralelo das fichas de tal forma que fosse possível processar uma quantidade de produtos ao mesmo tempo e aproveitando melhor os recursos da máquina;

Neste cenário, saber qual o estágio de processamento de cada ficha e o onde se encontra o cálculo passou a ser requisito de interação com usuário;

Para executar vamos utilizar da biblioteca de processamento em paralelo do Delphi (introduzido no XE7, no exemplo usamos Berlin).

Passos:

  • Isolar as conexões de banco de dados para trata-las individualmente por Task;
  • Criar infraestrutura de comunicação entre o processamento e feedback com usuário;
  • Tratar a sincronização de informações geradas pelas várias TTasks em andamento informando a janela de progresso do usuário;
imagem_janela
Tendo em mente que o controle possa ser utilizado em outras aplicações, o uso de um procedimento ANONIMOUS me parece bastante resistente a diversidade de códigos a que poderá vir a ser utilizado.
Veja como ficou o exemplo de execução:

procedure TForm8.Button1Click(Sender: TObject);
var
  LProgr: IProgressEvents;
  i: integer;
begin
  // inicializa a janela de progresso
  LProgr := TProgressEvents.new;
  LProgr.max := 100;  // opcional: marca o número máximo itens
  LProgr.MaxThreads := SpinEdit1.Value ;  // indica o número máximo de threads em paralelo
  LProgr.CanCancel := true;    :// marca se pode cancelar a operação

  for i := 1 to 100 do
  begin   // loop de demonstração - simulando uma lista de processos
    LProgr.Text := 'Produto: ' + intToStr(i);   // texto livre

    // onde as coisas acontecem.....
    // adiciona o processo a ser executado e aponta o método anonimous as ser executado pela TTask
    LProgr.add(i, 'Produto: ' + intToStr(i),    // processo a executar
      procedure(x: integer)
      var
        n: integer;
        msg: string;
      begin
        msg := 'Produto: ' + intToStr(x);    // processo em execução
        LogEvents.DoProgress(self, 0, etStarting, msg);  // notifica que o processo foi iniciado
        n := Random(10000);
        
        sleep(n);
        LogEvents.DoProgress(self, 0, etWorking, msg); // notifica que esta em execução
        // executa o código de calculo ... aqui...
        n := Random(10000);
        if LProgr.Terminated then exit;    // checa se o usuario cancelou a operação
        sleep(n);
      end);
    if LProgr.Terminated then
      break;
  end;
  LogEvents.DoProgress(self, 0, etAllFinished, '');  // sinaliza que todas os processo foram completados.
end;


Código fonte com o Exemplo e classes que implementam a janela de monitoramento do progresso de cada thread.

Share Button

Quando estamos rodando um código em um processo paralelo e internamente a Thread encontra pela frente uma EXCEPTION nada é apresentado para o usuário. Isto ocorre porque a Thread não tem como notificar a Thread Principal (do app) para mostrar a exceção ao usuário. Com isto não há um expediente para mostrar a exceção na thread principal.  escreve sobre o tema em seu blog Rob’s Technology Corner.

Robert propõe a rotina que gera erro para contextuar o problema:


procedure TForm5.Button1Click(Sender: TObject);
begin
  Button1.Enabled := False;
  SlowProc;
end;

procedure TForm5.FormDestroy(Sender: TObject);
begin
  Task.Cancel;
end;

procedure TForm5.SlowProc;
begin
 Task := TTask.Create( procedure
                var
                   I : Integer;
                begin
                  for I := 0 to 9 do
                  begin
                     if TTask.CurrentTask.Status = TTaskStatus.Canceled then
                        exit;
                     Sleep(1000);
                     if I = 2 then
                        raise EProgrammerNotFound.Create('Something bad just happened');
                  end;
                  if TTask.CurrentTask.Status <> TTaskStatus.Canceled then
                  begin
                    TThread.Queue(TThread.CurrentThread,
                    procedure
                    begin
                      if Assigned(ListBox1) then
                      begin
                        Listbox1.Items.Add('10 Seconds');
                        Button1.Enabled := True;
                      end;
                    end);
                 end;
              end);
 Task.Start;
end;

Executando o código é possível constatar que o procedimento levanta uma exceção e o usuário não recebe a informação de erro.

Stefan Glienke observando o que escreve Robert, propõe uma alteração em TTask para permitir tratar as exceções transparentes para o usuário e mais simples na implementação.
Glienke empresta de .NET uma implementação de Task.ContinueWith que permite continuar a execução após a ocorrência da exceção, veja como ficou.

unit ThreadingEx;
 
interface
 
uses
  SysUtils,
  Threading;
 
type
  TAction<T> = reference to procedure(const arg: T);
 
  TTaskContinuationOptions = (
    NotOnCompleted,
    NotOnFaulted,
    NotOnCanceled,
    OnlyOnCompleted,
    OnlyOnFaulted,
    OnlyOnCanceled
  );
 
  ITaskEx = interface(ITask)
    ['{3AE1A614-27AA-4B5A-BC50-42483650E20D}']
    function GetExceptObj: Exception;
    function GetStatus: TTaskStatus;
    function ContinueWith(const continuationAction: TAction<ITaskEx>;
      continuationOptions: TTaskContinuationOptions): ITaskEx;
 
    property ExceptObj: Exception read GetExceptObj;
    property Status: TTaskStatus read GetStatus;
  end;
 
  TTaskEx = class(TTask, ITaskEx)
  private
    fExceptObj: Exception;
    function GetExceptObj: Exception;
  protected
    function ContinueWith(const continuationAction: TAction<ITaskEx>;
      continuationOptions: TTaskContinuationOptions): ITaskEx;
  public
    destructor Destroy; override;
 
    class function Run(const action: TProc): ITaskEx; static;
  end;
 
implementation
 
uses
  Classes;
 
{ TTaskEx }
 
function TTaskEx.ContinueWith(const continuationAction: TAction<ITaskEx>;
  continuationOptions: TTaskContinuationOptions): ITaskEx;
begin
  Result := TTaskEx.Run(
    procedure
    var
      task: ITaskEx;
      doContinue: Boolean;
    begin
      task := Self;
      if not IsComplete then
        DoneEvent.WaitFor;
      fExceptObj := GetExceptionObject;
      case continuationOptions of
        NotOnCompleted:  doContinue := GetStatus <> TTaskStatus.Completed;
        NotOnFaulted:    doContinue := GetStatus <> TTaskStatus.Exception;
        NotOnCanceled:   doContinue := GetStatus <> TTaskStatus.Canceled;
        OnlyOnCompleted: doContinue := GetStatus = TTaskStatus.Completed;
        OnlyOnFaulted:   doContinue := GetStatus = TTaskStatus.Exception;
        OnlyOnCanceled:  doContinue := GetStatus = TTaskStatus.Canceled;
      else
        doContinue := False;
      end;
      if doContinue then
        continuationAction(task);
    end);
end;
 
destructor TTaskEx.Destroy;
begin
  fExceptObj.Free;
  inherited;
end;
 
function TTaskEx.GetExceptObj: Exception;
begin
  Result := fExceptObj;
end;
 
class function TTaskEx.Run(const action: TProc): ITaskEx;
var
  task: TTaskEx;
begin
  task := TTaskEx.Create(nil, TNotifyEvent(nil), action, TThreadPool.Default, nil);
  Result := task.Start as ITaskEx;
end;
 
end.

Como usar a nova implementação de TTask…

TTaskEx.Run(
  procedure
  begin
    Sleep(2000);
    raise EProgrammerNotFound.Create('whoops')
  end)
  .ContinueWith(
  procedure(const t: ITaskEx)
  begin
    TThread.Queue(nil,
      procedure
      begin
        ShowMessage(t.ExceptObj.Message);
      end);
  end, OnlyOnFaulted);

 

Ver PPL – TTask an example in how not to use

Share Button

Executar uma query em segundo plano (em paralelo) não …é difícil de fazer, o seu controle é que
pode ser mais complexo.
Para executar em segundo plano basta:

TThread.CreateAnonymousThread(procedure
begin
   ALQuery1.sql.Text := '....';
   ALQuery1.Open;
end).Start;

ou

TThread.CreateAnonymousThread(
   procedure
   var i:integer;
   begin
      for I := 0 to 10 do
      begin
         // faz alguma coisa...
         AlQuery1.execSQL;
      end;
    end).Start;

Um pensamento simplista é você criar uma conexão para cada QUERY em separado. Se você tem
um CONNECTION isolado para UMA QUERY, então é possível executá-la em paralelo dentro de
uma nova Thread;

Algumas idéias onde pode utilizar o processo em paralelo:
– quando precisa registrar um log em uma tabela;
– se for possível adiantar um SELECT que será utilizado mais a frente;
– se precisa rodar um loop que não tem dependência com os próximos passos da sequencia do
código;
– quando precisa fazer um somatório de dados na tabela para mostrar o seu valor na janela…. e
liberar o usuário para continuar fazendo outras coisas.
As vezes você pode por um SELECT em paralelo e disparar outra sequencia de código… e mais
adiante aguardar o primeiro SELECT concluir… para depois então continuar…. Este é assunto para
outro POST para tratar de TTASK.