手寫線程池 - C 語言版
線程池的組成主要分為 3 個(gè)部分,這三部分配合工作就可以得到一個(gè)完整的線程池:
- 任務(wù)隊(duì)列,存儲需要處理的任務(wù),由工作的線程來處理這些任務(wù)
- 通過線程池提供的 API 函數(shù),將一個(gè)待處理的任務(wù)添加到任務(wù)隊(duì)列,或者從任務(wù)隊(duì)列中刪除
- 已處理的任務(wù)會被從任務(wù)隊(duì)列中刪除
- 線程池的使用者,也就是調(diào)用線程池函數(shù)往任務(wù)隊(duì)列中添加任務(wù)的線程就是生產(chǎn)者線程
- 工作的線程(任務(wù)隊(duì)列任務(wù)的消費(fèi)者) ,N個(gè)
- 線程池中維護(hù)了一定數(shù)量的工作線程,他們的作用是是不停的讀任務(wù)隊(duì)列,從里邊取出任務(wù)并處理
- 工作的線程相當(dāng)于是任務(wù)隊(duì)列的消費(fèi)者角色,
- 如果任務(wù)隊(duì)列為空,工作的線程將會被阻塞 (使用條件變量 / 信號量阻塞)
- 如果阻塞之后有了新的任務(wù),由生產(chǎn)者將阻塞解除,工作線程開始工作
- 管理者線程(不處理任務(wù)隊(duì)列中的任務(wù)),1個(gè)
- 它的任務(wù)是周期性的對任務(wù)隊(duì)列中的任務(wù)數(shù)量以及處于忙狀態(tài)的工作線程個(gè)數(shù)進(jìn)行檢測
- 當(dāng)任務(wù)過多的時(shí)候,可以適當(dāng)?shù)膭?chuàng)建一些新的工作線程
- 當(dāng)任務(wù)過少的時(shí)候,可以適當(dāng)?shù)匿N毀一些工作的線程
2. 任務(wù)隊(duì)列
// 任務(wù)結(jié)構(gòu)體typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
3. 線程池定義
// 線程池結(jié)構(gòu)體struct ThreadPool
{
// 任務(wù)隊(duì)列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 當(dāng)前任務(wù)個(gè)數(shù)
int queueFront; // 隊(duì)頭 -> 取數(shù)據(jù)
int queueRear; // 隊(duì)尾 -> 放數(shù)據(jù)
pthread_t managerID; // 管理者線程ID
pthread_t *threadIDs; // 工作的線程ID
int minNum; // 最小線程數(shù)量
int maxNum; // 最大線程數(shù)量
int busyNum; // 忙的線程的個(gè)數(shù)
int liveNum; // 存活的線程的個(gè)數(shù)
int exitNum; // 要銷毀的線程個(gè)數(shù)
pthread_mutex_t mutexPool; // 鎖整個(gè)的線程池
pthread_mutex_t mutexBusy; // 鎖busyNum變量
pthread_cond_t notFull; // 任務(wù)隊(duì)列是不是滿了
pthread_cond_t notEmpty; // 任務(wù)隊(duì)列是不是空了
int shutdown; // 是不是要銷毀線程池, 銷毀為1, 不銷毀為0
};
4. 頭文件聲明
#ifndef _THREADPOOL_H#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
// 創(chuàng)建線程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);
// 銷毀線程池
int threadPoolDestroy(ThreadPool* pool);
// 給線程池添加任務(wù)
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 獲取線程池中工作的線程的個(gè)數(shù)
int threadPoolBusyNum(ThreadPool* pool);
// 獲取線程池中活著的線程的個(gè)數(shù)
int threadPoolAliveNum(ThreadPool* pool);
//////////////////////
// 工作的線程(消費(fèi)者線程)任務(wù)函數(shù)
void* worker(void* arg);
// 管理者線程任務(wù)函數(shù)
void* manager(void* arg);
// 單個(gè)線程退出
void threadExit(ThreadPool* pool);
#endif // _THREADPOOL_H
5. 源文件定義
ThreadPool* threadPoolCreate(int min, int max, int queueSize){
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小個(gè)數(shù)相等
pool->exitNum = 0;
if (pthread_mutex_init(