]> git.sesse.net Git - casparcg/commitdiff
osc: Simplify with a regular lock.
authorRobert Nagy <ronag@live.com>
Sat, 6 Jul 2013 23:45:29 +0000 (01:45 +0200)
committerRobert Nagy <ronag@live.com>
Sat, 6 Jul 2013 23:45:29 +0000 (01:45 +0200)
protocol/osc/client.cpp

index 4008c09e5c042ee3a6eafc3254d7833e5dbc8777..261d50f89ebd06b200296eb38bd483d2aa1ce533 100644 (file)
@@ -11,9 +11,8 @@
 #include <boost/thread.hpp>
 #include <boost/timer.hpp>
 
-#include <tbb/concurrent_queue.h>
-#include <tbb/concurrent_unordered_map.h>
-#include <tbb/concurrent_unordered_set.h>
+#include <tbb/atomic.h>
+#include <tbb/cache_aligned_allocator.h>
 
 #include <array>
 #include <functional>
@@ -106,81 +105,82 @@ void write_osc_event(byte_vector& destination, const core::monitor::message& e)
 
 struct client::impl
 {
-       boost::asio::io_service                                                                                                 service_;
-                                                                                                                                                       
-       udp::endpoint                                                                                                                   endpoint_;
-       udp::socket                                                                                                                             socket_;        
+       boost::asio::io_service                                                 service_;
+                                                                                                       
+       udp::endpoint                                                                   endpoint_;
+       udp::socket                                                                             socket_;        
        
-       std::array<tbb::concurrent_unordered_map<std::string, byte_vector>, 2>  updates_;
-       tbb::atomic<unsigned int>                                                                                               updates_counter_;                 
-
-       Concurrency::call<core::monitor::message>                                                               call_;
-                                                                               
-       boost::mutex                                                                                                                    cond_mutex_;
-       boost::condition_variable                                                                                               cond_;
-       tbb::atomic<bool>                                                                                                               is_running_;
-       boost::thread                                                                                                                   thread_;
+       std::unordered_map<std::string, byte_vector>    updates_;
+       boost::mutex                                                                    updates_mutex_;
+                                                               
+       boost::condition_variable                                               cond_;
+       boost::mutex                                                                    cond_mutex_;
+
+       tbb::atomic<bool>                                                               is_running_;
+
+       boost::thread                                                                   thread_;
+
+       Concurrency::call<core::monitor::message>               on_next_;
        
 public:
        impl(udp::endpoint endpoint, 
                 Concurrency::ISource<core::monitor::message>& source)
                : endpoint_(endpoint)
                , socket_(service_, endpoint_.protocol())
-               , call_(boost::bind(&impl::on_next, this, _1))
                , thread_(boost::bind(&impl::run, this))
+               , on_next_(boost::bind(&impl::on_next, this, _1))
        {
-               source.link_target(&call_);             
+               source.link_target(&on_next_);  
        }
 
        ~impl()
        {               
                is_running_ = false;
 
-               cond_.notify_one();
+               cond_.notify_all();
 
                thread_.join();
        }
        
        void on_next(const core::monitor::message& msg)
-       {
-               write_osc_event(updates_[updates_counter_ % 1][msg.path()], msg);
-               cond_.notify_one();
+       {                               
+               {
+                       boost::lock_guard<boost::mutex> lock(updates_mutex_);
+                       write_osc_event(updates_[msg.path()], msg);
+               }
+
+               cond_.notify_all();
        }
 
        void run()
        {
-               std::unordered_map<std::string, byte_vector> state;
-
                is_running_ = true;
 
-               while(is_running_)
+               std::unordered_map<std::string, byte_vector> updates;
+
+               while(true)
                {
                        std::vector<boost::asio::mutable_buffers_1>     buffers;
 
                        boost::unique_lock<boost::mutex> cond_lock(cond_mutex_);
                                        
                        cond_.wait(cond_lock);
-                               
-                       auto& updates = updates_[(++updates_counter_ - 1) % 1];
 
-                       BOOST_FOREACH(auto& slot, updates)              
+                       if(!is_running_)
+                               break;
+                                                       
                        {
-                               if(slot.second.empty())
-                                       continue;
-
-                               auto it = state.lower_bound(slot.first);
-                               if(it == std::end(state) || state.key_comp()(slot.first, it->first))
-                                       it = state.insert(it, std::make_pair(slot.first, slot.second));
-                               else
-                                       std::swap(it->second, slot.second);
-
-                               slot.second.clear();
-
-                               buffers.push_back(boost::asio::buffer(it->second));
+                               boost::lock_guard<boost::mutex> lock(updates_mutex_);
+                               std::swap(updates, updates_);
                        }
+                                               
+                       BOOST_FOREACH(auto& slot, updates)              
+                               buffers.push_back(boost::asio::buffer(slot.second));
 
                        if(!buffers.empty())
                                socket_.send_to(buffers, endpoint_);
+
+                       updates.clear();
                }
        }
 };