Читайте также:
|
|
Это очень универсальный и весьма эффективный способ взаимодействия параллельных потоков. Суть его состоит во введении посредника-буфера между параллельными потоками. Иногда этот способ называют взаимодействием с помощью обмена сообщениями. Поток-писатель, желающий послать другому потоку некоторые данные, записывает их в очередь. Поток-читатель ожидает появления данных в очереди, получает их, обрабатывает и, если нужно, посылает результат в очередь потока-писателя. Принципиальный недостаток асинхронного взаимодействия - необходимость буфера неопределенного размера. Этот недостаток можно сгладить, если ввести ограничение на размер очереди - в этом случае поток-писатель должен приостанавливаться, если очередь достигла своего максимального размера.
Важная проблема эффективности многопоточной обработки возникает в том случае, если поток должен обрабатывать несколько очередей входящих сообщений или несколько событий. Приведем цитату из [1]: "пассажиру, ожидающему автобуса номер 127 в общем случае придется ждать дольше, чем тому, кто может воспользоваться как 19-м, так и 127-м маршрутом автобуса, в зависимости от того, который из них раньше придет к остановке. В предположении что автобусы приходят в случайном порядке, у пассажира, имеющего выбор, время ожидания оказывается вдвое короче - парадоксально, но получается, что он ждет вдвое "быстрее"! Единственный способ достигнуть этого - ожидать именно первого из многих возможных событий; приобретение же более быстрого компьютера здесь не поможет".
Учитывая важность выбора первого из многих возможных событий, введем понятие альтернативного ожидания и дадим его практическое воплощение.
TGsvSelectMethod = procedure of object; TGsvSelect = classprivate FEvents: array[0..MAXIMUM_WAIT_OBJECTS - 1] of THandle; FMethods: array[0..MAXIMUM_WAIT_OBJECTS - 1] of TGsvSelectMethod; FCount: Cardinal; public constructor Create(aThread: TGsvThread); procedure Init; procedure Add(aEvent: THandle; aMethod: TGsvSelectMethod); function Wait(aTimeout: Cardinal = INFINITE): Boolean;end;Альтернатива могут быть добавлена к списку возможных альтернатив с помощью метода Add, который принимает 2 аргумента - дескриптор события и метод-обработчик. Альтернативы могут добавляться статически, на этапе создания объекта, а могут динамически, причем список альтернатив может быть различным в зависимости от различных условий. В языке Ada это делает оператор отбора. В Delphi мы можем воспользоваться оператором if или case. Список альтернатив инициализируется вызовом Init, а альтернативное ожидание - вызовом Wait. Если в процессе ожидания возникло событие, то будет вызван метод, связанный с данным событием и Wait вернет True, в противном случае Wait вернет False. Обработчик события - это обычный метод класса параллельного потока. Функция Wait принимает в качестве аргумента значение таймаута (в миллисекундах), если за время таймаута никаких событий не произошло, то Wait вернет False. Отметим важную деталь - в список ожидаемых событий неявно вносится событие FTerminationEvent, которое делает возможным форсированное завершение ожидания при завершении потока - в этом случае Wait также вернет False. Приведем пример кода, в котором параллельный поток создает набор альтернатив и выполняет ожидание. Предполагается, что в конструкторе объекта потока создан объект FSelect класса TGsvSelect:
Теперь рассмотрим более подробно реализацию класса TGsvSelect:
constructor TGsvSelect.Create(aThread: TGsvThread);begin inherited Create; FEvents[0]:= aThread.FTerminationEvent; FMethods[0]:= nil; FCount:= 1;end; procedure TGsvSelect.Init;begin FCount:= 1;end; procedure TGsvSelect.Add(aEvent: THandle; aMethod: TGsvSelectMethod);begin Assert(FCount <= High(FEvents)); FEvents[FCount]:= aEvent; FMethods[FCount]:= aMethod; Inc(FCount);end; function TGsvSelect.Wait(aTimeout: Cardinal): Boolean;var res, i: Cardinal;begin Result:= False; res:= WaitForMultipleObjects(FCount, @FEvents[0], False, aTimeout); if res < (WAIT_OBJECT_0 + FCount) then begin Result:= res > WAIT_OBJECT_0; if Result then begin i:= res - WAIT_OBJECT_0; if Assigned(FMethods[i]) then FMethods[i](); end; end;end;При создании объекта первым событием в списке становится завершающее событие. Процедура Add регистрирует событие - добавляет его в список (в массив) событий и добавляет в список методов связанный с этим событием метод. Функция Wait выполняет альтернативное ожидание по всем зарегистрированным событиям, включая завершающее событие. Следует отметить, что функция Wait может вернуть False не только при таймауте или при завершающем событии. Это может случиться также в том случае, когда в качестве события используется мьютекс, от которого отказался создавший его процесс. Но поскольку мы ограничились случаем взаимодействия потоков в рамках одного приложения (одного процесса операционной системы), то это событие у нас не возникнет никогда.
Теперь пора уточнить термин "событие", который мы использовали до этого момента без особого разъяснения. Под событием мы будем понимать изменение состояния объекта ядра операционной системы. В качестве событий можно использовать:
Кроме перечисленных событий операционная система Windows предоставляет еще несколько событий, но они доступны только в Windows NT. Особо можно еще отметить сокетные события, которые создаются функцией WSACreateEvent - эти события также можно использовать при альтернативных ожиданиях на сокетах.
Рассмотрим более подробно событие "Event":
TGsvEvent = classprivate FHandle: THandle; procedure SetState(aState: Boolean); public constructor Create; destructor Destroy; override; function Wait(aThread: TGsvThread; aTimeout: Cardinal = INFINITE): Boolean; property Handle: THandle read FHandle; property State: Boolean write SetState;end;Событие может находиться в двух состояниях - активном (сигнализирущее состояние) и сброшенном (несигнализирующее состояние). В активное состояние событие переводится установкой свойства State в True, а сбрасывается событие установкой State в False. Эти редкий тип свойства, которое доступно только для установки, но не доступно для получения. Событие автоматически сбрасывается при каком-либо успешном ожидании, после чего его нужно явно перевести в активное состояние. Событие может быть внутренним объектом потока, а может не принадлежать никакому потоку конкретно. Поток может ожидать только данного события с помощью метода Wait, либо использовать его дескриптор Handle для альтернативного ожидания. Приведем реализацию метода Wait:
function TGsvEvent.Wait(aThread: TGsvThread; aTimeout: Cardinal): Boolean;var objs: array[0..1] of THandle; cnt: Integer;begin objs[0]:= FHandle; cnt:= 1; if Assigned(aThread) then begin objs[1]:= aThread.FTerminationEvent; cnt:= 2; end; Result:= WaitForMultipleObjects(cnt, @objs[0], False, aTimeout) = WAIT_OBJECT_0;end;Если событие ожидает поток aThread, то кроме собственно события также будет ожидаться завершающее событие потока, которое необходимо для форсированного прекращения ожидания. Если поток не задан (aThread = nil), то метод Wait будет ожидать только своего события - это может пригодиться основному VCL-потоку.
Чтобы суммировать материал этого раздела, приведем реализацию очереди ограниченной длины, которую можно использовать для взаимодействия обменом сообщениями. Будем предполагать, что все сообщения являются объектами. Сообщения добавляются в конец очереди, а извлекаются из начала очереди. Исключение из правила - высокоприоритетное сообщение, которое помещается в начало очереди без учета ограничения на длину очереди.
Класс очереди:
TGsvQueue = classprivate FGetEvent: TGsvEvent; FPutEvent: TGsvEvent; FLatch: TGsvLatch; FList: TList; FMaxCount: Integer; function GetCount: Integer; procedure SetEvents; public constructor Create(aMaxCount: Integer); destructor Destroy; override; function Get(aThread: TGsvThread; aTimeout: Cardinal = INFINITE): TObject; function Put(aThread: TGsvThread; aMessage: TObject; aTimeout: Cardinal = INFINITE): Boolean; procedure PutOutOfTurn(aMessage: TObject); property GetEvent: TGsvEvent read FGetEvent; property PutEvent: TGsvEvent read FPutEvent; property Count: Integer read GetCount; property MaxCount: Integer read FMaxCount;end;Рассмотрим подробнее метод Get:
function TGsvQueue.Get(aThread: TGsvThread; aTimeout: Cardinal): TObject;begin Result:= nil; if not FGetEvent.Wait(aThread, aTimeout) then Exit; FLatch.Lock; try if FList.Count <> 0 then begin Result:= TObject(FList.Items[0]); FList.Delete(0); SetEvents; end; finally FLatch.Unlock; end;end;Поток, вызвавший метод Get, сразу переходит в состояние ожидания непустой очереди. Если очередь содержит хоть одно сообщение, то поток продолжит свое выполнение без ожидания. Первое сообщение извлекается из списка внутри критической секции, а затем производится переустановка событий:
procedure TGsvQueue.SetEvents;begin FGetEvent.State:= FList.Count <> 0; FPutEvent.State:= FList.Count < FMaxCount;end;Обычно только один поток пишет в очередь и только один поток читает из нее. Если при асинхронном взаимодействии требуется обмен сообщениями, то в структуре данных записываемого сообщения указывается дескриптор потока-отправителя. Кроме взаимодействия один-к-одному, очередь поддерживает достаточно важный случай взаимодействия многие-к-одному, при котором многие потоки посылают данные одному потоку. Можно себе представить также ситуацию множественного чтения-записи, когда многие потоки посылают сообщения в одну очередь, и несколько потоков выбирают сообщения из очереди. Наиболее вероятна такая ситуация при наличии массива идентичных потоков-получателей.
Дата добавления: 2015-07-20; просмотров: 127 | Нарушение авторских прав
<== предыдущая страница | | | следующая страница ==> |
Разделяемые данные | | | Синхронное взаимодействие |