C语言实现一个线程池
先学习几个 pthread 的线程函数:
pthread_cond_wait
作用
- 让线程等待某个条件成立。
- 在等待期间,线程会释放关联的互斥锁,以避免死锁。
- 当条件满足时,线程会被唤醒并重新获取互斥锁。
pthread_cond_signal
作用
- 唤醒 一个 正在等待条件变量的线程。
- 如果有多个线程在等待,具体唤醒哪一个线程是不确定的。
pthread_cond_broadcast
作用
- 唤醒 所有 正在等待条件变量的线程。
上述三个函数,经常是配合是配合使用。
基本功能:
- 有一个线程队列
- 有任务需要执行时,从线程队列中取出一个线程执行任务
- 任务执行完再次进入 pending 状态,等待唤醒
关键代码:
任务以及线程池结构体
// 线程执行的任务参数
typedef struct
{
void (*func)(void *args); // 任务函数指针
void *args
} Task;
// 线程池参数
typedef struct
{
pthread_mutex_t lock; // 线程池互斥锁
pthread_cond_t cond; // 线程池同步信号
pthread_t *threads; // 保存线程池创建的所有线程
int32_t threadMaxNum; // 最大可创建线程数
int32_t threadStartStep; // 初始启动线程的个数
int32_t threadStartCnt; // 已启动线程个数
int32_t threadPendCnt; // 已启动但是处于Pending状态的线程
Task *taskQueue; // 等待执行的任务队列
int32_t taskQueueSize; // 任务队列的大小
int32_t taskQueueHead; // 当前任务队列头索引
int32_t taskQueueTail; // 当前任务队列尾索引
int32_t taskPendCnt; // 等待执行的任务个数
int32_t isShutdown; // 线程池正在关闭
} ThreadpoolInfo;
线程池的创建:
ThreadpoolInfo *threadpool_create(
int32_t threadMaxNum,
int32_t threadStartStep,
int32_t taskQueueSize)
{
ThreadpoolInfo *threadpool = NULL;
if ((0 >= threadMaxNum) || (0 >= threadStartStep) || (0 >= taskQueueSize))
{
THREADPOOL_ERR("invalid param.\r\n");
goto error_exit;
}
threadpool = (ThreadpoolInfo *)malloc(sizeof(ThreadpoolInfo));
if (NULL == threadpool)
{
THREADPOOL_ERR("malloc threadpool failed.\r\n");
goto error_exit;
}
memset(threadpool, 0, sizeof(ThreadpoolInfo));
threadpool->threadMaxNum = threadMaxNum;
threadpool->threadStartStep = threadStartStep;
threadpool->taskQueueSize = taskQueueSize;
// 分配线程存储资源 */
threadpool->threads = (pthread_t *)calloc(threadMaxNum, sizeof(pthread_t));
if (NULL == threadpool->threads)
{
THREADPOOL_ERR("malloc threads failed.\r\n");
goto error_exit;
}
// 分配任务队列 */
threadpool->taskQueue = (Task *)calloc(taskQueueSize, sizeof(Task));
if (NULL == threadpool->taskQueue)
{
THREADPOOL_ERR("malloc task queue failed.\r\n");
goto error_exit;
}
// 初始化互斥信号量和同步信号 */
if (0 != THREADPOOL_LOCK_INIT(threadpool))
{
THREADPOOL_ERR("mutex init failed.\r\n");
goto error_exit;
}
if (0 != THREADPOOL_COND_INIT(threadpool))
{
THREADPOOL_ERR("cond init failed.\r\n");
goto error_exit;
}
return threadpool;
error_exit:
if (threadpool != NULL)
{
threadpool_free(threadpool);
}
return NULL;
}
增加任务:
int32_t threadpool_addtask(
ThreadpoolInfo *threadpool,
THREADPOOLTASKFUNC taskfunc,
void *args)
{
int32_t ret = 0;
if ((NULL == threadpool) || (NULL == taskfunc))
{
THREADPOOL_ERR("invalid param.\r\n");
return -1;
}
THREADPOOL_LOCK(threadpool);
do
{
if (threadpool->isShutdown)
{
THREADPOOL_ERR("threadpool is shutdown.\r\n");
ret = -1;
break;
}
// 判断等待执行的任务队列是否满 */
if (threadpool->taskPendCnt == threadpool->taskQueueSize)
{
THREADPOOL_ERR("task queue is full.\r\n");
ret = -1;
break;
}
// 如果pending状态的线程已用完,则启动新的线程 */
if (threadpool->threadPendCnt <= 0)
{
if (0 != threadpool_start(threadpool))
{
ret = -1;
break;
}
}
// 将任务放入对尾 */
threadpool->taskQueue[threadpool->taskQueueTail].func = taskfunc;
threadpool->taskQueue[threadpool->taskQueueTail].args = args;
threadpool->taskQueueTail = (threadpool->taskQueueTail + 1) % threadpool->taskQueueSize;
threadpool->taskPendCnt++;
// 唤醒一个线程执行任务 */
THREADPOOL_COND_SIGNAL(threadpool);
} while (0);
THREADPOOL_UNLOCK(threadpool);
return ret;
}
线程回调:
void *thread_callback(void *arg)
{
ThreadpoolInfo *threadpool = (ThreadpoolInfo *)arg;
Task task;
while (1)
{
THREADPOOL_LOCK(threadpool);
// 等待任务分配的信号
// 如果当前没有等待执行的任务,并且线程池没有关闭则继续等待信号
while ((0 == threadpool->taskPendCnt) && (0 == threadpool->isShutdown))
{
THREADPOOL_COND_WAIT(threadpool);
}
// 如果线程池已关闭,则退出线程 */
if (threadpool->isShutdown)
break;
// 取任务队列中当前第一个任务 */
task.func = threadpool->taskQueue[threadpool->taskQueueHead].func;
task.args = threadpool->taskQueue[threadpool->taskQueueHead].args;
threadpool->taskQueueHead = (threadpool->taskQueueHead + 1) % threadpool->taskQueueSize;
threadpool->taskPendCnt--;
threadpool->threadPendCnt--;
THREADPOOL_UNLOCK(threadpool);
// 执行任务 */
(*(task.func))(task.args);
// 任务执行完成后,线程进入pending状态 */
THREADPOOL_LOCK(threadpool);
threadpool->threadPendCnt++;
THREADPOOL_UNLOCK(threadpool);
}
threadpool->threadStartCnt--;
THREADPOOL_UNLOCK(threadpool);
pthread_exit(NULL);
}
源码地址: