Multithreading and threads sync ?

The Partridge Family were neither partridges nor a family. Discuss.
Post Reply
binbinhfr
Posts: 78
Joined: May 9th, 2019, 10:57 pm

Multithreading and threads sync ?

Post by binbinhfr » March 22nd, 2020, 3:13 pm

Hi,

I am looking for a way to launch several threads that should synchronize themselves at regular time interval (I mean that they should wait for each other, then continue all together).
I understood thread creation, and basic mutex stuff (in order to share a display or to share some variables), but syncing threads is another thing.
I tried this (see end of post) but it does not do exactly what I want (the SYNC does not trigger when it should, and even sometimes it blocks) and I don't understand why...
It seems that there is a problem with the waiting_threads variable that I use to count if all threads are waiting for a new start. I declared this variable as a atomic because it seems to me that it ensure a stable use between threads. But there must be an error somewhere...
I suspect also that I should use condition_variable and notify_all, but I did not succeed in implementing this without errors...
Any idea guys ?

PS : more than that, at the end, i would like the Thread method of my class to be a recursive function, resyncing with its peers at any depth of the recursion...

https://github.com/binbinhfr/multithread

or :

Code: Select all

// mthrd.cpp 

#include "conio.h"
#include <iostream>
#include <iomanip>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <random>
#include <string>

std::mutex mutex_display;		// used to lock shared display
std::mutex mutex_data;			// used to lock shared data
std::mutex mutex_sync;			// used to block all threads until released by the main thread.

#define tot_threads 5
std::atomic<int> launched_threads = 0; // atomic = to be shared between threads, without mutex control, but with some garanty of safe operations ?
std::atomic<int> waiting_threads = 0;

long count = 0;			// shared data

std::random_device random_device;
std::mt19937 random_engine(random_device());
std::uniform_int_distribution<int> distribution(2, 10);

void Print(const std::string& s)
{
	// shared display routine
	mutex_display.lock();
	std::cout << s << std::endl;
	mutex_display.unlock();
}

class Prog
{
public:
	std::string Message(std::string tag, char c);
	void Thread(int nLoops);

	int id = 0;
	int nLoop = 0;
};

std::string Prog::Message(std::string tag, char c)
{
	// format message
	std::string s1, s2;
	s1 = std::to_string(waiting_threads) + "/" + std::to_string(launched_threads) + "/" + std::to_string(tot_threads)
		+ " loop#" + std::to_string(nLoop) + " thread#" + std::to_string(id) + " " + tag;
	s1.resize(28, ' ');
	s2.resize(id, '.');
	s2 += c;
	s2.resize(tot_threads, '.');
	return(s1 + s2);
}

void Prog::Thread(int nLoops)
{
	// waiting all threads being created in main call...
	id = launched_threads++;

	mutex_sync.lock();
	mutex_sync.unlock();

	// go !
	Print("thread# " + std::to_string(id) + " : GO");

	double d = 0;
	std::string s;

	// heavy calculation, separated in nLoops steps, using shared display (cout) and data 
	// with all thread waiting for each other at the end of each steps.

	for (nLoop = 0; nLoop < nLoops; nLoop++)
	{
		int i, iMax, step;

		iMax = 10000 * distribution(random_engine);
		step = iMax / 10;

		Print(Message("start", 'S'));

		for (i = 0; i != iMax; i++)
		{
			d = tan(cos(d / 100) + sin(i / 200));

			mutex_data.lock();
			count++;
			mutex_data.unlock();

			if (i % step == 0)
			{
				// regularly print out some informations on the shared display
				Print(Message("calc", '0' + (i / step)));
			}
		}

		// I want all threads to sync HERE (ie. wait for each other), before next nLoop !!!

		++waiting_threads; // signal that job is done and wait for lock release...

		Print(Message("done", 'X'));

		while (waiting_threads < launched_threads);

		Print(Message("w all", 'A'));

		mutex_sync.lock();
		// wait for main to release all threads...
		mutex_sync.unlock();

		--waiting_threads; // signal that i'm unlocked
	}

	--launched_threads;
}

int RunOnce()
{
	int nLoops = 10;

	std::thread* threads = new std::thread[tot_threads];
	Prog* progs = new Prog[tot_threads];

	Print("CREATING " + std::to_string(tot_threads) + " THREADS");

	waiting_threads = 0;
	launched_threads = 0;

	mutex_sync.lock();

	for (int i = 0; i < tot_threads; ++i) {
		Print("LAUNCH THREAD #" + std::to_string(i));
		threads[i] = std::thread(&Prog::Thread, progs[i], nLoops);
	}

	while (launched_threads < tot_threads); // wait for all threads to be launched and ready to calculate
	mutex_sync.unlock(); // unlock so that threads can continue

	Print("ALL THREADS CREATED !");

	while (launched_threads > 0) // continue while there are threads running
	{
		mutex_sync.lock();
		// wait that all threads finish their current calculation step
		while (waiting_threads < launched_threads);
		mutex_sync.unlock();
		while (waiting_threads > 0);
		Print("                            SYNC!");
	}

	Print("ALL THREADS STOPPED !");

	for (int i = 0; i < tot_threads; ++i) {
		Print("JOIN (waiting) thread #" + std::to_string(i));
		threads[i].join();
	}

	Print("ALL THREADS JOINED and END");

	Print("SHARED DATA " + std::to_string(count));

	delete[] threads;
	delete[] progs;

	return 0;
}

int main()
{
	for (int n = 0; n != 1; n++)
	{
		Print("===================================================================================");
		Print(std::to_string(n));
		Print("===================================================================================");
		RunOnce();
	}

	while (!_kbhit());
}

binbinhfr
Posts: 78
Joined: May 9th, 2019, 10:57 pm

Re: Multithreading and threads sync ?

Post by binbinhfr » March 22nd, 2020, 5:52 pm

well after reading some docs, I finally managed to do something with condition_variable.
Here is my solution.
Maybe the "pros" of the forum can think about a better way to do this multithread-sync ?

Code: Select all

// mthrd.cpp 

#include "conio.h"
#include <iostream>
#include <iomanip>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
#include <random>
#include <string>

std::mutex mutex_display;		// used to lock shared display
std::mutex mutex_data;			// used to lock shared data

#define tot_threads 9
std::atomic<int> launched_threads = 0; // atomic = to be shared between threads, without mutex control, but with some garanty of safe operations ?
std::atomic<int> waiting_threads = 0;

long count = 0;			// shared data

std::random_device random_device;
std::mt19937 random_engine(random_device());
std::uniform_int_distribution<int> distribution100(0, 100);


void Print(const std::string& s)
{
	// shared display routine
	mutex_display.lock();
	std::cout << s << std::endl;
	mutex_display.unlock();
}

class Prog
{
public:
	std::string Message(std::string tag, char c);
	void Thread(int nLoops);

	int id = 0;
	int nLoop = 0;

	std::mutex mtx;
	std::condition_variable cv;
};

std::string Prog::Message(std::string tag, char c)
{
	// format message
	std::string s1, s2;
	s1 = std::to_string(waiting_threads) + "/" + std::to_string(launched_threads) + "/" + std::to_string(tot_threads)
		+ " loop#" + std::to_string(nLoop) + " thread#" + std::to_string(id) + " " + tag;
	s1.resize(30, ' ');
	s2.resize(id, '.');
	s2 += c;
	s2.resize(tot_threads, '.');
	return(s1 + s2);
}

void Prog::Thread(int nLoops)
{
	std::unique_lock<std::mutex> lck(mtx);

	// waiting all threads being created in main call...
	id = launched_threads++;

	cv.wait(lck);

	// go !
	Print("thread# " + std::to_string(id) + " : GO");

	std::string s;

	// heavy calculation, separated in nLoops steps, using shared display (cout) and data 
	// with all thread waiting for each other at the end of each steps.

	for (nLoop = 0; nLoop < nLoops; nLoop++)
	{
		Print(Message("start", 'S'));

		int time_offset = distribution100(random_engine) / 10;

		for (int i = 0; i != 10; i++)
		{
			std::this_thread::sleep_for(std::chrono::milliseconds(time_offset+distribution100(random_engine)/10));

			// update the shared data
			mutex_data.lock();
			count++;
			mutex_data.unlock();

			// regularly print out some informations on the shared display
			Print(Message("calc", '0' + i));
		}

		// all threads sync HERE (ie. wait for each other), before next nLoop !!!

		Print(Message("end ", 'E'));

		++waiting_threads; // signal that job is done and wait for lock release...
		cv.wait(lck);
		--waiting_threads; // signal that i'm unlocked
	}

	--launched_threads;
}

int RunOnce()
{
	int nLoops = 10;

	std::thread* threads = new std::thread[tot_threads];
	Prog* progs = new Prog[tot_threads];

	Print("CREATING " + std::to_string(tot_threads) + " THREADS");

	waiting_threads = 0;
	launched_threads = 0;

	for (int i = 0; i != tot_threads; ++i) 
	{
		Print("LAUNCH THREAD #" + std::to_string(i));
		threads[i] = std::thread(&Prog::Thread, std::ref(progs[i]), nLoops);
	}

	while (launched_threads < tot_threads);

	Print("ALL THREADS CREATED !");

	for (int i = 0; i != tot_threads; ++i)
		progs[i].cv.notify_one();

	Print("ALL THREADS NOTIFIED !");

	while (launched_threads > 0) // continue while there are threads running
	{
		while (waiting_threads < launched_threads );

		Print(std::string(30,' ') + "SYNC!");

		for (int i = 0; i != tot_threads; ++i)
			progs[i].cv.notify_one();
	}

	Print("ALL THREADS STOPPED !");

	for (int i = 0; i != tot_threads; ++i) {
		Print("JOIN (waiting) thread #" + std::to_string(i));
		threads[i].join();
	}

	Print("ALL THREADS JOINED and END");

	Print("SHARED DATA " + std::to_string(count));

	delete[] threads;
	delete[] progs;

	return 0;
}

int main()
{
	for (int n = 0; n != 2; n++)
	{
		Print("===================================================================================");
		Print(std::to_string(n));
		Print("===================================================================================");
		RunOnce();
	}

	while (!_kbhit());
}

binbinhfr
Posts: 78
Joined: May 9th, 2019, 10:57 pm

Re: Multithreading and threads sync ?

Post by binbinhfr » March 23rd, 2020, 1:57 pm

Well, I went even further, because, as I said, I want to use recursive concurrent threads that sync at regular intervals.
So I changed all to recursive_mutex and condition_variable_any

here is the code. Try it, you will see, it's funny to watch it run in parallel and sync back from time to time.

Guys, if you have some suggestions to improve, I am listening.

Code: Select all

// mthrd.cpp 

#include "conio.h"
#include <iostream>
#include <iomanip>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
#include <random>
#include <string>

std::mutex mutex_display;		// used to lock shared display
std::mutex mutex_data;			// used to lock shared data

#define level_max 4
#define width_max 4

#define tot_threads 9
std::atomic<int> launched_threads = 0; // atomic = to be shared between threads, without mutex control, but with some garanty of safe operations ?
std::atomic<int> waiting_threads = 0;

long count = 0;			// shared data
long nbSyncs = 0;		// shared data

std::random_device random_device;
std::mt19937 random_engine(random_device());
std::uniform_int_distribution<int> distribution100(0, 100);
std::uniform_real_distribution<double> distributionReal(0.0, 1.0);


void Print(const std::string& s)
{
	// shared display routine
	mutex_display.lock();
	std::cout << s << std::endl;
	mutex_display.unlock();
}

class Prog
{
public:
	Prog() {};
	~Prog();

	std::string MessageRecurs(int level, int width, const std::string& tag);
	void ThreadRecursInit();
	void ThreadRecurs(int level);

	int id = 0;
	int nLoop = 0;

	std::thread* pThread = nullptr;;
	std::recursive_mutex rmtx;
	std::condition_variable_any cva;
};

Prog::~Prog()
{
	if (pThread != nullptr) delete pThread;
}

std::string Prog::MessageRecurs(int level, int width, const std::string& tag)
{
	// format message
	std::string s;
	s = std::to_string(waiting_threads) + "/" + std::to_string(launched_threads) + "/" + std::to_string(tot_threads)
		+ " thread#" + std::to_string(id);
	s.resize(20, ' ');
	s.resize(size_t(20 + id * 2), '.');
	s += std::to_string(level) + std::to_string(width);
	s.resize(20 + tot_threads * 2, '.');
	s += ' ' + tag;
	return(s);
}

void Prog::ThreadRecursInit()
{
	id = launched_threads++;

	Print("thread# " + std::to_string(id) + " : GO");

	ThreadRecurs(0);

	Print(MessageRecurs(0, 0, "end"));

	--launched_threads;
}

void Prog::ThreadRecurs(int level)
{
	if (level == level_max) return;

	for (int n = 0; n != width_max; n++)
	{
		Print(MessageRecurs(level, n, ""));

		std::this_thread::sleep_for(std::chrono::milliseconds(1 + distribution100(random_engine) / 30));

		// update the shared data
		mutex_data.lock();
		count++;
		mutex_data.unlock();

		ThreadRecurs(level + 1);

		if (distributionReal(random_engine) < 0.008 * (1.0+2.0*double(waiting_threads)))
		{
			std::unique_lock<std::recursive_mutex> lck(rmtx);
			Print(MessageRecurs(level, n, "pause"));

			++waiting_threads; // signal that job is done and wait for lock release...
			cva.wait(lck);
			--waiting_threads; // signal that i'm unlocked
		}
	}
}

int RunOnce()
{
	int nLoops = 10;

	Prog* progs = new Prog[tot_threads];

	Print("CREATING " + std::to_string(tot_threads) + " THREADS");

	waiting_threads = 0;
	launched_threads = 0;

	for (int i = 0; i != tot_threads; ++i)
	{
		Print("LAUNCH THREAD #" + std::to_string(i));
		progs[i].pThread = new std::thread(&Prog::ThreadRecursInit, std::ref(progs[i]));
	}

	while (launched_threads > 0) // continue while there are threads running
	{
		while (waiting_threads < launched_threads);

		nbSyncs++;

		Print(std::string(15, ' ') + "SYNC!");

		for (int i = 0; i != tot_threads; ++i)
			progs[i].cva.notify_one();
	}

	Print("ALL THREADS STOPPED !");

	for (int i = 0; i != tot_threads; ++i) {
		Print("JOIN (waiting) thread #" + std::to_string(i));
		progs[i].pThread->join();
	}

	Print("ALL THREADS JOINED and END");

	Print("SHARED COUNT " + std::to_string(count));
	Print("EXPECTED     " + std::to_string( (std::pow(width_max,level_max)-1)/(width_max-1)* width_max * tot_threads));
	Print("SHARED SYNCS " + std::to_string(nbSyncs));

	delete[] progs;

	return 0;
}

int main()
{
	// to test several tries....

	for (int n = 0; n != 1; n++)
	{
		Print("===================================================================================");
		Print(std::to_string(n));
		Print("===================================================================================");

		count = 0;
		nbSyncs = 0;

		RunOnce();

		std::this_thread::sleep_for(std::chrono::seconds(3));
	}

	while (!_kbhit());
}

binbinhfr
Posts: 78
Joined: May 9th, 2019, 10:57 pm

Re: Multithreading and threads sync ?

Post by binbinhfr » March 24th, 2020, 5:51 am

As said with albinopapa in another thread, I would like to use the notify_all command, and maybe only one mutex for all waiting threads, but I do not know how. I suppose that I should use a condition_variable testing part with (waiting_threads < launched_threads) but I cannot figure out how to make it work...

If someone has an idea...

albinopapa
Posts: 4373
Joined: February 28th, 2013, 3:23 am
Location: Oklahoma, United States

Re: Multithreading and threads sync ?

Post by albinopapa » March 24th, 2020, 6:20 pm

Out of curiosity, why would you need to synchronize all threads at the same point before moving on?

In GPU programming there are fence and barriers I think that are designed for this exact usage, allowing all threads to wait until given some signal. It seems that fence/barriers cpu side deal mostly with memory access ordering instead of stalling thread execution. I only bring this up because I was hoping to make things less complicated by using std::async or std::atomic_thread_fence, but I still haven't come up with anything.
Last edited by albinopapa on March 24th, 2020, 6:35 pm, edited 1 time in total.
Reason: I originally had std::atomic, when I meant std::async
If you think paging some data from disk into RAM is slow, try paging it into a simian cerebrum over a pair of optical nerves. - gameprogrammingpatterns.com

binbinhfr
Posts: 78
Joined: May 9th, 2019, 10:57 pm

Re: Multithreading and threads sync ?

Post by binbinhfr » March 25th, 2020, 5:39 am

It's still my robot programming game :
On one map, each robot is driven by a user-written script that is stored as a N-tree.
Running one script = recursively parsing the tree with a recursive function.
To ease things (and at the end, also to have a MT game, taking advantage of actual multicores CPU) I created 1 thread per robot script + 1 thread for the map script itself + 1 thread for the screen refresh. As the game is based on cycle (each action of one robot take 1 cycle of 1 second), in order for each script to be synchronize with each other, I have to make them sync at the end of every cycle. During one cycle (between 2 robot actions), the robot script can compute various things, communicate with another robot, prepare a file or whatever you need to fulfil the present game solution.
I must admit that it's in good way. The game is still in console mode with ugly characters on screen, but the user-language part and MT part are almost done. Everything is sync now.
When it will be time, I'll probably use SDL2 for the graphic part, that I already sued in another project.

albinopapa
Posts: 4373
Joined: February 28th, 2013, 3:23 am
Location: Oklahoma, United States

Re: Multithreading and threads sync ?

Post by albinopapa » March 25th, 2020, 4:36 pm

Sweet, thanks for responding. That definitely clears things up for me. Glad to see that you are making good progress.
If you think paging some data from disk into RAM is slow, try paging it into a simian cerebrum over a pair of optical nerves. - gameprogrammingpatterns.com

Post Reply