Комментарии к примеру с таймером ожидания

Исходя из типа таймера и используя либо процедуру завершения, либо ожидание перехода дескриптора в сигнальное состояние, можно образовать четыре различных комбинации. Программа 14.3 иллюстрирует использование процедуры завершения и синхронизирующего таймера. Вы сможете тестировать каждую из четырех возможных комбинаций, изменяя комментарии в версии программы TimeBeep.с, доступной на Web-сайте.

Порты завершения ввода/вывода

Порты завершения ввода/вывода, поддерживаемые лишь на NT-платформах, объединяют в себе возможности перекрывающегося ввода/вывода и независимых потоков и используются чаще всего в серверных программах. Чтобы выяснить, какими требованиями это может диктоваться, обратимся к серверам, построенным в главах 11 и 12, где каждый клиент поддерживался отдельным рабочим потоком, связанным с сокетом или экземпляром именованного канала. Это решение хорошо работает лишь в тех случаях, когда число клиентов невелико.

Посмотрим, однако, что произойдет, если число клиентов достигнет 1000. В имеющейся модели для этого потребуется 1000 потоков, для каждого из которых необходимо выделить значительный объем виртуальной памяти. Так, по умолчанию каждому потоку выделяется 1 Мбайт стекового пространства, так что для 1000 потоков потребуется 1 Гбайт, и переключение контекстов потоков может увеличить задержки, обусловленные ошибками из-за отсутствия страниц.[35] Кроме того, потоки будут состязаться между собой за право владения общими ресурсами как на уровне планировщика, так и внутри процесса, и это, как было показано в главе 9, может приводить к снижению производительности. В связи с этим требуется механизм, позволяющий небольшому пулу рабочих потоков обслуживать большое количество клиентов.

Искомое решение обеспечивается портами завершения ввода/вывода, которые предоставляют возможность создавать ограниченное количество серверных потоков в пуле потоков, имея очень большое количество дескрипторов именованных каналов (или сокетов). При этом дескрипторы не соединяются попарно с отдельными рабочими серверными потоками; серверный поток может обслуживать любой дескриптор, данные которого нуждаются в обработке.

Итак, порт завершения ввода/вывода — это набор перекрывающихся дескрипторов, и потоки ожидают перехода порта в сигнальное состояние. Когда завершается операция чтения или записи с участием какого-либо дескриптора, один из потоков пробуждается и принимает данные и результаты выполнения операции ввода/вывода. Далее поток может обработать данные и вновь перейти в состояние ожидания перехода порта в сигнальное состояние.

Прежде всего необходимо создать порт завершения ввода/вывода и присоединить к нему перекрывающиеся дескрипторы.

Управление портами завершения ввода/вывода

Для создания порта и присоединения к нему дескрипторов используется одна и та же функция — CreateCompletionPort. Необходимость выполнения этой функцией двух разных задач соответственно усложняет использование ее параметров.

HANDLE CreateIoCompletionPort(HANDLE FileHandle, HANDLE ExistingCompletionPort, DWORD CompletionKey, DWORD NumberOfConcurrentThreads);

Порт завершения ввода/вывода представляет собой совокупность дескрипторов файлов, открытых в режиме OVERLAPPED. Параметр FileHandle — это перекрывающийся дескриптор, присоединяемый к порту. Если задать его значение равным INVALID_DESCRIPTOR_HANDLE, то функция создаст новый порт завершения ввода/вывода и возвратит его дескриптор. В этом случае следующий параметр, ExistingCompletionPort, должен быть установлен в NULL.

ExistingCompletionPort — порт, созданный при первом вызове функции, к которому должен быть присоединен дескриптор, указанный в первом параметре. В случае успешного выполнения функция возвращает дескриптор порта, иначе — NULL.

CompletionKey — указывает ключ, который будет включен в пакет завершения для дескриптора FileHandle. Обычно в качестве ключа используется значение индекса массива структур данных, содержащих тип операции, дескриптор и указатель на буфер данных.

NumberOfConcurrentThreads — предельно допустимое количество потоков, которым разрешено параллельное выполнение. При наличии других потоков сверх этого количества, ожидающих перехода порта в сигнальное состояние, они будут оставаться блокированными, даже если существует дескриптор с доступными данными. Если этот параметр установлен равным 0, то в качестве предела используется количество процессоров, установленных в системе.

Количество перекрывающихся дескрипторов, которые могут быть связаны с одним портом завершения ввода/вывода, ничем не ограничивается. Первоначальный вызов функции CreateCompletionPort используется для создания порта и указания максимального количества потоков. Для каждого очередного перекрывающегося дескриптора, подлежащего связыванию с данным портом, следует повторно вызывать эту же функцию. К сожалению, способов, позволяющих удалить дескриптор из порта завершения, не существует, и это упущение значительно ограничивает гибкость программ.

Дескрипторы, связанные с портом не должны использоваться совместно с функциями ReadFileEx и WriteFileEx. В документации Microsoft не рекомендуется разделять файлы и объекты иного типа, используя другие открытые дескрипторы.

Ожидание порта завершения ввода/вывода

Для выполнения ввода/вывода с участием дескрипторов, связанных с портом, используются функции ReadFile и WriteFile со структурами OVERLAPPED (дескрипторы событий не требуются). Далее операция ввода/вывода помещается в очередь порта завершения.

Поток ожидает завершения перекрывающейся операции ввода/вывода, находящейся в очереди, не путем ожидания события, а путем вызова функции GetQueueCompletionStatus с указанием порта завершения (completion port). Когда вызывающий поток пробуждается, функция возвращает ключ, который был указан при первоначальном присоединении к порту дескриптора, чья операция завершилась, и этот ключ может указывать количество переданных байтов и идентификационные данные фактического дескриптора, связанного с завершившейся операцией.

Следует отметить, что уведомление о завершении операции получит не обязательно тот же поток, который инициировал чтение или запись. Уведомление о завершении операции может быть получено любым ожидающим потоком. Поэтому необходимо, чтобы ключ мог идентифицировать дескриптор, связанный с завершившейся операцией.

Имеется также возможность использовать конечный интервал ожидания (time-out).

BOOL GetQueuedCompletionStatus(HANDLE CompletionPort, LPDWORD lpNumberOfBytesTransferred, LPDWORD lpCompletionKey, LPOVERLAPPED *lpOverlapped, DWORD dwMilliseconds);

Иногда может оказаться удобным, чтобы операция не помещалась в очередь порта завершения ввода/вывода. В этом случае поток может ожидать наступления перекрывающегося события, как показано в программе 14.4 и дополнительном примере, atouMTCP, который находится на Web-сайте. Для указания того, что перекрывающаяся операция не должна помещаться в очередь порта завершения, вы должны установить младший бит дескриптора события (hEvent) в структуре OVERLAPPED; тогда вы получите возможность ожидать наступления события для данной конкретной операции. Такое решение является довольно странным, однако оно документировано, хотя особо и не подчеркивается.

Отправка уведомления порту завершения ввода/вывода

Поток может отправить в порт событие завершения вместе с ключом, чтобы завершить остающийся невыполненным вызов функции GetQueueCompletionStatus. Вся необходимая для этого информация предоставляется функцией PostQueueCompletionStatus.

BOOL PostQueuedCompletionStatus(HANDLE CompletionPort, DWORD dwNumberOfBytesTransferred, DWORD dwCompletionKey, LPOVERLAPPED lpOverlapped);

Для пробуждения ожидающих потоков даже в условиях отсутствия завершившихся операций иногда используют метод, суть которого заключается в предоставлении фиктивного значения ключа, например, –1. Ожидающие потоки должны проверять значения ключей, и эта методика может использоваться, например, для того, чтобы сигнализировать потоку о необходимости завершить работу.

Альтернативы портам завершенияввода/вывода

В главе 9 было показано, как использовать семафор для ограничения количества готовых к выполнению потоков, и этот же метод можно эффективно применять для регулирования пропускной способности в условиях, когда множество потоков соревнуются между собой за право владения ограниченными ресурсами.

Эту же методику мы могли бы применить и в серверах serverSK (программа 12.2) и serverNP (программа 11.3). Все, что для этого требуется — это организовать ожидание перехода семафора в сигнальное состояние после завершения запроса на чтение, выполнение этого запроса, создание ответа и освобождение семафора перед тем, как записать ответ. Такое решение гораздо проще того, которое реализовано в примере с портом завершения ввода/вывода, приведенном в следующем разделе. Единственная проблема состоит в том, что потоков может оказаться очень много, и для каждой из них требуется собственное стековое пространство, что приведет к большому расходу виртуальной памяти. Остроту этой проблемы можно несколько ослабить, тщательно распределяя необходимые объемы стекового пространства. Упражнение 14.6 включает в себя выполнение экспериментов с альтернативным решением подобного рода, а реализация соответствующего примера находится на Web-сайте.

Существует еще одна возможность, которую можно использовать при создании масштабируемых серверов. Выборка пакетов рабочих заготовок (work items) из очереди (см. главу 10) может осуществляться с использованием ограниченного количества потоков. Поступающие рабочие заготовки могут помещаться в очередь одной или несколькими главными потоками, как показано в программе 10.5.

Пример: сервер, использующий порты завершения ввода/вывода

Программа 14.4 представляет видоизмененный вариант программы serverNP (программа 11.3), в котором используются порты завершения ввода/вывода. Этот сервер создает небольшой пул серверных потоков и больший пул дескрипторов перекрывающихся каналов, а также ключей завершения, по одному для каждого дескриптора. Перекрывающиеся дескрипторы присоединяются к порту завершения, а затем вызывается функция ConnectNamedPipe. Серверные потоки ожидают сигналов завершения, связанных как с подключениями клиентов, так и с операциями чтения. Когда регистрируется операция чтения, обрабатывается соответствующий клиентский запрос, и результаты возвращаются без использования порта завершения. Вместо этого серверный поток ожидает наступления события после выполнения операции записи, причем младший бит дескриптора события в структуре OVERLAPPED устанавливается в 1.

В другом возможном варианте решения, отличающемся большей гибкостью, можно было бы закрывать дескриптор при каждом отсоединении клиента и создавать новый дескриптор для каждого нового подключения. Этот способ аналогичен тому, который использовался в случае сокетов в главе 12. Вместе с тем, имеется одна трудность, обусловленная невозможностью удаления дескрипторов из порта завершения, в результате чего использование короткоживущих дескрипторов подобного рода будет приводить к утечке ресурсов.

Поскольку с большей частью кода вы уже знакомы по предыдущим примерам, она здесь не приводится.

Программа 14.4. serverCP: сервер, использующий порт завершения

/* Глава 14. ServerCP. Многопоточный сервер.

Версия на основе именованного канала, пример ПОРТА ЗАВЕРШЕНИЯ.

Использование: Server [ИмяПользователя ИмяГруппы]. */

 

#include "EvryThng.h"

#include "ClntSrvr.h"

 

/* Здесь определяются сообщения запроса и ответа. */

typedef struct { /*Структуры, на которые указывают ключи портов завершения*/

HANDLE hNp; /* и которые представляют еще не выполненные операции */

REQUEST Req; /* ReadFile и ConnectNamedPipe. */

DWORD Type; /* 0 – ConnectNamedPipe; 1 – ReadFile. */

OVERLAPPED Ov;

} CP_KEY;

 

static CP_KEY Key[MAX_CLIENTS_CP]; /* Доступно всем потокам. */

/* … */

 

_tmain(int argc, LPTSTR argv[]) {

HANDLE hCp, hMonitor, hSrvrThread[MAXCLIENTS];

DWORD iNp, iTh, MonitorId, ThreadId;

THREAD_ARG ThArgs[MAX_SERVER_TH];

/*…*/

hCp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, MAX_SERVER_TH);

/* Создать перекрывающийся именованный канал для каждого потенциального */

/* клиента, добавить порт завершения и ожидать соединения. */

/* Предполагается, что максимальное количество клиентов намного */

/* превышает количество серверных потоков. */

for (iNp = 0; iNp < MAX_CLIENTS_CP; iNp++) {

memset(&Key[iNp], 0, sizeof(CP_KEY));

Key[iNp].hNp = CreateNamedPipe(SERVER_PIPE, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_READMODE_MESSAGE | PIPE_TYPE_MESSAGE | PIPE_WAIT, MAX_CLIENTS_CP, 0, 0, INFINITE, pNPSA);

CreateIoCompletionPort(Key[iNp].hNp, hCp, iNp, MAX_SERVER_TH + 2);

Key[iNp].Ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

ConnectNamedPipe(Key[iNp].hNp, &Key[iNp].Ov);

}

/* Создать рабочие серверные потоки и имя временного файла для каждой из них.*/

for (iTh = 0; iTh < MAX_SERVER_TH; iTh++) {

ThArgs[iTh].hCompPort = hCp;

ThArgs[iTh].ThreadNo = iTh;

GetTempFileName(_T("."), _T("CLP"), 0, ThArgs[iTh].TmpFileName);

hSrvrThread[iTh] = (HANDLE)_beginthreadex (NULL, 0, Server, &ThArgs[iTh], 0, &ThreadId);

}

/* Дождаться завершения всех потоков и "убрать мусор". */

/* … */

return 0;

}

 

static DWORD WINAPI Server(LPTHREAD_ARG pThArg)

/* Функция потока сервера.

Имеется по одному потоку для каждого потенциального клиента. */

{

HANDLE hCp, hTmpFile = INVALID_HANDLE_VALUE;

HANDLE hWrEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

DWORD nXfer, KeyIndex, ServerNumber;

/* … */

BOOL Success, Disconnect, Exit = FALSE;

LPOVERLAPPED pOv;

OVERLAPPED ovResp = {0, 0, 0, 0, hWrEvent}; /*Для ответных сообщений.*/

/* Чтобы избежать помещения перекрывающейся операции в очередь порта завершения, должен быть установлен младший бит события. Несмотря на всю странность этого способа, он документирован. */

ovResp.hEvent = (HANDLE)((DWORD)hWrEvent | 0x1);

GetStartupInfo(&StartInfoCh);

hCp = pThArg->hCompPort;

ServerNumber = pThArg->ThreadNo;

while(!ShutDown && !Exit) __try {

Success = FALSE; /* Устанавливается только в случае успешного завершения всех операций. */

Disconnect = FALSE;

GetQueuedCompletionStatus(hCp, &nXfer, &KeyIndex, &pOv, INFINITE);

if (Key [KeyIndex].Type == 0) { /* Соединение установлено. */

/* Открыть временный файл с результатами для этого соединения. */

hTmpFile = CreateFile(pThArg->TmpFileName, /* … */);

Key[KeyIndex].Type = 1;

Disconnect = !ReadFile(Key[KeyIndex].hNp, &Key[KeyIndex].Req, RQ_SIZE, &nXfer, &Key[KeyIndex].Ov) && GetLastError () == ERROR_HANDLE_EOF; /* Первая операция чтения. */

if (Disconnect) continue;

Success = TRUE;

} else {

/* Чтение завершилось. Обработать запрос. */

ShutDown = ShutDown || (_tcscmp (Key[KeyIndex].Req.Record, ShutRqst) == 0);

if (ShutDown) continue;

/* Создать процесс для выполнения команды. */

/* … */

/* Отвечать по одной строке за один раз. На данном этапе удобно использовать функции библиотеки С для работы со строками. */

fp = _tfopen(pThArg->TmpFileName, _T("r"));

Response.Status = 0;

/* Поскольку младший бит события установлен, ответные сообщения в очередь порта завершения не помещаются. */

while(_fgetts(Response.Record, MAX_RQRS_LEN, fp) != NULL) {

WriteFile(Key [KeyIndex].hNp, &Response, RS_SIZE, &nXfer, &ovResp);

WaitForSingleObject(hWrEvent, INFINITE);

}

fclose(fp);

/* Уничтожить содержимое временного файла. */

SetFilePointer(hTmpFile, 0, NULL, FILE_BEGIN);

SetEndOfFile(hTmpFile);

/* Отправить признак конца ответа. */

Response.Status = 1;

strcpy(Response.Record, "");

WriteFile(Key[KeyIndex].hNp, &Response, RS_SIZE, &nXfer, &ovResp);

WaitForSingleObject(hWrEvent, INFINITE);

/* Конец основного командного цикла. Получить следующую команду.*/

Disconnect = !ReadFile(Key[KeyIndex].hNp, &Key[KeyIndex].Req, RQ_SIZE, &nXfer, &Key[KeyIndex].Ov) && GetLastError() == ERROR_HANDLE_EOF; /* Следующее чтение */

if (Disconnect) continue;

Success = TRUE;

}

} __finally {

if (Disconnect) {

/* Создать еще одно соединение по этому каналу. */

Key[KeyIndex].Type = 0;

DisconnectNamedPipe(Key[KeyIndex].hNp);

ConnectNamedPipe(Key[KeyIndex].hNp, &Key[KeyIndex].Ov);

}

if (!Success) {

ReportError(_T("Ошибка сервера"), 0, TRUE);

Exit = TRUE;

}

}

FlushFileBuffers(Key[KeyIndex].hNp);

DisconnectNamedPipe(Key[KeyIndex].hNp);

CloseHandle(hTmpFile);

/* … */

_endthreadex(0);

return 0;

/* Подавление предупреждающих сообщений компилятора. */

}

