在linux中使用命名pipe道和信号量

我一直试图让我的程序工作几个小时,而我只是不能找出我的代码有什么问题。 这是关于在使用pipe道的processessess之间传递一个variables。 每个进程增加M次。 当我使用共享内存时,该程序完美工作,但是当我将其更改为使用pipe道时,这是一个灾难。 创build或使用命名pipe道似乎没有工作,或者我想我只是做错了方式。 以下是源代码:

#include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/mman.h> #include <unistd.h> #include <memory.h> #include <fcntl.h> #include <sys/stat.h> #define PIPE_NAME "MY_PIPE" #define N 5 #define M 10 struct sembuf operations; int semid; key_t key; int marker; void semWait(int semid, int sempos) { operations.sem_num = sempos; operations.sem_op = -1; operations.sem_flg = 0; if (semop(semid, &operations, 1) < 0) { perror("ERROR: semop wait\n"); exit(-1); } } void semPost(int semid, int sempos) { operations.sem_num = sempos; operations.sem_op = 1; operations.sem_flg = IPC_NOWAIT; if (semop(semid, &operations, 1) < 0) { perror("ERROR: semop post\n"); exit(-1); } } void worker(int id) { int j, nmarker; int fd = open(PIPE_NAME, O_RDWR); read(fd, &nmarker, sizeof(int)); for (j = 0 ; j < M; j++) { semWait(semid, id); nmarker = nmarker + 1 ; printf("%d ", marker); semPost(semid, N); } write(fd, &nmarker, sizeof(nmarker)); close(fd); } main() { int i, tempPID; int sarray[N+1] = {0}; key = 23; marker = 0; if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1) { perror("ERROR: semget\n"); exit(-1); } if ((semctl(semid, N+1, SETALL, sarray)) < 0) { perror("ERROR: semctl - val\n"); exit(-1); } if(mkfifo(PIPE_NAME, S_IFIFO | 0666) < 0) { perror("ERROR:pipe\n"); exit(-1); } int fd; if( fd = open(PIPE_NAME, O_WRONLY) < 0 ){ perror("ERROR:open\n"); exit(-1); } write(fd, &marker, sizeof(marker)); close(fd); for(i = 0; i < N; i++) { tempPID = fork(); if (tempPID < 0) { perror("ERROR: fork\n"); exit(-1); } else if (tempPID == 0) { // if child worker(i); exit(0); } } for (i = 0 ; i < (M*N); i++) { semPost(semid, i%N); semWait(semid, N); } printf("Marker = %d\n", marker); if (semctl( semid, 1, IPC_RMID ) == -1) { perror("ERROR: semctl free\n"); exit(-1); } unlinc(PIPE_NAME); } 

我创build了N个工作进程,每个进程必须将标记值增加M次。 我必须创build一个“睡眠”进程池,并使用信号灯一个接一个地唤醒它们,但是它们都是模糊的,所以现在的源代码就是我想到的:

这是同一个程序的一个版本,但是使用共享内存而不是pipe道:

  #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/mman.h> #define N 5 #define M 10 struct sembuf operations; int semid; key_t key; int *sharedmem; void semWait(int semid, int sempos) { operations.sem_num = sempos; operations.sem_op = -1; operations.sem_flg = 0; if (semop(semid, &operations, 1) < 0) { perror("ERROR: semop wait\n"); exit(-1); } } void semPost(int semid, int sempos) { operations.sem_num = sempos; operations.sem_op = 1; operations.sem_flg = IPC_NOWAIT; if (semop(semid, &operations, 1) < 0) { perror("ERROR: semop post\n"); exit(-1); } } void worker(int id) { int j; for (j = 0 ; j < M; j++) { semWait(semid, id); (*sharedmem)++; semPost(semid, N); } } main() { int i, tempPID; int sarray[N+1] = {0}; int protect = PROT_READ | PROT_WRITE; int flags = MAP_SHARED | MAP_ANONYMOUS; if ((key = ftok("/dev/null", 4343)) == -1) { perror("ERROR: ftok\n"); exit(-1); } if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1) { perror("ERROR: semget\n"); exit(-1); } if ((semctl(semid, N+1, SETALL, sarray)) < 0) { perror("ERROR: semctl - val\n"); exit(-1); } sharedmem = (int*)mmap(NULL, sizeof(int), protect, flags, 0, 0); *(sharedmem) = 0; for(i = 0; i < N; i++) { tempPID = fork(); if (tempPID < 0) { perror("ERROR: fork\n"); exit(-1); } else if (tempPID == 0) { // if child worker(i); exit(0); } } for (i = 0 ; i < (M*N); i++) { semPost(semid, i%N); semWait(semid, N); } printf("Marker = %d\n", *sharedmem); if (semctl( semid, 1, IPC_RMID ) == -1) { perror("ERROR: semctl free\n"); exit(-1); } munmap(sharedmem, sizeof(int)); } 

