Использование объектов CRITICAL_SECTION для защиты разделяемыхпеременных

Использование объектов CRITICAL_SECTION не вызывает сложностей, и одним из наиболее распространенных способов их применения является обеспечение доступа потоков к разделяемым глобальным переменным. Рассмотрим, например, многопоточный сервер (аналогичный представленному на рис. 7.1), в котором необходимо вести учет следующих статистических данных:

• Общее количество полученных запросов.

• Общее количество отправленных ответов.

• Количество запросов, обрабатываемых в настоящее время всеми потоками сервера.

Поскольку переменные счетчиков являются глобальными переменными процесса, нельзя допустить того, чтобы одновременно два потока изменяли их значения. Один из методов обеспечения этого, базирующийся на применении объектов CRITICAL_SECTION, иллюстрирует схема, показанная ниже на рис. 8.2. Использование объектов CRITICAL_SECTION демонстрируется на примере программы 8.1, представляющей намного более простую систему, чем серверная.

Объекты CS могут привлекаться для решения задач, аналогичных той, которую иллюстрирует рис. 8.1, где два потока увеличивают значение одной и той же переменной. Приведенный ниже фрагмент кода обеспечивает нечто большее, нежели простое увеличение переменной, поскольку для этого достаточно было бы воспользоваться функциями взаимоблокировки. Обратите внимание на спецификатор volatile, предотвращающий размещение текущего значения переменной оптимизирующим компилятором в регистре, а не в ячейке памяти, отведенной для хранения переменной. Кроме того, в этом примере используется промежуточная переменная; этот необязательный элемент снижает эффективность программы, однако позволяет более отчетливо продемонстрировать, каким образом решается задача, иллюстрируемая рис. 8.1.

CRITICAL_SECTION cs1;

volatile DWORD N = 0, М;

/* N — глобальная переменная, разделяемая всеми потоками. */

InitializeCriticalSection (&cs1);

EnterCriticalSection (&cs1);

if (N < N_MAX) { M = N; M += 1; N = M; }

LeaveCriticalSection (&cs1);

DeleteCriticalSection (&cs1);

На рис. 8.2 представлена одна из возможных последовательностей выполнения программы для случая, изображенного на рис. 8.1, и продемонстрировано, каким образом объекты CS упрощают решение проблемы синхронизации.

Программа 8.1 демонстрирует, насколько полезными могут быть объекты CS.

Пример: простая система "производитель/потребитель"

Программа 8.1 иллюстрирует, насколько полезными могут быть объекты CS. Кроме того, эта программа демонстрирует, как создаются защищенные структуры данных для хранения состояний объектов, и знакомит с понятием инварианта (invariant) — свойства состояния объекта, относительно которого гарантируется (путем соответствующей реализации программы), что оно будет истинным за пределами критического участка кода.

Рис. 8.2. Разделение общей памяти синхронизированными потоками

 

Описание задачи приводится ниже.

• Имеются два потока, производитель (producer) и потребитель (consumer), работающие в полностью асинхронном режиме.

• Производитель периодически создает сообщения, содержащие таблицу чисел, например, таблицу биржевых котировок, которая периодически обновляется.

• По требованию пользователя потребитель отображает текущие данные. Требуется, чтобы отображаемые данные представляли собой самый последний полный набор данных, но никакие данные не должны отображаться дважды.

• Данные не должны отображаться в те промежутки времени, когда они обновляются производителем; устаревшие данные также не должны отображаться. Обратите внимание на то, что многие сообщения вообще никогда не используются и, таким образом, "теряются". Этот пример является частным случаем конвейерной модели, в которой данные передаются из одного потока в другой.

• В качестве средства контроля целостности данных производитель вычисляет простую контрольную сумму[28] данных таблицы, которая далее сравнивается с аналогичной суммой, вычисленной потребителем, дабы удостовериться в том, что данные не были повреждены при их передаче из одного потока в другой. Данные, полученные при обращении к таблице в моменты ее обновления, будут недействительными; использование объектов CS гарантирует, что этого никогда не произойдет. Инвариантом блока сообщения (message block invariant) является корректность контрольной суммы для содержимого текущего сообщения.

