chenty.in

Why Do Multiple Threads Corrupt a Queue?

In the previous article, we solved the first problem in a thread pool:

How do we put a "task" into a queue in C?

The answer was:

function pointer + void* argument + ThreadTask + ring buffer

So now we have a task queue.

The main thread puts tasks into it:

submit -> queue

Worker threads take tasks out of it:

queue -> worker

At first glance, the shape of the thread pool is already visible.

But this is exactly where we must slow down.

As soon as multiple threads access this queue at the same time, things can go wrong very quickly.

This article is about the first real concurrency problem in the thread pool:

Why can multiple threads corrupt the same queue? What exactly does a mutex protect?


1. In a single thread, queue logic is simple

Recall the task queue from the previous article.

We use an array to store tasks:

ThreadTask* queue;

Then we use a few fields to record the queue state:

int queue_capacity;
int queue_size;
int queue_head;
int queue_tail;

Their meanings are:

queue_head: the position of the next task to take
queue_tail: the position where the next task will be inserted
queue_size: how many tasks are currently in the queue

When submitting a task, we write it at queue_tail:

pool->queue[pool->queue_tail].fn = fn;
pool->queue[pool->queue_tail].arg = arg;

pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
pool->queue_size++;

When taking a task, we read it from queue_head:

ThreadTask task = pool->queue[pool->queue_head];

pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size--;

If only one thread operates on the queue, everything is fine.

But a thread pool is not a single-threaded program.

It contains multiple worker threads.

They all try to do the same thing:

take a task from the task queue

That is where the problem begins.


2. What happens if two threads take a task at the same time?

Suppose there is only one task in the queue.

The queue state is:

queue_head = 0
queue_size = 1

So index 0 contains one task that can be taken.

Worker A starts running.

It reads:

queue_head = 0

So it prepares to take the task at index 0.

But taking a task is not one indivisible operation.

It contains several steps:

1. Read queue_head.
2. Read the task at queue_head.
3. Update queue_head.
4. Update queue_size.

These steps are not atomic.

"Not atomic" means:

The thread may be interrupted between any two of these steps.

For example, worker A reads queue_head = 0, but before it updates queue_head, the operating system switches it out.

Now worker B starts running.

Worker B also sees:

queue_head = 0
queue_size = 1

So B also takes the task at index 0.

B then updates the state:

queue_head = 1
queue_size = 0

After that, worker A is scheduled again.

A still remembers that it previously saw:

queue_head = 0

So A continues and takes the same task from index 0.

The final result may be:

the same task runs twice
queue_head is updated incorrectly
queue_size becomes wrong
the internal queue state is corrupted

If we are unlucky, the program may even read invalid memory and crash.

This is one of the most common and dangerous problems in multi-threaded programming:

data race

3. What is a data race?

In simple terms, a data race happens when:

Multiple threads access the same shared data at the same time, at least one thread modifies it, and those accesses are not properly synchronized.

Inside a thread pool, many fields are shared:

queue
queue_head
queue_tail
queue_size
stop
working_count

Worker threads read and write them.

The submitting thread also reads and writes them.

If there is no protection, these variables can be touched by several threads at once.

For example:

pool->queue_size--;

This line looks simple, but at the machine level it is usually not one step.

You can think of it roughly as:

1. Read queue_size from memory.
2. Subtract 1.
3. Write the new value back to memory.

If two threads do this at the same time, we may get the classic lost update problem.

Suppose the initial value is:

queue_size = 2

Thread A reads 2.

Thread B also reads 2.

Thread A computes 2 - 1 and writes back 1.

Thread B also computes 2 - 1 and writes back 1.

Both threads executed queue_size--, but the final value is:

queue_size = 1

It should have been 0.

This is the kind of bug that makes concurrent programs hard to reason about.


4. The root cause: a thread can be scheduled away

When writing single-threaded code, we often read code as if it runs from top to bottom without interruption.

But in a multi-threaded program, that assumption is wrong.

The operating system scheduler can pause the current thread and run another thread.

This may happen between almost any two instructions:

Thread A reads queue_head.
Thread A is paused.
Thread B reads queue_head.
Thread B updates queue_head.
Thread A resumes.
Thread A continues using the old value.

The important rule is:

You cannot assume a block of code will run continuously from beginning to end.

Even this small piece:

task = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size--;

may be interleaved with another worker.

So we need a way to say:

Only one thread may execute this part at a time.

That is what a mutex gives us.


5. Critical resource and critical section

A shared object that cannot be safely modified by multiple threads at the same time is called a critical resource.

In the thread pool, the task queue is a critical resource.

More precisely, these fields belong to the same shared state:

pool->queue
pool->queue_head
pool->queue_tail
pool->queue_size