你的一些问题在工人代码中 – 这两行:

 int fd = open(PIPE_NAME, O_RDWR); read(fd, &nmarker, sizeof(int)); 
  1. 如果你打开管道进行阅读和写作,你正在寻求麻烦(IMNSHO)。 打开只读,读取,关闭它。 然后打开它只写,写信给它,关闭它。 现在,您必须考虑信号量操作的发生位置。 在尝试打开管道进行写入之前,实际上需要唤醒下一个进程,因为写入的开放会阻塞,直到有可用的进程读取。 同样,打开进行读取的进程将阻塞,直到有可用的进程写入。 所以,内核将协调这些进程。

  2. 你不检查open()的返回值,所以你不知道你是否有一个有效的文件描述符。 总是检查open()的返回状态。

  3. 你不检查read()的返回值,所以你不知道你是否有任何有效的管道。 总是检查read()的返回状态。

(如果没有有意义的错误恢复,可以决定忽略write()的返回状态,但是检查它是否工作并不是一个坏主意,你可以决定忽略close()的返回状态close()出于类似的原因,虽然你可能不知道问题,直到你做close() 。)

继续工人代码:

 for (j = 0 ; j < M; j++) { semWait(semid, id); nmarker = nmarker + 1 ; printf("%d ", marker); semPost(semid, N); } 

看到你打印marker而不是nmarker是令人惊讶的。 当然,基本的诊断技术在读取时打印nmarker的值。 您可能会或可能不会在每次迭代中打印jnmarker 。 请注意,由于此代码中没有增加marker ,因此打印的值不会更改。

这里的逻辑顺序很有趣……它最奇怪的是和main()的循环结合在一起。 父进程将一个值写入FIFO。 只有一个孩子可以读取这个值 – 其余的会立即得到EOF,或者无限期挂起(取决于你是否在孩子中使用O_RDONLYO_RDWR )。 每个孩子都会被告知增加其价值,然后再回到睡觉,直到再次醒来。 没有什么东西将递增的值发送​​给下一个孩子。 所以每个孩子都会独立地增加它所选择的价值 – 这可能是垃圾。 在共享内存的情况下,如果你有一个指向共享值的指针,那么所有进程都会同时看到这个增量,这就是为什么它被称为共享内存的原因。 但是这里没有共享内存,所以你必须明确地沟通才能使它工作。 (我想知道你的FIFO和共享内存的实现是否工作,因为通信是通过共享内存 – 换句话说,意外呢?)

所以,如果孩子每次增加它读取的变量,它必须读取当前值并且每次在循环中写入新的值。 当然,这将是一个错误检查的阅读。 由于信号量的原因,你或许可以用O_RDWR ,但是我个人更喜欢单独的读写打开 – 如果需要的话每次迭代都是如此。 但是我没有实现这个来检查它是否确实会遇到问题。 在FIFO上使用O_RDWR是简单的常规操作。

当你的孩子增加了N次的值后,它将结果写入管道。

 write(fd, &nmarker, sizeof(nmarker)); close(fd); 

主程序然后:

 printf("Marker = %d\n", marker); if (semctl( semid, 1, IPC_RMID ) == -1) { perror("ERROR: semctl free\n"); exit(-1); } unlinc(PIPE_NAME); 

由于没有修改marker ,因此打印的值将为0.您应该让主流程读取每个子项的回复。

unlink() FIFO的正确函数是unlink()remove()


讨论

正如评论中指出的那样,一个问题是打开FIFO是封锁 – 没有读者。 但是,这不是唯一的问题。

下面的代码运行。 我还没有证实这个数字正在增加,但它正在递增。 我没有检查过每一个过程都轮到了。 我已经修改了错误处理(每个调用一行,而不是3或4),并在输出中添加了包含PID的打印功能。 我错误检查了每个系统调用(但没有打印语句)。 if (fd = open(...) < 0)我解决了一个问题。 据我所知,关闭主进程中的FIFO会丢弃写入的内容 – 因此父进程不再立即关闭FIFO。 但主要是我把FIFO的读写写入工作循环 – 在外面打开和关闭。 代码还包含诊断打印,所以我可以看到它出错的地方。 我还没有完成标题最小化或任何应该发生的其他清理。 但是,除main()之外的所有内容都是静态的,所以不需要预先声明。 它编译清洁下:

 /usr/bin/gcc -O3 -g -std=c99 -Wall -Wextra fifocircle.c -o fifocircle 

 #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/mman.h> #include <unistd.h> #include <memory.h> #include <fcntl.h> #include <sys/stat.h> #include <stdarg.h> #include <errno.h> #include <string.h> static const char *arg0 = "undefined"; static void err_error(const char *fmt, ...) { int errnum = errno; va_list args; fflush(0); fprintf(stderr, "%s: pid %d:", arg0, (int)getpid()); va_start(args, fmt); vfprintf(stderr, fmt, args); va_end(args); if (errnum != 0) fprintf(stderr, "(%d: %s)", errnum, strerror(errnum)); fputc('\n', stderr); exit(1); } static void print(const char *fmt, ...) { va_list args; printf("pid %d: ", (int)getpid()); va_start(args, fmt); vfprintf(stdout, fmt, args); va_end(args); fflush(0); } #define PIPE_NAME "MY_PIPE" #define N 5 #define M 10 static struct sembuf operations; static int semid; static key_t key; static int marker; static void semWait(int semid, int sempos) { operations.sem_num = sempos; operations.sem_op = -1; operations.sem_flg = 0; if (semop(semid, &operations, 1) < 0) err_error("semop wait"); } static void semPost(int semid, int sempos) { operations.sem_num = sempos; operations.sem_op = 1; operations.sem_flg = IPC_NOWAIT; if (semop(semid, &operations, 1) < 0) err_error("semop post"); } static void worker(int id) { int j; int fd = open(PIPE_NAME, O_RDWR); if (fd < 0) err_error("failed to open FIFO %s for read & write", PIPE_NAME); print("Worker %d: fd %d\n", id, fd); for (j = 0 ; j < M; j++) { int nmarker; print("waiting for %d\n", id); semWait(semid, id); if (read(fd, &nmarker, sizeof(int)) != sizeof(int)) err_error("short read from FIFO"); print("Got %d from FIFO\n", nmarker); nmarker = nmarker + 1 ; if (write(fd, &nmarker, sizeof(nmarker)) != sizeof(nmarker)) err_error("short write to FIFO"); print("Wrote %d to FIFO\n", nmarker); print("posting %d\n", id); semPost(semid, N); } if (close(fd) != 0) err_error("failed to close FIFO"); print("done\n"); } int main(int argc, char **argv) { int i; int sarray[N+1] = {0}; key = 23; marker = 0; arg0 = argv[0]; if (argc != 1) err_error("Usage: %s\n", arg0); if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1) err_error("semget"); if ((semctl(semid, N+1, SETALL, sarray)) < 0) { perror("ERROR: semctl - val\n"); exit(-1); } if (mkfifo(PIPE_NAME, S_IFIFO | 0666) < 0) err_error("failed to create FIFO %s\n", PIPE_NAME); print("FIFO created\n"); int fd; if ((fd = open(PIPE_NAME, O_RDWR)) < 0 ) err_error("failed to open FIFO %s\n", PIPE_NAME); print("FIFO opened\n"); if (write(fd, &marker, sizeof(marker)) != sizeof(marker)) err_error("short write to FIFO"); print("FIFO loaded\n"); print("Master: about to fork\n"); for (i = 0; i < N; i++) { pid_t pid = fork(); if (pid < 0) err_error("failed to fork"); else if (pid == 0) { worker(i); exit(0); } } print("Master: about to loop\n"); for (i = 0 ; i < (M*N); i++) { print("posting to %d\n", i%N); semPost(semid, i%N); print("waiting for %d\n", N); semWait(semid, N); } if (close(fd) != 0) err_error("failed to close FIFO"); print("Marker = %d\n", marker); if (semctl( semid, 1, IPC_RMID ) == -1) err_error("semctl remove"); if (unlink(PIPE_NAME) != 0) err_error("failed to remove FIFO %s", PIPE_NAME); return(0); }