线程等待父

我为我的ubuntu服务器(为我的多客户端匿名聊天程序)实现了一个简单的线程池机制,并且我需要让我的工作线程hibernate,直到需要执行一个工作(以函数指针和参数的forms) 。

我目前的系统正在走出窗口。 我(工作者线程)询问经理是否有工作,如果没有5ms的睡眠。 如果存在,则将该作业添加到工作队列并运行该function。 糟糕的浪费周期。

我想要做的是制作一个简单的类似事件的系统。 我正在考虑有一个互斥体向量(每个工作人员一个),并在创build时拥有作为参数传入的互斥体的句柄。 然后在我的pipe理员类(它持有和分发作业),每当一个线程被创build,locking互斥。 当需要执行任务时,解锁下一个互斥锁,等待它被locking和解锁,然后重新locking。 不过,我想知道是否有更好的方法来达到这个目的。


tldr; 所以我的问题是这个。 什么是最有效,最安全的方法来让一个线程等待一个pipe理类的工作? 是投票技术,我甚至应该考虑(超过1000个客户在同一时间),是互斥锁体面吗? 或者还有其他技术?

Solutions Collecting From Web of "线程等待父"

你需要的是条件变量。
所有的工作线程调用wait()会挂起它们。

父线程然后将一个工作项放在队列上,并调用条件变量上的信号。 这将唤醒一个正在睡觉的线程。 它可以从队列中删除作业执行作业然后调用等待条件变量返回到休眠状态。

尝试:

#include <pthread.h> #include <memory> #include <list> // Use RAII to do the lock/unlock struct MutexLock { MutexLock(pthread_mutex_t& m) : mutex(m) { pthread_mutex_lock(&mutex); } ~MutexLock() { pthread_mutex_unlock(&mutex); } private: pthread_mutex_t& mutex; }; // The base class of all work we want to do. struct Job { virtual void doWork() = 0; }; // pthreads is a C library the call back must be a C function. extern "C" void* threadPoolThreadStart(void*); // The very basre minimal part of a thread pool // It does not create the workers. You need to create the work threads // then make them call workerStart(). I leave that as an exercise for you. class ThreadPool { public: ThreadPool(unsigned int threadCount=1); ~ThreadPool(); void addWork(std::auto_ptr<Job> job); private: friend void* threadPoolThreadStart(void*); void workerStart(); std::auto_ptr<Job> getJob(); bool finished; // Threads will re-wait while this is true. pthread_mutex_t mutex; // A lock so that we can sequence accesses. pthread_cond_t cond; // The condition variable that is used to hold worker threads. std::list<Job*> workQueue; // A queue of jobs. std::vector<pthread_t>threads; }; // Create the thread pool ThreadPool::ThreadPool(int unsigned threadCount) : finished(false) , threads(threadCount) { // If we fail creating either pthread object than throw a fit. if (pthread_mutex_init(&mutex, NULL) != 0) { throw int(1); } if (pthread_cond_init(&cond, NULL) != 0) { pthread_mutex_destroy(&mutex); throw int(2); } for(unsigned int loop=0; loop < threadCount;++loop) { if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0) { // One thread failed: clean up for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill) { pthread_kill(threads[kill], 9); } throw int(3); } } } // Cleanup any left overs. // Note. This does not deal with worker threads. // You need to add a method to flush all worker threads // out of this pobject before you let the destructor destroy it. ThreadPool::~ThreadPool() { finished = true; for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) { // Send enough signals to free all threads. pthread_cond_signal(&cond); } for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) { // Wait for all threads to exit (they will as finished is true and // we sent enough signals to make sure // they are running). void* result; pthread_join(*loop, &result); } // Destroy the pthread objects. pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); // Delete all re-maining jobs. // Notice how we took ownership of the jobs. for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop) { delete *loop; } } // Add a new job to the queue // Signal the condition variable. This will flush a waiting worker // otherwise the job will wait for a worker to finish processing its current job. void ThreadPool::addWork(std::auto_ptr<Job> job) { MutexLock lock(mutex); workQueue.push_back(job.release()); pthread_cond_signal(&cond); } // Start a thread. // Make sure no exceptions escape as that is bad. void* threadPoolThreadStart(void* data) { ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart); try { pool->workerStart(); } catch(...){} return NULL; } // This is the main worker loop. void ThreadPool::workerStart() { while(!finished) { std::auto_ptr<Job> job = getJob(); if (job.get() != NULL) { job->doWork(); } } } // The workers come here to get a job. // If there are non in the queue they are suspended waiting on cond // until a new job is added above. std::auto_ptr<Job> ThreadPool::getJob() { MutexLock lock(mutex); while((workQueue.empty()) && (!finished)) { pthread_cond_wait(&cond, &mutex); // The wait releases the mutex lock and suspends the thread (until a signal). // When a thread wakes up it is help until it can acquire the mutex so when we // get here the mutex is again locked. // // Note: You must use while() here. This is because of the situation. // Two workers: Worker A processing job A. // Worker B suspended on condition variable. // Parent adds a new job and calls signal. // This wakes up thread B. But it is possible for Worker A to finish its // work and lock the mutex before the Worker B is released from the above call. // // If that happens then Worker A will see that the queue is not empty // and grab the work item in the queue and start processing. Worker B will // then lock the mutext and proceed here. If the above is not a while then // it would try and remove an item from an empty queue. With a while it sees // that the queue is empty and re-suspends on the condition variable above. } std::auto_ptr<Job> result; if (!finished) { result.reset(workQueue.front()); workQueue.pop_front(); } return result; } 

经典的生产者 – 消费者与多个消费者的同步(工作者线程消耗工作请求)。 众所周知的技术是有一个信号量,每个工作线程down() ,每当你有一个工作请求,做up() 。 比从互斥锁的工作队列中选择请求。 由于一个up()只会唤醒一个down() ,所以实际上互斥体上的争用最少。

或者你可以用条件变量做同样的事情,在每个线程中等待,当你有工作时唤醒它。 队列本身仍然与互斥体锁定(condvar无论如何需要一个)。

最后我不完全确定,但我实际上认为你实际上可以使用管道作为包括所有同步的队列(工作线程只是试图“读取(sizeof(request))”)。 有点哈克,但导致更少的上下文切换。

通常的做法是有一个工作正常的队列,一个保护队列的互斥mutex ,以及一个等待队列queue_not_empty 。 然后,每个工作线程执行以下操作(使用伪API):

 while (true) { Work * work = 0; mutex.lock(); while ( queue.empty() ) if ( !queue_not_empty.wait( &mutex, timeout ) ) return; // timeout - exit the worker thread work = queue.front(); queue.pop_front(); mutex.unlock(); work->perform(); } 

wait( &mutex, timeout )呼叫阻塞,直到等待条件发出信号或呼叫超时。 传入的mutexwait()内原子解锁,并在从呼叫返回之前再次锁定,以向所有参与者提供队列的一致视图。 timeout将被选择为相当大的(秒),并且会导致线程退出(如果有更多的工作进入,线程池将开始新的线程)。

与此同时,线程池的工作插入功能是这样做的:

 Work * work = ...; mutex.lock(); queue.push_back( work ); if ( worker.empty() ) start_a_new_worker(); queue_not_empty.wake_one(); mutex.unlock(); 

由于网络聊天程序可能是I / O绑定而不是CPU绑定的,因此您并不需要线程。 您可以使用诸如Boost.Asio或GLib主循环等工具在单个线程中处理所有I / O。 这些是对特定于平台的功能的可移植抽象,允许程序阻止任何 (可能大的)打开的文件或套接字集合上的活动,然后在活动发生时立即唤醒并作出响应。

最简单的方法是semaphores 。 这是一个信号量如何工作:

信号量基本上是一个采用空值/正值的变量。 进程可以通过两种方式与它交互:增加或减少信号量。

增加信号量为这个神奇的变量增加1 ,就是这样。 减少计数,事情变得有趣 :如果计数到零,一个进程再次降低它,因为它不能取负值,它会阻塞,直到变量上升

如果多个进程块正在等待减小信号值,则每个单元只有一个被唤醒,计数增加。

这很容易创建一个工作/任务系统:你的经理进程排队任务,并增加信号量的价值来匹配剩余的项目,你的工作进程试图减少计数和不断获取任务。 当没有可用的任务时,它们将阻塞,并且不消耗CPU时间。 当出现时,只有一个休眠过程会醒来。 Insta-sync魔术。

不幸的是,至少在Unix世界中,信号量API不是很友好,因为它由于某种原因而处理sempahores数组而不是单个数组。 但是,你是一个简单的包装,远离一个漂亮的界面!

干杯!