chenty.in

Before Building a Thread Pool: How Do We Put Tasks into a Queue in C?

When people first hear "thread pool", the first picture in their head is often:

Create a few threads in advance, then let them do work together.

That is true, but it is not complete.

The hard part is not only "creating threads". The hard part is:

How does the main thread hand different jobs to worker threads? How does a worker know what it should do next?

Suppose we want to implement a thread pool.

The main thread keeps submitting tasks:

task 1
task 2
task 3
task 4
...

Several worker threads wait in the background. When a task appears, one of them takes it and runs it.

The structure looks like this:

main thread submits tasks  ->  task queue  ->  workers take tasks and run them

Before writing the pool, we need to solve at least two questions:

1. What is a "task"?
2. How should tasks be queued?

This first article does not touch locks yet.

It also does not touch condition variables yet.

We only build the first piece of the thread pool:

task abstraction + task queue

1. A thread pool should not know what the task really is

If the pool only needs to run one fixed function, such as:

process_image();

then the implementation is trivial.

Inside the worker thread, we can directly write:

process_image();

That is easy, but it is not a general-purpose thread pool.

It is just a special image-processing thread.

A real thread pool should be reusable.

Today we may ask it to process an image:

process_image(image);

Tomorrow we may ask it to calculate some data:

calculate(data);

The day after that, we may ask it to send a request:

send_request(req);

In other words, the thread pool itself should not care about business logic.

It only needs to know:

Here is a task.
Run it.

So the question becomes:

How can C pass "some code to execute" around as if it were data?

The answer is:

function pointer

2. Function pointers: storing an action

A normal pointer points to data.

For example:

int x = 10;
int* p = &x;

Here, p stores the address of variable x.

A function pointer points to a function.

A function is code, and code also lives somewhere in memory while the program is running. A function pointer stores the entry address of that code.

For a thread pool, we can define a task function type like this:

typedef void (*thread_task_fn)(void*);

This line looks a bit scary at first, but it is not complicated if we split it apart.

thread_task_fn

is the new type name.

(*thread_task_fn)

says that this type is a pointer.

(void*)

says that the function it points to accepts one void* argument.

The leftmost:

void

says that the function returns nothing.

So the whole line means:

thread_task_fn is a function pointer type. It points to a function that accepts one void* argument and returns void.

In other words, any function shaped like this:

void some_task(void* arg) {
    // do something
}

can be submitted to the thread pool as a task.


3. A task = function + argument

A function alone is not enough.

For example:

void print_number(void* arg) {
    int* number = (int*)arg;
    printf("%d\n", *number);
}

This task function is only the action.

It still needs a concrete argument. For example, which number should it print?

So inside a thread pool, a task is usually made of two parts:

task function + task argument

We can write that as:

typedef void (*thread_task_fn)(void*);

typedef struct {
    thread_task_fn fn;
    void* arg;
} ThreadTask;

ThreadTask is the smallest unit of work in the thread pool.

Here:

fn

is the function to run.

arg

is the argument passed to that function.

When a worker receives a ThreadTask, it only needs to do:

task.fn(task.arg);

This line is the core of the abstraction.

The worker does not need to know whether task.fn is printing a number, processing an image, or calculating a hash.

It only follows the contract:

call this function with this argument

For example:

void print_number(void* arg) {
    int* number = (int*)arg;
    printf("%d\n", *number);
}

Submit it like this:

int value = 42;

ThreadTask task;
task.fn = print_number;
task.arg = &value;

Later, a worker executes:

task.fn(task.arg);

which is equivalent to:

print_number(&value);

This is how C turns an arbitrary piece of work into a uniform task object.


4. Why do we need a task queue?

Now we have a way to describe one task.

But a thread pool does not usually receive only one task.

The main thread may submit many tasks very quickly:

task 1
task 2
task 3
task 4

Worker threads may not be able to finish them immediately.

So we need a place to store pending tasks.

That place is the task queue.

The queue answers this question:

Where do tasks wait before workers take them?

The simplest model is:

submit task  ->  push into queue
worker takes ->  pop from queue

For the first version, we can use an array to store tasks:

ThreadTask* queue;

Then we use a few integers to track the queue state:

int queue_capacity;
int queue_size;
int queue_head;
int queue_tail;

Their meanings are:

queue_capacity: maximum number of tasks the queue can hold
queue_size:     current number of tasks in the queue
queue_head:     index of the next task to take
queue_tail:     index where the next task will be inserted

For example, if the queue looks like this:

index:  0   1   2   3   4
task:  [A] [B] [C] [ ] [ ]

then the state may be:

queue_head = 0
queue_tail = 3
queue_size = 3

The next worker takes the task at queue_head, which is A.

The next submit operation inserts a task at queue_tail, which is index 3.


5. Why use a ring buffer?

If we use a normal array queue, one problem appears quickly.

Suppose tasks keep being submitted and taken.

The head keeps moving forward:

0 -> 1 -> 2 -> 3 -> 4

The tail also keeps moving forward:

0 -> 1 -> 2 -> 3 -> 4

Soon, both reach the end of the array.

But the front part of the array may already be empty.

For example:

index:  0   1   2   3   4
task:  [ ] [ ] [C] [D] [E]

Positions 0 and 1 are empty, but a simple tail pointer has already reached the end.

We do not want to move all existing tasks every time. That would be unnecessary work.

So we let the queue wrap around.

When the index reaches the end, it goes back to zero:

next = (current + 1) % capacity;

This is a ring buffer.

The key idea is:

If we reach the end of the array, wrap back to the beginning.

For example:

capacity = 5
current = 4

next = (4 + 1) % 5 = 0

So the array is used as if it were a circle:

0 -> 1 -> 2 -> 3 -> 4
^                   |
|___________________|

For a fixed-size thread pool queue, this is simple and efficient.


6. A first submit function without thread safety

Now we can write the first version of thread_pool_submit.

This version does not consider multi-thread safety yet.

It only shows the queue logic:

int thread_pool_submit(ThreadPool* pool, thread_task_fn fn, void* arg) {
    if (pool->queue_size == pool->queue_capacity) {
        return -1;
    }

    pool->queue[pool->queue_tail].fn = fn;
    pool->queue[pool->queue_tail].arg = arg;

    pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
    pool->queue_size++;

    return 0;
}

The logic is:

1. If the queue is full, reject the task.
2. Store fn and arg at queue_tail.
3. Move queue_tail forward.
4. Increase queue_size.

Taking a task is symmetrical:

int thread_pool_take(ThreadPool* pool, ThreadTask* out) {
    if (pool->queue_size == 0) {
        return -1;
    }

    *out = pool->queue[pool->queue_head];

    pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
    pool->queue_size--;

    return 0;
}

The logic is:

1. If the queue is empty, there is no task to take.
2. Copy the task at queue_head to out.
3. Move queue_head forward.
4. Decrease queue_size.

At this point, the thread pool has a task representation and a queue.

But it is still not safe for multiple threads.


7. What is still missing?

So far, we have solved the first problem:

How can C describe a task and put it into a queue?

The answer is:

function pointer + void* argument + ThreadTask + ring buffer

But the real thread pool problem has only just started.

Because in a real thread pool:

one main thread may submit tasks
multiple worker threads may take tasks
all of them access the same queue

That means these fields become shared state:

queue
queue_head
queue_tail
queue_size

If two worker threads call thread_pool_take at the same time, what happens?

For example, both may see:

queue_size = 1
queue_head = 0

Then both may try to take the same task.

Or two submitters may update queue_tail at the same time and overwrite each other's tasks.

This is why the next article is about the first real concurrency problem:

Why do multiple threads corrupt the queue, and what exactly does a mutex protect?


Summary

Before writing the synchronization part of a thread pool, we need a task model.

In C, the common model is:

thread_task_fn fn;
void* arg;

Together they form:

ThreadTask = function + argument

Then tasks are stored in a ring buffer:

queue_head points to the next task to take
queue_tail points to the next slot to insert into
queue_size records how many tasks are currently queued

This gives us the first shape of a thread pool.

But this version is only a single-threaded model.

Once multiple threads start reading and writing the queue at the same time, the queue state may be corrupted.

That is where mutexes enter the story.