Резюме

Для выполнения асинхронных операций ввода/вывода в Windows предусмотрены три метода. Самой распространенной и наиболее простой является методика, основанная на использовании потоков, которая, в отличие от двух остальных, способна работать даже под управлением Windows 9x. Каждый из потоков отвечает за выполнение определенной последовательности действий, состоящей из одной или нескольких последовательно выполняющихся, блокирующихся операций ввода/вывода. Кроме того, каждый поток должен располагать собственным дескриптором файла или канала.

Перекрывающийся ввод/вывод обеспечивает возможность выполнения асинхронных операций одним потоком с использованием одного дескриптора файла, но каждой отдельной операции вместо пары "поток—дескриптор файла" должен предоставляться дескриптор события. При этом требуется организовать ожидание завершения выполнения каждой конкретной операции ввода/вывода по отдельности, а затем очищать системные ресурсы или выполнять любые другие действия, необходимые для управления последовательностью выполнения операций.

С другой стороны, расширенный ввод/вывод автоматически вызывает код завершения и не требует использования дополнительных событий.

Одним неоспоримым преимуществом перекрывающегося ввода/вывода является то, что он предоставляет возможность создания портов завершения ввода/вывода, однако, о чем ранее уже говорилось и что иллюстрируется программой atouMTCP, которая находится на Web-сайте, но ценность и этого преимущества несколько снижается из-за того, что для ограничения количества активных потоков в пуле рабочих потоков могут быть использованы семафоры. Дополнительным недостатком портов завершения является то, что они не допускают удаления присоединенных к ним дескрипторов.

UNIX обеспечивает поддержку потоков средствами Pthreads, что ранее уже обсуждалось.

В System V UNIX асинхронный ввод/вывод ограничивается потоками и не может использоваться для выполнения операций с файлами и каналами.

В версии BSD 4.3 для указания события, связанного с дескриптором файла, и выбора функции с целью определения состояния готовности дескрипторов файлов используется комбинация сигналов (SIGIO). Для дескрипторов файлов должен устанавливаться режим O_ASYNC. Такой подход может использоваться только с терминалами и в сетевых коммуникациях.