例如,说我分配一个新的结构,并将指针写入匿名pipe道的写入结束。
如果我从相应的读取结束读取指针,我是否保证在结构中看到“正确的”内容?
另外值得关注的是,在Windows上,通过tcp loopback的unix&self连接上的socketpair()的结果是否具有相同的保证。
上下文是使用select / epoll集中事件分派的服务器devise
例如,说我分配一个新的结构,并将指针写入匿名管道的写入结束。
如果我从相应的读取结束读取指针,我是否保证在结构中看到“正确的”内容?
不可以。不能保证写入的CPU将写入的缓存写入,并使其可以读取的其他CPU可见。
另外值得关注的是,在Windows上,通过tcp loopback的unix&self连接上的socketpair()的结果是否具有相同的保证。
没有。
实际上,调用系统调用write()
将最终锁定内核中的一个或多个数据结构,这应该处理重新排序问题。 例如,POSIX需要后续的读取才能看到在调用之前写入的数据,这意味着锁定(或某种获取/释放)本身。
至于这是否是正式规范的一部分,可能不是。
一个指针只是一个内存地址,所以假设你在同一个进程上 ,指针在接收线程上是有效的,并且指向同一个结构体。 如果你在不同的进程,最好你会立即得到一个内存错误,更糟的是你会读(或写)到一个随机存储器,这本质上是未定义的行为。
你会阅读正确的内容? 如果您的指针处于两个线程共享的静态变量中,这两者都不会更好也不会更糟:如果要保持一致性,则仍然需要执行一些同步 。
请问在静态内存(由线程共享),匿名管道,套接字对,tcp环回等之间的传输地址的问题? 否:所有这些通道传输字节 ,所以如果你传递一个内存地址,你将得到你的内存地址。 剩下的就是同步,因为在这里你只是分享一个内存地址。
如果你不使用任何其他的同步,任何事情都可能发生(我已经谈到了未定义的行为?):
有趣的问题,迄今为止,只有一个来自Cornstalks的正确答案。
在同一个(多线程)进程中,由于指针和数据沿着不同的路径到达目的地,所以没有保证。 隐式获取/释放保证不适用,因为结构数据不能通过缓存搭载指针,并且正式处理数据竞争。
但是,看看指针和结构数据本身如何到达第二个线程(分别通过管道和内存高速缓存),这个机制实际上不会造成任何伤害。 发送指向对等线程的指针需要3个系统调用(发送线程中的write()
,接收线程中的select()
和read()
),这是相对昂贵的,并且在指针值在接收线程,结构数据可能早就到达了。
请注意,这只是一个观察,机制仍然不正确。
我相信,你的情况可能会减少到这2个线程模型:
int data = 0; std::atomic<int*> atomicPtr{nullptr}; //... void thread1() { data = 42; atomicPtr.store(&integer, std::memory_order_release); } void thread2() { int* ptr = nullptr; while(!ptr) ptr = atomicPtr.load(std::memory_order_consume); assert(*ptr == 42); }
既然你有两个进程,你不能在它们之间使用一个原子变量,但是因为你列出了窗口,所以你可以从消耗部分中省略atomicPtr.load(std::memory_order_consume)
,因为AFAIK所有的Windows运行的体系结构保证这个负载在装载方面没有任何障碍是正确的。 事实上,我认为在那里没有太多的架构,那里的指令不会是一个没有任何操作(我只听到关于DEC Alpha)
我同意Serge Ballesta的回答。 在同一个过程中,通过匿名管道发送和接收对象地址是可行的。
由于当消息大小低于PIPE_BUF
(通常为4096字节)时, write
系统调用保证是原子的,因此多生成器线程不会混淆彼此的对象地址(对于64位应用程序为8字节)。
谈话很便宜,这里是Linux的演示代码(为简单起见,防守代码和错误处理程序被省略)。 只需复制并粘贴到pipe_ipc_demo.cc
然后编译并运行测试。
#include <unistd.h> #include <string.h> #include <pthread.h> #include <string> #include <list> template<class T> class MPSCQ { // pipe based Multi Producer Single Consumer Queue public: MPSCQ(); ~MPSCQ(); int producerPush(const T* t); T* consumerPoll(double timeout = 1.0); private: void _consumeFd(); int _selectFdConsumer(double timeout); T* _popFront(); private: int _fdProducer; int _fdConsumer; char* _consumerBuf; std::string* _partial; std::list<T*>* _list; static const int _PTR_SIZE; static const int _CONSUMER_BUF_SIZE; }; template<class T> const int MPSCQ<T>::_PTR_SIZE = sizeof(void*); template<class T> const int MPSCQ<T>::_CONSUMER_BUF_SIZE = 1024; template<class T> MPSCQ<T>::MPSCQ() : _fdProducer(-1), _fdConsumer(-1) { _consumerBuf = new char[_CONSUMER_BUF_SIZE]; _partial = new std::string; // for holding partial pointer address _list = new std::list<T*>; // unconsumed T* cache int fd_[2]; int r = pipe(fd_); _fdConsumer = fd_[0]; _fdProducer = fd_[1]; } template<class T> MPSCQ<T>::~MPSCQ() { /* omitted */ } template<class T> int MPSCQ<T>::producerPush(const T* t) { return t == NULL ? 0 : write(_fdProducer, &t, _PTR_SIZE); } template<class T> T* MPSCQ<T>::consumerPoll(double timeout) { T* t = _popFront(); if (t != NULL) { return t; } if (_selectFdConsumer(timeout) <= 0) { // timeout or error return NULL; } _consumeFd(); return _popFront(); } template<class T> void MPSCQ<T>::_consumeFd() { memcpy(_consumerBuf, _partial->data(), _partial->length()); ssize_t r = read(_fdConsumer, _consumerBuf, _CONSUMER_BUF_SIZE - _partial->length()); if (r <= 0) { // EOF or error, error handler omitted return; } const char* p = _consumerBuf; int remaining_len_ = _partial->length() + r; T* t; while (remaining_len_ >= _PTR_SIZE) { memcpy(&t, p, _PTR_SIZE); _list->push_back(t); remaining_len_ -= _PTR_SIZE; p += _PTR_SIZE; } *_partial = std::string(p, remaining_len_); } template<class T> int MPSCQ<T>::_selectFdConsumer(double timeout) { int r; int nfds_ = _fdConsumer + 1; fd_set readfds_; struct timeval timeout_; int64_t usec_ = timeout * 1000000.0; while (true) { timeout_.tv_sec = usec_ / 1000000; timeout_.tv_usec = usec_ % 1000000; FD_ZERO(&readfds_); FD_SET(_fdConsumer, &readfds_); r = select(nfds_, &readfds_, NULL, NULL, &timeout_); if (r < 0 && errno == EINTR) { continue; } return r; } } template<class T> T* MPSCQ<T>::_popFront() { if (!_list->empty()) { T* t = _list->front(); _list->pop_front(); return t; } else { return NULL; } } // = = = = = test code below = = = = = #define _LOOP_CNT 5000000 #define _ONE_MILLION 1000000 #define _PRODUCER_THREAD_NUM 2 struct TestMsg { // all public int _threadId; int _msgId; int64_t _val; TestMsg(int thread_id, int msg_id, int64_t val) : _threadId(thread_id), _msgId(msg_id), _val(val) { }; }; static MPSCQ<TestMsg> _QUEUE; static int64_t _SUM = 0; void* functor_producer(void* arg) { int my_thr_id_ = pthread_self(); TestMsg* msg_; for (int i = 0; i <= _LOOP_CNT; ++ i) { if (i == _LOOP_CNT) { msg_ = new TestMsg(my_thr_id_, i, -1); } else { msg_ = new TestMsg(my_thr_id_, i, i + 1); } _QUEUE.producerPush(msg_); } return NULL; } void* functor_consumer(void* arg) { int msg_cnt_ = 0; int stop_cnt_ = 0; TestMsg* msg_; while (true) { if ((msg_ = _QUEUE.consumerPoll()) == NULL) { continue; } int64_t val_ = msg_->_val; delete msg_; if (val_ <= 0) { if ((++ stop_cnt_) >= _PRODUCER_THREAD_NUM) { printf("All done, _SUM=%ld\n", _SUM); break; } } else { _SUM += val_; if ((++ msg_cnt_) % _ONE_MILLION == 0) { printf("msg_cnt_=%d, _SUM=%ld\n", msg_cnt_, _SUM); } } } return NULL; } int main(int argc, char* const* argv) { pthread_t consumer_; pthread_create(&consumer_, NULL, functor_consumer, NULL); pthread_t producers_[_PRODUCER_THREAD_NUM]; for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { pthread_create(&producers_[i], NULL, functor_producer, NULL); } for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { pthread_join(producers_[i], NULL); } pthread_join(consumer_, NULL); return 0; }
这里是测试结果( 2 * sum(1..5000000) == (1 + 5000000) * 5000000 == 25000005000000
):
$ g++ -o pipe_ipc_demo pipe_ipc_demo.cc -lpthread $ ./pipe_ipc_demo ## output may vary except for the final _SUM msg_cnt_=1000000, _SUM=251244261289 msg_cnt_=2000000, _SUM=1000708879236 msg_cnt_=3000000, _SUM=2250159002500 msg_cnt_=4000000, _SUM=4000785160225 msg_cnt_=5000000, _SUM=6251640644676 msg_cnt_=6000000, _SUM=9003167062500 msg_cnt_=7000000, _SUM=12252615629881 msg_cnt_=8000000, _SUM=16002380952516 msg_cnt_=9000000, _SUM=20252025092401 msg_cnt_=10000000, _SUM=25000005000000 All done, _SUM=25000005000000
这里展示的技术被用于我们的生产应用。 一个典型的用法是消费者线程充当日志写入器,而工作者线程几乎可以异步地写入日志消息。 是的, 几乎意味着有时写入线程可能在管道已满时被write()
阻塞,这是OS提供的可靠拥塞控制功能。