A Multiple Producer Multiple Consumer queue allows multiple threads to push/pop to it. The queue has virtually infinite size where every time you push/pop to it, you just increment the front/back index infinitely.
As you might imagine, the queue's actual physical size is not infinite. When converting the virtual front/back indices to the actual physical indices of the queue, you simply "wrap" the position around the queue's physical range.
In this diagram,
- virtual position 3 == physical position 3
- virtual position 7 == physical position 3
- virtual position 11 == physical position 3
The assumption is that by the time the back of the queue warps back to the start of the queue, they'll be empty spaces to push objects to. This warping mechanism is why we call structures like these circular buffers. Their indices circle around when they exceed the size of the buffer. This however makes the structure harder to understand as it is now possible for objects at the back of the buffer to physically be in front of other objects.
For this reason, it is easier to just conceptualise the queue to have infinite size and to use the virtual indices instead of the physical ones unless we need to.
The queue contains "slots" which represent the objects it contains. Each slot contains its virtual index and the object stored within it. The slot's physical index can just be inferred from its physical position on the queue.
Now this is where the multithreading comes in.
When multiple threads try to push to the queue, they'll first grab the queue's back index through atomic operations(always use atomics when using variables manipulated by multiple threads!). Then, they'll grab the slot's index and calculate the difference from it to the queue's back index.
- If the difference is more than 0, they are ahead(they looped back to the beginning and the slot is not empty yet). The push function should just return false indicating that there are no empty slots to push to.
- If the difference is less than 0, they are behind(one thread has already pushed to that slot). They'll just fetch the queue's new back index(the other threads might've incremented the queue's back index if they have successfully pushed) and try again.
- If the difference is 0, they'll try to push to the slot first.
In this case, we'll read the queue's back index, see if it is still equal to the same value and increment it if so. If the function succeeds, it means that you got to the slot first and would increment the queue's back index to indicate this. You'll store the object into the slot and atomically increment the slot's index by 1 to indicate to the other threads that they've fallen behind and to indicate to the thread that'll pop the object that you have finished storing the object. It is VERY IMPORTANT that you store the object to the slot first before atomically incrementing the slot's index or otherwise, the thread that'll pop the object would think that you've finished storing the object and pop it immediately.
If the function fails, it could mean that you were not the first to get to the slot. It could also mean that the function has spontaneously failed for no good reason(compare and set atomic operations tend to do that sometimes). Just try again until you've definitely confirmed that another thread has gotten to it first when the slot's index has been changed.
The pop function basically does the same thing but when it compares the two indices, it checks if the difference is == 1 instead of 0(meaning that a thread has finished pushing to that slot). It also sets the slot's index to its next value by adding the queue's size to its previous index.
Putting it all together, you'll get this push function here in pseudo code:
Code: Select all
def push(data):
current_back = atomic_read(back)
physical_index = current_back % queue_size
slot = buffer[physical_index]
current_slot_index = atomic_read(slot.index)
difference = current_back - current_slot_index
if difference > 0:
return false;
else if difference < 0:
# Try again
return push(data)
else:
if atomic_compare_set(back, expected = current_back, new_value = current_back + 1) is True:
slot.data = data
atomic_increment(slot.index, 1)
return true
else:
# Try again
return push(data)