Пример ниже демонстрирует использование ниже перечисленных функций с портами завершения ввода-вывода данных (I/O):
Поскольку этот пример - законченное приложение, оно также демонстрирует и использование нескольких других функций.
Типовой код демонстрирует многопоточное приложение, разработанное, чтобы читать и анализировать большой файл. Асинхронный ввод - вывод (I/O) используется, чтобы читать порцию файла за один прием из пула рабочих потоков. Завершенный ввод - вывод (I/O) переправляется в порт завершения ввода-вывода данных (I/O). Рабочий поток перехватывает завершенный ввод - вывод (I/O), анализирует его, сообщает о результатах и начинает другую асинхронную операцию чтения. Процесс продолжается до тех пор, пока не будет прочитан весь файл.
Число потоков базируется на числе процессоров в главном компьютере. В особенности, если это число кратно двум. Дополнительные подробности смотри в статье Порты завершения ввода-вывода данных (I/O).
Затеянный "анализ" - это подсчет числа идентичных последовательных пар байтов в каждом куске программы.
Чтобы проанализировать файл, он задается в командной строке. Пример становится более содержательным, если заданный файл является по величине 1 МБ или больший.
Код состоит из одного потока, запущенного функцией main и нескольких идентичных рабочих потоков запущенных, функцией называемой readAndAnalyzeChunk. Эти функции описаны ниже.
функция main
Создает отдельный порт завершения ввода-вывода данных (I/O) в котором завершается чтение записанной единицы данных.
функция
readAndAnalyzeChunk
[C++]
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <conio.h>
// Назначения для этого примера, в который мы имеем максимум 16 процессоров.
#define MAX_THREADS 32
#define BUFFER_SIZE (64*1024)
DWORD dwNumProcessors;
// Дескриптор анализируемого источникового файла
HANDLE hSourceFile=NULL;
// Дескриптор порта завершения ввода-вывода данных (I/O)
HANDLE hIOCompletionPort=NULL;
// Позиция чтения
ULARGE_INTEGER readPointer;
// Критическая секция
CRITICAL_SECTION critSec;
// Функция ThreadProc
DWORD WINAPI readAndAnalyzeChunk(LPVOID lpParam);
// Коды, которые управляют характером работы потока.
#define KICKOFFKEY 99 // Начало чтения и анализа файла
#define KEY 1 // Чтение следующей порции файла и анализ ее
#define EXITKEY 86 // Выход из работы(файл был прочитан и проанализирован)
// Структурв, отслеживающая ожидающие обработки операции I/O
typedef struct _CHUNK {
OVERLAPPED overlapped;
LPVOID buffer;
} CHUNK, *PCHUNK;
//////////////////////////////////////////////////////////////////////
int main(
int argc,
char *argv[],
char *envp)
{
HANDLE hThread[MAX_THREADS];
DWORD dwThreadId,
dwStatus,
dwStartTime,
dwEndTime,
dwExitStatus = ERROR_SUCCESS;
ULARGE_INTEGER fileSize,
readPointer;
SYSTEM_INFO systemInfo;
UINT i;
BOOL bInit=FALSE;
OVERLAPPED kickoffOverlapped,
dieOverlapped;
// Убедимся, что источниковый файл определяется в командной строке.
if (argc != 2)
{
fprintf(stderr, "Используется: %s <source file>\n", argv[0]);
dwExitStatus=1;
goto EXIT;
}
// Получим число процессоров в системе.
GetSystemInfo(&systemInfo);
dwNumProcessors = systemInfo.dwNumberOfProcessors;
// Откроем источниковый файл.
hSourceFile = CreateFile(argv[1],
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED,
NULL);
if (hSourceFile == INVALID_HANDLE_VALUE)
{
fprintf(stderr, "%s: Открыть не удалось %s, ошибка %d\n",
argv[0],
argv[1],
dwExitStatus = GetLastError());
goto EXIT;
}
fileSize.LowPart = GetFileSize(hSourceFile, &fileSize.HighPart);
if ((fileSize.LowPart==0xffffffff) && (GetLastError()!= NO_ERROR))
{
fprintf(stderr, "%s: GetFileSize завершилась ошибкой, ошибка %d\n",
argv[0],
dwExitStatus = GetLastError());
goto EXIT;
}
// Используем критическую секцию, чтобы преобразовать в последовательную
// форму доступ к нескольким потокам.
InitializeCriticalSection(&critSec);
bInit=TRUE;
// Создаем порт завершения I/O.
hIOCompletionPort = CreateIoCompletionPort(hSourceFile,
NULL, // нет существующих портов завершения I/O
KEY, // принимаемый код в пакете завершения I/O
dwNumProcessors); // максимум рабочих потоков
if (hIOCompletionPort == NULL)
{
fprintf(stderr,
"%s: Порт заверешения I/O создать не удалось (ошибка %d)\n",
argv[0],
dwExitStatus = GetLastError());
goto EXIT;
}
// Инициализируем указатель файла на асинхронную структуру
readPointer.LowPart = readPointer.HighPart = 0;
dwStartTime = GetTickCount();
// Запускаем рабочие потоки.
for (i=0; i<2*dwNumProcessors; i++)
{
hThread[i]=CreateThread(NULL,
0, // размер стека по умолчанию
(LPTHREAD_START_ROUTINE) readAndAnalyzeChunk,
(LPVOID) &fileSize,
0, // выполнение немедленно
&dwThreadId);
if (hThread[i] == NULL)
{
fprintf(stderr, "%s: Поток создать не удалось #%d (ошибка %d)\n",
argv[0], i, dwExitStatus=GetLastError());
goto EXIT;
}
}
// Помещаем в очередь начальное событие.
PostQueuedCompletionStatus(hIOCompletionPort,
0,
KICKOFFKEY,
&kickoffOverlapped);
// Ждем, когда рабочий поток завершит работу.
dwStatus = WaitForMultipleObjects(2*dwNumProcessors,
hThread,
FALSE,
INFINITE);
if (dwStatus == WAIT_FAILED)
{
fprintf(stderr, "%s: Ожидание прервано (ошибка %d)\n", argv[0],
dwExitStatus=GetLastError());
goto EXIT;
}
// Рабочий поток возвратил значение; отправим сообщение,
// чтобы поток вышел из программы.
for (i=0; i<2*dwNumProcessors-1; i++)
{
PostQueuedCompletionStatus(hIOCompletionPort,
0,
EXITKEY,
&dieOverlapped);
}
// Ждем, когда поток закончит свою работу и завершит всякую деятельность.
dwStatus = WaitForMultipleObjects(2*dwNumProcessors,
hThread,
TRUE,
INFINITE);
if (dwStatus == WAIT_FAILED)
{
fprintf(stderr, "%s: Ожидание прервано (ошибка %d)\n",
argv[0], dwExitStatus=GetLastError());
goto EXIT;
}
dwEndTime = GetTickCount();
printf( "\n\n%d байтов проанализировано за %.3f секунд\n", fileSize.LowPart,
(float)(dwEndTime-dwStartTime)/1000.0);
printf( "%.2f Mб/сек\n",
((LONGLONG)fileSize.QuadPart/(1024.0*1024.0))/
(((float)(dwEndTime-dwStartTime))/1000.0));
EXIT:
(void) _getch();
if (bInit)
DeleteCriticalSection(&critSec);
if (hThread[i])
CloseHandle(hThread[i]);
if (hIOCompletionPort)
CloseHandle(hIOCompletionPort);
if (hSourceFile)
CloseHandle(hSourceFile);
exit(dwExitStatus);
}
//////////////////////////////////////////////////////////////////////
DWORD WINAPI readAndAnalyzeChunk(LPVOID lpParam)
{
BOOL bSuccess,
bMoreToRead;
DWORD dwNumBytes,
dwKey,
dwSuccess,
i,
repeatCnt,
dwThreadId;
LPOVERLAPPED completedOverlapped;
PCHUNK completedChunk;
CHUNK chunk;
printf("Поток (%d) запущен\n", dwThreadId=GetCurrentThreadId());
chunk.buffer=VirtualAlloc(NULL, BUFFER_SIZE, MEM_COMMIT, PAGE_READWRITE);
if (chunk.buffer==NULL)
{
fprintf(stderr, "Функция VirtualAlloc завершилась ошибкой (ошибка %d)\
n", GetLastError());
exit(1);
}
// Начало асинхронного чтения. Ждем завершения чтения, затем читаем дальше.
while(1){
bSuccess=GetQueuedCompletionStatus(hIOCompletionPort,
&dwNumBytes,
&dwKey,
&completedOverlapped,
INFINITE);
if (!bSuccess && (completedOverlapped==NULL))
{
fprintf(stderr, "GetQueuedCompletionStatus завершилась ошибкой
(ошибка %d)\n", GetLastError());
exit(1);
}
if (dwKey==EXITKEY)
{
VirtualFree((LPVOID) chunk.buffer,
0,
MEM_RELEASE);
ExitThread(0);
}
if (!bSuccess)
{
fprintf(stderr, "GetQueuedCompletionStatus переместила
испорченный пакет I/O (ошибка %d)\n", GetLastError());
// Не смотря на то, что вы можете здесь завершить работу
// по ошибке этот пример продолжается.
}
if (dwKey != KICKOFFKEY)
{
// Анализируем данные. Анализ в этом примере выявляет количество
// пар последовательных байтов, которые являются равны друг другу.
printf("Проанализирована %d часть байтов\n", dwNumBytes);
completedChunk = (PCHUNK)completedOverlapped;
repeatCnt = 0;
for (i = 1; i < dwNumBytes; i++)
if ((((PBYTE)completedChunk->buffer)[i - 1] ^
((PBYTE)completedChunk->buffer)[i]) == 0)
repeatCnt++;
printf("Подсчет повторов %d (поток #%d)\n", repeatCnt, dwThreadId);
// Если число возвращенных байтов меньше, чем BUFFER_SIZE,
// которое было прочитано последний раз. Выход потока из программы.
if (dwNumBytes < BUFFER_SIZE)
ExitThread(0);
}
// Настроим структуру OVERLAPPED для последующего чтения.
EnterCriticalSection(&critSec);
if (readPointer.QuadPart < ((ULARGE_INTEGER *)lpParam)->QuadPart)
{
bMoreToRead = TRUE;
chunk.overlapped.Offset = readPointer.LowPart;
chunk.overlapped.OffsetHigh = readPointer.HighPart;
chunk.overlapped.hEvent = NULL; // не нужен
// Установим указатель позиции в файле для следующего чтения
readPointer.QuadPart += BUFFER_SIZE;
}
else bMoreToRead = FALSE;
LeaveCriticalSection(&critSec);
// Происходит последующее чтение.
if (bMoreToRead)
{
dwSuccess = ReadFile(hSourceFile,
chunk.buffer,
BUFFER_SIZE,
&dwNumBytes,
&chunk.overlapped);
if (!dwSuccess && (GetLastError() == ERROR_HANDLE_EOF))
printf( "Конец файла\n" );
if (!dwSuccess && (GetLastError() != ERROR_IO_PENDING))
fprintf (stderr, "ReadFile при %lx завершилась ошибкой (ошибка %d)
\n",
chunk.overlapped.Offset,
GetLastError());
}
} // пока конец
}
|