-
-
Save swr1bm86/4725caa2e5bc22905b37596fea53d7fc to your computer and use it in GitHub Desktop.
Go channels in C++
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <future> | |
#include <iostream> | |
#include <thread> | |
#include <queue> | |
template <typename T> | |
class concurrent_queue { | |
private: | |
std::queue<T> _queue; | |
std::mutex _mutex; | |
public: | |
bool empty() { | |
std::lock_guard<std::mutex> l(_mutex); | |
return _queue.empty(); | |
} | |
size_t size() { | |
std::lock_guard<std::mutex> l(_mutex); | |
return _queue.size(); | |
} | |
bool try_pop(T& out) { | |
std::lock_guard<std::mutex> l(_mutex); | |
if (_queue.empty()) { | |
return false; | |
} | |
out = std::move(_queue.front()); | |
_queue.pop(); | |
return true; | |
} | |
void push(T val) { | |
std::lock_guard<std::mutex> l(_mutex); | |
_queue.push(std::move(val)); | |
} | |
}; | |
class channel_closed : public std::runtime_error { | |
using std::runtime_error::runtime_error; | |
}; | |
template <typename T> | |
class buffered_channel { | |
private: | |
concurrent_queue<T> _queue; | |
std::atomic_bool _open; | |
public: | |
buffered_channel() { | |
_open.store(true); | |
} | |
~buffered_channel() { | |
drain(); | |
} | |
void drain() { | |
_open.store(false); | |
} | |
bool is_open() { | |
return _open || !_queue.empty(); | |
} | |
void send(T val) { | |
if (!_open) { | |
throw channel_closed("send attempt while closed"); | |
} | |
_queue.push(std::move(val)); | |
} | |
bool recv(T& val) { | |
while (is_open()) { | |
if (_queue.try_pop(val)) { | |
return true; | |
} | |
std::this_thread::yield(); | |
} | |
return false; | |
} | |
}; | |
int main() { | |
buffered_channel<size_t> input; | |
buffered_channel<size_t> output; | |
std::vector<std::thread> producers; | |
std::vector<std::thread> consumers; | |
for (int i = 0; i < 2; ++i) { | |
producers.emplace_back([&]{ | |
for (size_t i = 0; i < 1000; ++i) { | |
input.send(1); | |
} | |
}); | |
consumers.emplace_back([&]{ | |
size_t total = 0; | |
size_t next = 0; | |
while (input.recv(next)) { | |
total += next; | |
} | |
output.send(total); | |
}); | |
} | |
for (auto&& t : producers) { | |
t.join(); | |
} | |
input.drain(); | |
for (auto&& t : consumers) { | |
t.join(); | |
} | |
output.drain(); | |
size_t total = 0; | |
size_t next = 0; | |
while (output.recv(next)) { | |
total += next; | |
} | |
std::cout << "total: " << total << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment