Created
September 25, 2020 10:37
-
-
Save SemenMartynov/b0d8a80df9a1a88688edb5a8fed8a823 to your computer and use it in GitHub Desktop.
[Multiple producer - one consumer] with the modern C++
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <thread> | |
#include <iostream> | |
#include <queue> | |
#include <mutex> | |
#include <algorithm> | |
#include <condition_variable> | |
#include <atomic> | |
#include <random> | |
std::mutex mtx; | |
std::condition_variable condvar; | |
std::atomic<int> wip; // workers in progress | |
std::queue<int> queue; | |
void producer(int n, std::mt19937 &generator) | |
{ | |
for (int i = 0; i != n; ++i) | |
{ | |
auto sleep_time = 100 + generator() % 1000; | |
{ | |
std::lock_guard<std::mutex> lg(mtx); | |
queue.push(i); | |
std::cout << "[tid=" << std::this_thread::get_id() << "] pushing " << i << " add sleep for " << sleep_time << "ms" << std::endl; | |
} | |
condvar.notify_all(); | |
// sleep | |
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); | |
} | |
--wip; | |
condvar.notify_all(); | |
} | |
void consumer() | |
{ | |
while (true) | |
{ | |
std::unique_lock<std::mutex> ul(mtx); | |
condvar.wait(ul, [] { return !wip || !queue.empty(); }); // spurious wakeup protection, i.e: | |
//while (wip || queue.empty()) | |
// condvar.wait(ul); | |
while (!queue.empty()) | |
{ | |
std::cout << "[tid=" << std::this_thread::get_id() << "] consuming " << queue.front() << std::endl; | |
queue.pop(); | |
} | |
if (wip == 0) | |
break; | |
} | |
} | |
int main() | |
{ | |
const int max_tasks = 10; // how many int do we need from the each producer? | |
// Random generator | |
std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); | |
std::vector<std::thread *> threads(std::thread::hardware_concurrency()); // threads == vCPUs | |
// start all | |
std::transform(threads.begin(), threads.end(), threads.begin(), [&generator](auto *thr) { | |
++wip; | |
return new std::thread(producer, max_tasks, std::ref(generator)); | |
}); | |
std::thread cons(consumer); | |
// join all | |
std::for_each(threads.begin(), threads.end(), [](auto *thr) { thr->join(); delete thr; }); | |
cons.join(); | |
std::cout << "Completed!" << std::endl; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment