#include <boost/thread.hpp>
#include <boost/timer.hpp>
-#include <tbb/concurrent_queue.h>
-#include <tbb/concurrent_unordered_map.h>
-#include <tbb/concurrent_unordered_set.h>
+#include <tbb/atomic.h>
+#include <tbb/cache_aligned_allocator.h>
#include <array>
#include <functional>
struct client::impl
{
- boost::asio::io_service service_;
-
- udp::endpoint endpoint_;
- udp::socket socket_;
+ boost::asio::io_service service_;
+
+ udp::endpoint endpoint_;
+ udp::socket socket_;
- std::array<tbb::concurrent_unordered_map<std::string, byte_vector>, 2> updates_;
- tbb::atomic<unsigned int> updates_counter_;
-
- Concurrency::call<core::monitor::message> call_;
-
- boost::mutex cond_mutex_;
- boost::condition_variable cond_;
- tbb::atomic<bool> is_running_;
- boost::thread thread_;
+ std::unordered_map<std::string, byte_vector> updates_;
+ boost::mutex updates_mutex_;
+
+ boost::condition_variable cond_;
+ boost::mutex cond_mutex_;
+
+ tbb::atomic<bool> is_running_;
+
+ boost::thread thread_;
+
+ Concurrency::call<core::monitor::message> on_next_;
public:
impl(udp::endpoint endpoint,
Concurrency::ISource<core::monitor::message>& source)
: endpoint_(endpoint)
, socket_(service_, endpoint_.protocol())
- , call_(boost::bind(&impl::on_next, this, _1))
, thread_(boost::bind(&impl::run, this))
+ , on_next_(boost::bind(&impl::on_next, this, _1))
{
- source.link_target(&call_);
+ source.link_target(&on_next_);
}
~impl()
{
is_running_ = false;
- cond_.notify_one();
+ cond_.notify_all();
thread_.join();
}
void on_next(const core::monitor::message& msg)
- {
- write_osc_event(updates_[updates_counter_ % 1][msg.path()], msg);
- cond_.notify_one();
+ {
+ {
+ boost::lock_guard<boost::mutex> lock(updates_mutex_);
+ write_osc_event(updates_[msg.path()], msg);
+ }
+
+ cond_.notify_all();
}
void run()
{
- std::unordered_map<std::string, byte_vector> state;
-
is_running_ = true;
- while(is_running_)
+ std::unordered_map<std::string, byte_vector> updates;
+
+ while(true)
{
std::vector<boost::asio::mutable_buffers_1> buffers;
boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
cond_.wait(cond_lock);
-
- auto& updates = updates_[(++updates_counter_ - 1) % 1];
- BOOST_FOREACH(auto& slot, updates)
+ if(!is_running_)
+ break;
+
{
- if(slot.second.empty())
- continue;
-
- auto it = state.lower_bound(slot.first);
- if(it == std::end(state) || state.key_comp()(slot.first, it->first))
- it = state.insert(it, std::make_pair(slot.first, slot.second));
- else
- std::swap(it->second, slot.second);
-
- slot.second.clear();
-
- buffers.push_back(boost::asio::buffer(it->second));
+ boost::lock_guard<boost::mutex> lock(updates_mutex_);
+ std::swap(updates, updates_);
}
+
+ BOOST_FOREACH(auto& slot, updates)
+ buffers.push_back(boost::asio::buffer(slot.second));
if(!buffers.empty())
socket_.send_to(buffers, endpoint_);
+
+ updates.clear();
}
}
};