unit BambooThread.PoolLite;
interface
uses
System.SysUtils;
type
TInterface_BambooThreadPoolLite = interface
function Runing: Boolean;
procedure ClearProc;
procedure ThreadStart(aProc: TProc);
end;
function Create_BambooThreadPool(aMaxThreadCount: Byte): TInterface_BambooThreadPoolLite;
implementation
uses
System.Classes,
System.Generics.Collections;
type
TBambooThreadPoolLite = class;
TCacheThread = class(TThread)
private
FThreadPool: TBambooThreadPoolLite;
FProc: TProc;
protected
procedure Execute; override;
procedure Do_Resume;
public
constructor Create(aThreadPool: TBambooThreadPoolLite);
end;
TBambooThreadPoolLite = class(TInterfacedObject, TInterface_BambooThreadPoolLite)
private type
TItem_Proc = record
FProc: TProc;
end;
private
FLock: TObject;
FQueue_Thread: TQueue<TCacheThread>;
FQueue_Proc: TQueue<TItem_Proc>;
FUsedCount: Integer;
FMaxThreadCount: Byte;
FCreateCount: Integer;
private
function TryDequeueProc(var aProc: TProc): Boolean;
function TryDequeueThread(var aThread: TCacheThread): Boolean;
procedure Do_Proc_Queue;
function Internal_Get_Thread: TCacheThread;
function Internal_Do_Start_Queue(aForceThread: Boolean): Boolean;
private
function Runing: Boolean;
procedure ClearProc;
procedure ThreadStart(aProc: TProc);
public
constructor Create(aMaxThreadCount: Byte);
destructor Destroy; override;
end;
function Create_BambooThreadPool(aMaxThreadCount: Byte): TInterface_BambooThreadPoolLite;
begin
Result := TBambooThreadPoolLite.Create(aMaxThreadCount);
end;
{ TCacheThread }
constructor TCacheThread.Create(aThreadPool: TBambooThreadPoolLite);
begin
inherited Create(True);
FThreadPool := aThreadPool;
FreeOnTerminate := False;
end;
procedure TCacheThread.Do_Resume;
begin
while Suspended do
Suspended := False;
end;
procedure TCacheThread.Execute;
begin
while not Terminated do
begin
try
if Assigned(FProc) then
FProc;
except
end;
FProc := nil;
with FThreadPool do
begin
TMonitor.Enter(FLock);
Internal_Do_Start_Queue(True);
FQueue_Thread.Enqueue(Self);
Dec(FUsedCount);
TMonitor.Exit(FLock);
end;
Suspended := True;
end;
end;
{ TBambooThreadPoolLite }
procedure TBambooThreadPoolLite.ClearProc;
begin
TMonitor.Enter(FLock);
FQueue_Proc.Clear;
TMonitor.Exit(FLock);
end;
constructor TBambooThreadPoolLite.Create(aMaxThreadCount: Byte);
begin
inherited Create;
FMaxThreadCount := aMaxThreadCount;
FLock := TObject.Create;
FQueue_Thread := TQueue<TCacheThread>.Create;
FQueue_Proc := TQueue<TItem_Proc>.Create;
FMaxThreadCount := aMaxThreadCount;
if FMaxThreadCount = 0 then
FMaxThreadCount := 1;
end;
destructor TBambooThreadPoolLite.Destroy;
var
aThread: TCacheThread;
begin
while Runing do
Sleep(10);
TMonitor.Enter(FLock);
try
while TryDequeueThread(aThread) do
begin
while not aThread.Suspended do
Sleep(0);
aThread.Terminate;
aThread.Do_Resume;
aThread.WaitFor;
aThread.Free;
end;
FQueue_Thread.Free;
FQueue_Proc.Free;
inherited Destroy;
finally
TMonitor.Exit(FLock);
FLock.Free;
end;
end;
procedure TBambooThreadPoolLite.Do_Proc_Queue;
var
aHaveNew: Boolean;
aProc: TProc;
begin
repeat
TMonitor.Enter(FLock);
aHaveNew := TryDequeueProc(aProc);
TMonitor.Exit(FLock);
if aHaveNew then
try
aProc;
except
end;
until not aHaveNew;
end;
function TBambooThreadPoolLite.Internal_Do_Start_Queue(aForceThread: Boolean): Boolean;
var
aThread: TCacheThread;
begin
Result := (FQueue_Proc.Count > 0) and (aForceThread or (FUsedCount < FMaxThreadCount));
if Result then
begin
Inc(FUsedCount);
aThread := Internal_Get_Thread;
aThread.FProc := Do_Proc_Queue;
aThread.Do_Resume;
end;
end;
function TBambooThreadPoolLite.Internal_Get_Thread: TCacheThread;
begin
if not TryDequeueThread(Result) then
begin
Result := TCacheThread.Create(Self);
Inc(FCreateCount);
end;
while not Result.Suspended do
Sleep(0);
end;
function TBambooThreadPoolLite.Runing: Boolean;
begin
TMonitor.Enter(FLock);
Result := FCreateCount > FQueue_Thread.Count;
TMonitor.Exit(FLock);
end;
procedure TBambooThreadPoolLite.ThreadStart(aProc: TProc);
var
aItem_Proc: TItem_Proc;
begin
if not Assigned(aProc) then
Exit;
TMonitor.Enter(FLock);
aItem_Proc.FProc := aProc;
FQueue_Proc.Enqueue(aItem_Proc);
Internal_Do_Start_Queue(False);
TMonitor.Exit(FLock);
end;
function TBambooThreadPoolLite.TryDequeueProc(var aProc: TProc): Boolean;
var
aItem_Proc: TItem_Proc;
begin
Result := FQueue_Proc.Count > 0;
if Result then
begin
aItem_Proc := FQueue_Proc.Dequeue;
aProc := aItem_Proc.FProc;
end;
end;
function TBambooThreadPoolLite.TryDequeueThread(var aThread: TCacheThread): Boolean;
begin
Result := FQueue_Thread.Count > 0;
if Result then
aThread := FQueue_Thread.Dequeue;
end;
end.
interface
uses
System.SysUtils;
type
TInterface_BambooThreadPoolLite = interface
function Runing: Boolean;
procedure ClearProc;
procedure ThreadStart(aProc: TProc);
end;
function Create_BambooThreadPool(aMaxThreadCount: Byte): TInterface_BambooThreadPoolLite;
implementation
uses
System.Classes,
System.Generics.Collections;
type
TBambooThreadPoolLite = class;
TCacheThread = class(TThread)
private
FThreadPool: TBambooThreadPoolLite;
FProc: TProc;
protected
procedure Execute; override;
procedure Do_Resume;
public
constructor Create(aThreadPool: TBambooThreadPoolLite);
end;
TBambooThreadPoolLite = class(TInterfacedObject, TInterface_BambooThreadPoolLite)
private type
TItem_Proc = record
FProc: TProc;
end;
private
FLock: TObject;
FQueue_Thread: TQueue<TCacheThread>;
FQueue_Proc: TQueue<TItem_Proc>;
FUsedCount: Integer;
FMaxThreadCount: Byte;
FCreateCount: Integer;
private
function TryDequeueProc(var aProc: TProc): Boolean;
function TryDequeueThread(var aThread: TCacheThread): Boolean;
procedure Do_Proc_Queue;
function Internal_Get_Thread: TCacheThread;
function Internal_Do_Start_Queue(aForceThread: Boolean): Boolean;
private
function Runing: Boolean;
procedure ClearProc;
procedure ThreadStart(aProc: TProc);
public
constructor Create(aMaxThreadCount: Byte);
destructor Destroy; override;
end;
function Create_BambooThreadPool(aMaxThreadCount: Byte): TInterface_BambooThreadPoolLite;
begin
Result := TBambooThreadPoolLite.Create(aMaxThreadCount);
end;
{ TCacheThread }
constructor TCacheThread.Create(aThreadPool: TBambooThreadPoolLite);
begin
inherited Create(True);
FThreadPool := aThreadPool;
FreeOnTerminate := False;
end;
procedure TCacheThread.Do_Resume;
begin
while Suspended do
Suspended := False;
end;
procedure TCacheThread.Execute;
begin
while not Terminated do
begin
try
if Assigned(FProc) then
FProc;
except
end;
FProc := nil;
with FThreadPool do
begin
TMonitor.Enter(FLock);
Internal_Do_Start_Queue(True);
FQueue_Thread.Enqueue(Self);
Dec(FUsedCount);
TMonitor.Exit(FLock);
end;
Suspended := True;
end;
end;
{ TBambooThreadPoolLite }
procedure TBambooThreadPoolLite.ClearProc;
begin
TMonitor.Enter(FLock);
FQueue_Proc.Clear;
TMonitor.Exit(FLock);
end;
constructor TBambooThreadPoolLite.Create(aMaxThreadCount: Byte);
begin
inherited Create;
FMaxThreadCount := aMaxThreadCount;
FLock := TObject.Create;
FQueue_Thread := TQueue<TCacheThread>.Create;
FQueue_Proc := TQueue<TItem_Proc>.Create;
FMaxThreadCount := aMaxThreadCount;
if FMaxThreadCount = 0 then
FMaxThreadCount := 1;
end;
destructor TBambooThreadPoolLite.Destroy;
var
aThread: TCacheThread;
begin
while Runing do
Sleep(10);
TMonitor.Enter(FLock);
try
while TryDequeueThread(aThread) do
begin
while not aThread.Suspended do
Sleep(0);
aThread.Terminate;
aThread.Do_Resume;
aThread.WaitFor;
aThread.Free;
end;
FQueue_Thread.Free;
FQueue_Proc.Free;
inherited Destroy;
finally
TMonitor.Exit(FLock);
FLock.Free;
end;
end;
procedure TBambooThreadPoolLite.Do_Proc_Queue;
var
aHaveNew: Boolean;
aProc: TProc;
begin
repeat
TMonitor.Enter(FLock);
aHaveNew := TryDequeueProc(aProc);
TMonitor.Exit(FLock);
if aHaveNew then
try
aProc;
except
end;
until not aHaveNew;
end;
function TBambooThreadPoolLite.Internal_Do_Start_Queue(aForceThread: Boolean): Boolean;
var
aThread: TCacheThread;
begin
Result := (FQueue_Proc.Count > 0) and (aForceThread or (FUsedCount < FMaxThreadCount));
if Result then
begin
Inc(FUsedCount);
aThread := Internal_Get_Thread;
aThread.FProc := Do_Proc_Queue;
aThread.Do_Resume;
end;
end;
function TBambooThreadPoolLite.Internal_Get_Thread: TCacheThread;
begin
if not TryDequeueThread(Result) then
begin
Result := TCacheThread.Create(Self);
Inc(FCreateCount);
end;
while not Result.Suspended do
Sleep(0);
end;
function TBambooThreadPoolLite.Runing: Boolean;
begin
TMonitor.Enter(FLock);
Result := FCreateCount > FQueue_Thread.Count;
TMonitor.Exit(FLock);
end;
procedure TBambooThreadPoolLite.ThreadStart(aProc: TProc);
var
aItem_Proc: TItem_Proc;
begin
if not Assigned(aProc) then
Exit;
TMonitor.Enter(FLock);
aItem_Proc.FProc := aProc;
FQueue_Proc.Enqueue(aItem_Proc);
Internal_Do_Start_Queue(False);
TMonitor.Exit(FLock);
end;
function TBambooThreadPoolLite.TryDequeueProc(var aProc: TProc): Boolean;
var
aItem_Proc: TItem_Proc;
begin
Result := FQueue_Proc.Count > 0;
if Result then
begin
aItem_Proc := FQueue_Proc.Dequeue;
aProc := aItem_Proc.FProc;
end;
end;
function TBambooThreadPoolLite.TryDequeueThread(var aThread: TCacheThread): Boolean;
begin
Result := FQueue_Thread.Count > 0;
if Result then
aThread := FQueue_Thread.Dequeue;
end;
end.