DELPHI 2007 下的 PARALLEL FOR 实现


Delphi XE7 之后 语法就加了 Parallel.For 用于多线程编程. 有一个第三方开源的库 OmniThreadLibrary (OTL) 也可以用 但是在 D2007 下由于没有 匿名函数和通用模板 一些OTL的高级语法就都不能用了. The AsyncCall 也是第三方开源的 库 支持 D2006到 XE2 但是也没有 Parallel.For 语法.

下面就简单在 D2007 下实现了 多线程 并行 FOR 语法. 虽然无法向 XE7之后 或者是OTL的语法那么的舒服 但是简单用一下还是挺方便的. PARALLEL FOR 能使多线程的代码变得相对简单直接一些.

Parallel.For一键傻瓜式的对于for循环可以并行/同时多线程处理,在很大程度上简化了代码.

首先 我们需要定义 传递参数 的类 用于保存需要用的的数据.

type
  TParallelParam = class
    public
      Data: Pointer;
      First: integer;
      Last: integer;

      constructor Create; overload;
      constructor Create(_Data: Pointer; _First, _Last: integer); overload;
      procedure SetValues(_First, _Last: integer); overload;
      procedure SetValues(_Data: Pointer; _First, _Last: integer); overload;
      destructor Destroy; override;
  end;

constructor TParallelParam.Create;
begin
  inherited Create;
end;

constructor TParallelParam.Create(_Data: Pointer; _First, _Last: integer);
begin
  inherited Create;
  Self.SetValues(_Data, _First, _Last);
end;

procedure TParallelParam.SetValues(_First, _Last: Integer);
begin
  Self.First := _First;
  Self.Last := _Last;
end;

procedure TParallelParam.SetValues(_Data: Pointer; _First, _Last: Integer);
begin
  Self.Data := _Data;
  Self.First := _First;
  Self.Last := _Last;
end;

destructor TParallelParam.Destroy;
begin
  inherited;
end;

TParallelParam 这个类就是数据的包装. 然后我们可以定义 并行调用过程的 声名.

type
  TParallelForFunc = procedure(Index: integer; const Param: TParallelParam);

对于每个并发的线程来说 index 索引值是不一样的. 所有需要用到的数据都可以 通过 指针(比如指向一个结构体) 来传递.

然后 我们封装一下 TThread 来传递这些数据.

type
  TWorkerThread = class(TThread)
    public
      _Method: TParallelForFunc;
      _Param: TParallelParam;
      _Index: integer;
      Done: boolean;

      constructor Create(CreateSuspended: Boolean); overload;
      destructor Destroy; override;
    protected
      procedure Execute; override;
  end;

procedure TWorkerThread.Execute;
begin
  try
    Self._Method(_Index, Self._Param);
  finally
    Self.Terminate;
    Self.Done := True; // 已经完成
  end;
end;

constructor TWorkerThread.Create(CreateSuspended: Boolean);
begin
  inherited Create(CreateSuspended);
  Self.Done := False;
end;

destructor TWorkerThread.Destroy;
begin
  inherited Destroy;
end;

有了这些定义 我们就可以实现这个并发的FOR语法 第一个参数就是并发的过程.

procedure TParallelFor(Method: TParallelForFunc; const Param: TParallelParam; First, Last: integer; ThreadNumber: integer);
var
  flags: array of boolean;
  threads: array of TWorkerThread;
  tnum, len, i, cnt, j: integer;
begin
  len := Last - First + 1;
  if (len <= 0) then
  begin
    Raise Exception.Create('TParallelFor.Len = 0');
  end;
  SetLength(flags, len);
  tnum := ThreadNumber;
  if (tnum < 2) then
  begin
    tnum := 2;
  end;
  SetLength(threads, tnum);
  i := 0;
  cnt := 0; // 完成的数目
  while (cnt < len) do  // 还没完成
  begin
    // 寻找下一个可用的线程
    for j := 0 to tnum - 1 do
    begin
      // 线程还没被创建 或者 已经完成
      if ((threads[j] = nil) or (threads[j].Done) or (threads[j].Terminated)) then
      begin
        // 获得下一个任务
        while (flags[i]) do
        begin                              
          i := (i + 1) mod len;
        end;
        // 标记为已经有线程在做了
        flags[i] := True;
        // 增加任务完成 +1
        Inc(cnt);
        // 开始
        if (threads[j] <> nil) then
        begin
          threads[j].Free;
          threads[j] := nil;
        end;
        if (threads[j] = nil) then
        begin
          threads[j] := TWorkerThread.Create(True);
          threads[j].FreeOnTerminate := False;
        end;
        with threads[j] do
        begin
          _Method := Method;
          _Param := Param;
          _Index := First + i;
          Priority := tpHigher;
          Resume; // 并发
        end;
        if (cnt >= len) then
        begin
          break;
        end;     
      end;
    end;
    Sleep(0); // 主线程 不停的等啊等
  end;
  // 剩余的线程需要完成 
  for i := 0 to tnum - 1 do
  begin
    if (threads[i] <> nil) then
    begin
      WaitForSingleObject(threads[i].Handle, INFINITE);
      threads[i].Free;
    end;
  end;
end;

使用用例

把上面的代码 放在一个独立 的单元中 然后这样使用

uses
  Parallel;

implementation

type
  _PAAB = ^_AAB;

procedure Parallel1(Arg: integer; const obj: TParallelParam);
var
  j, i, k: integer;
  map: _PAAB;
begin
  map := obj.Data;
  i := obj.First;
  j := obj.Last;
  if (i <= j) then
  begin
    for k := i to j do
    begin
      if (map^[Arg][k] = 0) then
      begin
        map^[Arg][k] := 2;
      end
      else
      begin
        break;
      end;
    end;
  end
  else
  begin
    for k := i downto j do
    begin
      if (map^[Arg][k] = 0) then
      begin
        map^[Arg][k] := 2;
      end
      else
      begin
        break;
      end;
    end;
  end;
end;

procedure TestParallel;
var
  map: _AAB;
begin
  SetLength(map, 1000, 1000);
  obj := TParallelParam.Create;
  try
    obj.Data := @map;
    obj.SetValues(0, High(map));
    TParallelFor(@Parallel1, obj, 0, High(map[0]), 8); // 8 线程
  finally
    obj.Free;
  end;
end;

end.

局限性

本来可以使用 AsyncCalls 库则可以相当简化线程的调用 AsyncCall(@fun, [parameters]) 但是 AsyncCall 使用了一个全局的线程池来管理线程 当在COM+DLL库中 当 单元被移除的时候 会发生死锁 dead lock.

相关英文技术贴

英文原贴: DELPHI 2007 下的 PARALLEL FOR 实现

聪明的程序员用Delphi

Delphi(VB Killer)是一种编程语言,它是由Borland公司于1995年推出的。Delphi是一种基于Pascal的高级编程语言,它是一种面向对象的编程语言,它是一种结构化编程语言,它是一种事件驱动的编程语言。也被称为Object Pascal。 本文一共 520 个汉字, 你数一下对不对.
DELPHI 2007 下的 PARALLEL FOR 实现. (AMP 移动加速版本)
上一篇: 把年假分成几周请完 - 工作不是全部
下一篇: 软件分享 - 用 SpeedFan 免费软件来查看系统温度

扫描二维码,分享本文到微信朋友圈
7f29ab1ac0fd5b010ad2705314157b41 DELPHI 2007 下的 PARALLEL FOR 实现 Delphi / Turbo Pascal 数据结构与算法 有意思的 程序设计 编程 计算机

评论