Linux 多线程条件变量同步

条件变量是线程同步的另一种方式,实际上,条件变量是信号量的底层实现,这也就意味着,使用条件变量可以拥有更大的*度,同时也就需要更加小心的进行同步操作。条件变量使用的条件本身是需要使用互斥量进行保护的,线程在改变条件状态之前必须首先锁住互斥量,其他线程在获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定之后才能计算条件。

模型

#include<pthread.h>
pthread_cond_t cond //准备条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化静态的条件变量
pthread_cond_init() //初始化一个动态的条件变量
pthread_cond_wait() //等待条件变量变为真
pthread_cond_timedwait() //等待条件变量变为真,等待有时间限制。
pthread_cond_signal() //至少唤醒一个等待该条件的线程
pthread_cond_broadcast() //唤醒等待该条件的所有线程
pthread_cond_destroy() //销毁一个条件变量

pthread_cond_init()

//初始化一个动态的条件变量
//成功返回0,失败返回error number
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

cond:条件变量指针,这里使用了restrict关键字

attr:条件变量属性指针,默认属性赋NULL

pthread_cond_wait() / pthread_cond_timedwait()

//等待条件变量为真。收到pthread_cond_broadcast()或pthread_cond_signal()就唤醒
//成功返回0,失败返回error number
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

条件变量的使在多线程的程序中,因为各个线程都可以访问大部分的进程资源,所以我们为了保证公有资源的使用是可以控制的,在一个线程开始使用公有资源之前要尝试获取互斥锁,在使用完毕之后要释放互斥锁,这样就能保证每个公共资源在一个时刻都只能被一个线程使用,在一定程度上得到了控制,但这并不能解决同步的问题,考虑如下两个线程:

Q=create_queue();
pthread_t mutex
//线程A,入队
while(1){
lock(mutex);
in_queue(Q);
unlock(mutex);
} //线程B,出队
while(1){
lock(mutex);
out_queue(Q);
unlock(mutex);
}

上述代码可以实现两个线程的互斥,即同一时刻只有一个线程在使用公有资源-队列。但如果线程B获取了锁,但队列中是空的,它的out_queue()也就是没有意义的,所以我们这里更需要一种方法将两个线程进行同步:只有当队列中有数据的时候才进行出队。

我们设计这样一种逻辑:

//线程B,出队
while(1){
lock(mutex);
if(队列是空,线程不应该执行){
释放锁;
continue;

out_queue(Q);
unlock(mutex);
}

这个程序就解决了上述的问题,即便线程B抢到了互斥锁,但是如果队列是空的,他就释放锁让两个线程重新抢锁,希望这次线程A能抢到并往里放一个数据。

但这个逻辑还有一个问题,就是多线程并发的问题,很有可能发生的一种情况是:线程B抢到了锁,发现没有数据,释放锁->线程A立即抢到了锁并往里放了一个数据->线程B执行continue,显然,这种情况下是不应该continue的,因为线程B想要的条件在释放锁之后立即就被满足了,它错过了条件。

So,我们想一种反过来的逻辑:

//线程B,出队
while(1){
lock(mutex);
if(队列是空,线程不应该执行){
continue;
释放锁;

out_queue(Q);
unlock(mutex);
}

显然这种方法有个致命的问题:一旦continue了,线程B自己获得锁就没有被释放,这样线程A不可能抢到锁,而B继续加锁就会形成死锁!

Finaly,我们希望看到一个函数fcn,如果条件不满足,能同时释放锁+停止执行线程,如果条件满足,自己当时获得的锁还在

//线程B,出队
while(1){
lock(mutex);
fcn(当前线程不应该执行,mutex) //if(当前线程不应该执行){释放锁“同时” continue;}
out_queue(Q);
unlock(mutex);
}

OK,这个就是pthread_cond_wait()的原理了,只不过它把continue变成了"休眠"这种由OS负责的操作,可以大大的节约资源。

当然,线程执行的条件是很难当作参数传入一个函数的,POSIX多线程的模型使用系统提供的"条件变量"+"我们自己定义的具体条件" 来确定一个线程是否应该执行接下来的内容。"条件变量"只有,所以一种典型的多线程同步的结构如下

//线程B,出队
while(1){
lock(mutex);
while(条件不满足)
pthread_cond_wait(cond,mutex)
//获得互斥锁可以同时保护while里的条件和cond的判断,二者总是用一把锁保护,并一同释放
//cond为假,就休眠同时释放锁,等待被cond为真唤醒,把自己获得的锁拿回来
//拿回自己的锁再检查线程执行条件,条件不满足继续循环,直到条件满足跳出循环
//这个函数是带着"线程的执行条件为真"+"cond为真"走出循环的
//这个函数返回后cond被重新设置为0
out_queue(Q);
unlock(mutex);
}

pthread_cond_braoadcast()/pthread_cond_signal()

//使条件变量为真并唤醒wait中的线程,前者唤醒所有wait的,后者唤醒一个
//成功返回0,失败返回error number
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_destroy()

//销毁条件变量
//成功返回0,失败返回error number
int pthread_cond_destroy(pthread_cond_t *cond);

例子-线程池

\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100 typedef struct{//每个节点的封装格式,fcn是用户自定义函数,arg是用户自定义函数参数指针,保证通用性声明为void
void* (*fcn)(void* arg);
void* arg;
}task_t;
typedef struct{ //用户自定义函数的参数结构体
int x;
}argfcn_t; //#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t #endif //__THREAD_H__
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread_pool.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h" //互斥量和条件变量
pthread_mutex_t lock;
pthread_cond_t cond; //全局的表
lqueue_t* Q; //每个线程的任务,必须是这个形式的函数指针
void* do_task(void* p){
task_t data;
int ret=0;
while(1){
pthread_mutex_lock(&lock);
while(is_empty_lqueue(Q)){ //大家收到广播,因为延迟,可能醒了好几个,要判断一下是不是自己
pthread_cond_wait(&cond,&lock); //先抢到锁再醒
}
ret=out_lqueue(Q,&data);
pthread_mutex_unlock(&lock);
data.fcn(data.arg);
}
} //创建线程池
void create_pool(void){
//初始化队列
Q=create_lqueue();
//初始化互斥量
pthread_mutex_init(&lock,NULL);
//初始化条件变量
pthread_cond_init(&cond,NULL);
int i=THREAD_NUM;
pthread_t tid[THREAD_NUM];
while(i--)
pthread_create(&tid[i],NULL,do_task,NULL);
} //准备函数
void* fcn(void* parg){ //用户自定义的需要线程执行的函数
argfcn_t* i=(argfcn_t*)parg;
printf("this is task1\n");
printf("task1:%d\n",i->x);
} //添加任务
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
task_t task;
task.fcn=pfcn;
task.arg=arg; in_lqueue(Q,task);
pthread_cond_signal(&cond); //添加了一个任务,用signal更好
} int main(int argc, const char *argv[])
{
//创建线程池
create_pool(); //准备参数
argfcn_t argfcn;
argfcn.x=5; //添加任务
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pause();
return 0;
}
上一篇:oracle、postgres、mysql数据库的建库、创建用户、导人导出备份总结


下一篇:Mysql 导入导出备份