Page 1 of 1

How a Multiple Producer Multiple Consumer queue works

Posted: August 4th, 2019, 6:05 am
by cyboryxmen
A queue is a simple data structure that operates the same way as a real life queue. Objects get "pushed" to the back of the queue and get "popped" when they reach the front of the queue. The front and back of the queue are tracked using indices like unsigned int(s) or std::size_t(s). When an object gets pushed to the queue, the back index simply gets incremented to the next empty slot in the queue. Similarly, when an object gets popped from the queue, the front index gets incremented to the next object in the queue.


A Multiple Producer Multiple Consumer queue allows multiple threads to push/pop to it. The queue has virtually infinite size where every time you push/pop to it, you just increment the front/back index infinitely.


As you might imagine, the queue's actual physical size is not infinite. When converting the virtual front/back indices to the actual physical indices of the queue, you simply "wrap" the position around the queue's physical range.


In this diagram,
  • virtual position 3 == physical position 3
  • virtual position 7 == physical position 3
  • virtual position 11 == physical position 3
As for the indices themselves, indices have a limited range of their own and can't be incremented infinitely. Fortunately, C++ makes it such that unsigned integers(not signed integers) such as the front/back indices get wrapped back to 0 automatically when they overflow. This however means that the physical range of the queue has to be a size that the index range's size is divisible by in order for the ranges to line up.


The assumption is that by the time the back of the queue warps back to the start of the queue, they'll be empty spaces to push objects to. This warping mechanism is why we call structures like these circular buffers. Their indices circle around when they exceed the size of the buffer. This however makes the structure harder to understand as it is now possible for objects at the back of the buffer to physically be in front of other objects.


For this reason, it is easier to just conceptualise the queue to have infinite size and to use the virtual indices instead of the physical ones unless we need to.

The queue contains "slots" which represent the objects it contains. Each slot contains its virtual index and the object stored within it. The slot's physical index can just be inferred from its physical position on the queue.

Now this is where the multithreading comes in.

When multiple threads try to push to the queue, they'll first grab the queue's back index through atomic operations(always use atomics when using variables manipulated by multiple threads!). Then, they'll grab the slot's index and calculate the difference from it to the queue's back index.
  • If the difference is more than 0, they are ahead(they looped back to the beginning and the slot is not empty yet). The push function should just return false indicating that there are no empty slots to push to.
  • If the difference is less than 0, they are behind(one thread has already pushed to that slot). They'll just fetch the queue's new back index(the other threads might've incremented the queue's back index if they have successfully pushed) and try again.
  • If the difference is 0, they'll try to push to the slot first.
They'll do this using a compare and set atomic operation on the queue's back index. This atomic operation does what its name suggests: it reads the variable being manipulated and compares it to a certain value. If they are equal, it sets the variable to a new value. Otherwise, it does nothing and return false.

In this case, we'll read the queue's back index, see if it is still equal to the same value and increment it if so. If the function succeeds, it means that you got to the slot first and would increment the queue's back index to indicate this. You'll store the object into the slot and atomically increment the slot's index by 1 to indicate to the other threads that they've fallen behind and to indicate to the thread that'll pop the object that you have finished storing the object. It is VERY IMPORTANT that you store the object to the slot first before atomically incrementing the slot's index or otherwise, the thread that'll pop the object would think that you've finished storing the object and pop it immediately.

If the function fails, it could mean that you were not the first to get to the slot. It could also mean that the function has spontaneously failed for no good reason(compare and set atomic operations tend to do that sometimes). Just try again until you've definitely confirmed that another thread has gotten to it first when the slot's index has been changed.

The pop function basically does the same thing but when it compares the two indices, it checks if the difference is == 1 instead of 0(meaning that a thread has finished pushing to that slot). It also sets the slot's index to its next value by adding the queue's size to its previous index.

Putting it all together, you'll get this push function here in pseudo code:

Code: Select all

def push(data):
	current_back = atomic_read(back)

	physical_index = current_back % queue_size
	slot = buffer[physical_index]

	current_slot_index = atomic_read(slot.index)
	difference = current_back - current_slot_index

	if difference > 0:
		return false;
	else if difference < 0:
		# Try again
		return push(data)
		if atomic_compare_set(back, expected = current_back, new_value = current_back + 1) is True: = data
			atomic_increment(slot.index, 1)

			return true
			# Try again
			return push(data)
From here, you should infer what the rest of the code would be. A thread safe queue that multiple threads can push/pop to simultaneously isn't as hard to understand as you might think. The hardest part about them is dealing with atomic operations which are always a nightmare but conceptually, most people would be able to understand this. You'd have to have a brain of a wombat to not be able to understand how a Multiple Producer Multiple Consumer queue works! 😊