-
-
Save tmacam/668080 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef PRODUCER_CONSUMER_QUEUE_HPP_ | |
#define PRODUCER_CONSUMER_QUEUE_HPP_ | |
// Based on code from http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html | |
// Original version by Anthony Williams | |
// Modifications by Michael Anderson -- https://gist.github.com/482342 | |
#include <boost/thread.hpp> | |
#include <deque> | |
template<typename Data> | |
class concurrent_queue | |
{ | |
private: | |
std::deque<Data> the_queue; | |
mutable boost::mutex the_mutex; | |
boost::condition_variable the_condition_variable; | |
bool is_canceled; | |
public: | |
concurrent_queue() : the_queue(), the_mutex(), the_condition_variable(), is_canceled(false) {} | |
struct Canceled{}; | |
void push(Data const& data) | |
{ | |
boost::mutex::scoped_lock lock(the_mutex); | |
if (is_canceled) throw Canceled(); | |
the_queue.push_back(data); | |
lock.unlock(); | |
the_condition_variable.notify_one(); | |
} | |
bool empty() const | |
{ | |
boost::mutex::scoped_lock lock(the_mutex); | |
if (is_canceled) throw Canceled(); | |
return the_queue.empty(); | |
} | |
bool try_pop(Data& popped_value) | |
{ | |
boost::mutex::scoped_lock lock(the_mutex); | |
if (is_canceled) throw Canceled(); | |
if(the_queue.empty()) | |
{ | |
return false; | |
} | |
popped_value=the_queue.front(); | |
the_queue.pop_front(); | |
return true; | |
} | |
void wait_and_pop(Data& popped_value) | |
{ | |
boost::mutex::scoped_lock lock(the_mutex); | |
while(the_queue.empty() && !is_canceled) | |
{ | |
the_condition_variable.wait(lock); | |
} | |
if (is_canceled) throw Canceled(); | |
popped_value=the_queue.front(); | |
the_queue.pop_front(); | |
} | |
/**Wait and retrieve all data in this queue. | |
* | |
* @param dest Destination queue that will be emptied and then will | |
* receive all data contained in this queue. | |
* | |
* @warning Previous contents of @c dest are discarded. | |
* | |
*/ | |
void wait_and_take_all(std::deque<Data>& dest) | |
{ | |
boost::mutex::scoped_lock lock(the_mutex); | |
while(the_queue.empty() && !is_canceled) | |
{ | |
the_condition_variable.wait(lock); | |
} | |
if (is_canceled) throw Canceled(); | |
dest.clear(); | |
dest.swap(the_queue); | |
} | |
void cancel() | |
{ | |
boost::mutex::scoped_lock lock(the_mutex); | |
if (is_canceled) throw Canceled(); | |
is_canceled = true; | |
lock.unlock(); | |
the_condition_variable.notify_all(); | |
} | |
}; | |
#endif /* PRODUCER_CONSUMER_QUEUE_HPP_ */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment