multithreading单读写器单写FIFO队列

我需要一个队列来传递消息从一个线程(A)到另一个(B),但是我找不到一个真正做我想做的事情,因为他们通常允许添加一个项目失败,在我的情况下因为消息需要处理,所以非常致命,线程真的不能停下来等待空闲的空间。

  • 只有线程A添加项目,只有线程B读取它们
  • 线程A不能阻塞,但是线程B不是性能关键的,所以它可以
  • 添加项目必须总是成功,所以队列不能有一个大小上限(系统内存不足)
  • 如果队列是空的,线程B应该等待,直到有一个项目要处理

以下是如何在C ++中编写一个无锁队列:

http://www.ddj.com/hpc-high-performance-computing/210604448

但是当你说“线程A不能阻塞”时,你确定这是要求吗? Windows不是实时操作系统(在正常使用情况下,也不是Linux)。 如果您希望线程A能够使用所有可用的系统内存,则需要分配内存(或等待别人执行)。 如果读写器为了操纵列表而使用进程内锁(即非共享互斥锁),操作系统本身不能提供更好的定时保证。 而添加消息的最坏情况将不得不去OS得到内存。

简而言之,那些你不喜欢的队列有一个固定的容量是有原因的,这样他们就不必在低延迟线程中分配内存。

所以无锁的代码一般会少一些block-y,但是由于内存的分配,并不能保证是这样的,并且互斥体的性能不应该是那么简陋,除非你有一个真正巨大的事件流过程(例如,您正在编写网络驱动程序,并且这些消息是传入的以太网数据包)。

所以,在伪代码中,我会尝试的第一件事情是:

Writer: allocate message and fill it in acquire lock append node to intrusive list signal condition variable release lock Reader: for(;;) acquire lock for(;;) if there's a node remove it break else wait on condition variable endif endfor release lock process message free message endfor 

只有当这证明在编写器线程中引入了不可接受的延迟时,我才会去无锁的代码,除非我碰巧有一个合适的队列已经躺在那里。

Visual Studio 2010增加了2个支持这种场景的新库,即异步代理库和并行模式库。

代理程序库具有支持或异步消息传递,并包含用于将消息发送到“目标”和从“来源”接收消息的消息块

unbounded_buffer是一个模板类,它提供了我相信你正在寻找的东西:

 #include <agents.h> #include <ppl.h> #include <iostream> using namespace ::Concurrency; using namespace ::std; int main() { //to hold our messages, the buffer is unbounded... unbounded_buffer<int> buf1; task_group tasks; //thread 1 sends messages to the unbounded_buffer //without blocking tasks.run([&buf1](){ for(int i = 0 ; i < 10000; ++i) send(&buf1,i) //signal exit send(&buf1,-1); }); //thread 2 receives messages and blocks if there are none tasks.run([&buf1](){ int result; while(result = receive(&buf1)!=-1) { cout << "I got a " << result << endl; } }); //wait for the threads to end tasks.wait(); } 
  • 为什么不使用STL < list >或< deque >带有互斥体的add / remove? STL的线程安全性是否不足?

  • 为什么不创建自己的(单/双)链接列表节点类,其中包含一个指针,并添加/删除项目继承? 因此不需要额外的分配。 你只需要在threadA::add()threadB::remove() threadA::add()几个指针就可以了。 (虽然你想在互斥量下做到这一点,但除非你真的做了错误的事情,否则对threadA的阻塞效应是微不足道的)

  • 如果您正在使用pthreads,请查看sem_post()sem_wait() 。 这个想法是,threadB可以通过sem_wait()无限期地sem_wait()直到threadA把某些东西放在队列中。 然后threadA调用sem_post() 。 哪一个醒来threadB做它的工作。 之后threadB可以回去睡觉。 这是一种处理异步信号的有效方式,在threadB::remove()完成前支持多个threadA::add()之类的threadB::remove()

你可能想考虑你的要求 – 是不是真的是A不能丢弃任何队列项目? 或者你是不是希望B将两个连续的元素从队列中拉出来,而不是连续的项目进入,因为这会以某种方式歪曲一系列的事件?

例如,如果这是某种数据记录系统,那么您(可以理解)不会在记录中留下空白 – 但是没有无限的记忆,现实情况是在某个角落的情况下,您可能会在某个地方超出排队容量。 。

在这种情况下,一个解决方案是有一些可以放入队列的特殊元素,这代表了发现必须放弃项目的情况。 基本上你保留一个额外的元素,大部分时间是空的。 每当A去往队列中添加元素时,如果这个额外的元素不为空,就进入队列。如果A发现队列中没有空间,那么它配置这个额外的元素来表示“嘿,队列已满” 。

这样,A永远不会阻塞,当系统非常繁忙时,可以删除元素,但是您不会忽略元素被丢弃的事实,因为只要队列空间变得可用,这个标记就进入指示数据的位置发生下降。 过程B然后做它需要做的任何事情,当它发现它已经拉出这个溢出标记元素的队列。