The Lock Free Queue
In a question in Stackoverflow about lock-free queues, the most upvoted answer (jan/20) is:
I've made a particular study of lock-free data structures in the last couple of years. I've read most of the papers in the field (there's only about fourty or so - although only about ten or fifteen are any real use :-)
AFAIK, a lock-free circular buffer has not been invented.
In another S.O. question someone said:
Lock-free queues are unicorns.
Searching the literature, we found no more encouraging words: The book The Art of Multiprocessor Programming asserts that the construction of a wait-free queue is impossible:
Corollary 5.4.1. It is impossible to construct a wait-free implementation of a queue, stack, priority queue, set, or list from a set of atomic registers.
Although FIFO queues solve two-thread consensus, they cannot solve 3-thread consensus. (pg. 107)
After years of investigation and lot of tests, I finally destiled a bare minimum solution to this problem, which I'm pretty sure that it is wait-free. (If you can refute, please do it).
In this paper/repository, I did my best to bring only the essential to the compreension of the problem and its solution. Focusing on what really matters. And yet giving some code ready to be used.
Don't let its simplicity fools you. This is the result of years of work, and I believe that is the smallest/simplest solution ever found.
After releasing the idea, I received some objections about the asuredness of my "claims". The most common is:
How can you asure that it is really lock-free?
This is somewhat hard to proof, because the problem its not well defined, and considered impossible by the academic luminars. But the main point is that the cost per operation is O(1) for any number of concurrent threads. I've tested with up to 512 threads reading and writing a single position buffer and got the same result.
But...
The Curry-Howard Correspondence says:
a proof is a program, and the formula it proves is the type for the program.
I think that's a fancy way to say: The program is the proof of itself.
Here is some verified facts and features of this program/formula:
- N threads (tested up to 512).
- N buffer size (minimum 1).
- Constant cost per operation O(1).
- Only 2 atomic variables.
- No locks or mutexes.
- Freely preempted.
- Zero checksum.
Follow a compreensive description of the algorithm.
If you like to put your hands dirt and dive right into de code, start at test.cpp and queue.h. (We have implementations in C# and pascal too.)
A bare minimum solution to the 3-thread consensus, implemented as a MRMW (multi-read/multi-write) circular buffer. In the context of a multi-threaded producer/consumer testcase.
For the sake of simplicity, in this docs I used a simplified JavaScript pseudocode, familiar for anyone using C-like languages. For the real thing, refer to source code.
class Queue(size) {
data = Array(size);
in = 0, out = 0
mask = size-1
push(item) // send the item & return the id of the job
pop() // get the next item.
}
data
Is a common array (non atomic). Updated without locking. Secured by in
& out
.
size
must be a power of 2 (1, 2, 4, 8, 16...) The buffer will be indexed by binary mask (data[t & mask]
) limiting the memory access inside. At minimum, a single bit.
in
Holds a the ID of the next element. Always ahead or equal out
.
out
Is the ID of the last element (next to be removed). Never greater than in
Both are atomic variables, always incremented.
No boundary check is needed, because mask
and the way integer overflow happens.
Here, the heart of the solution, where we solve the 3-thread consensus.
push(item)
{
let i
do {
i = in // get a local copy the next id
while (i - out == size) sleep() // if full, wait for space
} while ( (data[i & mask]) || (CompareAndSwap(in, i+1, i) != i) )
// if the seat is lost or CAS failed, try again.
data[i & mask] = item // now is safe to update the buffer using local i
return i // id of the job
}
-
If the thread is preempted at any point between
i = in
andCompareAndSwap(input, i+1, i)
, on return the CAS will fail and the loop go to the next seat. Without any kind of locking. -
I think that
(data[i & mask]) ||
should not be really neeed, but my computer hangs without it. And it prevents the use of the expensive CAS instruction. -
Here I check for nullability of the content. But in the C++ implementation it was replaced by an isFree boolean array.
pop()
{
let o
do {
o = out // id of the next candidate
while (o == in) sleep() // if empty, wait for items
} while ( !(data[o & mask]) || CompareAndSwap(out, o+1, o) != o )
// if the candidate is gone or CAS failed, try again.
o &= mask // round to fit the buffer
int r = data[o] // save the return
buffer[o] = 0 // release the seat
return r
}
Both methods have two nested while()
loops:
-
First we get the next seat/candidates with
i = in
ando = out
-
Then we check if the buffer is full or empty,
sleep()
ing until state change. -
If the seat/candidate is available, try increment the atomic register
CompareAndSwap(out, o+1, o)
. -
If the CAS fail (
!=o
), go to the next seat.
-
The load operation used by
data[h] = 0
and!(data[o & mask])
are naturaly atomic. -
The last thing done by
pop()
is release the seat. -
The flow happens without any kind of lock.
If in == out
the Queue is empty,
If in-out == size
the Queue is full.
In these cases the queue do not lock, but make a voluntary preemption calling the sleep()
function.
Now, with our Queue defined, its time to put it on fire...
Lets create two groups of threads: one producing and another consuming data.
The first type of thread is the producer
, it will put a bunch of numbers into the queue Q
.
producer()
{
for(let i=1; i <= N; i++)
Q.push(i)
Q.push(-1) // end of job
Total += N
log("Produced:", N)
}
Here we push -1 into the queue to signal the termination of the job. Also can be implemented with external flags.
Now, the work of the consumer
is remove elements out of the queue, until receiving a termination.
consumer()
{
let value, sum = 0
do {
value = Q.pop()
sum += value
} while( value != -1 )
Total -= sum
log("Consumed:", sum)
}
Producers increment Total
and consumers decrement it. At the end, it must be zero, proofing that there is no leaks.
Lets start our threads and check if what we get out the queue its equal to what we pushed into.
let producers = []
, consumers = []
for(int i=0; i < 4; i++)
{
producers[i] = Thread(producer)
consumers[i] = Thread(consumer)
}
wait(producers, consumers)
log("Total: %d", Total)
- Here we create 8 threads to flow data. For sake of simplicity, with the same number of producers and consumers, but it works equally in asymmetric conditions.
- I've tested with at most 512 threads on an old Windows XP machine, (where I got the better performace).
This is the output I got from the C++ implementation, flowing 10M items:
Creating 4 producers & 4 consumers
Flowing 10.000.000 items trough the queue.
Produced: 2.500.000
Consumed: 1.349.413
Produced: 2.500.000
Consumed: 1.359.293
Consumed: 3.876.642
Produced: 2.500.000
Consumed: 3.414.652
Produced: 2.500.000
Total: 0
real 0m0,581s
Note that producers always pushed the same amount of items (2.5M), but consumers get different quantities, ranging from 1.3 to 3.8M. This is the expected behaviour.
Follow our expected Total:0
proofing that all produced was consumed.
Then the time took by the operation: 581 ms. A throughput of 17.2 M flow/s. (Enough for an old Dell M6300 Core duo).
I made a series of benchmarks, varying the buffer size and the number of threads. Follow the results.
With default buffer size (64), varying the number of threads:
- The cost per operation with 2 threads is the same for 512. I think this is the definitive proof lock-freedom.
Comments, benchmarks and use cases are welcome.
Code released under GNU 3.0 and docs under Creative Commons (CC BY-SA 3.0) licenses.