I am looking for a way to launch several threads that should synchronize themselves at regular time interval (I mean that they should wait for each other, then continue all together).
I understood thread creation, and basic mutex stuff (in order to share a display or to share some variables), but syncing threads is another thing.
I tried this (see end of post) but it does not do exactly what I want (the SYNC does not trigger when it should, and even sometimes it blocks) and I don't understand why...
It seems that there is a problem with the waiting_threads variable that I use to count if all threads are waiting for a new start. I declared this variable as a atomic because it seems to me that it ensure a stable use between threads. But there must be an error somewhere...
I suspect also that I should use condition_variable and notify_all, but I did not succeed in implementing this without errors...
Any idea guys ?
PS : more than that, at the end, i would like the Thread method of my class to be a recursive function, resyncing with its peers at any depth of the recursion...
https://github.com/binbinhfr/multithread
or :
Code: Select all
// mthrd.cpp
#include "conio.h"
#include <iostream>
#include <iomanip>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <random>
#include <string>
std::mutex mutex_display; // used to lock shared display
std::mutex mutex_data; // used to lock shared data
std::mutex mutex_sync; // used to block all threads until released by the main thread.
#define tot_threads 5
std::atomic<int> launched_threads = 0; // atomic = to be shared between threads, without mutex control, but with some garanty of safe operations ?
std::atomic<int> waiting_threads = 0;
long count = 0; // shared data
std::random_device random_device;
std::mt19937 random_engine(random_device());
std::uniform_int_distribution<int> distribution(2, 10);
void Print(const std::string& s)
{
// shared display routine
mutex_display.lock();
std::cout << s << std::endl;
mutex_display.unlock();
}
class Prog
{
public:
std::string Message(std::string tag, char c);
void Thread(int nLoops);
int id = 0;
int nLoop = 0;
};
std::string Prog::Message(std::string tag, char c)
{
// format message
std::string s1, s2;
s1 = std::to_string(waiting_threads) + "/" + std::to_string(launched_threads) + "/" + std::to_string(tot_threads)
+ " loop#" + std::to_string(nLoop) + " thread#" + std::to_string(id) + " " + tag;
s1.resize(28, ' ');
s2.resize(id, '.');
s2 += c;
s2.resize(tot_threads, '.');
return(s1 + s2);
}
void Prog::Thread(int nLoops)
{
// waiting all threads being created in main call...
id = launched_threads++;
mutex_sync.lock();
mutex_sync.unlock();
// go !
Print("thread# " + std::to_string(id) + " : GO");
double d = 0;
std::string s;
// heavy calculation, separated in nLoops steps, using shared display (cout) and data
// with all thread waiting for each other at the end of each steps.
for (nLoop = 0; nLoop < nLoops; nLoop++)
{
int i, iMax, step;
iMax = 10000 * distribution(random_engine);
step = iMax / 10;
Print(Message("start", 'S'));
for (i = 0; i != iMax; i++)
{
d = tan(cos(d / 100) + sin(i / 200));
mutex_data.lock();
count++;
mutex_data.unlock();
if (i % step == 0)
{
// regularly print out some informations on the shared display
Print(Message("calc", '0' + (i / step)));
}
}
// I want all threads to sync HERE (ie. wait for each other), before next nLoop !!!
++waiting_threads; // signal that job is done and wait for lock release...
Print(Message("done", 'X'));
while (waiting_threads < launched_threads);
Print(Message("w all", 'A'));
mutex_sync.lock();
// wait for main to release all threads...
mutex_sync.unlock();
--waiting_threads; // signal that i'm unlocked
}
--launched_threads;
}
int RunOnce()
{
int nLoops = 10;
std::thread* threads = new std::thread[tot_threads];
Prog* progs = new Prog[tot_threads];
Print("CREATING " + std::to_string(tot_threads) + " THREADS");
waiting_threads = 0;
launched_threads = 0;
mutex_sync.lock();
for (int i = 0; i < tot_threads; ++i) {
Print("LAUNCH THREAD #" + std::to_string(i));
threads[i] = std::thread(&Prog::Thread, progs[i], nLoops);
}
while (launched_threads < tot_threads); // wait for all threads to be launched and ready to calculate
mutex_sync.unlock(); // unlock so that threads can continue
Print("ALL THREADS CREATED !");
while (launched_threads > 0) // continue while there are threads running
{
mutex_sync.lock();
// wait that all threads finish their current calculation step
while (waiting_threads < launched_threads);
mutex_sync.unlock();
while (waiting_threads > 0);
Print(" SYNC!");
}
Print("ALL THREADS STOPPED !");
for (int i = 0; i < tot_threads; ++i) {
Print("JOIN (waiting) thread #" + std::to_string(i));
threads[i].join();
}
Print("ALL THREADS JOINED and END");
Print("SHARED DATA " + std::to_string(count));
delete[] threads;
delete[] progs;
return 0;
}
int main()
{
for (int n = 0; n != 1; n++)
{
Print("===================================================================================");
Print(std::to_string(n));
Print("===================================================================================");
RunOnce();
}
while (!_kbhit());
}