- Made KILL and RESTART shutdown as cleanly as "q".
- Added logging to some destructors to help when debugging shutdown issues.
- Fixed some deadlocks/lifetime issues.
13 files changed:
+ CASPAR_LOG(trace) << L"Shutting down " << name_;
+
- internal_begin_invoke([=]
- {
- is_running_ = false;
- }).wait();
+ if (is_running_)
+ internal_begin_invoke([=]
+ {
+ is_running_ = false;
+ }).wait();
class polling_filesystem_monitor : public filesystem_monitor
{
class polling_filesystem_monitor : public filesystem_monitor
{
+ tbb::atomic<bool> running_;
std::shared_ptr<boost::asio::io_service> scheduler_;
directory_monitor root_monitor_;
std::shared_ptr<boost::asio::io_service> scheduler_;
directory_monitor root_monitor_;
boost::asio::deadline_timer timer_;
boost::asio::deadline_timer timer_;
- tbb::atomic<bool> running_;
int scan_interval_millis_;
std::promise<void> initial_scan_completion_;
tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
tbb::atomic<bool> reemmit_all_;
int scan_interval_millis_;
std::promise<void> initial_scan_completion_;
tbb::concurrent_queue<boost::filesystem::path> to_reemmit_;
tbb::atomic<bool> reemmit_all_;
public:
polling_filesystem_monitor(
const boost::filesystem::path& folder_to_watch,
public:
polling_filesystem_monitor(
const boost::filesystem::path& folder_to_watch,
const initial_files_handler& initial_files_handler)
: scheduler_(std::move(scheduler))
, root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
const initial_files_handler& initial_files_handler)
: scheduler_(std::move(scheduler))
, root_monitor_(report_already_existing, folder_to_watch, events_of_interest_mask, handler, initial_files_handler)
- , executor_(L"polling_filesystem_monitor")
, timer_(*scheduler_)
, scan_interval_millis_(scan_interval_millis)
, timer_(*scheduler_)
, scan_interval_millis_(scan_interval_millis)
+ , executor_(L"polling_filesystem_monitor")
{
running_ = true;
reemmit_all_ = false;
{
running_ = true;
reemmit_all_ = false;
#include <common/except.h>
#include <common/future.h>
#include <common/except.h>
#include <common/future.h>
+#include <common/os/general_protection_fault.h>
#include <core/video_format.h>
#include <core/frame/frame.h>
#include <core/video_format.h>
#include <core/frame/frame.h>
impl_->preconfigured_consumer_factories.insert(std::make_pair(element_name, factory));
}
impl_->preconfigured_consumer_factories.insert(std::make_pair(element_name, factory));
}
+tbb::atomic<bool>& destroy_consumers_in_separate_thread()
+{
+ static tbb::atomic<bool> state;
+
+ return state;
+}
+
+void destroy_consumers_synchronously()
+{
+ destroy_consumers_in_separate_thread() = false;
+}
+
class destroy_consumer_proxy : public frame_consumer
{
std::shared_ptr<frame_consumer> consumer_;
class destroy_consumer_proxy : public frame_consumer
{
std::shared_ptr<frame_consumer> consumer_;
destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer)
: consumer_(std::move(consumer))
{
destroy_consumer_proxy(spl::shared_ptr<frame_consumer>&& consumer)
: consumer_(std::move(consumer))
{
+ destroy_consumers_in_separate_thread() = true;
}
~destroy_consumer_proxy()
}
~destroy_consumer_proxy()
static tbb::atomic<int> counter;
static std::once_flag counter_init_once;
std::call_once(counter_init_once, []{ counter = 0; });
static tbb::atomic<int> counter;
static std::once_flag counter_init_once;
std::call_once(counter_init_once, []{ counter = 0; });
+
+ if (!destroy_consumers_in_separate_thread())
+ return;
++counter;
CASPAR_VERIFY(counter < 8);
++counter;
CASPAR_VERIFY(counter < 8);
boost::thread([=]
{
std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);
boost::thread([=]
{
std::unique_ptr<std::shared_ptr<frame_consumer>> pointer_guard(consumer);
auto str = (*consumer)->print();
auto str = (*consumer)->print();
- if(!consumer->unique())
+ ensure_gpf_handler_installed_for_thread(u8(L"Destroyer: " + str).c_str());
+
+ if (!consumer->unique())
CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();
else
CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";
CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << consumer->use_count();
else
CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";
spl::shared_ptr<impl> impl_;
};
spl::shared_ptr<impl> impl_;
};
+void destroy_consumers_synchronously();
+
static spl::shared_ptr<frame_producer> producer = spl::make_shared<empty_frame_producer>();
return producer;
static spl::shared_ptr<frame_producer> producer = spl::make_shared<empty_frame_producer>();
return producer;
+}
+
+tbb::atomic<bool>& destroy_producers_in_separate_thread()
+{
+ static tbb::atomic<bool> state;
+
+ return state;
+}
+
+void destroy_producers_synchronously()
+{
+ destroy_producers_in_separate_thread() = false;
+}
class destroy_producer_proxy : public frame_producer
{
class destroy_producer_proxy : public frame_producer
{
destroy_producer_proxy(spl::shared_ptr<frame_producer>&& producer)
: producer_(std::move(producer))
{
destroy_producer_proxy(spl::shared_ptr<frame_producer>&& producer)
: producer_(std::move(producer))
{
+ destroy_producers_in_separate_thread() = true;
}
virtual ~destroy_producer_proxy()
}
virtual ~destroy_producer_proxy()
static tbb::atomic<int> counter;
static std::once_flag counter_init_once;
std::call_once(counter_init_once, []{ counter = 0; });
static tbb::atomic<int> counter;
static std::once_flag counter_init_once;
std::call_once(counter_init_once, []{ counter = 0; });
- if(producer_ == core::frame_producer::empty())
+ if(producer_ == core::frame_producer::empty() || !destroy_producers_in_separate_thread())
auto str = (*producer)->print();
try
{
auto str = (*producer)->print();
try
{
- if(!producer->unique())
+ ensure_gpf_handler_installed_for_thread(u8(L"Destroyer: " + str).c_str());
+
+ if (!producer->unique())
CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << producer->use_count();
else
CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";
CASPAR_LOG(trace) << str << L" Not destroyed on asynchronous destruction thread: " << producer->use_count();
else
CASPAR_LOG(trace) << str << L" Destroying on asynchronous destruction thread.";
};
spl::shared_ptr<core::frame_producer> create_destroy_proxy(spl::shared_ptr<core::frame_producer> producer);
};
spl::shared_ptr<core::frame_producer> create_destroy_proxy(spl::shared_ptr<core::frame_producer> producer);
+void destroy_producers_synchronously();
CASPAR_LOG(info) << print() << " Successfully Initialized.";
}
CASPAR_LOG(info) << print() << " Successfully Initialized.";
}
+
+ ~impl()
+ {
+ CASPAR_LOG(info) << print() << " Uninitializing.";
+ }
core::video_format_desc video_format_desc() const
{
core::video_format_desc video_format_desc() const
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
CASPAR_LOG_CURRENT_EXCEPTION();
}
- executor_.begin_invoke([=]{tick();});
+ if (executor_.is_running())
+ executor_.begin_invoke([=]{tick();});
}
std::wstring print() const
}
std::wstring print() const
+ g_cef_executor.reset();
}
class cef_task : public CefTask
}
class cef_task : public CefTask
- auto frame = core::const_frame::empty();
+ core::const_frame frame;
frame_buffer_.pop(frame);
render_and_draw_frame(frame);
frame_buffer_.pop(frame);
render_and_draw_frame(frame);
{
boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
{
boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
+ if (!is_running_)
+ return;
+
if (updates_.empty())
updates_cond_.wait(cond_lock);
if (updates_.empty())
updates_cond_.wait(cond_lock);
typedef tbb::concurrent_queue<std::string> send_queue;
const spl::shared_ptr<tcp::socket> socket_;
typedef tbb::concurrent_queue<std::string> send_queue;
const spl::shared_ptr<tcp::socket> socket_;
- boost::asio::io_service& service_;
+ std::shared_ptr<boost::asio::io_service> service_;
const spl::shared_ptr<connection_set> connection_set_;
const std::wstring name_;
protocol_strategy_factory<char>::ptr protocol_factory_;
const spl::shared_ptr<connection_set> connection_set_;
const std::wstring name_;
protocol_strategy_factory<char>::ptr protocol_factory_;
- static spl::shared_ptr<connection> create(spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
+ static spl::shared_ptr<connection> create(std::shared_ptr<boost::asio::io_service> service, spl::shared_ptr<tcp::socket> socket, const protocol_strategy_factory<char>::ptr& protocol, spl::shared_ptr<connection_set> connection_set)
- spl::shared_ptr<connection> con(new connection(std::move(socket), std::move(protocol), std::move(connection_set)));
+ spl::shared_ptr<connection> con(new connection(std::move(service), std::move(socket), std::move(protocol), std::move(connection_set)));
con->read_some();
return con;
}
con->read_some();
return con;
}
virtual void send(std::string&& data)
{
send_queue_.push(std::move(data));
virtual void send(std::string&& data)
{
send_queue_.push(std::move(data));
- service_.dispatch([=] { do_write(); });
+ service_->dispatch([=] { do_write(); });
}
virtual void disconnect()
{
}
virtual void disconnect()
{
- service_.dispatch([=] { stop(); });
+ service_->dispatch([=] { stop(); });
}
void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
}
void add_lifecycle_bound_object(const std::wstring& key, const std::shared_ptr<void>& lifecycle_bound)
return socket_->is_open() ? socket_->remote_endpoint().address().to_string() : "no-address";
}
return socket_->is_open() ? socket_->remote_endpoint().address().to_string() : "no-address";
}
- connection(const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
+ connection(const std::shared_ptr<boost::asio::io_service>& service, const spl::shared_ptr<tcp::socket>& socket, const protocol_strategy_factory<char>::ptr& protocol_factory, const spl::shared_ptr<connection_set>& connection_set)
- , service_(socket->get_io_service())
, name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
, connection_set_(connection_set)
, protocol_factory_(protocol_factory)
, name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast<std::string>(socket_->local_endpoint().port())) : L"no-address"))
, connection_set_(connection_set)
, protocol_factory_(protocol_factory)
protocol_strategy_factory<char>::ptr protocol_factory_;
spl::shared_ptr<connection_set> connection_set_;
std::vector<lifecycle_factory_t> lifecycle_factories_;
protocol_strategy_factory<char>::ptr protocol_factory_;
spl::shared_ptr<connection_set> connection_set_;
std::vector<lifecycle_factory_t> lifecycle_factories_;
implementation(std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
: service_(std::move(service))
implementation(std::shared_ptr<boost::asio::io_service> service, const protocol_strategy_factory<char>::ptr& protocol, unsigned short port)
: service_(std::move(service))
+ acceptor_.cancel();
+ acceptor_.close();
- auto conn = connection::create(socket, protocol_factory_, connection_set_);
+ auto conn = connection::create(service_, socket, protocol_factory_, connection_set_);
connection_set_->insert(conn);
for (auto& lifecycle_factory : lifecycle_factories_)
connection_set_->insert(conn);
for (auto& lifecycle_factory : lifecycle_factories_)
print_info();
// Create server object which initializes channels, protocols and controllers.
print_info();
// Create server object which initializes channels, protocols and controllers.
- server caspar_server(shutdown_server_now);
+ std::unique_ptr<server> caspar_server(new server(shutdown_server_now));
// Print environment information.
// Print environment information.
- print_system_info(caspar_server.get_system_info_provider_repo());
+ print_system_info(caspar_server->get_system_info_provider_repo());
std::wstringstream str;
boost::property_tree::xml_writer_settings<std::wstring> w(' ', 3);
boost::property_tree::write_xml(str, env::properties(), w);
CASPAR_LOG(info) << config_file_name << L":\n-----------------------------------------\n" << str.str() << L"-----------------------------------------";
std::wstringstream str;
boost::property_tree::xml_writer_settings<std::wstring> w(' ', 3);
boost::property_tree::write_xml(str, env::properties(), w);
CASPAR_LOG(info) << config_file_name << L":\n-----------------------------------------\n" << str.str() << L"-----------------------------------------";
+ caspar_server->start();
// Create a dummy client which prints amcp responses to console.
auto console_client = spl::make_shared<IO::ConsoleClientInfo>();
// Create a amcp parser for console commands.
// Create a dummy client which prints amcp responses to console.
auto console_client = spl::make_shared<IO::ConsoleClientInfo>();
// Create a amcp parser for console commands.
- auto amcp = spl::make_shared<caspar::IO::delimiter_based_chunking_strategy_factory<wchar_t>>(
+ std::shared_ptr<IO::protocol_strategy<wchar_t>> amcp = spl::make_shared<caspar::IO::delimiter_based_chunking_strategy_factory<wchar_t>>(
L"\r\n",
spl::make_shared<caspar::IO::legacy_strategy_adapter_factory>(
spl::make_shared<protocol::amcp::AMCPProtocolStrategy>(
L"Console",
L"\r\n",
spl::make_shared<caspar::IO::legacy_strategy_adapter_factory>(
spl::make_shared<protocol::amcp::AMCPProtocolStrategy>(
L"Console",
- caspar_server.get_amcp_command_repository())))->create(console_client);
+ caspar_server->get_amcp_command_repository())))->create(console_client);
+ std::weak_ptr<IO::protocol_strategy<wchar_t>> weak_amcp = amcp;
// Use separate thread for the blocking console input, will be terminated
// anyway when the main thread terminates.
// Use separate thread for the blocking console input, will be terminated
// anyway when the main thread terminates.
- boost::thread stdin_thread(std::bind(do_run, amcp, std::ref(shutdown_server_now))); //compiler didn't like lambda here...
+ boost::thread stdin_thread(std::bind(do_run, weak_amcp, std::ref(shutdown_server_now))); //compiler didn't like lambda here...
- return shutdown_server.get();
+ bool should_restart = shutdown_server.get();
+ amcp.reset();
+
+ while (weak_amcp.lock());
+
+ return should_restart;
struct tbb_thread_installer : public tbb::task_scheduler_observer
{
tbb_thread_installer(){observe(true);}
struct tbb_thread_installer : public tbb::task_scheduler_observer
{
tbb_thread_installer(){observe(true);}
- void on_scheduler_entry(bool is_worker)
+ void on_scheduler_entry(bool is_worker) override
{
ensure_gpf_handler_installed_for_thread("tbb-worker-thread");
}
{
ensure_gpf_handler_installed_for_thread("tbb-worker-thread");
}
setup_console_window();
return_code = run(config_file_name) ? 5 : 0;
setup_console_window();
return_code = run(config_file_name) ? 5 : 0;
-
- boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
+ for (auto& thread : get_thread_infos())
+ {
+ if (thread->name != "main thread" && thread->name != "tbb-worker-thread")
+ CASPAR_LOG(warning) << L"Thread left running: " << thread->name << L" (" << thread->native_id << L")";
+ }
+
CASPAR_LOG(info) << "Successfully shutdown CasparCG Server.";
}
catch(boost::property_tree::file_parser_error&)
CASPAR_LOG(info) << "Successfully shutdown CasparCG Server.";
}
catch(boost::property_tree::file_parser_error&)
{
work.reset();
service->stop();
{
work.reset();
service->stop();
+ if (thread->get_id() != boost::this_thread::get_id())
+ thread->join();
+ else
+ thread->detach();
std::shared_ptr<amcp::amcp_command_repository> amcp_command_repo_;
std::vector<spl::shared_ptr<IO::AsyncEventServer>> async_servers_;
std::shared_ptr<IO::AsyncEventServer> primary_amcp_server_;
std::shared_ptr<amcp::amcp_command_repository> amcp_command_repo_;
std::vector<spl::shared_ptr<IO::AsyncEventServer>> async_servers_;
std::shared_ptr<IO::AsyncEventServer> primary_amcp_server_;
- osc::client osc_client_;
+ std::shared_ptr<osc::client> osc_client_ = std::make_shared<osc::client>(io_service_);
std::vector<std::shared_ptr<void>> predefined_osc_subscriptions_;
std::vector<spl::shared_ptr<video_channel>> channels_;
spl::shared_ptr<media_info_repository> media_info_repo_;
std::vector<std::shared_ptr<void>> predefined_osc_subscriptions_;
std::vector<spl::shared_ptr<video_channel>> channels_;
spl::shared_ptr<media_info_repository> media_info_repo_;
explicit impl(std::promise<bool>& shutdown_server_now)
: accelerator_(env::properties().get(L"configuration.accelerator", L"auto"))
explicit impl(std::promise<bool>& shutdown_server_now)
: accelerator_(env::properties().get(L"configuration.accelerator", L"auto"))
- , osc_client_(io_service_)
, media_info_repo_(create_in_memory_media_info_repository())
, producer_registry_(spl::make_shared<core::frame_producer_registry>(help_repo_))
, consumer_registry_(spl::make_shared<core::frame_consumer_registry>(help_repo_))
, media_info_repo_(create_in_memory_media_info_repository())
, producer_registry_(spl::make_shared<core::frame_producer_registry>(help_repo_))
, consumer_registry_(spl::make_shared<core::frame_consumer_registry>(help_repo_))
initial_media_info_thread_.join();
}
initial_media_info_thread_.join();
}
+ std::weak_ptr<boost::asio::io_service> weak_io_service = io_service_;
+ io_service_.reset();
+ osc_client_.reset();
thumbnail_generator_.reset();
thumbnail_generator_.reset();
+ amcp_command_repo_.reset();
primary_amcp_server_.reset();
async_servers_.clear();
primary_amcp_server_.reset();
async_servers_.clear();
+ destroy_producers_synchronously();
+ destroy_consumers_synchronously();
-
- boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
- //Sleep(500); // HACK: Wait for asynchronous destruction of producers and consumers.
+
+ while (weak_io_service.lock())
+ boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
uninitialize_modules();
core::diagnostics::osd::shutdown();
uninitialize_modules();
core::diagnostics::osd::shutdown();
using boost::property_tree::wptree;
using namespace boost::asio::ip;
using boost::property_tree::wptree;
using namespace boost::asio::ip;
- monitor_subject_->attach_parent(osc_client_.sink());
+ monitor_subject_->attach_parent(osc_client_->sink());
auto default_port =
pt.get<unsigned short>(L"configuration.osc.default-port", 6250);
auto default_port =
pt.get<unsigned short>(L"configuration.osc.default-port", 6250);
const auto port =
predefined_client.second.get<unsigned short>(L"port");
predefined_osc_subscriptions_.push_back(
const auto port =
predefined_client.second.get<unsigned short>(L"port");
predefined_osc_subscriptions_.push_back(
- osc_client_.get_subscription_token(udp::endpoint(
+ osc_client_->get_subscription_token(udp::endpoint(
address_v4::from_string(u8(address)),
port)));
}
address_v4::from_string(u8(address)),
port)));
}
return std::make_pair(
std::wstring(L"osc_subscribe"),
return std::make_pair(
std::wstring(L"osc_subscribe"),
- osc_client_.get_subscription_token(
+ osc_client_->get_subscription_token(
udp::endpoint(
address_v4::from_string(
ipv4_address),
udp::endpoint(
address_v4::from_string(
ipv4_address),