//****************************************************************************** //线程池 //****************************************************************************** //作者:Cai //日期:2011-3-10 //****************************************************************************** unit ThreadPoolClass;
interface uses Windows, Classes, SyncObjectClass;
type TThreadPool = class;
TOnTerminateTask = procedure (Sender: TObject) of object; TTaskObject = class(TObject) private FOwner: TThread; FOnTerminateTask: TOnTerminateTask; FThreadID: Cardinal; FTaskID : Cardinal; procedure SetOnTerminateTask(const Value: TOnTerminateTask); protected procedure Execute();virtual; abstract; procedure WaitFor(iTimeOut: Cardinal);virtual; procedure Terminate; public constructor Create();virtual; destructor Destroy();override; procedure Synchronize(AMethod: TThreadMethod); property ThreadID:Cardinal read FThreadID; property TaskID:Cardinal read FTaskID; property OnTerminateTask: TOnTerminateTask read FOnTerminateTask write SetOnTerminateTask; end;
TThreadPolicyInt = 0..6;
TOnTerminatingTask = procedure(Sender: TObject; TaskObject: TTaskObject) of object;
TThreadPool = class(TObject) private FCriticalSectionLocker: TCriticalSectionLocker; FThreadList: TList; FTaskObjectList: TList; FThreadMaxNum: Integer; FOnTerminatingTask: TOnTerminatingTask; FThreadPriority: TThreadPolicyInt; FNextTaskID: Cardinal;// 可记录已处理的任务数 procedure SetThreadMaxNum(const Value: Integer); procedure SetOnTerminatingTask(const Value: TOnTerminatingTask); procedure SetThreadPriority(const Value: TThreadPolicyInt); protected function GetIdelThreadNum(): Integer; function WakeUpThreads(iNum:Integer): Integer; procedure GetTaskFromList(var TaskObject: TTaskObject; bPop:Boolean=True); procedure AddTaskToList(TaskObject: TTaskObject); procedure DeleteTaskFromList(TaskObject: TTaskObject); procedure ClearTaskList(); procedure ClearThreadList(); procedure ClearList(); public constructor Create();virtual; destructor Destroy();override; procedure AddTask(TaskObject: TTaskObject); procedure KillTask(TaskObject: TTaskObject); procedure Clear(); procedure WaitFor(iTimeOut:Cardinal);virtual; function IsThreadDone():Boolean; property ThreadMaxNum: Integer read FThreadMaxNum write SetThreadMaxNum; property ThreadPriority: TThreadPolicyInt read FThreadPriority write SetThreadPriority; property OnTerminatingTask: TOnTerminatingTask read FOnTerminatingTask write SetOnTerminatingTask; end;
implementation
type TTaskStatus = (tsRunning, {tsSuspend, tsWillTerminate, }tsTerminating, tsTerminated, tsDestroying);
TThreadItem = class(TThread) private FCriticalSectionLocker: TCriticalSectionLocker; FOwner: TThreadPool; FTaskStatus: TTaskStatus; // FNextTaskStatus: TTaskStatus; FCurTaskObject: TTaskObject; procedure SetOwner(const Value: TThreadPool); protected procedure Execute();override; procedure SetTaskStatus(TaskStatus: TTaskStatus); public constructor Create();overload; virtual; destructor Destroy();override; property Owner: TThreadPool read FOwner write SetOwner; end;
{ TThreadPool }
constructor TThreadPool.Create; begin FCriticalSectionLocker:= TCriticalSectionLocker.Create; FThreadList:=TList.Create; FTaskObjectList:=TList.Create; FThreadMaxNum := 3; FThreadPriority := 4; end;
destructor TThreadPool.Destroy; begin ClearList(); FThreadList.Destroy; FThreadList := nil; FTaskObjectList.Destroy; FTaskObjectList := nil; FCriticalSectionLocker.Destroy; inherited; end;
procedure TThreadPool.KillTask(TaskObject: TTaskObject); begin DeleteTaskFromList(TaskObject); end;
procedure TThreadPool.SetThreadMaxNum(const Value: Integer); begin FThreadMaxNum := Value; end;
procedure TThreadPool.AddTask(TaskObject: TTaskObject); begin AddTaskToList(TaskObject); end;
procedure TThreadPool.AddTaskToList(TaskObject: TTaskObject); var ThreadItem: TThreadItem; begin if not FCriticalSectionLocker.EnterLocker() then Exit; try if FTaskObjectList.IndexOf(TaskObject)>=0 then Exit; FTaskObjectList.Add(TaskObject); TaskObject.FTaskID := FNextTaskID; Inc(FNextTaskID); //检查线程数是否足够 //====================================== if WakeUpThreads(1)=0 then//没有线程被唤醒 if FThreadList.Count < FThreadMaxNum then begin ThreadItem:= TThreadItem.Create(); ThreadItem.Priority := TThreadPriority(FThreadPriority); FThreadList.Add(ThreadItem); ThreadItem.FOwner := Self; WakeUpThreads(1); end; finally FCriticalSectionLocker.LeaveLocker(); end; end;
procedure TThreadPool.GetTaskFromList(var TaskObject: TTaskObject; bPop:Boolean=True); begin TaskObject := nil; if not FCriticalSectionLocker.EnterLocker() then Exit; try if FTaskObjectList.Count=0 then Exit; TaskObject := TTaskObject(FTaskObjectList.Items[0]); if bPop then FTaskObjectList.Delete(0); finally FCriticalSectionLocker.LeaveLocker(); end; end;
procedure TThreadPool.DeleteTaskFromList(TaskObject: TTaskObject); var iIndex: Integer; begin if not FCriticalSectionLocker.EnterLocker() then Exit; try if Assigned(TaskObject) then Exit; iIndex := FTaskObjectList.IndexOf(Pointer(TaskObject)); if iIndex = -1 then Exit; if TaskObject.FOwner=nil then Exit; if TThreadItem(TaskObject.FOwner).FTaskStatus<>tsTerminated then begin TaskObject.Terminate(); TaskObject.WaitFor(DWORD(-1)); end; FTaskObjectList.Delete(iIndex); finally FCriticalSectionLocker.LeaveLocker(); end; end;
procedure TThreadPool.SetOnTerminatingTask( const Value: TOnTerminatingTask); begin FOnTerminatingTask := Value; end;
function TThreadPool.GetIdelThreadNum: Integer; var I: Integer; begin Result := 0; if FThreadList.Count>0 then for I:=0 to FThreadList.Count-1 do begin if TThread(FThreadList.Items[I]).Suspended then Inc(Result); end; end;
function TThreadPool.WakeUpThreads(iNum: Integer): Integer; var I: Integer; begin Result := 0; if FThreadList.Count>0 then for I:=0 to FThreadList.Count-1 do begin if TThread(FThreadList.Items[I]).Suspended then TThread(FThreadList.Items[I]).Resume; end; end;
procedure TThreadPool.ClearList; begin ClearTaskList(); ClearThreadList(); end;
procedure TThreadPool.ClearTaskList; var I: Integer; begin //if not FCriticalSectionLocker.EnterLocker() then Exit; //try if FTaskObjectList.Count>0 then for I:=FTaskObjectList.Count-1 downto 0 do begin if TTaskObject(FTaskObjectList.Items[I])<>nil then if (TTaskObject(FTaskObjectList.Items[I]).FOwner<>nil) then begin TTaskObject(FTaskObjectList.Items[I]).Terminate(); TTaskObject(FTaskObjectList.Items[I]).WaitFor(DWORD(-1)); if (TTaskObject(FTaskObjectList.Items[I])<>nil) and Assigned(TTaskObject(FTaskObjectList.Items[I])) then TTaskObject(FTaskObjectList.Items[I]).FOwner := nil; end; //不能释放。。因为不是TThreadPool创建的资源 //TTaskObject(FTaskObjectList.Items[I]).Destroy; FTaskObjectList.Delete(I); end; //finally //FCriticalSectionLocker.LeaveLocker(); //end; end;
procedure TThreadPool.ClearThreadList; var I: Integer; begin if FThreadList.Count>0 then for I:=FThreadList.Count-1 downto 0 do begin if Assigned(TThreadItem(FThreadList.Items[I])) then begin if (TThreadItem(FThreadList.Items[I]).FCurTaskObject<>nil) then begin if (TThreadItem(FThreadList.Items[I]).FTaskStatus<>tsTerminated) then begin TThreadItem(FThreadList.Items[I]).FCurTaskObject.Terminate; TThreadItem(FThreadList.Items[I]).FCurTaskObject.WaitFor(DWORD(-1)); if (TThreadItem(FThreadList.Items[I]).FCurTaskObject <>nil) and Assigned(TThreadItem(FThreadList.Items[I]).FCurTaskObject) then TThreadItem(FThreadList.Items[I]).FCurTaskObject.FOwner := nil; end; end; TThreadItem(FThreadList.Items[I]).Free; end; FThreadList.Delete(I); end; end;
procedure TThreadPool.WaitFor(iTimeOut: Cardinal); var iFirst: Cardinal; begin iFirst := GetTickCount(); while (iTimeOut=DWORD(-1)) or ((GetTickCount()-iFirst)>=iTimeOut) do begin if IsThreadDone() then Break; Sleep(10); end; end;
function TThreadPool.IsThreadDone: Boolean; var I: Integer; begin Result := False; if not FCriticalSectionLocker.EnterLocker() then Exit; try //任务不为空时肯定没有完成,可立即返回False if (FTaskObjectList<>nil) and (FTaskObjectList.Count=0) then begin for I:=0 to FThreadList.Count-1 do if not (TThreadItem(FThreadList.Items[I]).Suspended or TThreadItem(FThreadList.Items[I]).Terminated) then Exit;//Suspended then Exit; Result := True; end; finally FCriticalSectionLocker.LeaveLocker(); end; end;
procedure TThreadPool.SetThreadPriority(const Value: TThreadPolicyInt); begin FThreadPriority := Value; end;
procedure TThreadPool.Clear(); begin ClearList(); FNextTaskID := 0; end;
{ TThreadItem }
constructor TThreadItem.Create(); begin FCriticalSectionLocker:= TCriticalSectionLocker.Create; Create(True); FTaskStatus:= tsTerminated; end;
destructor TThreadItem.Destroy; begin FCriticalSectionLocker.Destroy; FTaskStatus:= tsDestroying; inherited; end;
procedure TThreadItem.Execute; var TaskObject: TTaskObject; begin inherited; while not Self.Terminated do begin //申请任务 if FOwner=nil then Break; FOwner.GetTaskFromList(TaskObject); //无任务。挂起等待Pool唤醒 if TaskObject=nil then begin Self.Suspend; Continue;//保证唤醒后重新申请任务 end; //绑定任务与当前线程 TaskObject.FOwner := Self; TaskObject.FThreadID := Self.ThreadID; FCurTaskObject := TaskObject; Self.SetTaskStatus(tsRunning); //执行任务 TaskObject.Execute(); Self.SetTaskStatus(tsTerminating); Self.SetTaskStatus(tsTerminated); if Assigned(TaskObject.FOnTerminateTask) then TaskObject.FOnTerminateTask(TaskObject); //解除当前绑定关系 FOwner.DeleteTaskFromList(TaskObject); TaskObject.FOwner := nil; FCurTaskObject := nil; if Assigned(FOwner.FOnTerminatingTask) then FOwner.FOnTerminatingTask(FOwner, TaskObject); end; //不释放线程时,挂起,保留线程资源 if FTaskStatus<>tsDestroying then Self.Suspended := True; end;
procedure TThreadItem.SetOwner(const Value: TThreadPool); begin FOwner := Value; end;
procedure TThreadItem.SetTaskStatus(TaskStatus: TTaskStatus); begin if not Assigned(Self) or (not Assigned(FCriticalSectionLocker)) then begin if Self<>nil then ; Exit; end; if not FCriticalSectionLocker.EnterLocker() then Exit; try FTaskStatus := TaskStatus; finally FCriticalSectionLocker.LeaveLocker; end; end;
{ TTaskObject }
constructor TTaskObject.Create; begin // end;
destructor TTaskObject.Destroy; begin Terminate(); WaitFor(DWORD(-1)); inherited; end;
procedure TTaskObject.SetOnTerminateTask(const Value: TOnTerminateTask); begin FOnTerminateTask := Value; end;
procedure TTaskObject.Synchronize(AMethod: TThreadMethod); begin TThread.Synchronize(Self.FOwner, AMethod); end;
procedure TTaskObject.Terminate; begin if FOwner<>nil then TThreadItem(FOwner).SetTaskStatus(tsTerminating); //if Assigned(FOnTerminatingTask) then FOnTerminatingTask(Self); //WaitFor(); end;
procedure TTaskObject.WaitFor(iTimeOut: Cardinal); var iFirst: Cardinal; begin iFirst := GetTickCount(); if Self=nil then Exit; if FOwner=nil then Exit; try while (Self<>nil) and (FOwner<>nil) and Assigned(FOwner) and (TThreadItem(FOwner).FTaskStatus<>tsTerminated) do begin if (GetTickCount()-iFirst)>=iTimeOut then Break; Sleep(5); end; except end; end;
end.
|
请发表评论