跨进程内存障碍

我正在使用内存映射文件进行跨进程数据共享。

我有两个进程,一个写入数据块和一个或多个读取这些块的其他进程。 为了让读者知道一个块是否准备好了,我正在写两个“标签”值,一个在开始,一个在每个块的末尾,表示已经准备就绪。

它看起来像这样:

注:在这个例子中,我不包括读者进程可以寻求到前面的块的事实。

static const int32_t START_TAG = 0xFAFAFAFA; static const int32_t END_TAG = 0x06060606; void writer_process(int32_t* memory_mapped_file_ptr) { auto ptr = memory_mapped_file_ptr; while (true) { std::vector<int32_t> chunk = generate_chunk(); std::copy(ptr + 2, chunk.begin(), chunk.end()); // We are done writing. Write the tags. *ptr = START_TAG; ptr += 1; *ptr = chunk.size(); ptr += 1 + chunk.size(); *ptr = END_TAG; ptr += 1; } } void reader_process(int32_t* memory_mapped_file_ptr) { auto ptr = memory_mapped_file_ptr; while (true) { auto ptr2 = ptr; std::this_thread::sleep_for(std::chrono::milliseconds(20)); if (*ptr2 != START_TAG) continue; ptr2 += 1; auto len = *ptr2; ptr2 += 1; if (*(ptr2 + len) != END_TAG) continue; std::vector<int32_t> chunk(ptr2, ptr2 + len); process_chunk(chunk); } } 

迄今为止这种作品。 但在我看来,这是一个非常糟糕的主意,并可能导致各种奇怪的错误由于caching行为。

有没有更好的方法来实现这一目标?

我看过:

  • 消息队列:效率低下,仅适用于单个阅读器。 我也无法寻求前面的块。

  • 互斥体:不知道如何locking当前块而不是整个内存。 我不能为每个可能的块(特别是他们有dynamic大小)有一个互斥体。 我已经考虑将内存分成一个互斥块,但由于在写入和读取之间发生延迟,这对我来说不起作用。

正如其他人所提到的,您需要有一些内存屏障来确保多个处理器(和进程)之间的事情正确同步。

我建议你改变你的方案,定义一组当前可用的条目,并使用互锁增量,只要有新的条目可用。

http://msdn.microsoft.com/en-us/library/windows/desktop/ms683614%28v=vs.85%29.aspx

我建议的结构是这样的,所以你可以实际达到你想要的,并迅速做到这一点:

 // at the very start, the number of buffers you might have total uint32_t m_size; // if you know the max. number maybe use a const instead... // then m_size structures, one per buffer: uint32_t m_offset0; // offset to your data uint32_t m_size0; // size of that buffer uint32_t m_busy0; // whether someone is working on the buffer uint32_t m_offset1; uint32_t m_size1; uint32_t m_busy1; ... uint32_t m_offsetN; uint32_t m_sizeN; uint32_t m_busyN; 

利用偏移量和大小,您可以直接访问映射区域中的任何缓冲区。 要分配一个缓冲区,你可能想要实现类似于malloc()所做的事情,尽管所有必要的信息都在这个表中,所以不需要链接列表等。但是,如果你要释放一些缓冲区,你需要跟踪它的大小。 如果你一直分配/释放,你将会有分裂的乐趣。 无论如何…

另一种方法是使用环形缓冲区(本质上是一个“管道”),所以你总是在最后一个缓冲区之后分配空间,如果没有足够的空间,在开始时分配,按照新的缓冲区大小关闭N个缓冲区要求…这可能会更容易实施。 但是,这意味着您可能需要知道从哪里开始寻找一个缓冲区(即有一个索引,目前认为是“第一个”[最旧的]缓冲区,这将成为下一个被重用)。

但是既然你没有解释一个缓冲区是如何变得“老”并且是可重用的(被释放以便可以被重用),所以我不能真正给你一个确切的实现。 但是像下面这样的东西可能会为你做。

在头结构中,如果m_offset为零,那么缓冲区当前不被分配,因此与该条目无关。 如果m_busy为零,则没有进程正在访问该缓冲区。 我还提供了一个m_free字段,它可以是0或1.只要需要更多的缓冲区来保存刚刚收到的数据,作者就会将该参数设置为1。 我对这个问题并没有太深入的了解,因为我不知道你是如何释放你的缓冲区的。 如果你永远不释放缓冲区也不是必需的。

0)结构

 // only if the size varies between runs, otherwise use a constant like: // namespace { uint32_t const COUNT = 123; } struct header_count_t { uint32_t m_size; }; struct header_t { uint32_t m_offset; uint32_t m_size; uint32_t m_busy; // to use with Interlocked...() you may want to use LONG instead }; // and from your "ptr" you'd do: header_count_t *header_count = (header_count_t *) ptr; header_count->m_size = ...; // your dynamic size (if dynamic it needs to be) header_t *header = (header_t *) (header_count + 1); // first buffer will be at: data = (char *) (header + header_count->m_size) for(size_t n(0); n < header_count->m_size; ++n) { // do work (see below) on header[n] ... } 

1)访问数据的写入者必须首先锁定缓冲区,如果不可用,再次尝试下一个; 用InterlockedIncrement()完成锁定,用InterlockedIncrement()解锁:

 InterlockedIncrement(&header[n]->m_busy); if(header[n]->m_offset == nullptr) { // buffer not allocated yet, allocate now and copy data, // but do not save the offset until "much" later uint32_t offset = malloc_buffer(); memcpy(ptr + offset, source_data, size); header[n]->m_size = size; // extra memory barrier to make sure that the data copied // in the buffer is all there before we save the offset InterlockedIncrement(&header[n]->m_busy); header[n]->m_offset = offset; InterlockedDecrement(&header[n]->m_busy); } InterlockedDecrement(&header[n]->m_busy); 

现在如果你想释放一个缓冲区,这是不够的。 在这种情况下,需要另一个标志来防止其他进程重新使用旧的缓冲区。 这又取决于你的实现…(见下面的例子)

2)访问数据的读写器必须先用InterlockedIncrement()将缓冲区锁定一次,然后使用InterlockedDecrement()释放缓冲区。 请注意,即使m_offset为nullptr,锁也适用。

 InterlockedIncrement(&header[n]->m_busy); if(header[n]->m_offset) { // do something with the buffer uint32_t size(header[n]->m_size); char const *buffer_ptr = ptr + header[n]->m_offset; ... } InterlockedDecrement(header[n]->m_busy); 

所以在这里我只是测试是否设置m_offset。

3)如果你想释放一个缓冲区,你还需要测试另一个标志(见下面),如果另一个标志是真的(或错误的),那么缓冲区即将被释放(所有进程释放它),然后该标志可以在以前的代码片段中使用(即m_offset为零,或该标志为1, m_busy计数器正好是1)。

对于作者来说这样的事情:

 LONG lock = InterlockedIncrement(&header[n]->m_busy); if(header[n]->m_offset == nullptr || (lock == 1 && header[n]->m_free == 1)) { // new buffer (nullptr) or reusing an old buffer // reset the offset first InterlockedIncrement(&header[n]->m_busy); header[n]->m_offset = nullptr; InterlockedDecrement(&header[n]->m_busy); // then clear m_free header[n]->m_free = 0; InterlockedIncrement(&header[n]->m_busy); // WARNING: you need another Decrement against this one... // code as before (malloc_buffer, memcpy, save size & offset...) ... } InterlockedDecrement(&header[n]->m_busy); 

在读者中,测试的变化是:

 if(header[n]->m_offset && header[n]->m_free == 0) 

作为一个方面说明:所有的Interlocked …()函数都是完整的内存屏障(fence),所以你在这方面都很好。 你必须使用其中的许多来确保你得到正确的同步。

请注意,这是未经测试的代码…但如果您想避免进程间信号量(这可能不会简化这么多),那就是要走的路。 请注意,20ms的sleep()本身并不是必需的,除非为了避免每个阅读器绑定一个CPU,显然。