从文件并发处理

考虑下面的代码:

std::vector<int> indices = /* Non overlapping ranges. */; std::istream& in = /*...*/; for(std::size_t i= 0; i< indices.size()-1; ++i) { in.seekg(indices[i]); std::vector<int> data(indices[i+1] - indices[i]); in.read(reinterpret_cast<char*>(data.data()), data.size()*sizeof(int)); process_data(data); } 

我想使这个代码并行,尽可能快。

使用PPL进行parallizing的一种方法是:

 std::vector<int> indices = /* Non overlapping ranges. */; std::istream& in = /*...*/; std::vector<concurrency::task<void>> tasks; for(std::size_t i= 0; i< indices.size()-1; ++i) { in.seekg(indices[i]); std::vector<int> data(indices[i+1] - indices[i]); in.read(reinterpret_cast<char*>(data.data()), data.size()*sizeof(int)); tasks.emplace_back(std::bind(&process_data, std::move(data))); } concurrency::when_all(tasks.begin(), tasks.end()).wait(); 

这种方法的问题是,我想处理与读入内存(数据在caching中的数据很热)相同的线程中的数据(它适合CPUcaching),这不是这里的情况,它只是简单的浪费了使用热门数据的机会。

我有两个想法如何改善这个,但是,我还没有能够实现。

  1. 在单独的任务上开始下一个迭代。

     /* ??? */ { in.seekg(indices[i]); std::vector<int> data(indices[i+1] - indices[i]); // data size will fit into CPU cache. in.read(reinterpret_cast<char*>(data.data()), data.size()*sizeof(int)); /* Start a task that begins the next iteration? */ process_data(data); } 
  2. 使用内存映射文件并映射文件的所需区域,而不是仅仅从指针读取正确的偏移量。 使用parallel_for_each处理数据范围。 但是,我不明白内存映射文件在读取内存和caching时的性能意义。 也许我甚至不必考虑caching,因为文件只是DMA:d到系统内存,从来没有通过CPU?

任何build议或意见?

这很可能是你追求错误的目标。 正如已经指出的那样,“热门数据”的任何优势都将被磁盘速度所压倒。 否则,有没有告诉你重要的细节。
1)文件是否“大”
2)单个记录是否“大”
3)处理是否“缓慢”

如果文件是“大”,那么最重要的是确保文件被顺序读取。 你的“指数”让我觉得不然。 根据我自己的经验,最近的例子是6秒比20分钟,这取决于随机和顺序读取。 没有开玩笑

如果这个文件很小,而且你确信它完全被缓存了,那么你只需要一个同步队列来向你的线程传递任务,那么在同一个线程中处理就不会有问题了。

另一种方式是将“索引”分成两半,每个线程一个。