我有一个有两个线程的进程。 一个线程(线程A)将设置timerfd定时器,而另一个线程(线程B)将在这些定时器上执行“select”。 一旦定时器到期,线程B将指出这个线程A.
要添加定时器,线程A将创build一个新的定时器,然后它将唤醒线程B,将该定时器包含在其调用中以进行select。 我试图通过使用文件描述符唤醒线程B只是为了这个目的。 然后线程B将调用该FD上的select以及调用timerfd返回的所有FD。
问题是,我无法设法创build一个FD,我可以通过某种方式进行控制,以便在select时阻止或返回。
我试图使用shm_open与调用fcntl,我已经试图只使用打开一个文件,但没有一个会导致select阻止。 我所有的尝试导致select立即返回。 有什么办法来创build一个FD,将导致select阻止,直到我更新该FD不知何故?
尝试1 – 使用shm_open创build一个FD并使用fcntl来设置读取locking:
从线程A创buildFD
if((wakeUpFd = shm_open("/wakeup", O_RDWR|O_CREAT|O_TRUNC, 0)) == -1) printf("Failed to open /wakeup, Errno = %d\n", errno); else { fcntl(wakeUpFd, F_SETLK, F_RDLCK); }
从线程A添加定时器
#create a timer and add it to a list /* wake up timer thread */ fcntl(wakeUpFd, F_SETLK, ~F_RDLCK);
唤醒线程B
#when select returns if(FD_ISSET(wakeUpFd, &timerSet)) { fcntl(wakeUpFd, F_SETLK, F_RDLCK); } #check all other timer FD's
尝试2 – 使用shm_open并读取/写入数据:
从线程A创buildFD
if((wakeUpFd = shm_open("/wakeup", O_RDWR|O_CREAT|O_TRUNC, 0)) == -1) printf("Failed to open /wakeup, Errno = %d\n", errno); else { if(ftruncate(wakeUpFd, 2) == -1) { printf("Failed with ftruncate, Errno = %d\n", errno); } }
从线程A添加定时器
#create a timer and add it to a list /* wake up timer thread */ if(write(wakeUpFd, wakeUpStr, 1) != 1) printf("Failed to write to wakeUpFd\n");
唤醒线程B
#when select returns if(FD_ISSET(wakeUpFd, &timerSet)) { read(wakeUpFd, wakeUpBuf, 10); } #check all other timer FD's
尝试3 – 几乎与尝试2相同,但使用开放而不是shm_open。
尝试4 – 与try 1相同,但使用fcntl(wakeUpFd,F_SETFL,〜O_NONBLOCK)而不是fcntl(wakeUpFd,F_SETLK,〜F_RDLCK)
阅读select()
规范,特别是它所说的位:
与常规文件相关联的文件描述符应始终选择true以准备好读取,准备好写入和错误条件。
您不能在常规文件的文件描述符上生成select()
块。 你必须有一个管道或套接字或者这些行的东西,就像你select()
的文件select()
。
使用一个Unix域套接字对,例如socketpair(AF_UNIX, SOCK_DGRAM, 0, commsd)
进行通信。 当线程A创建一个新的timerfd时,它只是将新的描述符( int
写入通信套接字commsd[0]
。
当线程B注意到通信套接字commsd[1]
是可读的时,它从中读取一个或多个int
。 每个int
显然是一个新的timerd描述符,因此线程B必须将它们添加到它所select()
的set中。
在线程B中,我建议在循环中使用非阻塞式读取bytes = recv(commfd, ptr, len, MSG_DONTWAIT)
来读取通信套接字:
char buffer[8 * sizeof(int)]; size_t head = 0, tail = 0; ssize_t bytes; int new_timerfd; while (1) { if (head >= tail) head = tail = 0; else if (head > 0) { memmove(buffer, buffer + head, tail - head); tail -= head; head = 0; } if (tail < sizeof buffer) { bytes = recv(commsd[1], buffer + head, sizeof buffer - tail, MSG_DONTWAIT); if (bytes > (ssize_t)0) head += bytes; } if (head >= tail) break; while (head + sizeof (int) <= tail) { /* Unaligned version of new_timerfd = *(int *)(buffer + head); */ memmove(&new_timerfd, buffer + head, sizeof new_timerfd); head += sizeof (int); /* * Add new_timerfd to select()ed set */ } }
上面的循环确实做了一个额外的非阻塞recv()
调用来检测它已经读取了所有立即挂起的,但这种方式是非常强大的。 它甚至不假定你总是可以读取一个完整的int
。 (由于_POSIX_PIPE_BUF
总是sizeof int
的倍数,所以你可以假设你总是读完整的int
。)
只要有可用的数据,上面的循环会从套接字中进行非阻塞接收,循环体将new_timerfd
提取描述符到new_timerfd
。 我省略了将它添加到select()
ed集的代码。
最后,这种方法也适用于一般情况,当你将一个较大的结构传递给一个循环中select()
的线程时。 只要确保缓冲区足够大,至少有两个结构,你就设置好了; memmove()
处理任何你可能拥有的缓冲区包装和结构对齐问题。