如何实现无锁,但阻塞的行为?

我正在为一个密集的networking应用程序实现一个无锁的单生产者单消费者队列。 我有一堆工作线程在他们自己的单独队列中接收工作,然后他们出队和处理。

从这些队列中移除locking已经极大地提高了高负载下的性能, 但是当队列空时不再阻塞 ,这又导致CPU使用率急剧上升。

我怎样才能有效地导致一个线程阻塞,直到它可以成功地出队或被杀死/中断?

如果你在Linux上,使用Futex来看看 。 它通过使用原子操作而不是像互斥体那样的内核调用来提供非锁定实现的性能,但是如果您需要将进程设置为空闲,因为某些条件不成立(即锁争用),它将然后进行适当的内核调用,让进程进入休眠状态,并在以后的事件中将其唤醒。 它基本上就像一个非常快的信号量。

在Linux上, futex可以用来阻塞一个线程。 但要知道, 互斥是棘手的 !

更新:条件变量比futexes更安全,更便携。 然而,一个条件变量与一个互斥体结合使用,所以严格来说结果将不再是无锁的。 但是,如果您的主要目标是性能(而不是全球进步的保证),并且锁定的部分(即线程唤醒后检查的条件)很小,则可能会发生无需进入即可获得满意的结果将futexes整合到算法中的细微之处。

如果您使用的是Windows,则无法使用futex,但Windows Vista有类似的机制,称为键控事件 。 不幸的是,这不是发布的API(它是NTDLL本地API)的一部分,但只要您接受在未来版本的Windows中可能更改的警告,就可以使用它(并且不需要运行Vista之前的内核)。 一定要阅读我上面链接的文章。 这是一个未经测试的草图,它可能如何工作:

 /* Interlocked SList queue using keyed event signaling */ struct queue { SLIST_HEADER slist; // Note: Multiple queues can (and should) share a keyed event handle HANDLE keyed_event; // Initial value: 0 // Prior to blocking, the queue_pop function increments this to 1, then // rechecks the queue. If it finds an item, it attempts to compxchg back to // 0; if this fails, then it's racing with a push, and has to block LONG block_flag; }; void init_queue(queue *qPtr) { NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0); InitializeSListHead(&qPtr->slist); qPtr->blocking = 0; } void queue_push(queue *qPtr, SLIST_ENTRY *entry) { InterlockedPushEntrySList(&qPtr->slist, entry); // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we // have committed to a keyed-event handshake LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1); if (oldv) { NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL); } } SLIST_ENTRY *queue_pop(queue *qPtr) { SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist); if (entry) return entry; // fast path // Transition block flag 0 -> 1. We must recheck the queue after this point // in case we race with queue_push; however since ReleaseKeyedEvent // blocks until it is matched up with a wait, we must perform the wait if // queue_push sees us LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0); assert(oldv == 0); entry = InterlockedPopEntrySList(&qPtr->slist); if (entry) { // Try to abort oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1); if (oldv == 1) return entry; // nobody saw us, we can just exit with the value } // Either we don't have an entry, or we are forced to wait because // queue_push saw our block flag. So do the wait NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL); // block_flag has been reset by queue_push if (!entry) entry = InterlockedPopEntrySList(&qPtr->slist); assert(entry); return entry; } 

您也可以使用Slim Read Write锁和条件变量的类似协议,并使用无锁快速路径。 这些是关键事件的封装,所以它们可能比直接使用键控事件承担更多的开销。

你有没有尝试有条件等待? 当队列变空时,开始等待新的工作。 将作业放入队列中的线程应该触发信号。 这样你只能在队列为空时使用锁。

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

您可以通过使用sigwait()函数使线程进入睡眠状态。 你可以用pthread_kill唤醒线程。 这比条件变量快得多。

等待的时候可以加入睡眠。 只要选择你想要的最大的等待,然后做这样的事情(伪代码,因为我不记得pthread语法):

 WAIT_TIME = 100; // Set this to whatever you're happy with while(loop_condition) { thing = get_from_queue() if(thing == null) { sleep(WAIT_TIME); } else { handle(thing); } } 

即使像100 ms睡眠这样的短时间内,也会显着降低CPU使用率。 我不确定在什么情况下,上下文切换会比忙于等待更糟糕。