使用条件变量遇到的小问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include<pthread.h>
#include<inttypes.h>
#include<stdio.h>
#include<unistd.h>

#define CONSUMER_COUNT 1000
#define PRODUCTOR_COUNT 100

pthread_mutex_t mq_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t mq_not_full_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t mq_not_empty_cond = PTHREAD_COND_INITIALIZER;

class Mq;
class QNode;

class QNode{
};

class Mq{
public:
Mq() = default;
Mq(uint32_t s):mq_size(s),mq_count(0){
mq = new QNode[mq_size];
head = 0;
tail = 0;
}

void addMg(const QNode& n){
pthread_mutex_lock(&mq_lock);
while(mq_count == mq_size){
pthread_cond_wait(&mq_not_full_cond,&mq_lock);
}
mq[tail++] = n;
if(tail >= mq_size) tail = 0;
printf("add a product \n");
if(mq_count++ >= 0) pthread_cond_signal(&mq_not_empty_cond);
pthread_mutex_unlock(&mq_lock);
}

QNode takeMg(){
QNode result;
pthread_mutex_lock(&mq_lock);
while(!mq_count){
pthread_cond_wait(&mq_not_empty_cond,&mq_lock);
}
result = mq[head++];
if(head >= mq_size) head = 0;
printf("take a product \n");
if(mq_count-- <= mq_size) pthread_cond_signal(&mq_not_full_cond);
pthread_mutex_unlock(&mq_lock);
return result;
}
private:
uint32_t mq_size;
uint32_t mq_count;
QNode* mq;
uint32_t head;
uint32_t tail;
};


void* product(void* arg){
Mq& mq = *(Mq*)arg;
for(int i = 0;i < 1000;i++){
mq.addMg(QNode());
}
return NULL;
}

void* consume(void* arg){
Mq& mq = *(Mq*)arg;
for(int i = 0;i < 100;i++){
QNode node = mq.takeMg();
}
return NULL;
}

Mq mq(100);

int main(){
pthread_t productor[PRODUCTOR_COUNT];
pthread_t consumer[CONSUMER_COUNT];

for(int i = 0;i < PRODUCTOR_COUNT;i++){
pthread_create(productor+i,NULL,product,(void*)(&mq));
}
for(int i = 0;i < CONSUMER_COUNT;i++){
pthread_create(consumer+i,NULL,consume,(void*)(&mq));
}

for(int i = 0;i < CONSUMER_COUNT;i++){
pthread_join(consumer[i],NULL);
}
for(int i = 0;i < PRODUCTOR_COUNT;i++){
pthread_join(productor[i],NULL);
}

}

这是一个简单的消息队列,开启多个生产者多个消费者线程使用该消息队列,使用互斥锁和条件变量进行同步。以上是通过测试的最终版本,在实际写的过程中遇到了一些小问题。

1
2
3
4
5
6
7
//takeMg
if(mq_count-- <= mq_size) pthread_cond_signal(&mq_not_full_cond);//最终版
if(mq_count-- == mq_size) pthread_cond_signal(&mq_not_full_cond);//有问题

//addMg
if(mq_count++ >= 0) pthread_cond_signal(&mq_not_empty_cond);//最终版
if(mq_count++ == 0) pthread_cond_signal(&mq_not_empty_cond);//有问题

以上是错误代码和最终代码的对比。可以看到,仅仅是修改了比较符,但影响很大。

在刚开始,写有问题的那段代码时,我是这样想的,需要把非空条件的集合唤醒,说明当前肯定是空变为非空了。比方说A、B都在等待非空条件,C投入一个资源,队列由空变为非空,唤醒A、B中的一个,比如说A。然后A拿走资源,队列再次变为空。C再投入一个资源,发现队列又由空变为非空,再次唤醒B。

似乎运行的很好。但是问题在于pthread_cond_signal的含义,由于我们是在锁内唤醒,即使唤醒也不能立刻拿到锁。所以pthread_cond_signal的含义是把线程从等待条件队列中移到等待互斥锁的队列里,并不是意味着唤醒就代表着它会是下一个拿到锁的线程,还是得经过在互斥锁队列里的竞争。

所以在刚才的步骤里就可能出现这种情况,C投入资源唤醒了A,A加入互斥锁队列,C再一次抢到了锁,这次因为资源数由1变为2,所以没有唤醒过程,如果C不再投入资源,那么B将永远醒不过来,一直在条件队列里。这也是我在实验的时候,程序一直停不下来。