Program Listing for File ipc_queue.hpp

Return to documentation for file (rcppsw/multiprocess/ipc_queue.hpp)

#pragma once

/*******************************************************************************
 * Includes
 ******************************************************************************/
#include <boost/thread/thread_time.hpp>
#include <deque>
#include <memory>

#include "rcppsw/multiprocess/ipc.hpp"
#include "rcppsw/rcppsw.hpp"

/*******************************************************************************
 * Namespaces/Decls
 ******************************************************************************/
namespace rcppsw::multiprocess {

/*******************************************************************************
 * Class Definitions
 ******************************************************************************/
template <class T>
class ipc_queue {
 public:
  typedef bip::allocator<T, bip::managed_shared_memory::segment_manager>
      allocator_type;

  explicit ipc_queue(allocator_type alloc)
      : m_queue(alloc), m_io_mutex(), m_wait_condition() {}

  void push(T element) {
    bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
    m_queue.push_back(element);
    m_wait_condition.notify_one();
  }
  bool is_empty() const {
    bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
    return m_queue.empty();
  }

  T pop(void) {
    bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);

    T element = m_queue.front();
    m_queue.pop_front();

    return element;
  }

  bool pop_try(T* const element) {
    bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);

    if (m_queue.empty()) {
      return false;
    }

    *element = m_queue.front();
    m_queue.pop_front();

    return true;
  }

  void pop_wait(T* const element) {
    bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);

    while (m_queue.empty()) {
      m_wait_condition.wait(lock);
    } /* while() */

    *element = m_queue.front();
    m_queue.pop_front();
  }

  bool pop_timed_wait(T* const element, int to_sec) {
    bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
    boost::system_time to =
        boost::get_system_time() + boost::posix_time::seconds(to_sec);
    m_wait_condition.timed_wait(lock, to);
    if (m_queue.empty()) {
      return false;
    }
    *element = m_queue.front();
    m_queue.pop_front();
    return true;
  } /* pop_timed_wait() */

  void clear(void) {
    bip::scoped_lock<bip::interprocess_mutex> lock(m_io_mutex);
    m_queue.clear();
  }

  size_t size() const { return m_queue.size(); }

 private:
  /* clang-format off */
  bip::deque<T, allocator_type>       m_queue;
  mutable bip::interprocess_mutex     m_io_mutex;
  mutable bip::interprocess_condition m_wait_condition;
  /* clang-format on */
};

} /* namespace rcppsw::multiprocess */