chenty.in

Putting the Thread Pool Together: Submit, Execute, Wait, and Destroy

In the previous articles, we built the main pieces of a thread pool one by one.

First, we used:

function pointer + void* argument

to describe a task.

Then we used:

ring buffer

to store tasks.

Then we used:

mutex

to protect the queue from concurrent modification.

Finally, we used:

condition variables

to let workers sleep, wake, wait for space, and wait for completion.

Now we can put these parts together into a usable thread pool.

This version uses a common C-style interface:

ThreadPool pool;
thread_pool_init(&pool, thread_count, queue_capacity);

The caller owns the ThreadPool object itself.

The pool only allocates its internal threads array and queue.

That means thread_pool_destroy releases internal resources, but it does not call:

free(pool);

because pool was not allocated by the thread pool.


1. The thread pool interface

The public API is:

int thread_pool_init(ThreadPool* pool, int thread_count, int queue_capacity);

int thread_pool_submit(ThreadPool* pool, thread_task_fn fn, void* arg);

int thread_pool_wait(ThreadPool* pool);

int thread_pool_destroy(ThreadPool* pool);

The typical usage is:

prepare a ThreadPool variable
initialize it
submit tasks
wait for tasks to finish
destroy the pool

In code:

ThreadPool pool;

if (thread_pool_init(&pool, 4, 64) != 0) {
    return 1;
}

/* submit tasks */

thread_pool_wait(&pool);
thread_pool_destroy(&pool);

This interface style has an important ownership rule:

The caller owns the ThreadPool object.
The thread pool owns the resources inside it.

So destroy should release:

threads array
task queue
mutex
condition variables

but it should not release the ThreadPool object itself.


2. The thread pool data structure

A minimal ThreadPool may look like this:

typedef void (*thread_task_fn)(void*);

typedef struct {
  thread_task_fn fn;
  void* arg;
} ThreadTask;

typedef struct {
  pthread_t* threads;
  int thread_count;

  ThreadTask* queue;
  int queue_capacity;
  int queue_size;
  int queue_head;
  int queue_tail;

  int working_count;
  int stop;

  pthread_mutex_t mutex;
  pthread_cond_t not_empty;
  pthread_cond_t not_full;
  pthread_cond_t all_done;
} ThreadPool;

The queue fields are:

ThreadTask* queue;
int queue_capacity;
int queue_size;
int queue_head;
int queue_tail;

They describe the ring buffer.

The worker state fields are:

int working_count;
int stop;

working_count records how many workers are currently running tasks.

stop records whether the pool is shutting down.

The synchronization fields are:

pthread_mutex_t mutex;
pthread_cond_t not_empty;
pthread_cond_t not_full;
pthread_cond_t all_done;

Their roles are:

mutex:      protects all shared pool state
not_empty:  workers wait here when the queue is empty
not_full:   submitters wait here when the queue is full
all_done:   waiters wait here until the queue is empty and no worker is active

3. Initializing the pool: thread_pool_init

Initialization needs to do several things:

1. Validate arguments.
2. Clear the ThreadPool struct.
3. Store thread_count and queue_capacity.
4. Allocate the worker thread array.
5. Allocate the task queue.
6. Initialize the mutex and condition variables.
7. Create worker threads.

The implementation:

int thread_pool_init(ThreadPool* pool, int thread_count, int queue_capacity) {
  int i;

  if (pool == NULL || thread_count <= 0 || queue_capacity <= 0) {
    return -1;
  }

  memset(pool, 0, sizeof(*pool));
  pool->thread_count = thread_count;
  pool->queue_capacity = queue_capacity;
  pool->threads = (pthread_t*)malloc((size_t)thread_count * sizeof(pthread_t));
  pool->queue = (ThreadTask*)malloc((size_t)queue_capacity * sizeof(ThreadTask));
  if (pool->threads == NULL || pool->queue == NULL) {
    free(pool->threads);
    free(pool->queue);
    memset(pool, 0, sizeof(*pool));
    return -1;
  }

  pthread_mutex_init(&pool->mutex, NULL);
  pthread_cond_init(&pool->not_empty, NULL);
  pthread_cond_init(&pool->not_full, NULL);
  pthread_cond_init(&pool->all_done, NULL);

  for (i = 0; i < thread_count; ++i) {
    if (pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool) != 0) {
      pthread_mutex_lock(&pool->mutex);
      pool->stop = 1;
      pthread_cond_broadcast(&pool->not_empty);
      pthread_mutex_unlock(&pool->mutex);

      while (--i >= 0) {
        pthread_join(pool->threads[i], NULL);
      }

      pthread_mutex_destroy(&pool->mutex);
      pthread_cond_destroy(&pool->not_empty);
      pthread_cond_destroy(&pool->not_full);
      pthread_cond_destroy(&pool->all_done);
      free(pool->threads);
      free(pool->queue);
      memset(pool, 0, sizeof(*pool));
      return -1;
    }
  }

  return 0;
}

The most important engineering detail here is rollback on failure.

Suppose we want to create four workers, but creating the third one fails.

The previously created workers must not be left running in the background.

So the code:

sets stop = 1
broadcasts not_empty to wake existing workers
joins already-created workers
destroys synchronization objects
frees threads and queue
clears the pool
returns -1

This is much better than simply returning an error in the middle of initialization.


4. The worker main loop

Each worker repeatedly does:

wait for a task
take a task
run it
update completion state
exit when the pool is stopping and no task remains

The implementation:

static void* thread_pool_worker(void* arg) {
  ThreadPool* pool = (ThreadPool*)arg;

  for (;;) {
    ThreadTask task;

    pthread_mutex_lock(&pool->mutex);
    while (pool->queue_size == 0 && !pool->stop) {
      pthread_cond_wait(&pool->not_empty, &pool->mutex);
    }

    if (pool->stop && pool->queue_size == 0) {
      pthread_mutex_unlock(&pool->mutex);
      break;
    }

    task = pool->queue[pool->queue_head];
    pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
    pool->queue_size -= 1;
    pool->working_count += 1;
    pthread_cond_signal(&pool->not_full);
    pthread_mutex_unlock(&pool->mutex);

    task.fn(task.arg);

    pthread_mutex_lock(&pool->mutex);
    pool->working_count -= 1;
    if (pool->queue_size == 0 && pool->working_count == 0) {
      pthread_cond_broadcast(&pool->all_done);
    }
    pthread_mutex_unlock(&pool->mutex);
  }

  return NULL;
}

The first half takes a task:

pthread_mutex_lock(&pool->mutex);
while (pool->queue_size == 0 && !pool->stop) {
  pthread_cond_wait(&pool->not_empty, &pool->mutex);
}

if (pool->stop && pool->queue_size == 0) {
  pthread_mutex_unlock(&pool->mutex);
  break;
}

task = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size -= 1;
pool->working_count += 1;
pthread_cond_signal(&pool->not_full);
pthread_mutex_unlock(&pool->mutex);

The logic is:

queue empty and pool not stopping: sleep
pool stopping and queue empty: exit
otherwise: take one task and mark one worker as active

The second half runs the task and updates state:

task.fn(task.arg);

pthread_mutex_lock(&pool->mutex);
pool->working_count -= 1;
if (pool->queue_size == 0 && pool->working_count == 0) {
  pthread_cond_broadcast(&pool->all_done);
}
pthread_mutex_unlock(&pool->mutex);

The task runs outside the mutex.

This is critical.

If a worker held the mutex while running task.fn, other workers would be blocked from taking tasks. The pool would effectively become serial.


5. submit: submitting tasks

Submitting a task is the producer side:

int thread_pool_submit(ThreadPool* pool, thread_task_fn fn, void* arg) {
  if (pool == NULL || fn == NULL) {
    return -1;
  }

  pthread_mutex_lock(&pool->mutex);
  while (pool->queue_size == pool->queue_capacity && !pool->stop) {
    pthread_cond_wait(&pool->not_full, &pool->mutex);
  }

  if (pool->stop) {
    pthread_mutex_unlock(&pool->mutex);
    return -1;
  }

  pool->queue[pool->queue_tail].fn = fn;
  pool->queue[pool->queue_tail].arg = arg;
  pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
  pool->queue_size += 1;
  pthread_cond_signal(&pool->not_empty);
  pthread_mutex_unlock(&pool->mutex);
  return 0;
}

The logic is:

queue full: wait on not_full
pool stopping: reject the task
queue has space: insert the task
task inserted: signal not_empty to wake one worker

This line:

pthread_cond_signal(&pool->not_empty);

does not try to wake all workers.

Only one task was added, so waking one worker is normally enough.


6. wait: waiting until all tasks finish

The wait function is:

int thread_pool_wait(ThreadPool* pool) {
  if (pool == NULL) {
    return -1;
  }

  pthread_mutex_lock(&pool->mutex);
  while (pool->queue_size > 0 || pool->working_count > 0) {
    pthread_cond_wait(&pool->all_done, &pool->mutex);
  }
  pthread_mutex_unlock(&pool->mutex);
  return 0;
}

The important field is working_count.

queue_size == 0 only means:

there are no queued tasks

It does not mean:

no worker is currently running a task

So the real completion condition is:

queue_size == 0 && working_count == 0

Written as a wait loop:

while (pool->queue_size > 0 || pool->working_count > 0) {
  pthread_cond_wait(&pool->all_done, &pool->mutex);
}

As long as there are queued tasks or active workers, the caller keeps waiting.


7. destroy: destroying the thread pool

Destroying the pool means:

stop accepting new work
wake all waiting threads
join all workers
release internal resources
clear the struct

The implementation:

int thread_pool_destroy(ThreadPool* pool) {
  int i;

  if (pool == NULL) {
    return -1;
  }

  pthread_mutex_lock(&pool->mutex);
  pool->stop = 1;
  pthread_cond_broadcast(&pool->not_empty);
  pthread_cond_broadcast(&pool->not_full);
  pthread_mutex_unlock(&pool->mutex);

  for (i = 0; i < pool->thread_count; ++i) {
    pthread_join(pool->threads[i], NULL);
  }

  pthread_mutex_destroy(&pool->mutex);
  pthread_cond_destroy(&pool->not_empty);
  pthread_cond_destroy(&pool->not_full);
  pthread_cond_destroy(&pool->all_done);

  free(pool->threads);
  free(pool->queue);
  memset(pool, 0, sizeof(*pool));

  return 0;
}

This is not forcefully killing workers.

It is a cooperative shutdown:

set stop = 1
wake waiters
let workers observe stop and exit naturally

A worker exits only when:

if (pool->stop && pool->queue_size == 0)

So if there are still tasks in the queue, workers continue taking and running them.

This shutdown strategy means:

reject new tasks
finish already queued tasks
wait for workers to exit
free resources

Also notice:

pthread_cond_broadcast(&pool->not_full);

This wakes submitters that may be blocked because the queue was full.

After waking, they check:

if (pool->stop) {
  pthread_mutex_unlock(&pool->mutex);
  return -1;
}

and return failure.


8. API usage contract

There is one lifecycle rule the API cannot magically enforce:

After thread_pool_destroy starts, no new thread should continue calling thread_pool_submit.

Submitters already blocked inside submit can be woken safely:

pthread_cond_broadcast(&pool->not_full);

They wake up, see:

if (pool->stop) {
  pthread_mutex_unlock(&pool->mutex);
  return -1;
}

and fail cleanly.

But if some thread calls thread_pool_submit after thread_pool_destroy has completed and the mutex has already been destroyed, that is a caller-side lifecycle bug.

The pool cannot protect itself from being used after destruction.


9. Example usage

A complete usage example:

#include "thread_pool.h"

#include <stdio.h>

void print_number(void* arg) {
  int* number = (int*)arg;
  printf("task: %d\n", *number);
}

int main(void) {
  ThreadPool pool;
  int nums[100];
  int i;

  if (thread_pool_init(&pool, 4, 64) != 0) {
    return 1;
  }

  for (i = 0; i < 100; ++i) {
    nums[i] = i;
    thread_pool_submit(&pool, print_number, &nums[i]);
  }

  thread_pool_wait(&pool);
  thread_pool_destroy(&pool);

  return 0;
}

One important detail:

&nums[i]

must remain valid until the task runs.

The thread pool only stores the pointer:

pool->queue[pool->queue_tail].arg = arg;

It does not copy the data pointed to by arg.

So this is dangerous:

for (int i = 0; i < 100; ++i) {
  int x = i;
  thread_pool_submit(&pool, print_number, &x);
}

The task may run later.

But x is a local variable inside the loop. Its lifetime and value are not stable for asynchronous execution.

The correct approach is to make the argument object live long enough.

For example, use an array, allocate memory on the heap, or make the caller clearly own task argument lifetimes.


10. Summary

The lifecycle of this thread pool is:

init  ->  submit  ->  worker  ->  wait  ->  destroy

It is not:

create  ->  submit  ->  worker  ->  wait  ->  destroy

The difference is:

The caller owns the ThreadPool object.
The pool only allocates threads and queue internally.
destroy does not free(pool); it only releases internal resources.

The synchronization relationships are:

workers wait on not_empty
submitters wait on not_full
waiters wait on all_done
destroy sets stop and broadcasts to wake waiting threads

The state relationships are:

queue_size records how many tasks are queued
working_count records how many workers are executing tasks
stop records whether the pool is shutting down

The completion condition is:

pool->queue_size == 0 && pool->working_count == 0

The worker exit condition is:

pool->stop && pool->queue_size == 0

At this point, we have a small but complete C thread pool.

It has no complicated scheduling strategy.

It has no cancellation, priority, or dynamic resizing.

But it already contains the key engineering pieces:

task abstraction
ring buffer
mutex
condition variables
completion waiting
cooperative shutdown
initialization rollback

Once these are clear, more advanced thread pools become easier to understand.

They are no longer just a pile of APIs. Each state variable has a reason to exist, and each synchronization step protects a specific invariant.