单个生产者和多个消费者

我正在尝试使用一个生产者产生一个值到缓冲区(state.value)的scheme,并且多个消费者正在读取缓冲区并将其更新到数组中。 下面是代码。

#include <pthread.h> #include <stdio.h> #include <stdlib.h> pthread_mutex_t mutex; pthread_cond_t prod, cons; static int count = 0; struct shared_state{ int done; int value; int value_available; int *array; int j; }state; void * producer(void *arg){ int a[] = {12,11,10,9,8,7}; int size = sizeof(a)/sizeof(a[0]); int i = 0; while(i<size){ pthread_mutex_lock(&mutex); while(state.value_available) pthread_cond_wait(&prod, &mutex); state.value = a[i++]; printf("In producer: %d\n",state.value); state.value_available = 1; pthread_cond_signal(&cons); pthread_mutex_unlock(&mutex); } state.done = 1; count++; printf("Producer count: %d\n",count); pthread_exit(NULL); } void * consumer(void *arg){ while(!(state.done)){ pthread_mutex_lock(&mutex); while(!state.value_available) pthread_cond_wait(&cons,&mutex); state.array[(state.j)] = state.value; printf("In consumer: %d\t%d\n",state.array[state.j], state.j); (state.j)++; state.value_available = 0; pthread_cond_signal(&prod); pthread_mutex_unlock(&mutex); } int i; for(i=0;i<6;i++) printf("%d-->",state.array[i]); printf("\n"); count++; printf("Consumer count: %d\n",count); } int main(void){ state.done = 0; pthread_t pro,con,con2; state.value_available = 0; state.j = 0; state.array = (int *)malloc(sizeof(int)*6); pthread_create(&pro, NULL, producer, (void *)NULL); pthread_create(&con, NULL, consumer, (void *)NULL); pthread_create(&con2, NULL, consumer, (void *)NULL); pthread_join(pro,NULL); pthread_join(con,NULL); pthread_join(con2,NULL); pthread_exit(NULL); printf("\n"); return 0; } 

以下是我收到的输出。 但是,第二个消费者线程不会退出并进入无限循环。 如果有人可以帮助我找出错误,那将会很有帮助。 谢谢。

  In producer: 12 In consumer: 12 In producer: 11 In consumer: 11 In producer: 10 In consumer: 10 In producer: 9 In consumer: 9 In producer: 8 In consumer: 8 In producer: 7 Producer count: 1 In consumer: 7 Consumer array: 12-->11-->10-->9-->8-->7--> Consumer count: 2 

首先,你不能初始化你的互斥量和你的条件变量。 因为它们是全局变量,所以它们的初始状态不是这个原因不确定的,但它也不一定是有效的。 您必须使用适当的初始化函数或使用for-purpose初始化宏进行初始化。 例如,

 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t prod = PTHREAD_COND_INITIALIZER; pthread_cond_t cons = PTHREAD_COND_INITIALIZER; 

你似乎正在讨论这个问题,但这并不意味着你不应该修复它。

其次,你不检查函数调用的返回值是否有错误代码。 你真的必须这样做你的代码是健壮的。 错误的发生既是因为代码被破坏,也是由于不受控制的运行时问题,如果你认为你的函数调用总会成功,那么迟早你会悲伤的。

然而,你有一个更大的问题: pthread_cond_signal()唤醒等待给定条件变量的一个线程,如果确实有任何等待。 在生产者最后一次发送CV的情况下,消费者线程可能会被阻塞。 在这种情况下,一个醒来并执行处理,但另一个仍然被阻止。 因为消费者在从等待状态唤醒之后对状态谓词执行了适当的检查,所以可以使用pthread_cond_broadcast()来解决该问题。

但这只是一个解决方案。 因为你确实执行了一个合适的谓词检查,并且由于消费者会在释放互斥体之前更新共享状态以使其自己的谓词是假的,所以如果第二个消费者从等待中醒来,它将会继续等待。 此外,如果不再继续等待,会怎样呢? 没有可用的价值可供消费,也没有替代的途径。

底线:

  • 你的制片人必须向消费者的简历播放,而不是发信号给它,至少在产生最后一个项目之后。
  • 当消费者从等待CV时醒来,不仅要检查价值是否可用,还要检查生产者是否完成。 在任何情况下都不能重新等待,但只有在实际可用的情况下才能消耗一定的价值。