C语言实现一个线程池

C语言实现一个线程池

先学习几个 pthread 的线程函数:

pthread_cond_wait

作用

  • 让线程等待某个条件成立。
  • 在等待期间,线程会释放关联的互斥锁,以避免死锁。
  • 当条件满足时,线程会被唤醒并重新获取互斥锁。

pthread_cond_signal

作用

  • 唤醒 一个 正在等待条件变量的线程。
  • 如果有多个线程在等待,具体唤醒哪一个线程是不确定的。

pthread_cond_broadcast

作用

  • 唤醒 所有 正在等待条件变量的线程。

上述三个函数,经常是配合是配合使用。

基本功能:

  • 有一个线程队列
  • 有任务需要执行时,从线程队列中取出一个线程执行任务
  • 任务执行完再次进入 pending 状态,等待唤醒

关键代码:

任务以及线程池结构体

// 线程执行的任务参数
typedef struct
{
    void (*func)(void *args); // 任务函数指针
    void *args
    } Task;

// 线程池参数
typedef struct
{
    pthread_mutex_t lock; // 线程池互斥锁
    pthread_cond_t cond;  // 线程池同步信号

    pthread_t *threads;      // 保存线程池创建的所有线程
    int32_t threadMaxNum;    // 最大可创建线程数
    int32_t threadStartStep; // 初始启动线程的个数
    int32_t threadStartCnt;  // 已启动线程个数
    int32_t threadPendCnt;   // 已启动但是处于Pending状态的线程

    Task *taskQueue; // 等待执行的任务队列
    int32_t taskQueueSize;     // 任务队列的大小
    int32_t taskQueueHead;     // 当前任务队列头索引
    int32_t taskQueueTail;     // 当前任务队列尾索引
    int32_t taskPendCnt;       // 等待执行的任务个数

    int32_t isShutdown; // 线程池正在关闭
} ThreadpoolInfo;

线程池的创建:

ThreadpoolInfo *threadpool_create(
    int32_t threadMaxNum,
    int32_t threadStartStep,
    int32_t taskQueueSize)
{
    ThreadpoolInfo *threadpool = NULL;

    if ((0 >= threadMaxNum) || (0 >= threadStartStep) || (0 >= taskQueueSize))
    {
        THREADPOOL_ERR("invalid param.\r\n");
        goto error_exit;
    }

    threadpool = (ThreadpoolInfo *)malloc(sizeof(ThreadpoolInfo));
    if (NULL == threadpool)
    {
        THREADPOOL_ERR("malloc threadpool failed.\r\n");
        goto error_exit;
    }

    memset(threadpool, 0, sizeof(ThreadpoolInfo));
    threadpool->threadMaxNum = threadMaxNum;
    threadpool->threadStartStep = threadStartStep;
    threadpool->taskQueueSize = taskQueueSize;

    // 分配线程存储资源 */
    threadpool->threads = (pthread_t *)calloc(threadMaxNum, sizeof(pthread_t));
    if (NULL == threadpool->threads)
    {
        THREADPOOL_ERR("malloc threads failed.\r\n");
        goto error_exit;
    }

    // 分配任务队列 */
    threadpool->taskQueue = (Task *)calloc(taskQueueSize, sizeof(Task));
    if (NULL == threadpool->taskQueue)
    {
        THREADPOOL_ERR("malloc task queue failed.\r\n");
        goto error_exit;
    }

    // 初始化互斥信号量和同步信号 */
    if (0 != THREADPOOL_LOCK_INIT(threadpool))
    {
        THREADPOOL_ERR("mutex init failed.\r\n");
        goto error_exit;
    }

    if (0 != THREADPOOL_COND_INIT(threadpool))
    {
        THREADPOOL_ERR("cond init failed.\r\n");
        goto error_exit;
    }

    return threadpool;

error_exit:

    if (threadpool != NULL)
    {
        threadpool_free(threadpool);
    }

    return NULL;
}

增加任务:

int32_t threadpool_addtask(
    ThreadpoolInfo *threadpool,
    THREADPOOLTASKFUNC taskfunc,
    void *args)
{
    int32_t ret = 0;

    if ((NULL == threadpool) || (NULL == taskfunc))
    {
        THREADPOOL_ERR("invalid param.\r\n");
        return -1;
    }

    THREADPOOL_LOCK(threadpool);

    do
    {
        if (threadpool->isShutdown)
        {
            THREADPOOL_ERR("threadpool is shutdown.\r\n");
            ret = -1;
            break;
        }

        // 判断等待执行的任务队列是否满 */
        if (threadpool->taskPendCnt == threadpool->taskQueueSize)
        {
            THREADPOOL_ERR("task queue is full.\r\n");
            ret = -1;
            break;
        }

        // 如果pending状态的线程已用完,则启动新的线程 */
        if (threadpool->threadPendCnt <= 0)
        {
            if (0 != threadpool_start(threadpool))
            {
                ret = -1;
                break;
            }
        }

        // 将任务放入对尾 */
        threadpool->taskQueue[threadpool->taskQueueTail].func = taskfunc;
        threadpool->taskQueue[threadpool->taskQueueTail].args = args;

        threadpool->taskQueueTail = (threadpool->taskQueueTail + 1) % threadpool->taskQueueSize;
        threadpool->taskPendCnt++;

        // 唤醒一个线程执行任务 */
        THREADPOOL_COND_SIGNAL(threadpool);

    } while (0);

    THREADPOOL_UNLOCK(threadpool);
    return ret;
}

线程回调:

void *thread_callback(void *arg)
{
    ThreadpoolInfo *threadpool = (ThreadpoolInfo *)arg;
    Task task;

    while (1)
    {
        THREADPOOL_LOCK(threadpool);

        // 等待任务分配的信号
        // 如果当前没有等待执行的任务,并且线程池没有关闭则继续等待信号
        while ((0 == threadpool->taskPendCnt) && (0 == threadpool->isShutdown))
        {
            THREADPOOL_COND_WAIT(threadpool);
        }

        // 如果线程池已关闭,则退出线程  */
        if (threadpool->isShutdown)
            break;

        // 取任务队列中当前第一个任务 */
        task.func = threadpool->taskQueue[threadpool->taskQueueHead].func;
        task.args = threadpool->taskQueue[threadpool->taskQueueHead].args;

        threadpool->taskQueueHead = (threadpool->taskQueueHead + 1) % threadpool->taskQueueSize;
        threadpool->taskPendCnt--;
        threadpool->threadPendCnt--;

        THREADPOOL_UNLOCK(threadpool);

        // 执行任务 */
        (*(task.func))(task.args);

        // 任务执行完成后,线程进入pending状态 */
        THREADPOOL_LOCK(threadpool);
        threadpool->threadPendCnt++;
        THREADPOOL_UNLOCK(threadpool);
    }

    threadpool->threadStartCnt--;
    THREADPOOL_UNLOCK(threadpool);

    pthread_exit(NULL);
}

源码地址:

https://gitee.com/jesson-deng/cthreadpool

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注