The code that reads or writes this shared state is the critical section.

For example:

take a task from the queue
insert a task into the queue
update queue_head
update queue_tail
update queue_size

These operations must not be interleaved by multiple threads.


6. What exactly is a mutex?

A mutex is a mutual exclusion lock.

The idea is simple:

Only one thread may hold the lock at a time.

Before a thread enters a critical section, it locks the mutex:

pthread_mutex_lock(&pool->mutex);

After it finishes touching the shared state, it unlocks the mutex:

pthread_mutex_unlock(&pool->mutex);

The protected region looks like this:

lock
  read/write shared state
unlock

If thread A already holds the mutex, thread B will block when it tries to lock the same mutex.

Thread B cannot enter the critical section until thread A unlocks it.

So the mutex does not make the code "atomic" at the CPU level.

It creates a rule:

threads that follow this rule will not enter the protected region at the same time

7. A mutex does not prevent scheduling

A common misunderstanding is:

After a thread locks a mutex, the operating system will not switch it out.

That is not true.

A thread can still be scheduled away while holding a mutex.

For example:

Thread A locks the mutex.
Thread A is scheduled away.
Thread B runs and tries to lock the mutex.
Thread B blocks.
Thread A resumes.
Thread A unlocks the mutex.
Thread B continues.

The mutex does not stop scheduling.

It stops another thread from entering the same critical section before the first thread leaves it.

The key guarantee is:

Even if threads are interrupted and scheduled, two threads will not execute the same protected critical section at the same time.


8. Add a lock to taking tasks

Now we can protect thread_pool_take.

The unsafe version was:

int thread_pool_take(ThreadPool* pool, ThreadTask* out) {
    if (pool->queue_size == 0) {
        return -1;
    }

    *out = pool->queue[pool->queue_head];

    pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
    pool->queue_size--;

    return 0;
}

The shared state is:

queue
queue_head
queue_size

So the locked version becomes:

int thread_pool_take(ThreadPool* pool, ThreadTask* out) {
    pthread_mutex_lock(&pool->mutex);

    if (pool->queue_size == 0) {
        pthread_mutex_unlock(&pool->mutex);
        return -1;
    }

    *out = pool->queue[pool->queue_head];
    pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
    pool->queue_size--;

    pthread_mutex_unlock(&pool->mutex);
    return 0;
}

Now only one thread at a time can inspect and update the queue state.

The important thing is that every return path must unlock the mutex.

This path:

if (pool->queue_size == 0) {
    pthread_mutex_unlock(&pool->mutex);
    return -1;
}

is just as important as the successful path.


9. Submitting tasks also needs the lock

Submitting a task also modifies shared state:

queue
queue_tail
queue_size

So thread_pool_submit must also hold the same mutex while it updates the queue.

The unsafe version was:

pool->queue[pool->queue_tail].fn = fn;
pool->queue[pool->queue_tail].arg = arg;
pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
pool->queue_size++;

The locked version is:

int thread_pool_submit(ThreadPool* pool, thread_task_fn fn, void* arg) {
    pthread_mutex_lock(&pool->mutex);

    if (pool->queue_size == pool->queue_capacity) {
        pthread_mutex_unlock(&pool->mutex);
        return -1;
    }

    pool->queue[pool->queue_tail].fn = fn;
    pool->queue[pool->queue_tail].arg = arg;
    pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
    pool->queue_size++;

    pthread_mutex_unlock(&pool->mutex);
    return 0;
}

Now submitters and workers cannot update the queue at the same time.


10. A lock protects shared state, not business logic

A very important rule:

A lock protects shared state. It should not protect slow business logic unless that logic really needs the shared state.

For example, this is bad:

pthread_mutex_lock(&pool->mutex);

task = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size--;

task.fn(task.arg);

pthread_mutex_unlock(&pool->mutex);

The task is executed while the lock is still held.

That means:

one worker is running a task
all other workers are blocked

Even if we created many worker threads, the pool behaves almost like a single-threaded executor.

The correct pattern is:

lock
  take task from shared queue
unlock

run task outside the lock

In code:

pthread_mutex_lock(&pool->mutex);

task = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size--;

pthread_mutex_unlock(&pool->mutex);

task.fn(task.arg);

The lock only covers the queue operation.

The actual task runs outside the critical section.

The rule is:

Make the locked region as small as possible, but large enough to cover all reads and writes of the shared state.


11. A small data race example

Here is a tiny program with a data race:

#include <pthread.h>
#include <stdio.h>

int counter = 0;

void* worker(void* arg) {
    for (int i = 0; i < 100000; ++i) {
        counter++;
    }
    return NULL;
}

int main(void) {
    pthread_t t1;
    pthread_t t2;

    pthread_create(&t1, NULL, worker, NULL);
    pthread_create(&t2, NULL, worker, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    printf("counter = %d\n", counter);
    return 0;
}

You might expect:

counter = 200000

But the result is often smaller.

Because this line:

counter++;

is a read-modify-write operation.

Two threads can read the same old value and overwrite each other's updates.

Now add a mutex:

#include <pthread.h>
#include <stdio.h>

int counter = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* worker(void* arg) {
    for (int i = 0; i < 100000; ++i) {
        pthread_mutex_lock(&mutex);
        counter++;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main(void) {
    pthread_t t1;
    pthread_t t2;

    pthread_create(&t1, NULL, worker, NULL);
    pthread_create(&t2, NULL, worker, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    printf("counter = %d\n", counter);
    return 0;
}

Now the protected operation is:

read counter
increment it
write it back

Only one thread can do that protected operation at a time.


12. Common pthread mutex APIs

Define a mutex:

pthread_mutex_t mutex;

Initialize it:

pthread_mutex_init(&mutex, NULL);

Lock it:

pthread_mutex_lock(&mutex);

Unlock it:

pthread_mutex_unlock(&mutex);

Destroy it:

pthread_mutex_destroy(&mutex);

For a statically allocated mutex, this is also common:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

In a thread pool struct, the mutex is usually a field:

typedef struct {
    pthread_mutex_t mutex;
    ThreadTask* queue;
    int queue_capacity;
    int queue_size;
    int queue_head;
    int queue_tail;
} ThreadPool;

13. Common mistakes when writing lock-based code

Mistake 1: forgetting to unlock

This is wrong:

pthread_mutex_lock(&pool->mutex);

if (pool->queue_size == 0) {
    return -1;
}

pthread_mutex_unlock(&pool->mutex);

If the queue is empty, the function returns while still holding the lock.

Every later thread trying to lock the same mutex will block forever.

The correct version unlocks before returning:

pthread_mutex_lock(&pool->mutex);

if (pool->queue_size == 0) {
    pthread_mutex_unlock(&pool->mutex);
    return -1;
}

pthread_mutex_unlock(&pool->mutex);

Mistake 2: locking too little

This is wrong:

pthread_mutex_lock(&pool->mutex);
task = pool->queue[pool->queue_head];
pthread_mutex_unlock(&pool->mutex);

pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size--;

Only the read of the task is protected.

But queue_head and queue_size are also shared state.

The whole queue operation must be protected together.


Mistake 3: locking too much

This is often bad:

pthread_mutex_lock(&pool->mutex);

task = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size--;
task.fn(task.arg);

pthread_mutex_unlock(&pool->mutex);

It is correct in the sense that the queue state is protected.

But it holds the lock while executing the task.

The better pattern is:

pthread_mutex_lock(&pool->mutex);

task = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_capacity;
pool->queue_size--;

pthread_mutex_unlock(&pool->mutex);

task.fn(task.arg);

Mistake 4: thinking every variable needs a lock

Not every variable needs a mutex.

Local variables owned by one thread do not need protection:

void* worker(void* arg) {
    ThreadTask task;
    int local_count = 0;
}

These are local to the current worker thread.

They are not shared.

The mutex is needed for data that can be accessed by multiple threads:

shared queue
shared counters
shared flags
shared resources

The question is not:

Is this variable important?

The question is:

Can multiple threads access it at the same time, and can at least one of them modify it?

14. Mutexes alone are not enough

After adding a mutex, the queue state is protected.

But the thread pool still has another problem.

What should a worker do if the queue is empty?

One naive version is:

while (1) {
    if (thread_pool_take(pool, &task) == 0) {
        task.fn(task.arg);
    }
}

This is safe from queue corruption if thread_pool_take uses a mutex.

But it has a new problem.

When the queue is empty, the worker keeps looping:

take failed
take failed
take failed
take failed
...

That wastes CPU.

What we really want is:

if there is no task, the worker should sleep
when a new task is submitted, wake a worker

A mutex only protects shared state.

It does not provide a waiting and notification mechanism.

So after mutexes, the next piece is:

condition variable

Summary

In this article, we used the task queue to understand the first real concurrency issue in a thread pool.

The important points are:

multiple threads can interleave while reading and writing shared state
operations like queue_size-- are not atomic
data races corrupt queue state
a mutex allows only one thread to enter a critical section at a time
the lock should cover the whole shared-state operation
the task itself should run outside the lock

At this point, the queue will not be corrupted by multiple workers.

But workers still need a way to sleep when there is no task, and wake up when a task arrives.

That is what the next article is about:

How does a thread sleep when a condition is not satisfied, and how is it woken up when the condition becomes true?