#include <iostream>
#include <cstdint>
+#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/convenience.hpp>
{
try
{
- boost::this_thread::interruption_point();
handler_(event, file);
- boost::this_thread::interruption_point();
- }
- catch (const boost::thread_interrupted&)
- {
- throw;
}
catch (...)
{
class polling_filesystem_monitor : public filesystem_monitor
{
+ std::shared_ptr<boost::asio::io_service> scheduler_;
directory_monitor root_monitor_;
- boost::thread scanning_thread_;
+ executor executor_;
+ boost::asio::deadline_timer timer_;
tbb::atomic<bool> running_;
int scan_interval_millis_;
std::promise<void> initial_scan_completion_;
filesystem_event events_of_interest_mask,
bool report_already_existing,
int scan_interval_millis,
+ std::shared_ptr<boost::asio::io_service> scheduler,
const filesystem_monitor_handler& handler,
const initial_files_handler& initial_files_handler)
- : root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+ : scheduler_(std::move(scheduler))
+ , root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
+ , executor_(L"polling_filesystem_monitor")
+ , timer_(*scheduler_)
, scan_interval_millis_(scan_interval_millis)
{
running_ = true;
reemmit_all_ = false;
- scanning_thread_ = boost::thread([this] { scanner(); });
+ executor_.begin_invoke([this]
+ {
+ scan();
+ initial_scan_completion_.set_value();
+ schedule_next();
+ });
}
- virtual ~polling_filesystem_monitor()
+ ~polling_filesystem_monitor()
{
running_ = false;
- scanning_thread_.interrupt();
- scanning_thread_.join();
+ boost::system::error_code e;
+ timer_.cancel(e);
}
- virtual std::future<void> initial_files_processed()
+ std::future<void> initial_files_processed() override
{
return initial_scan_completion_.get_future();
}
- virtual void reemmit_all()
+ void reemmit_all() override
{
reemmit_all_ = true;
}
- virtual void reemmit(const boost::filesystem::path& file)
+ void reemmit(const boost::filesystem::path& file) override
{
to_reemmit_.push(file);
}
private:
- void scanner()
+ void schedule_next()
{
- ensure_gpf_handler_installed_for_thread("polling_filesystem_monitor");
+ if (!running_)
+ return;
- bool running = scan(false);
- initial_scan_completion_.set_value();
+ timer_.expires_from_now(
+ boost::posix_time::milliseconds(scan_interval_millis_));
+ timer_.async_wait([this](const boost::system::error_code& e)
+ {
+ begin_scan();
+ });
+ }
+
+ void begin_scan()
+ {
+ if (!running_)
+ return;
- if (running)
- while (scan(true));
+ executor_.begin_invoke([this]()
+ {
+ scan();
+ schedule_next();
+ });
}
- bool scan(bool sleep)
+ void scan()
{
+ if (!running_)
+ return;
+
try
{
- if (sleep)
- boost::this_thread::sleep_for(boost::chrono::milliseconds(scan_interval_millis_));
-
if (reemmit_all_.fetch_and_store(false))
root_monitor_.reemmit_all();
else
{
- boost::filesystem::path file;
+ boost::filesystem::wpath file;
while (to_reemmit_.try_pop(file))
root_monitor_.reemmit(file);
root_monitor_.scan([=] { return !running_; });
}
- catch (const boost::thread_interrupted&)
- {
- }
catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
-
- return running_;
}
};
struct polling_filesystem_monitor_factory::impl
{
+ std::shared_ptr<boost::asio::io_service> scheduler_;
int scan_interval_millis;
- impl(int scan_interval_millis)
- : scan_interval_millis(scan_interval_millis)
+ impl(
+ std::shared_ptr<boost::asio::io_service> scheduler,
+ int scan_interval_millis)
+ : scheduler_(std::move(scheduler))
+ , scan_interval_millis(scan_interval_millis)
{
}
};
-polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(int scan_interval_millis)
- : impl_(new impl(scan_interval_millis))
+polling_filesystem_monitor_factory::polling_filesystem_monitor_factory(
+ std::shared_ptr<boost::asio::io_service> scheduler,
+ int scan_interval_millis)
+ : impl_(new impl(std::move(scheduler), scan_interval_millis))
{
}
events_of_interest_mask,
report_already_existing,
impl_->scan_interval_millis,
+ impl_->scheduler_,
handler,
initial_files_handler);
}
#include "filesystem_monitor.h"
+#include <boost/asio.hpp>
+
namespace caspar {
/**
/**
* Constructor.
*
+ * @param scheduler The io_service that will be used for
+ * scheduling periodic scans.
* @param scan_interval_millis The number of milliseconds between each
* scheduled scan. Lower values lowers the reaction
* time but causes more I/O.
*/
- polling_filesystem_monitor_factory(int scan_interval_millis = 5000);
+ polling_filesystem_monitor_factory(
+ std::shared_ptr<boost::asio::io_service> scheduler,
+ int scan_interval_millis = 5000);
virtual ~polling_filesystem_monitor_factory();
virtual filesystem_monitor::ptr create(
const boost::filesystem::path& folder_to_watch,
amcp/AMCPProtocolStrategy.cpp
amcp/amcp_command_repository.cpp
- asio/io_service_manager.cpp
-
cii/CIICommandsImpl.cpp
cii/CIIProtocolStrategy.cpp
amcp/amcp_command_repository.h
amcp/amcp_shared.h
- asio/io_service_manager.h
-
cii/CIICommand.h
cii/CIICommandsImpl.h
cii/CIIProtocolStrategy.h
include_directories(${TBB_INCLUDE_PATH})
source_group(sources\\amcp amcp/*)
-source_group(sources\\asio asio/*)
source_group(sources\\cii cii/*)
source_group(sources\\clk clk/*)
source_group(sources\\osc\\oscpack osc/oscpack/*)
+++ /dev/null
-/*
-* Copyright 2013 Sveriges Television AB http://casparcg.com/
-*
-* This file is part of CasparCG (www.casparcg.com).
-*
-* CasparCG is free software: you can redistribute it and/or modify
-* it under the terms of the GNU General Public License as published by
-* the Free Software Foundation, either version 3 of the License, or
-* (at your option) any later version.
-*
-* CasparCG is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU General Public License for more details.
-*
-* You should have received a copy of the GNU General Public License
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
-*
-* Author: Helge Norberg, helge.norberg@svt.se
-*/
-
-#include "../StdAfx.h"
-
-#include "io_service_manager.h"
-
-#include <memory>
-
-#include <boost/asio/io_service.hpp>
-#include <boost/thread/thread.hpp>
-
-#include <common/os/general_protection_fault.h>
-
-namespace caspar { namespace protocol { namespace asio {
-
-struct io_service_manager::impl
-{
- boost::asio::io_service service_;
- // To keep the io_service::run() running although no pending async
- // operations are posted.
- std::unique_ptr<boost::asio::io_service::work> work_;
- boost::thread thread_;
-
- impl()
- : work_(new boost::asio::io_service::work(service_))
- , thread_([this] { run(); })
- {
- }
-
- void run()
- {
- ensure_gpf_handler_installed_for_thread("asio-thread");
-
- service_.run();
- }
-
- ~impl()
- {
- work_.reset();
- service_.stop();
- thread_.join();
- }
-};
-
-io_service_manager::io_service_manager()
- : impl_(new impl)
-{
-}
-
-io_service_manager::~io_service_manager()
-{
-}
-
-boost::asio::io_service& io_service_manager::service()
-{
- return impl_->service_;
-}
-
-}}}
+++ /dev/null
-/*
-* Copyright 2013 Sveriges Television AB http://casparcg.com/
-*
-* This file is part of CasparCG (www.casparcg.com).
-*
-* CasparCG is free software: you can redistribute it and/or modify
-* it under the terms of the GNU General Public License as published by
-* the Free Software Foundation, either version 3 of the License, or
-* (at your option) any later version.
-*
-* CasparCG is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU General Public License for more details.
-*
-* You should have received a copy of the GNU General Public License
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
-*
-* Author: Helge Norberg, helge.norberg@svt.se
-*/
-
-#pragma once
-
-#include <memory>
-
-#include <boost/noncopyable.hpp>
-
-namespace boost { namespace asio {
- class io_service;
-}}
-
-namespace caspar { namespace protocol { namespace asio {
-
-class io_service_manager : boost::noncopyable
-{
-public:
- io_service_manager();
- ~io_service_manager();
- boost::asio::io_service& service();
-private:
- struct impl;
- std::unique_ptr<impl> impl_;
-};
-
-}}}
struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
{
- udp::socket socket_;
+ std::shared_ptr<boost::asio::io_service> service_;
+ udp::socket socket_;
tbb::spin_mutex endpoints_mutex_;
std::map<udp::endpoint, int> reference_counts_by_endpoint_;
boost::thread thread_;
public:
- impl(boost::asio::io_service& service)
- : socket_(service, udp::v4())
+ impl(std::shared_ptr<boost::asio::io_service> service)
+ : service_(std::move(service))
+ , socket_(*service_, udp::v4())
, thread_(boost::bind(&impl::run, this))
{
}
}
};
-client::client(boost::asio::io_service& service)
- : impl_(new impl(service))
+client::client(std::shared_ptr<boost::asio::io_service> service)
+ : impl_(new impl(std::move(service)))
{
}
// Constructors
- client(boost::asio::io_service& service);
+ client(std::shared_ptr<boost::asio::io_service> service);
client(client&&);
#include <functional>
#include <boost/asio.hpp>
-#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <tbb/mutex.h>
struct AsyncEventServer::implementation
{
- boost::asio::io_service service_;
- tcp::acceptor acceptor_;
- protocol_strategy_factory<char>::ptr protocol_factory_;
- spl::shared_ptr<connection_set> connection_set_;
- boost::thread thread_;
- std::vector<lifecycle_factory_t> lifecycle_factories_;
+ std::shared_ptr<boost::asio::io_service> service_;
+ tcp::acceptor acceptor_;
+ protocol_strategy_factory<char>::ptr protocol_factory_;
+ spl::shared_ptr<connection_set> connection_set_;
+ std::vector<lifecycle_factory_t> lifecycle_factories_;
tbb::mutex mutex_;
- implementation(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
- : acceptor_(service_, tcp::endpoint(tcp::v4(), port))
+ implementation(std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
+ : service_(std::move(service))
+ , acceptor_(*service_, tcp::endpoint(tcp::v4(), port))
, protocol_factory_(protocol)
- , thread_([&] { service_.run(); })
{
start_accept();
}
CASPAR_LOG_CURRENT_EXCEPTION();
}
- service_.post([=]
+ service_->post([=]
{
auto connections = *connection_set_;
for (auto& connection : connections)
connection->stop();
});
-
- thread_.join();
}
- void start_accept()
+ void start_accept()
{
- spl::shared_ptr<tcp::socket> socket(new tcp::socket(service_));
+ spl::shared_ptr<tcp::socket> socket(new tcp::socket(*service_));
acceptor_.async_accept(*socket, std::bind(&implementation::handle_accept, this, socket, std::placeholders::_1));
}
void add_client_lifecycle_object_factory(const lifecycle_factory_t& factory)
{
- service_.post([=]{ lifecycle_factories_.push_back(factory); });
+ service_->post([=]{ lifecycle_factories_.push_back(factory); });
}
};
AsyncEventServer::AsyncEventServer(
- const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
- : impl_(new implementation(protocol, port)) {}
+ std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
+ : impl_(new implementation(std::move(service), protocol, port)) {}
AsyncEventServer::~AsyncEventServer() {}
void AsyncEventServer::add_client_lifecycle_object_factory(const lifecycle_factory_t& factory) { impl_->add_client_lifecycle_object_factory(factory); }
#include "protocol_strategy.h"
+#include <boost/asio.hpp>
+
namespace caspar { namespace IO {
typedef std::function<std::pair<std::wstring, std::shared_ptr<void>> (const std::string& ipv4_address)>
class AsyncEventServer
{
public:
- explicit AsyncEventServer(const protocol_strategy_factory<char>::ptr& protocol, unsigned short port);
+ explicit AsyncEventServer(
+ std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port);
~AsyncEventServer();
void add_client_lifecycle_object_factory(const lifecycle_factory_t& lifecycle_factory);
#include <modules/image/consumer/image_consumer.h>
-#include <protocol/asio/io_service_manager.h>
#include <protocol/amcp/AMCPProtocolStrategy.h>
#include <protocol/amcp/amcp_command_repository.h>
#include <protocol/amcp/AMCPCommandsImpl.h>
#include <boost/lexical_cast.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/xml_parser.hpp>
+#include <boost/asio.hpp>
#include <tbb/atomic.h>
using namespace core;
using namespace protocol;
+std::shared_ptr<boost::asio::io_service> create_running_io_service()
+{
+ auto service = std::make_shared<boost::asio::io_service>();
+ // To keep the io_service::run() running although no pending async
+ // operations are posted.
+ auto work = std::make_shared<boost::asio::io_service::work>(*service);
+ auto thread = std::make_shared<boost::thread>([service]
+ {
+ ensure_gpf_handler_installed_for_thread("asio-thread");
+
+ service->run();
+ });
+
+ return std::shared_ptr<boost::asio::io_service>(
+ service.get(),
+ [service, work, thread](void*) mutable
+ {
+ work.reset();
+ service->stop();
+ thread->join();
+ });
+}
+
struct server::impl : boost::noncopyable
{
- protocol::asio::io_service_manager io_service_manager_;
+ std::shared_ptr<boost::asio::io_service> io_service_ = create_running_io_service();
spl::shared_ptr<monitor::subject> monitor_subject_;
spl::shared_ptr<monitor::subject> diag_subject_ = core::diagnostics::get_or_create_subject();
accelerator::accelerator accelerator_;
explicit impl(std::promise<bool>& shutdown_server_now)
: accelerator_(env::properties().get(L"configuration.accelerator", L"auto"))
- , osc_client_(io_service_manager_.service())
+ , osc_client_(io_service_)
, media_info_repo_(create_in_memory_media_info_repository())
, producer_registry_(spl::make_shared<core::frame_producer_registry>(help_repo_))
, consumer_registry_(spl::make_shared<core::frame_consumer_registry>(help_repo_))
auto scan_interval_millis = pt.get(L"configuration.thumbnails.scan-interval-millis", 5000);
- polling_filesystem_monitor_factory monitor_factory(scan_interval_millis);
+ polling_filesystem_monitor_factory monitor_factory(io_service_, scan_interval_millis);
thumbnail_generator_.reset(new thumbnail_generator(
monitor_factory,
env::media_folder(),
{
unsigned int port = xml_controller.second.get(L"port", 5250);
auto asyncbootstrapper = spl::make_shared<IO::AsyncEventServer>(
+ io_service_,
create_protocol(protocol, L"TCP Port " + boost::lexical_cast<std::wstring>(port)),
port);
async_servers_.push_back(asyncbootstrapper);
{
initial_media_info_thread_ = boost::thread([this]
{
+ ensure_gpf_handler_installed_for_thread("initial media scan");
+
for (boost::filesystem::wrecursive_directory_iterator iter(env::media_folder()), end; iter != end; ++iter)
{
if (running_)