From: hellgore Date: Tue, 25 Sep 2012 15:28:25 +0000 (+0000) Subject: Merged CLK changes from trunk, and separated delimiter message splitting and codepage... X-Git-Tag: 2.1.0_Beta1~495 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=f3553f118beae8eb4ec9ae62ef8b7902bbe784cd;p=casparcg Merged CLK changes from trunk, and separated delimiter message splitting and codepage conversion from AsyncEventServer to separate protocol adapters git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@3365 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d --- diff --git a/common/blocking_priority_queue.h b/common/blocking_priority_queue.h index 101ae31b0..f8cd0198c 100644 --- a/common/blocking_priority_queue.h +++ b/common/blocking_priority_queue.h @@ -124,7 +124,6 @@ public: { acquire_transaction transaction(elements_available_); - pop_acquired_any_priority(element, transaction); } diff --git a/common/semaphore.h b/common/semaphore.h index 1a384397e..c50394f85 100644 --- a/common/semaphore.h +++ b/common/semaphore.h @@ -110,15 +110,18 @@ public: boost::mutex::scoped_lock lock(mutex_); auto num_acquired = 0u; - while (permits_ == 0u && num_acquired < permits) + while (true) { - permits_available_.wait(lock); - auto num_wanted = permits - num_acquired; auto to_drain = std::min(num_wanted, permits_); permits_ -= to_drain; num_acquired += to_drain; + + if (num_acquired == permits) + break; + + permits_available_.wait(lock); } } diff --git a/protocol/clk/CLKCommand.cpp b/protocol/clk/CLKCommand.cpp deleted file mode 100644 index 23813d39b..000000000 --- a/protocol/clk/CLKCommand.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* -* Copyright (c) 2011 Sveriges Television AB -* -* This file is part of CasparCG (www.casparcg.com). -* -* CasparCG is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* (at your option) any later version. -* -* CasparCG is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with CasparCG. If not, see . -* -* Author: Nicklas P Andersson -*/ - - -#include "..\stdafx.h" -#include -#include -#include "CLKCommand.h" - -namespace caspar { namespace protocol { namespace CLK { - -CLKCommand::CLKCommand() : clockID_(0), command_(CLKInvalidCommand) {} - -CLKCommand::~CLKCommand() {} - -const std::wstring& CLKCommand::GetData() -{ - std::wstringstream dataStream; - - dataStream << TEXT(""); - dataStream << TEXT(""); - dataStream << TEXT(""); - - std::vector::const_iterator it = parameters_.begin(); - std::vector::const_iterator end = parameters_.end(); - for(; it != end; ++it) { - dataStream << TEXT("") << (*it) << TEXT(""); - } - - dataStream << TEXT(""); - dataStream << TEXT(""); - dataStream << TEXT(""); - - dataCache_ = dataStream.str(); - return dataCache_; -} - -bool CLKCommand::SetCommand() -{ - bool bResult = true; - std::transform(commandString_.begin(), commandString_.end(), commandString_.begin(), toupper); - - if(commandString_ == TEXT("DUR")) - command_ = CLKDuration; - else if(commandString_ == TEXT("NEWDUR")) - command_ = CLKNewDuration; - else if(commandString_ == TEXT("NEXTEVENT")) - command_ = CLKNextEvent; - else if(commandString_ == TEXT("STOP")) - command_ = CLKStop; - else if(commandString_ == TEXT("UNTIL")) - command_ = CLKUntil; - else if(commandString_ == TEXT("ADD")) - command_ = CLKAdd; - else if(commandString_ == TEXT("SUB")) - command_ = CLKSub; - else if(commandString_ == TEXT("RESET")) - command_ = CLKReset; - else - { - command_ = CLKInvalidCommand; - bResult = false; - } - - return bResult; -} - -void CLKCommand::Clear() -{ - dataCache_.clear(); - commandString_.clear(); - time_.clear(); - command_ = CLKDuration; - clockID_ = 0; - parameters_.clear(); -} - -}}} \ No newline at end of file diff --git a/protocol/clk/CLKProtocolStrategy.cpp b/protocol/clk/CLKProtocolStrategy.cpp index 531b75932..30b5f7d9a 100644 --- a/protocol/clk/CLKProtocolStrategy.cpp +++ b/protocol/clk/CLKProtocolStrategy.cpp @@ -23,124 +23,138 @@ #include "..\stdafx.h" #include "CLKProtocolStrategy.h" +#include "clk_commands.h" #include #include -#include #include +#include +#include +#include namespace caspar { namespace protocol { namespace CLK { - -CLKProtocolStrategy::CLKProtocolStrategy(const std::vector>& channels) - : currentState_(ExpectingNewCommand), bClockLoaded_(false), pChannel_(channels.at(0)) -{} -void CLKProtocolStrategy::Parse(const TCHAR* pData, int charCount, IO::ClientInfoPtr pClientInfo) +class CLKProtocolStrategy : public IO::protocol_strategy { - for(int index = 0; index < charCount; ++index) +public: + CLKProtocolStrategy( + const IO::client_connection::ptr& client_connection, + clk_command_processor& command_processor) + : currentState_(ExpectingNewCommand) + , command_processor_(command_processor) + , client_connection_(client_connection) { - if(currentState_ == ExpectingNewCommand) - currentCommandString_.str(TEXT("")); - - TCHAR currentByte = pData[index]; - if(currentByte < 32) - currentCommandString_ << TEXT("<") << (int)currentByte << TEXT(">"); - else - currentCommandString_ << currentByte; + } - if(currentByte != 0) + void parse(const std::basic_string& data) + { + for (int index = 0; index < data.length(); ++index) { - switch(currentState_) + wchar_t currentByte = data[index]; + + if (currentByte < 32) + currentCommandString_ << L"<" << static_cast(currentByte) << L">"; + else + currentCommandString_ << currentByte; + + if (currentByte != 0) { - case ExpectingNewCommand: - if(currentByte == 1) - currentState_ = ExpectingCommand; - //just throw anything else away - break; - - case ExpectingCommand: - if(currentByte == 2) - { - if(!currentCommand_.SetCommand()) + switch (currentState_) + { + case ExpectingNewCommand: + if (currentByte == 1) + currentState_ = ExpectingCommand; + //just throw anything else away + break; + case ExpectingCommand: + if (currentByte == 2) + currentState_ = ExpectingParameter; + else + command_name_ += currentByte; + break; + case ExpectingParameter: + //allocate new parameter + if (parameters_.size() == 0 || currentByte == 2) + parameters_.push_back(std::wstring()); + + //add the character to end end of the last parameter + if (currentByte != 2) { - CASPAR_LOG(error) << "CLK: Failed to interpret command"; - currentState_ = ExpectingNewCommand; - currentCommand_.Clear(); + //add the character to end end of the last parameter + if (currentByte == L'<') + parameters_.back() += L"<"; + else if (currentByte == L'>') + parameters_.back() += L">"; + else if (currentByte == L'\"') + parameters_.back() += L"""; + else + parameters_.back() += currentByte; } - else - currentState_ = ExpectingClockID; - } - else - currentCommand_.commandString_ += currentByte; - break; - case ExpectingClockID: - if(currentByte == 2) - currentState_ = currentCommand_.NeedsTime() ? ExpectingTime : ExpectingParameter; - else - currentCommand_.clockID_ = currentByte - TCHAR('0'); - break; - - case ExpectingTime: - if(currentByte == 2) - currentState_ = ExpectingParameter; - else - currentCommand_.time_ += currentByte; - break; - - case ExpectingParameter: - //allocate new parameter - if(currentCommand_.parameters_.size() == 0 || currentByte == 2) - currentCommand_.parameters_.push_back(std::wstring()); - - //add the character to end end of the last parameter - if(currentByte == TEXT('<')) - currentCommand_.parameters_[currentCommand_.parameters_.size()-1] += TEXT("<"); - else if(currentByte == TEXT('>')) - currentCommand_.parameters_[currentCommand_.parameters_.size()-1] += TEXT(">"); - else if(currentByte == TEXT('\"')) - currentCommand_.parameters_[currentCommand_.parameters_.size()-1] += TEXT("""); - else - currentCommand_.parameters_[currentCommand_.parameters_.size()-1] += currentByte; - - break; + break; + } } - } - else - { - if(currentState_ == ExpectingCommand) + else { - if(!currentCommand_.SetCommand()) - CASPAR_LOG(error) << "CLK: Failed to interpret command"; - } + std::transform( + command_name_.begin(), command_name_.end(), + command_name_.begin(), + toupper); - if(currentCommand_.command_ == CLKCommand::CLKReset) - { - pChannel_->stage().clear(flash::cg_proxy::DEFAULT_LAYER); - bClockLoaded_ = false; - - CASPAR_LOG(info) << L"CLK: Recieved and executed reset-command"; - } - else if(currentCommand_.command_ != CLKCommand::CLKInvalidCommand) - { - if(!bClockLoaded_) + try { - flash::create_cg_proxy(pChannel_).add(0, TEXT("hawrysklocka/clock.ft"), true, TEXT(""), currentCommand_.GetData()); - bClockLoaded_ = true; + if (!command_processor_.handle(command_name_, parameters_)) + CASPAR_LOG(error) << "CLK: Unknown command: " << command_name_; + else + CASPAR_LOG(debug) << L"CLK: Executed valid command: " + << currentCommandString_.str(); + } + catch (...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + CASPAR_LOG(error) << "CLK: Failed to interpret command: " + << currentCommandString_.str(); } - else - flash::create_cg_proxy(pChannel_).update(0, currentCommand_.GetData()); - - CASPAR_LOG(debug) << L"CLK: Clockdata sent: " << currentCommand_.GetData(); - CASPAR_LOG(debug) << L"CLK: Executed valid command: " << currentCommandString_.str(); - } - currentState_ = ExpectingNewCommand; - currentCommand_.Clear(); + reset(); + } } } +private: + void reset() + { + currentState_ = ExpectingNewCommand; + currentCommandString_.str(L""); + command_name_.clear(); + parameters_.clear(); + } + + enum ParserState + { + ExpectingNewCommand, + ExpectingCommand, + ExpectingParameter + }; + + ParserState currentState_; + std::wstringstream currentCommandString_; + std::wstring command_name_; + std::vector parameters_; + clk_command_processor& command_processor_; + IO::client_connection::ptr client_connection_; +}; + +clk_protocol_strategy_factory::clk_protocol_strategy_factory( + const std::vector>& channels) +{ + add_command_handlers(command_processor_, channels.at(0)); +} + +IO::protocol_strategy::ptr clk_protocol_strategy_factory::create( + const IO::client_connection::ptr& client_connection) +{ + return spl::make_shared(client_connection, command_processor_); } -} //namespace CLK -}} //namespace caspar \ No newline at end of file +}}} diff --git a/protocol/clk/CLKProtocolStrategy.h b/protocol/clk/CLKProtocolStrategy.h index b813b540e..c82534cec 100644 --- a/protocol/clk/CLKProtocolStrategy.h +++ b/protocol/clk/CLKProtocolStrategy.h @@ -22,39 +22,21 @@ #pragma once -#include "CLKCommand.h" -#include "../util/ProtocolStrategy.h" +#include "../util/protocol_strategy.h" +#include "clk_command_processor.h" #include -#include - namespace caspar { namespace protocol { namespace CLK { -class CLKProtocolStrategy : public IO::IProtocolStrategy +class clk_protocol_strategy_factory : public IO::protocol_strategy_factory { + clk_command_processor command_processor_; public: - CLKProtocolStrategy(const std::vector>& channels); - - void Parse(const TCHAR* pData, int charCount, IO::ClientInfoPtr pClientInfo); - std::string GetCodepage() { return "ISO-8859-1"; } //ISO 8859-1 - -private: - enum ParserState - { - ExpectingNewCommand, - ExpectingCommand, - ExpectingClockID, - ExpectingTime, - ExpectingParameter - }; - - ParserState currentState_; - CLKCommand currentCommand_; - std::wstringstream currentCommandString_; - - spl::shared_ptr pChannel_; + clk_protocol_strategy_factory( + const std::vector>& channels); - bool bClockLoaded_; + virtual IO::protocol_strategy::ptr create( + const IO::client_connection::ptr& client_connection); }; }}} diff --git a/protocol/clk/clk_command_processor.cpp b/protocol/clk/clk_command_processor.cpp new file mode 100644 index 000000000..d614a6ae1 --- /dev/null +++ b/protocol/clk/clk_command_processor.cpp @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#include "../stdafx.h" + +#include "clk_command_processor.h" + +namespace caspar { namespace protocol { namespace CLK { + +clk_command_processor& clk_command_processor::add_handler( + const std::wstring& command_name, const clk_command_handler& handler) +{ + handlers_.insert(std::make_pair(command_name, handler)); + + return *this; +} + +bool clk_command_processor::handle( + const std::wstring& command_name, const std::vector& parameters) +{ + auto handler = handlers_.find(command_name); + + if (handler == handlers_.end()) + return false; + + handler->second(parameters); + + return true; +} + +}}} diff --git a/protocol/clk/clk_command_processor.h b/protocol/clk/clk_command_processor.h new file mode 100644 index 000000000..831390254 --- /dev/null +++ b/protocol/clk/clk_command_processor.h @@ -0,0 +1,71 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#pragma once + +#include +#include +#include + +#include + +#include + +namespace caspar { namespace protocol { namespace CLK { + +typedef boost::function&)> clk_command_handler; + +/** + * CLK command processor composed by multiple command handlers. + * + * Not thread-safe. + */ +class clk_command_processor +{ + std::map handlers_; +public: + /** + * Register a handler for a specific command. + * + * @param command_name The command name to use this handler for. + * @param handler The handler that will handle all commands with the + * specified name. + * + * @return this command processor for method chaining. + */ + clk_command_processor& add_handler( + const std::wstring& command_name, const clk_command_handler& handler); + + /** + * Handle an incoming command. + * + * @param command_name The command name. + * @param parameters The raw parameters supplied with the command. + * + * @return true if the command was handled, false if no handler was + * registered to handle the command. + */ + bool handle( + const std::wstring& command_name, + const std::vector& parameters); +}; + +}}} diff --git a/protocol/clk/clk_commands.cpp b/protocol/clk/clk_commands.cpp new file mode 100644 index 000000000..95b218d3f --- /dev/null +++ b/protocol/clk/clk_commands.cpp @@ -0,0 +1,182 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#include "../stdafx.h" + +#include +#include + +#include + +#include + +#include + +#include "clk_commands.h" + +namespace caspar { namespace protocol { namespace CLK { + +class command_context +{ + bool clock_loaded_; + spl::shared_ptr channel_; +public: + command_context(const spl::shared_ptr& channel) + : clock_loaded_(false) + , channel_(channel) + { + } + + void send_to_flash(const std::wstring& data) + { + if (!clock_loaded_) + { + flash::create_cg_proxy(channel_).add( + 0, L"hawrysklocka/clock.ft", true, L"", data); + clock_loaded_ = true; + } + else + { + flash::create_cg_proxy(channel_).update(0, data); + } + + CASPAR_LOG(debug) << L"CLK: Clockdata sent: " << data; + } + + void reset() + { + channel_->stage().clear(flash::cg_proxy::DEFAULT_LAYER); + clock_loaded_ = false; + CASPAR_LOG(info) << L"CLK: Recieved and executed reset-command"; + } +}; + +template +T require_param( + std::vector::const_iterator& params_current, + const std::vector::const_iterator& params_end, + const std::string& param_name) +{ + if (params_current == params_end) + throw std::runtime_error(param_name + " required"); + + T value = boost::lexical_cast(*params_current); + + ++params_current; + + return std::move(value); +} + +std::wstring get_xml( + const std::wstring& command_name, + bool has_clock_id, + bool has_time, + const std::vector& parameters) +{ + std::wstringstream stream; + + stream << L""; + stream << L""; + stream << L"::const_iterator it = parameters.begin(); + std::vector::const_iterator end = parameters.end(); + + if (has_clock_id) + { + stream << L" clockID=\"" + << require_param(it, end, "clock id") << L"\""; + } + + if (has_time) + { + stream << L" time=\"" + << require_param(it, end, "time") << L"\""; + } + + bool has_parameters = it != end; + + stream << (has_parameters ? L">" : L" />"); + + if (has_parameters) + { + for (; it != end; ++it) + { + stream << L"" << (*it) << L""; + } + + stream << L""; + } + + stream << L""; + stream << L""; + + return stream.str(); +} + +clk_command_handler create_send_xml_handler( + const std::wstring& command_name, + bool expect_clock, + bool expect_time, + const spl::shared_ptr& context) +{ + return [=] (const std::vector& params) + { + context->send_to_flash(get_xml( + command_name, expect_clock, expect_time, params)); + }; +} + +void add_command_handlers( + clk_command_processor& processor, + const spl::shared_ptr& channel) +{ + auto context = spl::make_shared(channel); + + processor + .add_handler(L"DUR", + create_send_xml_handler(L"DUR", true, true, context)) + .add_handler(L"NEWDUR", + create_send_xml_handler(L"NEWDUR", true, true, context)) + .add_handler(L"UNTIL", + create_send_xml_handler(L"UNTIL", true, true, context)) + .add_handler(L"NEXTEVENT", + create_send_xml_handler(L"NEXTEVENT", true, false, context)) + .add_handler(L"STOP", + create_send_xml_handler(L"STOP", true, false, context)) + .add_handler(L"ADD", + create_send_xml_handler(L"ADD", true, true, context)) + .add_handler(L"SUB", + create_send_xml_handler(L"SUB", true, true, context)) + .add_handler(L"TIMELINE_LOAD", + create_send_xml_handler(L"TIMELINE_LOAD", false, false, context)) + .add_handler(L"TIMELINE_PLAY", + create_send_xml_handler(L"TIMELINE_PLAY", false, false, context)) + .add_handler(L"TIMELINE_STOP", + create_send_xml_handler(L"TIMELINE_STOP", false, false, context)) + .add_handler(L"RESET", [=] (const std::vector& params) + { + context->reset(); + }) + ; +} + +}}} diff --git a/protocol/clk/CLKCommand.h b/protocol/clk/clk_commands.h similarity index 59% rename from protocol/clk/CLKCommand.h rename to protocol/clk/clk_commands.h index 72e6c92d6..02baf0e14 100644 --- a/protocol/clk/CLKCommand.h +++ b/protocol/clk/clk_commands.h @@ -1,63 +1,40 @@ -/* -* Copyright (c) 2011 Sveriges Television AB -* -* This file is part of CasparCG (www.casparcg.com). -* -* CasparCG is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* (at your option) any later version. -* -* CasparCG is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with CasparCG. If not, see . -* -* Author: Nicklas P Andersson -*/ - - -#pragma once - -namespace caspar { namespace protocol { namespace CLK { - -class CLKCommand -{ -public: - enum CLKCommands - { - CLKDuration, - CLKNewDuration, - CLKNextEvent, - CLKStop, - CLKUntil, - CLKAdd, - CLKSub, - CLKReset, - CLKInvalidCommand - }; - - CLKCommand(); - virtual ~CLKCommand(); - - bool SetCommand(); - bool NeedsTime() const - { - return !(command_ == CLKNextEvent || command_ == CLKStop); - } - - void Clear(); - const std::wstring& GetData(); - - std::wstring dataCache_; - std::wstring commandString_; - CLKCommands command_; - int clockID_; - std::wstring time_; - std::vector parameters_; -}; - -}}} \ No newline at end of file +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#pragma once + +#include + +#include "clk_command_processor.h" + +namespace caspar { namespace protocol { namespace CLK { + +/** + * Add the CLK command handlers to a command processor. + * + * @param processor The command processor to add the command handlers to. + * @param channel The channel to play the flash graphics on. + */ +void add_command_handlers( + clk_command_processor& processor, + const spl::shared_ptr& channel); + +}}} diff --git a/protocol/protocol.vcxproj b/protocol/protocol.vcxproj index 27f37eeea..b48b49c75 100644 --- a/protocol/protocol.vcxproj +++ b/protocol/protocol.vcxproj @@ -35,8 +35,9 @@ - + + @@ -50,6 +51,8 @@ + + @@ -72,13 +75,17 @@ ../StdAfx.h ../StdAfx.h - + ../StdAfx.h ../StdAfx.h - + + ../StdAfx.h ../StdAfx.h + + ../StdAfx.h + ../StdAfx.h NotUsing @@ -108,6 +115,10 @@ ../StdAfx.h ../StdAfx.h + + ../StdAfx.h + ../StdAfx.h + {2040B361-1FB6-488E-84A5-38A580DA90DE} diff --git a/protocol/protocol.vcxproj.filters b/protocol/protocol.vcxproj.filters index 77b575a32..21f469fa2 100644 --- a/protocol/protocol.vcxproj.filters +++ b/protocol/protocol.vcxproj.filters @@ -51,9 +51,6 @@ source\clk - - source\clk - source\util @@ -88,6 +85,18 @@ source\osc + + source\clk + + + source\clk + + + source\util + + + source\util + @@ -108,9 +117,6 @@ source\cii - - source\clk - source\clk @@ -130,5 +136,14 @@ source\osc + + source\clk + + + source\clk + + + source\util + \ No newline at end of file diff --git a/protocol/util/AsyncEventServer.cpp b/protocol/util/AsyncEventServer.cpp index 97c5fcfd4..2552d7869 100644 --- a/protocol/util/AsyncEventServer.cpp +++ b/protocol/util/AsyncEventServer.cpp @@ -23,19 +23,15 @@ #include "AsyncEventServer.h" -#include "ProtocolStrategy.h" - #include #include #include #include -#include +#include #include #include #include -#include -#include using boost::asio::ip::tcp; @@ -45,18 +41,18 @@ class connection; typedef std::set> connection_set; -class connection : public spl::enable_shared_from_this, public ClientInfo +class connection : public spl::enable_shared_from_this, public client_connection { const spl::shared_ptr socket_; const spl::shared_ptr connection_set_; const std::wstring name_; - const spl::shared_ptr protocol_; + protocol_strategy_factory::ptr protocol_factory_; + std::shared_ptr> protocol_; std::array data_; - std::string input_; public: - static spl::shared_ptr create(spl::shared_ptr socket, const ProtocolStrategyPtr& protocol, spl::shared_ptr connection_set) + static spl::shared_ptr create(spl::shared_ptr socket, const protocol_strategy_factory::ptr& protocol, spl::shared_ptr connection_set) { spl::shared_ptr con(new connection(std::move(socket), std::move(protocol), std::move(connection_set))); con->read_some(); @@ -70,12 +66,12 @@ public: /* ClientInfo */ - virtual void Send(const std::wstring& data) + virtual void send(std::string&& data) { - write_some(data); + write_some(std::move(data)); } - virtual void Disconnect() + virtual void disconnect() { stop(); } @@ -97,14 +93,22 @@ public: } private: - connection(const spl::shared_ptr& socket, const ProtocolStrategyPtr& protocol, const spl::shared_ptr& connection_set) + connection(const spl::shared_ptr& socket, const protocol_strategy_factory::ptr& protocol, const spl::shared_ptr& connection_set) : socket_(socket) , name_((socket_->is_open() ? u16(socket_->local_endpoint().address().to_string() + ":" + boost::lexical_cast(socket_->local_endpoint().port())) : L"no-address")) , connection_set_(connection_set) - , protocol_(protocol) + , protocol_factory_(protocol) { CASPAR_LOG(info) << print() << L" Connected."; } + + protocol_strategy& protocol() + { + if (!protocol_) + protocol_ = protocol_factory_->create(shared_from_this()); + + return *protocol_; + } void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { @@ -112,21 +116,11 @@ private: { try { - CASPAR_LOG(trace) << print() << L" Received: " << u16(std::string(data_.begin(), data_.begin() + bytes_transferred)); - - input_.append(data_.begin(), data_.begin() + bytes_transferred); - - std::vector split; - boost::iter_split(split, input_, boost::algorithm::first_finder("\r\n")); - - input_ = std::move(split.back()); - split.pop_back(); - - BOOST_FOREACH(auto cmd, split) - { - auto u16cmd = boost::locale::conv::to_utf(cmd, protocol_->GetCodepage()) + L"\r\n"; - protocol_->Parse(u16cmd.data(), static_cast(u16cmd.size()), shared_from_this()); - } + std::string data(data_.begin(), data_.begin() + bytes_transferred); + + CASPAR_LOG(trace) << print() << L" Received: " << u16(data); + + protocol().parse(data); } catch(...) { @@ -152,9 +146,9 @@ private: socket_->async_read_some(boost::asio::buffer(data_.data(), data_.size()), std::bind(&connection::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } - void write_some(const std::wstring& data) + void write_some(std::string&& data) { - auto str = spl::make_shared(boost::locale::conv::from_utf(data, protocol_->GetCodepage())); + auto str = spl::make_shared(std::move(data)); socket_->async_write_some(boost::asio::buffer(str->data(), str->size()), std::bind(&connection::handle_write, shared_from_this(), str, std::placeholders::_1, std::placeholders::_2)); } }; @@ -163,11 +157,11 @@ struct AsyncEventServer::implementation { boost::asio::io_service service_; tcp::acceptor acceptor_; - spl::shared_ptr protocol_; + protocol_strategy_factory::ptr protocol_; spl::shared_ptr connection_set_; boost::thread thread_; - implementation(const spl::shared_ptr& protocol, unsigned short port) + implementation(const protocol_strategy_factory::ptr& protocol, unsigned short port) : acceptor_(service_, tcp::endpoint(tcp::v4(), port)) , protocol_(protocol) , thread_(std::bind(&boost::asio::io_service::run, &service_)) @@ -214,6 +208,14 @@ struct AsyncEventServer::implementation } }; -AsyncEventServer::AsyncEventServer(const spl::shared_ptr& protocol, unsigned short port) : impl_(new implementation(protocol, port)){} -AsyncEventServer::~AsyncEventServer(){} +AsyncEventServer::AsyncEventServer( + const protocol_strategy_factory::ptr& protocol, unsigned short port) + : impl_(new implementation(protocol, port)) +{ +} + +AsyncEventServer::~AsyncEventServer() +{ +} + }} \ No newline at end of file diff --git a/protocol/util/AsyncEventServer.h b/protocol/util/AsyncEventServer.h index 1212b03fb..ace053a6a 100644 --- a/protocol/util/AsyncEventServer.h +++ b/protocol/util/AsyncEventServer.h @@ -22,16 +22,14 @@ ////////////////////////////////////////////////////////////////////// #pragma once -#include +#include "protocol_strategy.h" namespace caspar { namespace IO { -class IProtocolStrategy; - class AsyncEventServer { public: - explicit AsyncEventServer(const spl::shared_ptr& protocol, unsigned short port); + explicit AsyncEventServer(const protocol_strategy_factory::ptr& protocol, unsigned short port); ~AsyncEventServer(); private: struct implementation; diff --git a/protocol/util/ProtocolStrategy.h b/protocol/util/ProtocolStrategy.h index 687d22a23..73bc2990c 100644 --- a/protocol/util/ProtocolStrategy.h +++ b/protocol/util/ProtocolStrategy.h @@ -22,7 +22,7 @@ #pragma once #include -#include "clientInfo.h" +#include "ClientInfo.h" namespace caspar { namespace IO { diff --git a/protocol/util/protocol_strategy.h b/protocol/util/protocol_strategy.h new file mode 100644 index 000000000..e4c016d33 --- /dev/null +++ b/protocol/util/protocol_strategy.h @@ -0,0 +1,89 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#pragma once + +#include + +#include + +namespace caspar { namespace IO { + +/** + * A protocol strategy handles a single client connection. A client_connection + * instance is needed in order to send data to the client. + */ +template +class protocol_strategy +{ +public: + typedef spl::shared_ptr> ptr; + + virtual ~protocol_strategy() { } + + /** + * Parse some data received. If used directly by the async event server, + * then the data will be what was received from the TCP/IP stack, but if + * a delimiter based protocol is used, delimiter_based_chunking_strategy + * can be used to ensure that the strategy implementation is only + * provided complete messages. + * + * @param data The data received. + */ + virtual void parse(const std::basic_string& data) = 0; +}; + +/** + * A handle for a protocol_strategy to use when interacting with the client. + */ +template +class client_connection +{ +public: + typedef spl::shared_ptr> ptr; + + virtual ~client_connection() { } + + virtual void send(std::basic_string&& data) = 0; + virtual void disconnect() = 0; + virtual std::wstring print() const = 0; +}; + +/** + * Creates unique instances of protocol_strategy implementations. + * + * Each async event server will have one instance of this factory, but create + * unique protocol_strategy instances for each connected client. + * + * Any shared state between client interactions could be held in the factory. + */ +template +class protocol_strategy_factory +{ +public: + typedef spl::shared_ptr> ptr; + + virtual ~protocol_strategy_factory() { } + virtual typename protocol_strategy::ptr create( + const typename client_connection::ptr& client_connection) = 0; +}; + +}} diff --git a/protocol/util/strategy_adapters.cpp b/protocol/util/strategy_adapters.cpp new file mode 100644 index 000000000..c4ded52cd --- /dev/null +++ b/protocol/util/strategy_adapters.cpp @@ -0,0 +1,165 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#include "../stdafx.h" + +#include "strategy_adapters.h" + +#include + +namespace caspar { namespace IO { + +class to_unicode_adapter : public protocol_strategy +{ + std::string codepage_; + protocol_strategy::ptr unicode_strategy_; +public: + to_unicode_adapter( + const std::string& codepage, + const protocol_strategy::ptr& unicode_strategy) + : codepage_(codepage) + , unicode_strategy_(unicode_strategy) + { + } + + virtual void parse(const std::basic_string& data) + { + auto utf_data = boost::locale::conv::to_utf(data, codepage_); + + unicode_strategy_->parse(utf_data); + } +}; + +class from_unicode_client_connection : public client_connection +{ + client_connection::ptr client_; + std::string codepage_; +public: + from_unicode_client_connection( + const client_connection::ptr& client, const std::string& codepage) + : client_(client) + , codepage_(codepage) + { + } + + virtual void send(std::basic_string&& data) + { + auto str = boost::locale::conv::from_utf(std::move(data), codepage_); + + client_->send(std::move(str)); + } + + virtual void disconnect() + { + client_->disconnect(); + } + + virtual std::wstring print() const + { + return client_->print(); + } +}; + +to_unicode_adapter_factory::to_unicode_adapter_factory( + const std::string& codepage, + const protocol_strategy_factory::ptr& unicode_strategy_factory) + : codepage_(codepage) + , unicode_strategy_factory_(unicode_strategy_factory) +{ +} + +protocol_strategy::ptr to_unicode_adapter_factory::create( + const client_connection::ptr& client_connection) +{ + auto client = spl::make_shared(client_connection, codepage_); + + return spl::make_shared(codepage_, unicode_strategy_factory_->create(client)); +} + +class legacy_client_info : public ClientInfo +{ + client_connection::ptr client_connection_; +public: + legacy_client_info(const client_connection::ptr& client_connection) + : client_connection_(client_connection) + { + } + + virtual void Disconnect() + { + client_connection_->disconnect(); + } + + virtual void Send(const std::wstring& data) + { + client_connection_->send(std::wstring(data)); + } + + virtual std::wstring print() const + { + return client_connection_->print(); + } +}; + +class legacy_strategy_adapter : public protocol_strategy +{ + ProtocolStrategyPtr strategy_; + ClientInfoPtr client_info_; +public: + legacy_strategy_adapter( + const ProtocolStrategyPtr& strategy, + const client_connection::ptr& client_connection) + : strategy_(strategy) + , client_info_(std::make_shared(client_connection)) + { + } + + virtual void parse(const std::basic_string& data) + { + auto p = data.c_str(); + strategy_->Parse(p, static_cast(data.length()), client_info_); + } +}; + +legacy_strategy_adapter_factory::legacy_strategy_adapter_factory( + const ProtocolStrategyPtr& strategy) + : strategy_(strategy) +{ +} + +protocol_strategy::ptr legacy_strategy_adapter_factory::create( + const client_connection::ptr& client_connection) +{ + return spl::make_shared(strategy_, client_connection); +} + +protocol_strategy_factory::ptr wrap_legacy_protocol( + const std::string& delimiter, + const ProtocolStrategyPtr& strategy) +{ + return spl::make_shared>( + delimiter, + spl::make_shared( + strategy->GetCodepage(), + spl::make_shared(strategy))); +} + +}} diff --git a/protocol/util/strategy_adapters.h b/protocol/util/strategy_adapters.h new file mode 100644 index 000000000..463d4c171 --- /dev/null +++ b/protocol/util/strategy_adapters.h @@ -0,0 +1,149 @@ +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Helge Norberg, helge.norberg@svt.se +*/ + +#pragma once + +#include + +#include "protocol_strategy.h" +#include "ProtocolStrategy.h" + +namespace caspar { namespace IO { + +/** + * A protocol strategy factory adapter for converting incoming data from a + * specific codepage to utf-16. The client_connection will do the reversed + * conversion. + * + * The adapter is not safe if the codepage contains multibyte-characters and + * the data is chunked with potentially incomplete characters, therefore it + * must be wrapped in an adapter providing complete chunks like + * delimiter_based_chunking_strategy_factory. + */ +class to_unicode_adapter_factory : public protocol_strategy_factory +{ + std::string codepage_; + protocol_strategy_factory::ptr unicode_strategy_factory_; +public: + to_unicode_adapter_factory( + const std::string& codepage, + const protocol_strategy_factory::ptr& unicode_strategy_factory); + + virtual protocol_strategy::ptr create( + const client_connection::ptr& client_connection); +}; + +/** + * Protocol strategy adapter for ensuring that only complete chunks or + * "packets" are delivered to the wrapped strategy. The chunks are determined + * by a given delimiter. + */ +template +class delimiter_based_chunking_strategy : public protocol_strategy +{ + std::basic_string delimiter_; + std::basic_string input_; + protocol_strategy::ptr strategy_; +public: + delimiter_based_chunking_strategy( + const std::basic_string& delimiter, + const protocol_strategy::ptr& strategy) + : delimiter_(delimiter) + , strategy_(strategy) + { + } + + virtual void parse(const std::basic_string& data) + { + input_ += data; + + std::vector> split; + boost::iter_split(split, input_, boost::algorithm::first_finder(delimiter_)); + + input_ = std::move(split.back()); + split.pop_back(); + + BOOST_FOREACH(auto cmd, split) + { + // TODO: perhaps it would be better to not append the delimiter. + strategy_->parse(cmd + delimiter_); + } + } +}; + +template +class delimiter_based_chunking_strategy_factory + : public protocol_strategy_factory +{ + std::basic_string delimiter_; + protocol_strategy_factory::ptr strategy_factory_; +public: + delimiter_based_chunking_strategy_factory( + const std::basic_string& delimiter, + const protocol_strategy_factory::ptr& strategy_factory) + : delimiter_(delimiter) + , strategy_factory_(strategy_factory) + { + } + + virtual typename protocol_strategy::ptr create( + const typename client_connection::ptr& client_connection) + { + return spl::make_shared>( + delimiter_, strategy_factory_->create(client_connection)); + } +}; + +/** + * Adapts an IProtocolStrategy to be used as a + * protocol_strategy_factory. + * + * Use wrap_legacy_protocol() to wrap it as a protocol_strategy_factory + * for use directly by the async event server. + */ +class legacy_strategy_adapter_factory + : public protocol_strategy_factory +{ + ProtocolStrategyPtr strategy_; +public: + legacy_strategy_adapter_factory(const ProtocolStrategyPtr& strategy); + + virtual protocol_strategy::ptr create( + const client_connection::ptr& client_connection); +}; + +/** + * Wraps an IProtocolStrategy in a legacy_strategy_adapter_factory, wrapped in + * a to_unicode_adapter_factory (using the codepage reported by the + * IProtocolStrategy) wrapped in a delimiter_based_chunking_strategy_factory + * with the given delimiter string. + * + * @param delimiter The delimiter to use to separate messages. + * @param strategy The legacy protocol strategy (the same instance will serve + * all connections). + * + * @return the adapted strategy. + */ +protocol_strategy_factory::ptr wrap_legacy_protocol( + const std::string& delimiter, + const ProtocolStrategyPtr& strategy); + +}} diff --git a/shell/server.cpp b/shell/server.cpp index 8d3cd24c7..4d2c7cdc0 100644 --- a/shell/server.cpp +++ b/shell/server.cpp @@ -27,10 +27,13 @@ #include #include #include +#include #include #include #include +#include +#include #include #include @@ -51,6 +54,7 @@ #include #include #include +#include #include #include @@ -95,6 +99,8 @@ struct server::impl : boost::noncopyable flash::init(); CASPAR_LOG(info) << L"Initialized flash module."; + core::register_producer_factory(core::create_diag_producer); + setup_channels(env::properties()); CASPAR_LOG(info) << L"Initialized channels."; @@ -183,17 +189,22 @@ struct server::impl : boost::noncopyable } } - spl::shared_ptr create_protocol(const std::wstring& name) const + IO::protocol_strategy_factory::ptr create_protocol(const std::wstring& name) const { + using namespace IO; + if(boost::iequals(name, L"AMCP")) - return spl::make_shared(channels_); + return wrap_legacy_protocol("\r\n", spl::make_shared(channels_)); else if(boost::iequals(name, L"CII")) - return spl::make_shared(channels_); + return wrap_legacy_protocol("\r\n", spl::make_shared(channels_)); else if(boost::iequals(name, L"CLOCK")) - return spl::make_shared(channels_); + return spl::make_shared( + "ISO-8859-1", + spl::make_shared(channels_)); CASPAR_THROW_EXCEPTION(caspar_exception() << arg_name_info(L"name") << arg_value_info(name) << msg_info(L"Invalid protocol")); } + }; server::server() : impl_(new impl()){}