Бакнелл Джулиан М.
Шрифт:
Листинг 12.20. Классы производителя и потребителя
type
TProducer * class(TThread) private
FBuffers : TQueuedBuffers;
FStream : TStream;
FSyncObj : TtdProduceManyConsumeSync;
protected
procedure Execute; override;
public
constructor Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers);
end;
constructor TProducer.Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers);
begin
inherited Create (true);
FStream := aStream;
FSyncObj := aSyncObj;
FBuffers := aBuffers;
end;
procedure TProducer.Execute;
var
Tail : PBuffer;
begin
{выполнять до тех nop, пока поток не будет исчерпан...}
repeat
{передать сигнал о готовности к началу генерации данных}
FSyncObj.StartProducing;
{выполнить считывание блока из потока в конечный буфер очереди}
Tail := FBuffers.Tail;
Tail74.bCount := FStream.Read (Tail^.ЬВ1оск, 1024);
{переместить указатель конца очереди}
FBuffers.AdvanceTail;
{передать сигнал о прекращении генерации данных}
FSyncObj.StopProducing;
until (Tail^.bCount = 0);
end;
type
TConsumer = class (TThread) private
FBuffers : TQueuedBuffers;
FID : integer;
FStream : TStream;
FSyncObj : TtdProduceManyConsumeSync;
protected
procedure Execute; override;
public
constructor Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers;
alD : integer);
end;
constructor TConsumer.Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers;
alD : integer);
begin
inherited Create (true);
FStream := aStream;
FSyncObj := aSyncObj;
FBuffers := aBuffers;
FID := alD;
end;
procedure TConsumer.Execute;
var
Head : PBuffer;
begin
{передать сигнал о готовности к началу потребления данных}
FSyncObj.StartConsuming(FID);
{выполнить считывание начального буфера очереди}
Head := FBuffers.Head[FID];
{до тех пор, пока начальный буфер не пуст...}
while (Head^.bCount <> 0) do
begin
{выполнить запись блока из начального буфера очереди в поток}
FStream.Write(Head^.bBlock, Head^.bCount);
{переместить указатель начала очереди}
FBuffers.AdvanceHead(FID);
{обработка этого буфера завершена}
FSyncObj.StopConsuming(FID);
{передать сигнал о повторной готовности к началу потребления данных}
FSyncObj.StartConsuming(FID);
{выполнить считывание начального буфера очереди}
Head := FBuffers.Head[FID];
end;
{обработка последнего буфера завершена}
FSyncObj.StopConsuming(FID);
end;
И, наконец, рассмотрим подпрограмму копирования потоков, код которой показан в листинге 12.21.
Листинг 12.21. Копирование потоков с применением модели "производитель-потребитель"
procedure ThreadedMultiCopyStream(aSrcStream : TStream;
aDestCount : integer;
aDestStreams : PStreamArray);
var
i : integer;
SyncObj : TtdProduceManyConsumeSync;
Buffers : TQueuedBuffers;
Producer : TProducer;
Consumer : array [0..pred(MaxConsumers) ] of TConsumer;
WaitArray : array [0..MaxConsumers] of THandle;
begin
SyncObj nil;
Buffers nil;
Producer :=nil;
for i := 0 to pred(MaxConsumers) do
Consumer[i] := nil;
for i := 0 to MaxConsumers do
WaitArray[i] := 0;
try
{создать объект синхронизации}
SyncObj : * TtdProduceManyConsumeSync.Create(20, aDestCount);
{создать объект буфера с очередью}
Buffers := TQueuedBuffers.Create(20, aDestCount);
{создать поток производителя и сохранить его дескриптор}
Producer := TProducer.Create(aSrcStream, SyncObj, Buffers);