如何在使用epoll_wait时正确读取数据

我正尝试将使用IOCP的现有Windows C ++代码移植到Linux。 在决定使用epoll_wait来实现高并发性之后,我已经面临一个理论问题,就是我们何时试图处理接收到的数据。

设想两个线程调用epoll_wait ,并收到两个相应的消息,以便Linux解除第一个线程的epoll_wait ,第二个线程即将到来。

例如:

 Thread 1 blocks on epoll_wait Thread 2 blocks on epoll_wait Client sends a chunk of data 1 Thread 1 deblocks from epoll_wait, performs recv and tries to process data Client sends a chunk of data 2 Thread 2 deblocks, performs recv and tries to process data. 

这种情况是可以想象的吗? 即它能发生?

有没有办法阻止它,以避免在recv /处理代码中实现同步?

如果您从同一套epoll手柄中读取多个线程,我建议您使用EPOLLONESHOT将您的epoll手柄置于一次性电平触发模式。 这将确保在一个线程观察到触发句柄之后,除非使用epoll_ctl重新设置句柄,否则其他线程将不会观察它。

如果需要独立处理读写路径,则可能需要完全拆分读写线程池; 有一个用于读取事件的epoll句柄,一个用于写入事件,并且专门为其中一个分配线程。 此外,有一个单独的读取和写入路径的锁。 当然,就修改每个套接字的状态而言,你必须小心读写线程之间的交互。

如果你确实采取了这种拆分方法,那么你需要考虑如何处理socket关闭。 最有可能的是,您将需要一个额外的共享数据锁,以及在共享数据锁下为读取和写入路径设置的“确认关闭”标志。 然后读取和写入线程可以竞争确认,最后一个确认获取清除共享数据结构。 就是这样的:

 void OnSocketClosed(shareddatastructure *pShared, int writer) { epoll_ctl(myepollhandle, EPOLL_CTL_DEL, pShared->fd, NULL); LOCK(pShared->common_lock); if (writer) pShared->close_ack_w = true; else pShared->close_ack_r = true; bool acked = pShared->close_ack_w && pShared->close_ack_r; UNLOCK(pShared->common_lock); if (acked) free(pShared); } 

我在这里假设你正在尝试处理的情况是这样的:

你有多个(也许很多)套接字,你想一次接收数据;

您希望在线程A第一次接收时从第一个连接开始处理数据,然后确保来自此连接的数据在任何其他线程上都没有处理,直到在线程A中完成。

当你这样做的时候,如果现在有一些数据在不同的连接上被接收到,你希望线程B选择这些数据并处理它,同时确保没有其他人能够处理这个连接,直到线程B完成它。

在这种情况下,使用epoll_wait()和多个线程中的epoll fd是相当有效的方法(我并没有声称它是最有效的)。

这里的技巧是用EPOLLONESHOT标志将单个连接fds添加到epoll fd中。 这确保了一旦fd从epoll_wait()返回,它将被监视,直到你明确告诉epoll再次监视它。 这确保了处理这个连接的线程不受干扰,因为没有其他的线程可以处理相同的连接,直到这个线程标记连接被再次监视。

您可以使用epoll_ctl()和EPOLL_CTL_MOD来设置fd以再次监视EPOLLIN或EPOLLOUT。

在多线程中使用epoll的一个重要好处是,当一个线程完成一个连接并将其添加回epoll监视集时,即使在前一个处理线程返回之前,仍然在epoll_wait()中的任何其他线程都会立即监视它到epoll_wait()。 顺便说一句,如果不同的线程立即拾取连接(因此需要获取该连接的数据结构并刷新前一个线程的缓存),则由于缺少缓存数据局部性,这也可能是缺点。 什么效果最好会敏感地取决于您的确切使用模式。

如果你正试图处理在不同线程的同一连接上接收到的消息,那么这个使用epoll的方案并不适合你,而使用一个监听线程提供一个高效的队列提供工作线程的方法可能会更好。

