如何在c / c ++中asynchronous运行线程的特定函数?

性能调优:将数据写入多个pipe道

现在我正在做一个单一的线程:

for(unsigned int i = 0; i < myvector.size();) { tmp_pipe = myvector[i]; fSuccess = WriteFile( tmp_pipe, &Time, sizeof(double), &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } fSuccess = WriteFile( tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } fSuccess = WriteFile( tmp_pipe, pBuffer, BufferLen, &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } i++; } 

结果是第一个pipe获得的数据最快,最后一个pipe最慢。

我正在考虑在不同的线程中这样做,所以每个pipe被平等地处理。

但是如何在c / c ++中asynchronous运行线程的特定函数(主线程应该立即返回)呢?

Solutions Collecting From Web of "如何在c / c ++中asynchronous运行线程的特定函数?"

您可以使用CreateThread函数创建一个新的线程,并将管道句柄作为参数传递给线程函数:

 DWORD PipeThread(LPVOID param) { HANDLE hPipe = (HANDLE)param; // Do the WriteFile operations here return 0; } for(unsigned int i = 0; i < myvector.size(); i++) CreateThread(NULL, 0, PipeThread, myvector[i], 0, NULL); 

请注意,vector类不是线程安全的,所以如果你不同步访问它们,你会面临myvector.erase问题。 使用关键部分。


更新:由于您提到的频率很高,因此可以使用I / O完成端口,而不是每个管道的单独线程。 然后,您可以使用WriteFile重叠I / O来异步执行写入,并且可以只有一个额外的线程来侦听写入的完成:

 // Initial setup: add pipe handles to a completion port HANDLE hPort = CreateCompletionPort(myvector[0], NULL, 0, 1); for (unsigned int i = 1; i < myvector.size(); i++) CreateCompletionPort(myvector[i], hPort, 0, 0); // Start thread CreateThread(NULL, 0, PipeThread, hPort, 0, NULL); // Do this as many times as you want for(unsigned int i = 0; i < myvector.size(); i++) { OVERLAPPED *ov = new OVERLAPPED; ZeroMemory(ov, sizeof ov); WriteFile(myvector[i], buffer, size, NULL, ov); // If pipe handle was closed, WriteFile will fail immediately // Otherwise I/O is performed asynchronousously } // Close the completion port at the end // This should automatically free the thread CloseHandle(hPort); --- DWRD PipeThread(LPVOID param) { HANDLE hPort = (HANDLE)param; DWORD nBytes; ULONG_PTR key; LPOVERLAPPED ov; // Continuously loop for I/O completion while (GetQueuedCompletionStatus(hPort, &nBytes, &key, &ov, INFINITE)) { if (ov != NULL) { delete ov; // Do anything else you may want to do here } } return 0; } 

你有writev()可用吗? 如果是这样,您可以将三个写入操作减少到每个管道一个,这将更有效率。 它也稍微简化了错误处理,但你可以折叠你必须:

 for (unsigned int i = 0; i < myvector.size(); i++) { tmp_pipe = myvector[i]; if (!WriteFile(tmp_pipe, &Time, sizeof(double), &dwWritten, NULL) || !WriteFile(tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL) || !WriteFile(tmp_pipe, pBuffer, BufferLen, &dwWritten, NULL)) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); } } 

这在很多方面比较简单,因为错误处理有1/3,所以操作代码隐藏得更少。

当然,你仍然想把这段代码封装成线程代码,所以写操作将由不同的线程处理。 你会安排每个线程获得自己的管道; 他们会共享对时间和缓冲区的只读访问权限。 每个线程将完成其写入,并在完成时返回状态。 父线程将等待每个子线程,如果一个子线程报告失败,相应的客户端管道将从该向量中移除。 由于只有父线程会操纵向量,因此没有线程问题需要担心。

概述:

  for (i = 0; i < myvector.size(); i++) tid[i] = thread create (myvector[i], write_to_pipe); for (i = 0; i < myvector.size(); i++) { status = wait for thread(tid[i]); if (status != success) myvector.erase(...); } 

数组(或向量) tid保存线程标识。 write_to_pipe()函数是线程的主函数, 它会在传递的管道上进行书写,并以适当的状态退出。

这些命名管道? 如果是这样,那么在创建它们时可以使用FILE_FLAG_OVERLAPPED ,这允许您在不处理线程的情况下执行异步写入。 这里是MSDN的一个例子 。

如果这些是匿名管道, 则不支持重叠的I / O,所以这可能是切换到命名管道的一个很好的理由。

另一个选项可能是为每个写入排队一个工作项目 ,但这并不能保证所有三个写入将同时执行。

我会考虑以减少I / O调用数量的方式来准备数据。 在知道数据正在以尽可能少的I / O调用的有效方式写入的情况下,我会考虑使用异步I / O。 如果性能还不够好,那么考虑在设计中增加额外的线程。

您可以减少写入次数的一种方法是使用一种结合所有数据的结构,以便可以使用一次写入而不是三次。 编译器可能需要去除编译器可能添加的任何额外的填充/对齐。

 #pragma pack(push,1) struct PipeData { double _time; long _buffer_len; char* _buffer; }; #pragma pack(pop) PipeData data; int data_len = sizeof(double) + sizeof(long) + <yourbufferlen>; for(unsigned int i = 0; i < myvector.size();) { tmp_pipe = myvector[i]; fSuccess = WriteFile( tmp_pipe, &data, data_len, &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } i++; } 

要特别回答你的问题: “我如何在c / c ++中异步运行线程的特定函数(主线程应该立即返回)?”

你可以通过解散这两个行为很容易做到这一点。 创建一个线程的工作者池,由它需要与之通信的管道初始化,并写一个函数来安排一个新的工作到这些线程。 与管道写入相比,这将是瞬间的,所有的线程都会得到它的工作,并开始写入它控制的管道,而主线程可以继续工作。

如果您的客户数量不多,这将是一个简单的解决方案。 如果没有,为每个客户端创建一个线程将不会在一个点之后扩展很多,并且您的服务器将被大量的上下文切换和线程争用所压倒。

在这种情况下,对于一个非常的服务器设计,你应该认真思考卡萨布兰卡的解决方案。 它只创建一个线程来侦听完成通知,并且是在Windows 2003中创建服务器的最有效的服务器设计。