• Обоими потоками поддерживается статистика суммарного количества отправленных, полученных и утерянных сообщений.

Программа 8.1.simplePC: простая система "производитель/потребитель"

/* Глава 8. simplePC.с */

/* Поддерживает два потока — производителя и потребителя. */

/* Производитель периодически создает буферные данные с контрольными */

/* суммами, или "блоки сообщений", отображаемые потребителем по запросу */

/* пользователя. */

 

#include "EvryThng.h"

#include <time.h>

#define DATA_SIZE 256

 

typedef struct msg_block_tag { /* Блок сообщения. */

volatile DWORD f_ready, f_stop; /* Флаги готовности и прекращения сообщений. */

volatile DWORD sequence; /* Порядковый номер блока сообщения. */

volatile DWORD nCons, nLost;

time_t timestamp;

CRITICAL_SECTION mguard; /* Структура защиты блока сообщения. */

DWORD checksum; /* Контрольная сумма содержимого сообщения. */

DWORD data[DATA_SIZE]; /* Содержимое сообщения. */

} MSG_BLOCK;

 

/* Одиночный блок, подготовленный к заполнению новым сообщением. */

MSG_BLOCK mblock = { 0, 0, 0, 0, 0 };

 

DWORD WINAPI produce(void*);

DWORD WINAPI consume(void*);

void MessageFill(MSG_BLOCK*);

void MessageDisplay(MSG_BLOCK*);

 

DWORD _tmain(DWORD argc, LPTSTR argv[]) {

DWORD Status, ThId;

HANDLE produce h, consume_h;

/* Инициализировать критический участок блока сообщения. */

InitializeCriticalSection (&mblock.mguard);

/* Создать два потока. */

produce_h = (HANDLE)_beginthreadex(NULL, 0, produce, NULL, 0, &ThId);

consume_h = (HANDLE)_beginthreadex (NULL, 0, consume, NULL, 0, &ThId);

/* Ожидать завершения потоков производителя и потребителя. */

WaitForSingleObject(consume_h, INFINITE);

WaitForSingleObject(produce_h, INFINITE);

DeleteCriticalSection(&mblock.mguard);

_tprintf(_T("Потоки производителя и потребителя завершили выполнение\n"));

_tprintf(_T("Отправлено: %d, Получено: %d, Известные потери: %d\n"), mblock.sequence, mblock.nCons, mblock.nLost);

return 0;

}

 

DWORD WINAPI produce(void *arg)

/* Поток производителя — создание новых сообщений через случайные */

/* интервалы времени. */

{

srand((DWORD)time(NULL)); /* Создать начальное число для генератора случайных чисел. */

while (!mblock.f_stop) {

/* Случайная задержка. */

Sleep(rand() / 100);

/* Получить и заполнить буфер. */

EnterCriticalSection(&mblock.mguard);

__try {

if (!mblock.f_stop) {

mblock.f_ready = 0;

MessageFill(&mblock);

mblock.f_ready = 1;

mblock.sequence++;

}

} __finally { LeaveCriticalSection (&mblock.mguard); }

}

return 0;

}

 

DWORD WINAPI consume (void *arg) {

DWORD ShutDown = 0;

CHAR command, extra;

/* Принять ОЧЕРЕДНОЕ сообщение по запросу пользователя. */

while (!ShutDown) { /* Единственный поток, получающий доступ к стандартным устройствам ввода/вывода. */

_tprintf(_T("\n**Введите 'с' для приема; 's' для прекращения работы: "));

_tscanf("%c%c", &command, &extra);

if (command == 's') {

EnterCriticalSection(&mblock.mguard);

ShutDown = mblock.f_stop = 1;

LeaveCriticalSection(&mblock.mguard);

} else if (command == 'c') { /* Получить новый буфер для принимаемых сообщений. */

EnterCriticalSection(&mblock.mguard);

__try {

if (mblock.f_ready == 0) _tprintf(_T("Новые сообщения отсутствуют. Повторите попытку.\n"));

else {

MessageDisplay(&mblock);

mblock.nCons++;

mblock.nLost = mblock.sequence – mblock.nCons;

mblock.f_ready = 0; /* Новые сообщения отсутствуют. */

}

} __finally { LeaveCriticalSection (&mblock.mguard); }

} else {

tprintf(_T("Такая команда отсутствует. Повторите попытку.\n"));

}

}

return 0;

}

 