以前的回答指出,从多个线程中调用epoll_wait()是一个坏主意,这个问题几乎肯定是正确的,但是我对这个问题充满了兴趣,试图弄清楚从同一个句柄的多个线程调用它时会发生什么,等待相同的套接字。 我写了下面的测试代码:

 #include <netinet/in.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/epoll.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> struct thread_info { int number; int socket; int epoll; }; void * thread(struct thread_info * arg) { struct epoll_event events[10]; int s; char buf[512]; sleep(5 * arg->number); printf("Thread %d start\n", arg->number); do { s = epoll_wait(arg->epoll, events, 10, -1); if (s < 0) { perror("wait"); exit(1); } else if (s == 0) { printf("Thread %d No data\n", arg->number); exit(1); } if (recv(arg->socket, buf, 512, 0) <= 0) { perror("recv"); exit(1); } printf("Thread %d got data\n", arg->number); } while (s == 1); printf("Thread %d end\n", arg->number); return 0; } int main() { pthread_attr_t attr; pthread_t threads[2]; struct thread_info thread_data[2]; int s; int listener, client, epollfd; struct sockaddr_in listen_address; struct sockaddr_storage client_address; socklen_t client_address_len; struct epoll_event ev; listener = socket(AF_INET, SOCK_STREAM, 0); if (listener < 0) { perror("socket"); exit(1); } memset(&listen_address, 0, sizeof(struct sockaddr_in)); listen_address.sin_family = AF_INET; listen_address.sin_addr.s_addr = INADDR_ANY; listen_address.sin_port = htons(6799); s = bind(listener, (struct sockaddr*)&listen_address, sizeof(listen_address)); if (s != 0) { perror("bind"); exit(1); } s = listen(listener, 1); if (s != 0) { perror("listen"); exit(1); } client_address_len = sizeof(client_address); client = accept(listener, (struct sockaddr*)&client_address, &client_address_len); epollfd = epoll_create(10); if (epollfd == -1) { perror("epoll_create"); exit(1); } ev.events = EPOLLIN; ev.data.fd = client; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, client, &ev) == -1) { perror("epoll_ctl: listen_sock"); exit(1); } thread_data[0].number = 0; thread_data[1].number = 1; thread_data[0].socket = client; thread_data[1].socket = client; thread_data[0].epoll = epollfd; thread_data[1].epoll = epollfd; s = pthread_attr_init(&attr); if (s != 0) { perror("pthread_attr_init"); exit(1); } s = pthread_create(&threads[0], &attr, (void*(*)(void*))&thread, &thread_data[0]); if (s != 0) { perror("pthread_create"); exit(1); } s = pthread_create(&threads[1], &attr, (void*(*)(void*))&thread, &thread_data[1]); if (s != 0) { perror("pthread_create"); exit(1); } pthread_join(threads[0], 0); pthread_join(threads[1], 0); return 0; } 

当数据到达时,两个线程都在等待epoll_wait(),只有一个线程会返回,但随着后续数据到达,唤醒处理数据的线程在两个线程之间是随机的。 我无法找到影响哪个线程被唤醒的方法。

看起来可能是一个调用epoll_wait的线程最有意义,事件传递给工作线程来抽取IO。

我相信使用epoll和每个内核线程的高性能软件会创建多个epoll句柄,每个句柄处理所有连接的子集。 通过这种方式,工作是分开的,但是你所描述的问题是可以避免的。

一般来说,当你有一个线程监听单个异步源的数据时,会使用epoll 。 为了避免繁忙等待(手动轮询),您可以使用epoll让您知道数据何时准备就绪(非常类似于select )。

多个线程从单个数据源读取是不标准的做法,至少我会认为这是不好的做法。

如果要使用多个线程,但只有一个输入源,那么请指定其中一个线程来监听和排队数据,以便其他线程可以从队列中读取单个片段。