我想产生线程来执行某些任务,并使用线程安全队列与他们进行通信。 在等待的时候,我也想对各种文件描述符进行IO操作。
推荐的方法是什么? 当队列从没有元素到某些元素时,我是否必须创build一个线程间pipe道并写入它? 没有更好的方法吗?
如果我必须创build线程间pipe道,为什么没有实现共享队列的库允许您创build共享队列和线程间pipe道作为单个实体?
我想这么做是否意味着一个基本的devise缺陷?
我正在问这个关于C ++和Python的问题。 我对一个跨平台的解决scheme很感兴趣,但主要对Linux感兴趣。
更具体的例子
我有一些代码将在文件系统树中search的东西。 我有几个通过sockets向外界开放的沟通渠道。 可能(或不可能)导致需要search文件系统树中的东西的请求将到达。
我将隔离在一个或多个线程中search文件系统树中的东西的代码。 我想要请求导致需要search树,并把它们放在一个线程安全的队列中,由search器线程完成。 结果将被放入已完成search的队列中。
我希望能够在search过程中快速处理所有的非search请求。 我希望能够及时处理search结果。
处理传入的请求通常意味着某种使用epoll
的事件驱动架构。 磁盘search请求队列和结果返回队列将暗示使用互斥或信号量来实现线程安全的线程安全队列。
等待空队列的标准方法是使用条件variables。 但是如果我在等待的时候需要处理其他的请求,那将是行不通的。 要么我总是轮询结果队列(并且平均延迟轮询间隔的一半),阻塞和不处理请求。
每当使用事件驱动架构时,都需要一个机制来报告事件完成。 在Linux上,如果使用的是文件,则需要使用选择或轮询族中的一些内容,即使用管道来启动所有与文件无关的事件。
编辑 :Linux有eventfd和timerfd 。 这些可以被添加到您的epoll
列表中,并用于分别从另一个线程或定时器事件触发时突破epoll_wait
。
还有另一个选择,那就是信号。 可以使用fcntl
修改文件描述符,以便在文件描述符变为活动状态时发出信号。 信号处理程序然后可以将文件就绪消息推送到您选择的任何类型的队列中。 这可能是一个简单的信号量或互斥体/ condvar驱动队列。 由于现在不再使用select
/ poll
,因此不再需要使用管道排列没有基于文件的消息。
健康警告:我还没有尝试过,尽管我不明白为什么它不起作用,但我并不真正了解signal
方法的性能影响。
我已经解决了这个确切的问题,使用你提到的,pipe()和libevent(包装epoll)。 当其输出队列从空到非空时,工作线程向其管道FD写入一个字节。 这唤醒了主IO线程,然后可以获取工作线程的输出。 这个伟大的工程其实很简单的代码。
你有Linux的标签,所以我打算抛出这个: POSIX消息队列做这一切,这应该履行你的“内置”的要求,如果不是你不太想要的跨平台的愿望。
线程安全同步是内置的。 您可以让您的工作线程在读取队列时阻塞。 或者,当有新项目放入队列时,MQ可以使用mq_notify()来产生一个新的线程(或者发出一个现有的线程)。 而且由于看起来您将要使用select(),所以MQ的标识符(mqd_t)可以用作带有select的文件描述符。
在我看来,Duck's和Twk's实际上比Doron's(OP所选择的)要好。 Doron建议从信号处理程序的上下文中写入消息队列,并指出消息队列可以是“任何类型的队列”。 我强烈建议您不要这样做,因为许多C库/系统调用不能从信号处理程序中安全地调用(请参阅async-signal-safe )。
特别是,如果您选择受互斥锁保护的队列,则不应该从信号处理程序访问它。 考虑这种情况:消费者线程锁定队列以读取它。 紧接着,内核传递信号通知你一个文件描述符现在有数据。 你发信号处理程序在消费者线程中运行,必然),并试图把你的队列中的东西。 要做到这一点,首先要取得锁定。 但它已经锁定了,所以你现在已经陷入僵局。
根据我的经验,选择/轮询是UNIX / Linux中事件驱动程序唯一可行的解决方案。 我希望在一个多线程程序中有一个更好的方法,但是你需要一些机制来“唤醒”你的消费者线程。 我还没有找到一个不涉及系统调用的方法(因为在任何阻塞调用(例如select)期间,消费者线程在内核中等待队列)。
编辑:我忘记提及一个Linux特定的方式来处理信号时使用select / poll: signalfd(2) 。 你得到一个文件描述符,你可以选择/轮询,你处理的代码正常运行,而不是在信号处理程序的上下文中。
似乎没有人提到这个选择呢:
不要运行select
/ poll
/ etc。 在你的“主线程”中。 启动一个专用的辅助线程,执行I / O并在I / O操作完成时将通知推入线程安全队列(与其他线程用于与主线程通信的同一队列)。
那么你的主线程只需要等待通知队列。
这是一个非常常见的问题,特别是在开发网络服务器端程序的时候。 大多数Linux服务器端程序的主要外观将如下循环:
epoll_add(serv_sock); while(1){ ret = epoll_wait(); foreach(ret as fd){ req = fd.read(); resp = proc(req); fd.send(resp); } }
它是单线程(主线程),基于epoll的服务器框架。 问题是,它是单线程的,不是多线程的。 它要求proc()永远不应该阻塞或运行一段很长的时间(比如对于一般情况来说,就是10毫秒)。
如果proc()会运行很长时间,我们需要多线程,并在一个单独的线程(工作线程)中执行proc()。
我们可以使用基于互斥锁的消息队列将任务提交给工作线程而不阻塞主线程,这足够快。
epoll_add(serv_sock); while(1){ ret = epoll_wait(); foreach(ret as fd){ req = fd.read(); queue.add_job(req); // fast, non blockable } }
然后我们需要一种方法来从工作者线程获得任务结果。 怎么样? 如果我们直接在epoll_wait()之前或之后检查消息队列。
epoll_add(serv_sock); while(1){ ret = epoll_wait(); // may blocks for 10ms resp = queue.check_result(); // fast, non blockable foreach(ret as fd){ req = fd.read(); queue.add_job(req); // fast, non blockable } }
但是,检查操作将在epoll_wait()结束后执行,如果等待的所有文件描述符都不活动,则epoll_wait()通常会阻塞10微秒(常见情况)。
对于一台服务器来说,10毫秒是相当长的时间! 当任务结果产生时,我们可以通过信号epoll_wait()立即结束吗?
是! 我将在我的一个开源项目中描述它是如何完成的:
为所有工作线程创建一个管道,epoll也在该管道上等待。 一旦生成任务结果,工作线程将一个字节写入管道,然后epoll_wait()几乎在同一时间结束! – Linux管道有5 us到20 us的延迟。
在我的项目SSDB (Redis协议兼容的磁盘NoSQL数据库)中,我创建了一个SelectableQueue来在主线程和工作线程之间传递消息。 就像它的名字一样,SelectableQueue有一个文件描述符,可以由epoll等待。
SelectableQueue: https : //github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94
主线程中的用法:
epoll_add(serv_sock); epoll_add(queue->fd()); while(1){ ret = epoll_wait(); foreach(ret as fd){ if(fd is queue){ sock, resp = queue->pop_result(); sock.send(resp); } if(fd is client_socket){ req = fd.read(); queue->add_task(fd, req); } } }
工作线程中的用法:
fd, req = queue->pop_task(); resp = proc(req); queue->add_result(fd, resp);
完成你想要做的一个方法是通过实现观察者模式
你可以把你的主线程注册为一个观察者,并把它们的所有派生线程都注册下来,让他们在做完他们应该做的事情(或者在运行期间用你需要的信息更新)时通知它。
基本上,你想改变你的方法,以事件驱动模型。
C ++ 11有std :: mutex和std :: condition_variable。 当满足一定的条件时,可以使用两个线程信号。 这听起来像你将需要建立你的解决方案出这些原语。 如果你的环境还不支持这些C ++ 11库的功能,你可以找到非常类似的提升。 对不起,不能多说python。