void MessageFill(MSG_BLOCK *mblock) {

/* Заполнить буфер сообщения содержимым, включая контрольную сумму и отметку времени. */

DWORD i;

mblock->checksum = 0;

for (i = 0; i < DATA_SIZE; i++) {

mblock->data[i] = rand();

mblock->checksum ^= mblock->data[i];

}

mblock->timestamp = time(NULL);

return;

}

 

void MessageDisplay(MSG_BLOCK *mblock) {

/* Отобразить буфер сообщения, отметку времени и контрольную сумму. */

DWORD i, tcheck = 0;

for (i = 0; i < DATA_SIZE; i++) tcheck ^= mblock->data[i];

_tprintf(_T("\nВремя генерации сообщения № %d: %s"), mblock->sequence, _tctime(&(mblock->timestamp)));

_tprintf(_T("Первая и последняя записи: %х %х\n"), mblock->data[0], mblock->data[DATA_SIZE – 1]);

if (tcheck == mblock->checksum) _tprintf(_T("УСПЕШНАЯ ОБРАБОТКА –>Контрольная сумма совпадает.\n"));

else tprintf(_T("СБОЙ –>Несовпадение контрольной суммы. Сообщение запорчено.\n"));

return;

}

Комментарии к примеру простой системы "производитель/потребитель"

Этот пример иллюстрирует некоторые моменты и соглашения, касающиеся программирования, которые будут важны для нас на протяжении этой и последующих глав.

• Объект CRITICAL_SECTION является частью объекта (блока сообщения), защиту которого он обеспечивает.

• Каждый доступ к сообщению осуществляется на критическом участке кода.

• Типом переменных, доступ к которым осуществляется разными потоками, является volatile.

• Использование обработчиков завершения гарантирует, что объекты CS будут обязательно освобождены. Хотя в данном случае эта методика и не является для нас существенной, она дополнительно гарантирует, что вызов функции LeaveCriticalSection не будет случайно опущен впоследствии при изменении кода программы. Имейте также в виду, что обработчик завершения ограничен использованием средств С, и его не следует использовать совместно с C++.

• Функции MessageFill и MessageDisplay вызываются лишь на критических участках кода и используют для нужд своих вычислений не глобальную, а локальную память. Кстати, обе они будут применяться и в последующих примерах, но их листинги больше приводиться не будут.

• Не существует удобного способа, при помощи которого поток производителя мог бы известить поток потребителя о наличии нового сообщения, и поэтому поток потребителя должен просто ожидать, пока не будет установлен флаг готовности, который используется для индикации появления нового сообщения. Устранить этот недостаток нам помогут объекты событий ядра.

• Одним из инвариантных свойств, которые гарантируются этой программой, является то, что контрольная сумма блока сообщения будет всегда корректной вне критических участков кода. Другим инвариантным свойством является следующее:

0 <= nLost + nCons <= sequence

Об этом важном свойстве далее еще будет идти речь.

• О необходимости прекращения передачи поток производителя узнает лишь после проверки флага, устанавливаемого в блоке сообщения потока потребителя. Поскольку потоки не могут обмениваться между собой никакими сигналами, а вызов функции TerminateThread чреват нежелательными побочными эффектами, эта методика является простейшим способом остановки другого потока. Разумеется, чтобы эта методика была эффективной, работа потоков должна быть скоординированной. В то же время, подобное решение требует, чтобы поток не блокировался, иначе он не сможет тестировать флаг; способы решения проблемы блокированных потоков обсуждаются в главе 10.

Объекты CRITICAL_SECTION предоставляют в наше распоряжение мощный механизм синхронизации, но, тем не менее, они не в состоянии обеспечить всю полноту необходимых функциональных возможностей. О невозможности отправки сигналов одним потоком другому уже говорилось, кроме того, эти объекты не позволяют воспользоваться конечными интервалами ожидания (time-out). Объекты синхронизации ядра Windows позволяют снизить остроту не только этих, но и других ограничений.