Шрифт:
//...
pthread_mutex_lock(&Mutex)
AvailableThreads—
pthread_mutex_unlock(&Mutex)
}
int threadAvailability(void)
{
//...
pthread_mutex_lock(&Mutex)
if(AvailableThreads > 1)
return 1
else
return 0
pthread_mutex_unlock(&Mutex)
}
Ф ункция threadAvailability возвратит число 1, если максимально допустимое количество потоков для процесса еще не достигнуто. Эта функция опрашивает глобальную переменную ThreadAvailability, в которой хранится число потоков, еще доступных для процесса. Управляющий поток вызывает функцию decrementThreadAvailability, которая декрементирует эту глобальную переменную до создания им рабочего потока. Каждый рабочий поток вызывает функцию incrementThreadAvailability, которая инкрементирует глобальную переменную ThreadAvailability до начала его выполнения. Обе функции содержат обращение к функции pthread_mutex_lock до получения доступа к этой глобальной переменной и обращение к функции pthread_mutex_unlock после него. Если максимально допустимое количество потоков превышено, управляющий поток может отменить создание потока, если это возможно, или породить другой процесс, если это необходимо. Функции taskX, taskY и taskZ выполняют код, предназначенный для обработки запроса соответствующего типа.
Другой подход к реализации модели делегирования состоит в создании управляющим потоком пула потоков, которым (вместо создания под каждый новый запрос нового потока) переназначаются новые запросы. Управляющий поток во время инициализации создает некоторое количество потоков, а затем каждый созданный поток приостанавливается до тех пор, пока в очередь не будет добавлен новый запрос. Управляющий поток для выделения запросов из очереди по-прежнему использует цикл событий. Но вместо создания нового потока для обслуживания очередного запроса, управляющий поток уведомляет уже существующий поток о необходимости обработки запроса. Этот подход к реализации модели делегирования представлен в листинге 4.7.
// Листинг 4.7. Подход 2: скелет программы реализации . модели управляющего и рабочих потоков
pthread_t Thread[N]
// boss thread
{
pthread_create(&(Thread[1]...taskX...);
pthread_create(&(Thread[2]...taskY...);
pthread_create(&(Thread[3]...taskZ...);
//...
loop while(Request Queue is not empty
get request
classify request
switch(request type)
{
case X :
enqueue request to XQueue
signal Thread[1]
case Y :
enqueue request to YQueue
signal Thread[2]
case Z :
enqueue request to ZQueue
signal Thread[3]
//...
}
end loop
}
void *taskX(void *X)
{
loop
suspend until awaken by boss
loop while XQueue is not empty
dequeue request
process request
end loop
until done
{
void *taskY(void *Y)
{
loop
suspend until awaken by boss
loop while YQueue is not empty
dequeue request
process request
end loop
until done
}
void *taskZ(void *Z)
{
loop
suspend until awaken by boss
loop while (ZQueue is not empty)
dequeue request
process request
end loop
until done
} //.. .
В листинге 4.7 управляющий поток создает N рабочих потоков (по одному для каждого типа задачи). Каждая задача связана с обработкой запросов некоторого типа В цикле событий управляющий поток извлекает запрос из очереди запросов, определяет его тип, ставит его в очередь запросов, соответствующую типу, а затем оправляет сигнал потоку, который обрабатывает запросы из этой очереди. Функции потоков также содержат циклы событий. Поток приостанавливается до тех пор, пока не получит сигнал от управляющего потока о существовании запроса в его очереди. После «пробуждения» (уже во внутреннем цикле) поток обрабатывает все запросы до тех пор, пока его очередь не опустеет.
Использование модели сети с равноправными узлами
В модели равноправных узлов один поток сначала создает все потоки, необходимые выполнения всех задач. Каждый из равноправных потоков обрабатывает запросы, поступающие из собственного входного потока данных. В листинге 4.8 представлен скелет программы, реализующий при разделении программы на потоки метод равноправных узлов
Листинг 4.8. Скелет программы реализации модели равноправных потоков
pthread_t Thread[N]
// initial thread
{
pthread_create(&(Thread[1]...taskX...);
pthread_create(&(Thread[2]...taskY...);
pthread_create(&(Thread[3]...taskZ...);
//...
}
void *taskX(void *X)
{
loop while (Type XRequests are available)
extract Request
process request
end loop
return(NULL)
}
В модели равноправных потоков каждый поток отвечает за собственный входной поток данных. Входные данные могут быть выделены из базы данных, файла и т.п.