Program Listing for File ipc_queue.hpp
↰ Return to documentation for file (rcppsw/multiprocess/ipc_queue.hpp
)
#pragma once
/*******************************************************************************
* Includes
******************************************************************************/
#include <boost/thread/thread_time.hpp>
#include <deque>
#include <memory>
#include "rcppsw/multiprocess/ipc.hpp"
#include "rcppsw/rcppsw.hpp"
/*******************************************************************************
* Namespaces/Decls
******************************************************************************/
namespace rcppsw::multiprocess {
/*******************************************************************************
* Class Definitions
******************************************************************************/
template <class T>
class ipc_queue {
public:
typedef bip::allocator<T, bip::managed_shared_memory::segment_manager>
allocator_type;
explicit ipc_queue(allocator_type alloc)
: m_queue(alloc), m_io_mutex(), m_wait_condition() {}
void push(T element) {
bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
m_queue.push_back(element);
m_wait_condition.notify_one();
}
bool is_empty() const {
bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
return m_queue.empty();
}
T pop(void) {
bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
T element = m_queue.front();
m_queue.pop_front();
return element;
}
bool pop_try(T* const element) {
bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
if (m_queue.empty()) {
return false;
}
*element = m_queue.front();
m_queue.pop_front();
return true;
}
void pop_wait(T* const element) {
bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
while (m_queue.empty()) {
m_wait_condition.wait(lock);
} /* while() */
*element = m_queue.front();
m_queue.pop_front();
}
bool pop_timed_wait(T* const element, int to_sec) {
bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
boost::system_time to =
boost::get_system_time() + boost::posix_time::seconds(to_sec);
m_wait_condition.timed_wait(lock, to);
if (m_queue.empty()) {
return false;
}
*element = m_queue.front();
m_queue.pop_front();
return true;
} /* pop_timed_wait() */
void clear(void) {
bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
m_queue.clear();
}
size_t size() const { return m_queue.size(); }
private:
/* clang-format off */
bip::deque<T, allocator_type> m_queue;
mutable bip::interprocess_mutex m_io_mutex;
mutable bip::interprocess_condition m_wait_condition;
/* clang-format on */
};
} /* namespace rcppsw::multiprocess */