pthread_cond_wait和pthread_mutex_lock优先级?

我有多个读取线程和一个写入线程。 如果我在其中一个读线程上locking了互斥并且从它发送了广播,那么保证互斥将被等待在pthread_cond_wait()上的写入线程locking,或者有可能在pthread_mutex_lock()上包含的另一个读线程locking互斥? 主要问题是pthread_cond_wait()的优先级高于pthread_mutex_lock()吗?

如果没有,我怎样才能实现互斥锁总是会被写在pthread_cond_broadcast()上的线程locking?

阅读主题:

pthread_mutex_lock(mutex); pthread_cond_broadcast(cond); pthread_mutex_unlock(mutex); 

写线程:

 pthread_mutex_lock(&mutex); pthread_cond_wait(&cond, &mutex); 

Solutions Collecting From Web of "pthread_cond_wait和pthread_mutex_lock优先级?"

让我们假设读写两个线程在同一时刻到达pthread_mutex_lock 。 所以,无论是写线程获取pthread_mutex_lock调用的互斥锁还是读取线程。

如果它是写入线程,读取的将在pthread_mutex_lock等待。 写入,通过调用pthread_cond_wait释放cond上的mutex和块。 这是完成原子。 所以,当读线程是mutex ,我们可以肯定读线程在cond等待。 所以, cond广播到达了写入线程,它不再等待cond但仍然在pthread_cond_wait范围内 – 试图获得mutex (保持被读取线程)。 读取线程在广播cond后释放mutex ,并转到写入线程。 所以写线程终于从pthread_cond_wait退出了,锁定了mutex锁。 记得稍后解锁。

如果它是读线程,写一个将在pthread_mutex_lock等待,读将在cond上广播一个信号,然后释放mutex 。 之后写入线程获取pthread_mutex_lock上的mutex并立即释放pthread_cond_wait等待cond (请注意,先前的cond广播对当前的pthread_cond_wait没有影响)。 在读取线程的下一次迭代中,它获取mutex ,在cond上发送广播并解锁mutex 。 这意味着写线程在cond上向前移动,并获得mutex

它是否回答你关于优先权的问题?


更新后的评论。

让我们假设我们有一个线程(让我们将其命名为A以备将来参考)持有mutex的锁,而其他试图获取同一个锁的其他mutex也是如此。 只要第一个线程释放锁,就无法预测哪个线程会获得锁定。 而且,如果A线程有一个循环并且试图重新获得对mutex锁定,那么就有机会获得这个锁,而其他的线程会一直等待。 添加pthread_cond_wait不会更改授予锁的范围内的任何内容。

让我引用POSIX规范的片段(请参阅https://stackoverflow.com/a/9625267/2989411以供参考):

这些函数以原子方式释放互斥并导致调用线程在条件变量cond上阻塞; 原子地这里的意思是“原子地相对于另一个线程访问互斥体,然后是条件变量”。 也就是说,如果另一个线程能够在约程序线程释放它之后获取该互斥体,则该线程中的pthread_cond_broadcast()或pthread_cond_signal()的后续调用应该如同在about-阻塞线程被阻塞。

这只是关于操作顺序的标准。 授予其他线程锁定的顺序是相当不可预知的,并且取决于一些非常微妙的时序波动而改变。

只有与互斥体相关的代码,请用以下代码玩一下:

 #define _GNU_SOURCE #include <pthread.h> #include <stdio.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void *th(void *arg) { int i; char *s = arg; for (i = 0; i < 10; ++i) { pthread_mutex_lock(&mutex); printf("%s %d\n", s, i); //sleep(1); pthread_mutex_unlock(&mutex); #if 0 pthread_yield(); #endif } return NULL; } int main() { int i; for (i = 0; i < 10; ++i) { pthread_t t1, t2, t3; printf("================================\n"); pthread_create(&t1, NULL, th, "t1"); pthread_create(&t2, NULL, th, " t2"); pthread_create(&t3, NULL, th, " t3"); pthread_join(t1, NULL); pthread_join(t2, NULL); pthread_join(t3, NULL); } return 0; } 

在一台机器上(单CPU),它总是显示从t3开始的整个循环,然后是t2,最后是从t1开始。 在另一个(2个内核)线程的顺序是更随机的,但几乎总是显示整个循环的每个线程之前授予互斥其他线程。 很少有这样的情况:

 t1 8 t1 9 t3 0 t2 0 t2 1 [removed other t2 output] t2 8 t2 9 t3 1 t3 2 

通过用#if 0 #if 1替换#if 0启用pthread_yield并观察结果并检查输出。 对我来说,它的工作方式是两个线程交错显示输出,然后第三个线程终于有机会工作。 添加另一个或多个线程。 玩着睡觉等,它确认了随机行为。

如果你想稍微尝试一下,编译并运行下面的一段代码。 这是一个单一的生产者 – 多个消费者模型的例子。 它可以用两个参数运行:第一个是消费者线程的数量,第二个是产生的数据序列的长度。 如果没有给出参数,则有一个消费者线程和120个要处理的项目。 我还推荐在标有/* play here */地方睡觉/休眠/* play here */ :改变参数的值,根本不需要睡觉,在合适的时候移动到临界区或者用pthread_yield替换,观察行为的变化。

 #define _GNU_SOURCE #include <assert.h> #include <limits.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <time.h> #include <unistd.h> struct data_t { int seq; int payload; struct data_t *next; }; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; struct data_t *first = NULL, *last = NULL; int in_progress = 1; int num_data = 120; void push(int seq, int payload) { struct data_t *e; e = malloc(sizeof(struct data_t)); e->seq = seq; e->payload = payload; e->next = NULL; if (last == NULL) { assert(first == NULL); first = last = e; } else { last->next = e; last = e; } } struct data_t pop() { struct data_t res = {0}; if (first == NULL) { res.seq = -1; } else { res.seq = first->seq; res.payload = first->payload; first = first->next; if (first == NULL) { last = NULL; } } return res; } void *producer(void *arg __attribute__((unused))) { int i; printf("producer created\n"); for (i = 0; i < num_data; ++i) { int val; sleep(1); /* play here */ pthread_mutex_lock(&mutex); val = rand() / (INT_MAX / 1000); push(i, val); pthread_mutex_unlock(&mutex); pthread_cond_signal(&cond); printf("prod %3d %3d signaled\n", i, val); } in_progress = 0; printf("prod end\n"); pthread_cond_broadcast(&cond); printf("prod end signaled\n"); return NULL; } void *consumer(void *arg) { char c_id[1024]; int t_id = *(int *)arg; sprintf(c_id, "%*sc %02d", t_id % 10, "", t_id); printf("%s created\n", c_id); while (1) { struct data_t item; pthread_mutex_lock(&mutex); item = pop(); while (item.seq == -1 && in_progress) { printf("%s waits for data\n", c_id); pthread_cond_wait(&cond, &mutex); printf("%s got signal\n", c_id); item = pop(); } if (!in_progress && item.seq == -1) { printf("%s detected end of data.\n", c_id); pthread_mutex_unlock(&mutex); break; } pthread_mutex_unlock(&mutex); printf("%s processing %3d %3d\n", c_id, item.seq, item.payload); sleep(item.payload % 10); /* play here */ printf("%s processed %3d %3d\n", c_id, item.seq, item.payload); } printf("%s end\n", c_id); return NULL; } int main(int argc, char *argv[]) { int num_cons = 1; pthread_t t_prod; pthread_t *t_cons; int i; int *nums; if (argc > 1) { num_cons = atoi(argv[1]); if (num_cons == 0) { num_cons = 1; } if (num_cons > 99) { num_cons = 99; } } if (argc > 2) { num_data = atoi(argv[2]); if (num_data < 10) { num_data = 10; } if (num_data > 600) { num_data = 600; } } printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data); t_cons = malloc(sizeof(pthread_t) * num_cons); nums = malloc(sizeof(int) * num_cons); if (!t_cons || !nums) { printf("Out of memory!\n"); exit(1); } srand(time(NULL)); pthread_create(&t_prod, NULL, producer, NULL); for (i = 0; i < num_cons; ++i) { nums[i] = i + 1; usleep(100000); /* play here */ pthread_create(t_cons + i, NULL, consumer, nums + i); } pthread_join(t_prod, NULL); for (i = 0; i < num_cons; ++i) { pthread_join(t_cons[i], NULL); } free(nums); free(t_cons); return 0; } 

我希望我已经清除你的疑惑,并给你一些代码来试验,并获得有关pthread行为的信心。