条件变量
条件变量(Condition Variable)是多线程编程中常用的一种同步机制,它主要用于线程之间的协调和通信,使线程能够在某些条件满足时被唤醒,或者等待某些条件发生。条件变量通常和互斥锁(Mutex)一起使用。 工作过程:
- 线程在检查到某个条件不满足时,会在条件变量上等待(阻塞),同时释放相关的互斥锁。
- 其他线程改变条件后,可以通过通知机制(如
signal
或broadcast
)唤醒等待的线程。
函数pthread_cond_init
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
作用
初始化条件变量 cond
,使用 cond_attr
指定的条件属性,如果 cond_attr
为 NULL 则使用默认属性。LinuxThreads 实现不支持条件变量的属性,因此 cond_attr
参数实际上被忽略。
参数
pthread_cond_t *cond
:指向要初始化的条件变量的指针。pthread_condattr_t *cond_attr
:指向条件变量属性的指针,如果为 NULL 则使用默认属性。
返回值
- 成功时返回 0。
- 失败时返回非零错误码。
函数pthread_cond_signal
int pthread_cond_signal(pthread_cond_t *cond);
作用
唤醒一个在条件变量 cond
上等待的线程。如果没有线程在等待,则不执行任何操作。如果有多个线程在等待,只有一个会被唤醒,但具体哪一个未指定。
参数
pthread_cond_t *cond
:指向条件变量的指针。
返回值
- 成功时返回 0。
- 失败时返回非零错误码。
函数pthread_cond_broadcast
int pthread_cond_broadcast(pthread_cond_t *cond);
作用
唤醒所有在条件变量 cond
上等待的线程。如果没有线程在等待,则不执行任何操作。
**参数**- `pthread_cond_t *cond`:指向条件变量的指针。
**返回值**- 成功时返回 0。- 失败时返回非零错误码。
### 函数pthread_cond_wait```cint pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
作用
原子地解锁互斥量 mutex
(如同调用 pthread_unlock_mutex
),然后等待条件变量 cond
被信号唤醒。线程执行被挂起,直到条件变量被信号唤醒。调用线程在进入 pthread_cond_wait
之前必须已经锁定了互斥量。在返回到调用线程之前,pthread_cond_wait
会重新获取互斥量(如同调用 pthread_lock_mutex
)。
参数
pthread_cond_t *cond
:指向条件变量的指针。pthread_mutex_t *mutex
:指向互斥量的指针。
返回值
- 成功时返回 0。
- 失败时返回非零错误码。
函数pthread_cond_timedwait
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
作用
原子地解锁互斥量 mutex
并等待条件变量 cond
,如同 pthread_cond_wait
所做的一样,但它还限定了等待的持续时间。如果在 abstime
指定的时间内条件变量未被信号唤醒,则重新获取互斥量 mutex
并返回错误 ETIMEDOUT
。abstime
参数指定一个绝对时间,与 time(2)
和 gettimeofday(2)
具有相同的起点:abstime
为 0 对应于 1970 年 1 月 1 日 GMT 00:00:00。
参数
pthread_cond_t *cond
:指向条件变量的指针。pthread_mutex_t *mutex
:指向互斥量的指针。const struct timespec *abstime
:指向绝对时间的指针。
返回值
- 成功时返回 0。
- 失败时返回非零错误码,可能的错误码包括:
ETIMEDOUT
:在abstime
指定的时间内条件变量未被信号唤醒。EINTR
:pthread_cond_timedwait
被信号中断。
函数pthread_cond_destroy
int pthread_cond_destroy(pthread_cond_t *cond);
作用
销毁条件变量,释放它可能持有的资源。在进入 pthread_cond_destroy
时,不能有任何线程在等待该条件变量。在 LinuxThreads 实现中,条件变量不关联任何资源,因此 pthread_cond_destroy
实际上什么都不做,只是检查条件变量上没有等待的线程。
参数
pthread_cond_t *cond
:指向要销毁的条件变量的指针。
返回值
- 成功时返回 0。
- 失败时返回非零错误码,可能的错误码包括:
EBUSY
:有些线程当前正在等待cond
。
条件变量优化质数筛
#include "pthread.h"#include "sys/wait.h"#include <stdio.h>#include <stdlib.h>#include <sys/types.h>#include <unistd.h>#define LEFT 30000000#define RIGHT 30000200
#define THREAD_NUM 12
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;static pthread_t thread_array[THREAD_NUM];static int num = 0;
int sieve(int is_primer) { int mask = 0; for (int j = 2; j <= is_primer / 2; j++) { if (is_primer % j == 0) { mask = 1; break; } } if (!mask) { printf("The Num [%d] is Primer \n", is_primer); } fflush(stdout); return 0;}
void *thread_task() { while (1) { pthread_mutex_lock(&mutex); if (num == 0) { pthread_mutex_unlock(&mutex); pthread_yield(); } else if (num == -1) { pthread_mutex_unlock(&mutex); pthread_exit(NULL); } else { sieve(num); num = 0; pthread_mutex_unlock(&mutex); pthread_yield(); } }}
int main() {
for (size_t i = 0; i < THREAD_NUM; i++) { pthread_create(thread_array + i, NULL, thread_task, NULL); fprintf(stdout, "thread %d is created !\n", i); fflush(stdout); }
for (size_t i = LEFT; i <= RIGHT; i++) { while (1) { pthread_mutex_lock(&mutex); if (num == 0) { // fprintf(stdout, "put num %d \n", i); fflush(stdout); num = i; pthread_mutex_unlock(&mutex); break; } pthread_mutex_unlock(&mutex); pthread_yield(); // usleep(500000); } } while (1) { pthread_mutex_lock(&mutex); if (num == 0) { num = -1; pthread_mutex_unlock(&mutex); break; } else { pthread_mutex_unlock(&mutex); pthread_yield(); } } for (size_t i = 0; i < THREAD_NUM; i++) { pthread_join(thread_array[i], NULL); } exit(0);}
性能比较
➜ build time ./primer_cond11677./primer_cond 221.12s user 1.02s system 1185% cpu 18.735 total
➜ build time ./primer11677./primer 232.46s user 13.76s system 1295% cpu 19.008 total
可以看到由于不用反复抢锁,用户空间和内核空间的CPU时间都有所减小。
线程令牌桶🪣
tbf.c:
#include "tbf.h"#include "error.h"#include "pthread.h"#include "stdint.h"#include "stdlib.h"
#define TBF_MAX 12struct tbf_st { int cps; int burst; int token; int pos;};
static pthread_t ptid;static pthread_once_t pth_once = PTHREAD_ONCE_INIT;static struct tbf_st *tbf_job[TBF_MAX];static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;static int cntnums = 0;
void *handler(void *p) { struct timespec ts; ts.tv_sec = 1; ts.tv_nsec = 0; while (cntnums) { pthread_mutex_lock(&mutex); for (size_t i = 0; i < cntnums; i++) { struct tbf_st *curt = tbf_job[i]; curt->token += curt->cps; if (curt->token > curt->burst) { curt->token = curt->burst; } } pthread_mutex_unlock(&mutex); pthread_cond_broadcast(&cond);
nanosleep(&ts, NULL); }
return (void *)1;}
int fetch_token(tbf_st *tbf_ptr, int size) { struct tbf_st *tbf = tbf_ptr; if (size <= 0) { return -1; } pthread_mutex_lock(&mutex); if (tbf->token == 0) { pthread_cond_wait(&cond, &mutex); } if (tbf->token <= size) { int ret_token = tbf->token; tbf->token = 0; pthread_mutex_unlock(&mutex); return ret_token; } else { tbf->token -= size; pthread_mutex_unlock(&mutex); return size; } pthread_mutex_unlock(&mutex); return -1;}
int try_fetch_token(tbf_st *tbf_ptr, int size) { struct tbf_st *tbf = tbf_ptr; if (size <= 0) { return -1; } pthread_mutex_lock(&mutex); if (tbf->token == 0) { pthread_mutex_unlock(&mutex); return 0; } if (tbf->token <= size) { int ret_token = tbf->token; tbf->token = 0; pthread_mutex_unlock(&mutex); return ret_token; } else { tbf->token -= size; pthread_mutex_unlock(&mutex); return size; } return -1;}
int return_token(tbf_st *tbf_ptr, int size) { struct tbf_st *tbf = tbf_ptr;
if (size <= 0) { return -1; }
tbf->token += size; if (tbf->token > tbf->burst) { tbf->token = tbf->burst; } return 0;}
tbf_st *tbf_init(int cps, int burst) { struct tbf_st *tbf = malloc(sizeof(struct tbf_st)); if (tbf == NULL) { return NULL; }
tbf->burst = burst; tbf->cps = cps; tbf->token = 0; if (cntnums == TBF_MAX) { free(tbf); return NULL; } tbf_job[cntnums] = tbf; tbf_job[cntnums]->pos = cntnums;
if (cntnums++ == 0) { if (pthread_create(&ptid, NULL, handler, NULL)) { return NULL; } }
return tbf;}
int tbf_destory(tbf_st *tbf_p) { if (tbf_p == NULL) { return -1; } free(tbf_p); tbf_p = NULL; cntnums--; if (cntnums == 0) { pthread_join(ptid, NULL); } return 0;}
tbf.h
#ifndef TBF_H__#define TBF_H__
typedef void tbf_st;
tbf_st *tbf_init(int cps, int burst);
int tbf_destory(tbf_st *tbf_p);int return_token(tbf_st *tbf, int size);int try_fetch_token(tbf_st *tbf, int size);int fetch_token(tbf_st *tbf, int size);#endif
tbf_test.c
#include "stdio.h"#include "tbf.h"int main(int argc, char *argv[]) { tbf_st *tbf;
if ((tbf = tbf_init(10, 500)) == NULL) { return 1; }
if (argc != 2) {
fprintf(stderr, "参数错误"); return 1; } char *path = argv[1]; FILE *file = fopen(path, "r"); if (file == NULL) { perror("fopen"); return 1; } char fc; int reading = 1; int token = 0; while (1) { token += fetch_token(tbf, 1); if (token) { if ((fc = getc(file)) == EOF) { break; } fputc(fc, stdout); fflush(stdout); token--; } }
fclose(file);}
简易线程池
NOTE暂略
信号量
信号量(Semaphore)是并发编程中用来在多个线程或进程之间协调对共享资源访问的同步原语。它本质上是一个带有原子操作的计数器,通过“等待/减少”和“释放/增加”两个基本操作来控制同时访问某资源的数量。
#include "sem.h"#include <pthread.h>#include <stdio.h>#include <stdlib.h>// 信号量结构体struct sem_t { int value; // 信号量的值 pthread_mutex_t mutex; // 互斥锁 pthread_cond_t cond; // 条件变量};
// 初始化信号量sem_t *sem_init(unsigned int value) { struct sem_t *sem = malloc(sizeof(struct sem_t)); sem->value = value; pthread_mutex_init(&sem->mutex, NULL); pthread_cond_init(&sem->cond, NULL); return sem;}
// 等待信号量(P操作)int sem_wait(sem_t *sem_ptr) { struct sem_t *sem = sem_ptr;
pthread_mutex_lock(&sem->mutex);
// 如果信号量值为0,等待 while (sem->value <= 0) { pthread_cond_wait(&sem->cond, &sem->mutex); }
// 消耗一个资源 sem->value--;
pthread_mutex_unlock(&sem->mutex); return 0;}
// 非阻塞等待信号量(尝试P操作)int sem_trywait(sem_t *sem_ptr) { struct sem_t *sem = sem_ptr;
int result = 0;
pthread_mutex_lock(&sem->mutex);
if (sem->value > 0) { sem->value--; result = 0; } else { result = -1; // 资源不可用 }
pthread_mutex_unlock(&sem->mutex); return result;}
// 释放信号量(V操作)int sem_post(sem_t *sem_ptr) { struct sem_t *sem = sem_ptr;
pthread_mutex_lock(&sem->mutex);
// 增加资源 sem->value++;
// 唤醒等待的线程 pthread_cond_signal(&sem->cond);
pthread_mutex_unlock(&sem->mutex); return 0;}
// 获取信号量当前值int sem_getvalue(sem_t *sem_ptr, int *sval) { struct sem_t *sem = sem_ptr;
pthread_mutex_lock(&sem->mutex); *sval = sem->value; pthread_mutex_unlock(&sem->mutex); return 0;}
// 销毁信号量int sem_destroy(sem_t *sem_ptr) { struct sem_t *sem = sem_ptr;
pthread_mutex_destroy(&sem->mutex); pthread_cond_destroy(&sem->cond); return 0;}