(简而言之,main()的WaitForSingleObject挂在下面的程序中)。
我正在尝试写一段代码来分派线程并等待它们在恢复之前完成。 而不是每次创造线程,这是昂贵的,我把他们睡觉。 主线程以CREATE_SUSPENDED状态创buildX个线程。
同步使用X作为MaximumCount的信号量完成。 信号量的计数器被置零,线程被调度。 他们在进入睡眠之前执行一些愚蠢的循环并调用ReleaseSemaphore。 然后,主线程使用WaitForSingleObject X来确保每个线程完成其工作并正在hibernate。 然后它循环,并再次做到这一切。
有时程序不会退出。 当我喙的程序,我可以看到WaitForSingleObject挂起。 这意味着线程的ReleaseSemaphore不起作用。 没有什么是printf'ed所以没有出错。
也许两个线程不应该在同一时间调用ReleaseSemaphore,但是这将使信号量的作用无效。
我只是不喜欢它…
其他解决scheme同步线程感激地接受!
#define TRY 100 #define LOOP 100 HANDLE *ids; HANDLE semaphore; DWORD WINAPI Count(__in LPVOID lpParameter) { float x = 1.0f; while(1) { for (int i=1 ; i<LOOP ; i++) x = sqrt((float)i*x); while (ReleaseSemaphore(semaphore,1,NULL) == FALSE) printf(" ReleaseSemaphore error : %d ", GetLastError()); SuspendThread(ids[(int) lpParameter]); } return (DWORD)(int)x; } int main() { SYSTEM_INFO sysinfo; GetSystemInfo( &sysinfo ); int numCPU = sysinfo.dwNumberOfProcessors; semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL); ids = new HANDLE[numCPU]; for (int j=0 ; j<numCPU ; j++) ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL); for (int j=0 ; j<TRY ; j++) { for (int i=0 ; i<numCPU ; i++) { if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) printf("Timed out !!!\n"); ResumeThread(ids[i]); } for (int i=0 ; i<numCPU ; i++) WaitForSingleObject(semaphore,INFINITE); ReleaseSemaphore(semaphore,numCPU,NULL); } CloseHandle(semaphore); printf("Done\n"); getc(stdin); }
在以下情况下会发生问题:
主线程恢复工作线程:
for (int i=0 ; i<numCPU ; i++) { if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) printf("Timed out !!!\n"); ResumeThread(ids[i]); }
工作线程完成他们的工作并释放信号量:
for (int i=1 ; i<LOOP ; i++) x = sqrt((float)i*x); while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
主线程等待所有工作线程并重置信号量:
for (int i=0 ; i<numCPU ; i++) WaitForSingleObject(semaphore,INFINITE); ReleaseSemaphore(semaphore,numCPU,NULL);
主线程进入下一轮,试图恢复工作线程(请注意,工作线程没有事件暂停自己呢!这是问题开始的地方…你正在尝试恢复不一定被挂起的线程然而):
for (int i=0 ; i<numCPU ; i++) { if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) printf("Timed out !!!\n"); ResumeThread(ids[i]); }
最后工作者线程暂停(虽然他们应该已经开始下一轮):
SuspendThread(ids[(int) lpParameter]);
主线程因为所有的工作人员都被暂停而永远等待:
for (int i=0 ; i<numCPU ; i++) WaitForSingleObject(semaphore,INFINITE);
这里是一个链接,显示如何正确解决生产者/消费者问题:
http://en.wikipedia.org/wiki/Producer-consumer_problem
我也认为关键部分比信号量和互斥体要快得多。 在大多数情况下,他们也更容易理解(imo)。
而不是使用信号量(至少直接)或主要明确唤醒线程来完成一些工作,我一直使用线程安全队列。 当main想要一个工作线程去做某事的时候,它会把这个工作的描述推到队列上。 工作者线程每个都只是做一个工作,然后尝试从队列中弹出另一个工作,并最终挂起,直到队列中有工作要做:
队列的代码如下所示:
#ifndef QUEUE_H_INCLUDED #define QUEUE_H_INCLUDED #include <windows.h> template<class T, unsigned max = 256> class queue { HANDLE space_avail; // at least one slot empty HANDLE data_avail; // at least one slot full CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos T buffer[max]; long in_pos, out_pos; public: queue() : in_pos(0), out_pos(0) { space_avail = CreateSemaphore(NULL, max, max, NULL); data_avail = CreateSemaphore(NULL, 0, max, NULL); InitializeCriticalSection(&mutex); } void push(T data) { WaitForSingleObject(space_avail, INFINITE); EnterCriticalSection(&mutex); buffer[in_pos] = data; in_pos = (in_pos + 1) % max; LeaveCriticalSection(&mutex); ReleaseSemaphore(data_avail, 1, NULL); } T pop() { WaitForSingleObject(data_avail,INFINITE); EnterCriticalSection(&mutex); T retval = buffer[out_pos]; out_pos = (out_pos + 1) % max; LeaveCriticalSection(&mutex); ReleaseSemaphore(space_avail, 1, NULL); return retval; } ~queue() { DeleteCriticalSection(&mutex); CloseHandle(data_avail); CloseHandle(space_avail); } }; #endif
在线程中使用它的大致相当于你的代码看起来像这样。 我并没有完全弄清楚你的线程函数在做什么,但它是用平方根求和的东西,显然你对线程同步的兴趣比线程实际上做的要好。
编辑:(根据评论):如果你需要main()
等待一些任务完成,做更多的工作,然后分配更多的任务,通常最好的办法是把事件(例如)放入每个任务中,并让你的线程函数设置事件。 修改后的代码看起来像这样(请注意,队列代码不受影响):
#include "queue.hpp" #include <iostream> #include <process.h> #include <math.h> #include <vector> struct task { int val; HANDLE e; task() : e(CreateEvent(NULL, 0, 0, NULL)) { } task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {} }; void process(void *p) { queue<task> &q = *static_cast<queue<task> *>(p); task t; while ( -1 != (t=q.pop()).val) { std::cout << t.val << "\n"; SetEvent(te); } } int main() { queue<task> jobs; enum { thread_count = 4 }; enum { task_count = 10 }; std::vector<HANDLE> threads; std::vector<HANDLE> events; std::cout << "Creating thread pool" << std::endl; for (int t=0; t<thread_count; ++t) threads.push_back((HANDLE)_beginthread(process, 0, &jobs)); std::cout << "Thread pool Waiting" << std::endl; std::cout << "First round of tasks" << std::endl; for (int i=0; i<task_count; ++i) { task t(i+1); events.push_back(te); jobs.push(t); } WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE); events.clear(); std::cout << "Second round of tasks" << std::endl; for (int i=0; i<task_count; ++i) { task t(i+20); events.push_back(te); jobs.push(t); } WaitForMultipleObjects(events.size(), &events[0], true, INFINITE); events.clear(); for (int j=0; j<thread_count; ++j) jobs.push(-1); WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE); return 0; }
我不明白的代码,但线程同步肯定是坏的。 你假设线程将以特定的顺序调用SuspendThread()。 一个成功的WaitForSingleObject()调用不会告诉你哪个线程叫ReleaseSemaphore()。 因此,您将在未挂起的线程上调用ReleaseThread()。 这很快就使程序陷入僵局。
另一个不好的假设是在WFSO返回后,一个已经调用SuspendThread的线程。 通常是的,并不总是。 线程可以在RS调用之后立即被抢占。 您将再次在未挂起的线程上调用ReleaseThread()。 这通常需要一天左右的时间来锁定你的程序。
我觉得有一个ReleaseSemaphore调用太多了。 试图解开它,毫无疑问。
你不能用Suspend / ReleaseThread()来控制线程,不要尝试。
问题在于你等待的信号比你信号更频繁。
for (int j=0 ; j<TRY ; j++)
循环等待信号八次,而四个线程每次只发信号一次,循环本身发信号一次。 第一次循环,这不是一个问题,因为信号量的初始值是四。 第二次和以后的每一次,你都在等待太多的信号。 这可以通过在前四个等待时间限制时间并且不重试错误的事实来减轻。 所以有时候可能会起作用,有时候你会等待。
我认为以下(未经测试)的更改将有所帮助。
初始化信号量为零计数:
semaphore = CreateSemaphore(NULL, 0, numCPU, NULL);
摆脱线程恢复循环中的等待(即删除以下内容):
if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) printf("Timed out !!!\n");
从try循环末尾删除无用信号(即删除以下内容):
ReleaseSemaphore(semaphore,numCPU,NULL);
这是一个实用的解决方案。
我希望我的主程序使用线程(然后使用多个核心)来完成作业,并等待所有线程完成,然后再继续执行其他任务。 我不想让线程死亡并创建新的线程,因为这很慢。 在我的问题中,我试图通过暂停线程来做到这一点,这似乎很自然。 但是正如nobugz所指出的:“你可以用Suspend / ReleaseThread()来控制线程。
解决方案涉及到像我用来控制线程的信号量。 实际上再用一个信号量来控制主线程。 现在我有一个信号每个线程来控制线程和一个信号来控制主。
这是解决方案:
#include <windows.h> #include <stdio.h> #include <math.h> #include <process.h> #define TRY 500000 #define LOOP 100 HANDLE *ids; HANDLE *semaphores; HANDLE allThreadsSemaphore; DWORD WINAPI Count(__in LPVOID lpParameter) { float x = 1.0f; while(1) { WaitForSingleObject(semaphores[(int)lpParameter],INFINITE); for (int i=1 ; i<LOOP ; i++) x = sqrt((float)i*x+rand()); ReleaseSemaphore(allThreadsSemaphore,1,NULL); } return (DWORD)(int)x; } int main() { SYSTEM_INFO sysinfo; GetSystemInfo( &sysinfo ); int numCPU = sysinfo.dwNumberOfProcessors; ids = new HANDLE[numCPU]; semaphores = new HANDLE[numCPU]; for (int j=0 ; j<numCPU ; j++) { ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL); // Threads blocked until main releases them one by one semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL); } // Blocks main until threads finish allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL); for (int j=0 ; j<TRY ; j++) { for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs ReleaseSemaphore(semaphores[i],1,NULL); for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish WaitForSingleObject(allThreadsSemaphore,INFINITE); } for (int j=0 ; j<numCPU ; j++) CloseHandle(semaphores[j]); CloseHandle(allThreadsSemaphore); printf("Done\n"); getc(stdin); }