From 6544fa0ec3f3a501bcb89ea977756911bd7f3ebd Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sun, 10 Apr 2016 01:15:21 +0200 Subject: [PATCH] Add suppor for raw (non-Metacube) inputs over HTTP. Only really useful for TS. --- config.cpp | 26 +++++++++++++++++- config.h | 5 +++- cubemap.config.sample | 5 ++++ httpinput.cpp | 31 +++++++++++++++++++--- httpinput.h | 5 +++- input.cpp | 7 +++-- input.h | 19 ++++++++------ main.cpp | 61 ++++++++++++++++++++++++++++--------------- server.cpp | 11 ++++++-- server.h | 3 ++- serverpool.cpp | 11 ++++++-- serverpool.h | 7 +++-- state.proto | 1 + stream.cpp | 3 ++- stream.h | 5 +++- 15 files changed, 154 insertions(+), 46 deletions(-) diff --git a/config.cpp b/config.cpp index 0b5ed20..51a8b8d 100644 --- a/config.cpp +++ b/config.cpp @@ -242,7 +242,7 @@ bool parse_stream(const ConfigLine &line, Config *config) stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str()); } - // Parse encoding. + // Parse ouptut encoding. map::const_iterator encoding_parm_it = line.parameters.find("encoding"); if (encoding_parm_it == line.parameters.end() || encoding_parm_it->second == "raw") { @@ -254,6 +254,18 @@ bool parse_stream(const ConfigLine &line, Config *config) return false; } + // Parse input encoding. + map::const_iterator src_encoding_parm_it = line.parameters.find("src_encoding"); + if (src_encoding_parm_it == line.parameters.end() || + src_encoding_parm_it->second == "metacube") { + stream.src_encoding = StreamConfig::STREAM_ENCODING_METACUBE; + } else if (src_encoding_parm_it->second == "raw") { + stream.src_encoding = StreamConfig::STREAM_ENCODING_RAW; + } else { + log(ERROR, "Parameter 'src_encoding' must be either 'raw' or 'metacube' (default)"); + return false; + } + // Parse the pacing rate, converting from kilobits to bytes as needed. map::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit"); if (pacing_rate_it == line.parameters.end()) { @@ -318,6 +330,18 @@ bool parse_udpstream(const ConfigLine &line, Config *config) } } + // Parse input encoding. + map::const_iterator src_encoding_parm_it = line.parameters.find("src_encoding"); + if (src_encoding_parm_it == line.parameters.end() || + src_encoding_parm_it->second == "metacube") { + udpstream.src_encoding = StreamConfig::STREAM_ENCODING_METACUBE; + } else if (src_encoding_parm_it->second == "raw") { + udpstream.src_encoding = StreamConfig::STREAM_ENCODING_RAW; + } else { + log(ERROR, "Parameter 'src_encoding' must be either 'raw' or 'metacube' (default)"); + return false; + } + config->udpstreams.push_back(udpstream); return true; } diff --git a/config.h b/config.h index 9a5cc57..a230731 100644 --- a/config.h +++ b/config.h @@ -15,7 +15,9 @@ struct StreamConfig { size_t backlog_size; size_t prebuffering_bytes; uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). - enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding; + enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; + Encoding encoding; + Encoding src_encoding; }; struct UDPStreamConfig { @@ -24,6 +26,7 @@ struct UDPStreamConfig { uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit). int ttl; // Default is -1 (use operating system default). int multicast_iface_index; // Default is -1 (use operating system default). + StreamConfig::Encoding src_encoding; }; struct Gen204Config { diff --git a/cubemap.config.sample b/cubemap.config.sample index 35bfe5e..0d283dd 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -51,6 +51,11 @@ stream /test-jwplayer.flv src=http://gruessi.zrh.sesse.net:4013/test.flv force_p # for sending on to another Cubemap instance. stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube +# A stream where the input is _not_ Metacube framed. Note that the stream needs to +# have no header and be self-synchronizing (like with UDP input below), and most formats +# are not like this. A typical example, however, is MPEG-TS. +stream /test.ts src=http://gruessi.zrh.sesse.net:4013/test.ts src_encoding=raw + # UDP input. TS is the most common container to use over UDP (you cannot # take any arbitrary container and expect it to work). # backlog_size= overrides the backlog, which is normally 10 MB. diff --git a/httpinput.cpp b/httpinput.cpp index 50a975c..5fb55b3 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -32,9 +32,10 @@ using namespace std; extern ServerPool *servers; -HTTPInput::HTTPInput(const string &url) +HTTPInput::HTTPInput(const string &url, Input::Encoding encoding) : state(NOT_CONNECTED), url(url), + encoding(encoding), has_metacube_header(false), sock(-1) { @@ -48,6 +49,9 @@ HTTPInput::HTTPInput(const string &url) HTTPInput::HTTPInput(const InputProto &serialized) : state(State(serialized.state())), url(serialized.url()), + encoding(serialized.is_metacube_encoded() ? + Input::INPUT_ENCODING_METACUBE : + Input::INPUT_ENCODING_RAW), request(serialized.request()), request_bytes_sent(serialized.request_bytes_sent()), response(serialized.response()), @@ -100,6 +104,12 @@ InputProto HTTPInput::serialize() const serialized.set_bytes_received(stats.bytes_received); serialized.set_data_bytes_received(stats.data_bytes_received); serialized.set_connect_time(stats.connect_time); + if (encoding == Input::INPUT_ENCODING_METACUBE) { + serialized.set_is_metacube_encoded(true); + } else { + assert(encoding == Input::INPUT_ENCODING_RAW); + serialized.set_is_metacube_encoded(false); + } return serialized; } @@ -412,8 +422,14 @@ void HTTPInput::do_work() process_data(&extra_data[0], extra_data.size()); } - log(INFO, "[%s] Connected to '%s', receiving data.", - url.c_str(), url.c_str()); + if (encoding == Input::INPUT_ENCODING_RAW) { + log(INFO, "[%s] Connected to '%s', receiving raw data.", + url.c_str(), url.c_str()); + } else { + assert(encoding == Input::INPUT_ENCODING_METACUBE); + log(INFO, "[%s] Connected to '%s', receiving data.", + url.c_str(), url.c_str()); + } state = RECEIVING_DATA; break; } @@ -472,6 +488,15 @@ void HTTPInput::process_data(char *ptr, size_t bytes) stats.bytes_received += bytes; } + if (encoding == Input::INPUT_ENCODING_RAW) { + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->add_data(stream_indices[i], ptr, bytes, SUITABLE_FOR_STREAM_START); + } + return; + } + + assert(encoding == Input::INPUT_ENCODING_METACUBE); + for ( ;; ) { // If we don't have enough data (yet) for even the Metacube header, just return. if (pending_data.size() < sizeof(metacube2_block_header)) { diff --git a/httpinput.h b/httpinput.h index 6b0c875..823204d 100644 --- a/httpinput.h +++ b/httpinput.h @@ -12,7 +12,7 @@ class InputProto; class HTTPInput : public Input { public: - HTTPInput(const std::string &url); + HTTPInput(const std::string &url, Input::Encoding encoding); // Serialization/deserialization. HTTPInput(const InputProto &serialized); @@ -59,6 +59,9 @@ private: std::string url; std::string host, port, path; + // What the input stream is to be interpreted as (normally Metacube). + Input::Encoding encoding; + // The HTTP request, with headers and all. // Only relevant for SENDING_REQUEST. std::string request; diff --git a/input.cpp b/input.cpp index bbd2c3a..25012a6 100644 --- a/input.cpp +++ b/input.cpp @@ -94,16 +94,19 @@ bool parse_url(const string &url, string *protocol, string *user, string *host, return true; } -Input *create_input(const string &url) +Input *create_input(const string &url, Input::Encoding encoding) { string protocol, user, host, port, path; if (!parse_url(url, &protocol, &user, &host, &port, &path)) { return NULL; } if (protocol == "http") { - return new HTTPInput(url); + return new HTTPInput(url, encoding); } if (protocol == "udp") { + if (encoding == Input::INPUT_ENCODING_METACUBE) { + return NULL; + } return new UDPInput(url); } return NULL; diff --git a/input.h b/input.h index 1cea2f5..1b94e59 100644 --- a/input.h +++ b/input.h @@ -10,14 +10,6 @@ class Input; class InputProto; -// Extremely rudimentary URL parsing. -bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path); - -// Figure out the right type of input based on the URL, and create a new Input of the right type. -// Will return NULL if unknown. -Input *create_input(const std::string &url); -Input *create_input(const InputProto &serialized); - // Digested statistics for writing to logs etc. struct InputStats { std::string url; @@ -43,6 +35,9 @@ struct InputStats { class Input : public Thread { public: + // Must be in sync with StreamConfig::Encoding. + enum Encoding { INPUT_ENCODING_RAW = 0, INPUT_ENCODING_METACUBE }; + virtual ~Input(); virtual InputProto serialize() const = 0; virtual std::string get_url() const = 0; @@ -53,4 +48,12 @@ public: virtual InputStats get_stats() const = 0; }; +// Extremely rudimentary URL parsing. +bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path); + +// Figure out the right type of input based on the URL, and create a new Input of the right type. +// Will return NULL if unknown. +Input *create_input(const std::string &url, Input::Encoding encoding); +Input *create_input(const InputProto &serialized); + #endif // !defined(_INPUT_H) diff --git a/main.cpp b/main.cpp index 1b6fb4f..a7ad64c 100644 --- a/main.cpp +++ b/main.cpp @@ -39,6 +39,8 @@ ServerPool *servers = NULL; volatile bool hupped = false; volatile bool stopped = false; +typedef pair InputKey; + namespace { struct OrderByConnectionTime { @@ -70,7 +72,7 @@ void do_nothing(int signum) CubemapStateProto collect_state(const timespec &serialize_start, const vector acceptors, - const multimap inputs, + const multimap inputs, ServerPool *servers) { CubemapStateProto state = servers->serialize(); // Fills streams() and clients(). @@ -81,7 +83,7 @@ CubemapStateProto collect_state(const timespec &serialize_start, state.add_acceptors()->MergeFrom(acceptors[i]->serialize()); } - for (multimap::const_iterator input_it = inputs.begin(); + for (multimap::const_iterator input_it = inputs.begin(); input_it != inputs.end(); ++input_it) { state.add_inputs()->MergeFrom(input_it->second.input->serialize()); @@ -124,44 +126,45 @@ vector create_acceptors( return acceptors; } -void create_config_input(const string &src, multimap *inputs) +void create_config_input(const string &src, Input::Encoding encoding, multimap *inputs) { if (src.empty()) { return; } - if (inputs->count(src) != 0) { + InputKey key(src, encoding); + if (inputs->count(key) != 0) { return; } InputWithRefcount iwr; - iwr.input = create_input(src); + iwr.input = create_input(src, encoding); if (iwr.input == NULL) { - log(ERROR, "did not understand URL '%s', clients will not get any data.", + log(ERROR, "did not understand URL '%s' or source encoding was invalid, clients will not get any data.", src.c_str()); return; } iwr.refcount = 0; - inputs->insert(make_pair(src, iwr)); + inputs->insert(make_pair(key, iwr)); } // Find all streams in the configuration file, and create inputs for them. -void create_config_inputs(const Config &config, multimap *inputs) +void create_config_inputs(const Config &config, multimap *inputs) { for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; if (stream_config.src != "delete") { - create_config_input(stream_config.src, inputs); + create_config_input(stream_config.src, Input::Encoding(stream_config.src_encoding), inputs); } } for (unsigned i = 0; i < config.udpstreams.size(); ++i) { const UDPStreamConfig &udpstream_config = config.udpstreams[i]; - create_config_input(udpstream_config.src, inputs); + create_config_input(udpstream_config.src, Input::Encoding(udpstream_config.src_encoding), inputs); } } void create_streams(const Config &config, const set &deserialized_urls, - multimap *inputs) + multimap *inputs) { // HTTP streams. set expecting_urls = deserialized_urls; @@ -181,7 +184,8 @@ void create_streams(const Config &config, stream_index = servers->add_stream(stream_config.url, stream_config.backlog_size, stream_config.prebuffering_bytes, - Stream::Encoding(stream_config.encoding)); + Stream::Encoding(stream_config.encoding), + Stream::Encoding(stream_config.src_encoding)); } else { stream_index = servers->lookup_stream_by_url(stream_config.url); assert(stream_index != -1); @@ -189,13 +193,16 @@ void create_streams(const Config &config, servers->set_prebuffering_bytes(stream_index, stream_config.prebuffering_bytes); servers->set_encoding(stream_index, Stream::Encoding(stream_config.encoding)); + servers->set_src_encoding(stream_index, + Stream::Encoding(stream_config.src_encoding)); } servers->set_pacing_rate(stream_index, stream_config.pacing_rate); string src = stream_config.src; + Input::Encoding src_encoding = Input::Encoding(stream_config.src_encoding); if (!src.empty()) { - multimap::iterator input_it = inputs->find(src); + multimap::iterator input_it = inputs->find(make_pair(src, src_encoding)); if (input_it != inputs->end()) { input_it->second.input->add_destination(stream_index); ++input_it->second.refcount; @@ -224,8 +231,9 @@ void create_streams(const Config &config, udpstream_config.multicast_iface_index); string src = udpstream_config.src; + Input::Encoding src_encoding = Input::Encoding(udpstream_config.src_encoding); if (!src.empty()) { - multimap::iterator input_it = inputs->find(src); + multimap::iterator input_it = inputs->find(make_pair(src, src_encoding)); assert(input_it != inputs->end()); input_it->second.input->add_destination(stream_index); ++input_it->second.refcount; @@ -400,7 +408,7 @@ start: timespec serialize_start; set deserialized_urls; map deserialized_acceptors; - multimap inputs; // multimap due to older versions without deduplication. + multimap inputs; // multimap due to older versions without deduplication. if (state_fd != -1) { log(INFO, "Deserializing state from previous process..."); string serialized; @@ -445,7 +453,12 @@ start: InputWithRefcount iwr; iwr.input = create_input(serialized_input); iwr.refcount = 0; - inputs.insert(make_pair(serialized_input.url(), iwr)); + + Input::Encoding src_encoding = serialized_input.is_metacube_encoded() ? + Input::INPUT_ENCODING_METACUBE : + Input::INPUT_ENCODING_RAW; + InputKey key(serialized_input.url(), src_encoding); + inputs.insert(make_pair(key, iwr)); } // Deserialize the acceptors. @@ -503,11 +516,17 @@ start: servers->run(); // Now delete all inputs that are longer in use, and start the others. - for (multimap::iterator input_it = inputs.begin(); + for (multimap::iterator input_it = inputs.begin(); input_it != inputs.end(); ) { if (input_it->second.refcount == 0) { - log(WARNING, "Input '%s' no longer in use, closing.", - input_it->first.c_str()); + if (input_it->first.second == Input::INPUT_ENCODING_RAW) { + log(WARNING, "Raw input '%s' no longer in use, closing.", + input_it->first.first.c_str()); + } else { + assert(input_it->first.second == Input::INPUT_ENCODING_METACUBE); + log(WARNING, "Metacube input '%s' no longer in use, closing.", + input_it->first.first.c_str()); + } input_it->second.input->close_socket(); delete input_it->second.input; inputs.erase(input_it++); @@ -527,7 +546,7 @@ start: InputStatsThread *input_stats_thread = NULL; if (!config.input_stats_file.empty()) { vector inputs_no_refcount; - for (multimap::iterator input_it = inputs.begin(); + for (multimap::iterator input_it = inputs.begin(); input_it != inputs.end(); ++input_it) { inputs_no_refcount.push_back(input_it->second.input); } @@ -575,7 +594,7 @@ start: for (size_t i = 0; i < acceptors.size(); ++i) { acceptors[i]->stop(); } - for (multimap::iterator input_it = inputs.begin(); + for (multimap::iterator input_it = inputs.begin(); input_it != inputs.end(); ++input_it) { input_it->second.input->stop(); diff --git a/server.cpp b/server.cpp index 9b83ddc..d3b936f 100644 --- a/server.cpp +++ b/server.cpp @@ -295,11 +295,11 @@ int Server::lookup_stream_by_url(const string &url) const return stream_url_it->second; } -int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding) +int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding) { MutexLock lock(&mutex); stream_url_map.insert(make_pair(url, streams.size())); - streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding)); + streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding)); return streams.size() - 1; } @@ -331,6 +331,13 @@ void Server::set_encoding(int stream_index, Stream::Encoding encoding) assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); streams[stream_index]->encoding = encoding; } + +void Server::set_src_encoding(int stream_index, Stream::Encoding encoding) +{ + MutexLock lock(&mutex); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->src_encoding = encoding; +} void Server::set_header(int stream_index, const string &http_header, const string &stream_header) { diff --git a/server.h b/server.h index 073ef73..320fcbc 100644 --- a/server.h +++ b/server.h @@ -55,12 +55,13 @@ public: // at the same time). CubemapStateProto serialize(); void add_client_from_serialized(const ClientProto &client); - int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding); + int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding); int add_stream_from_serialized(const StreamProto &stream, int data_fd); int lookup_stream_by_url(const std::string &url) const; void set_backlog_size(int stream_index, size_t new_size); void set_prebuffering_bytes(int stream_index, size_t new_amount); void set_encoding(int stream_index, Stream::Encoding encoding); + void set_src_encoding(int stream_index, Stream::Encoding encoding); void add_gen204(const std::string &url, const std::string &allow_origin); private: diff --git a/serverpool.cpp b/serverpool.cpp index c77c30e..2c663fb 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -73,14 +73,14 @@ int ServerPool::lookup_stream_by_url(const string &url) const return servers[0].lookup_stream_by_url(url); } -int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding) +int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding) { // Adding more HTTP streams after UDP streams would cause the UDP stream // indices to move around, which is obviously not good. assert(udp_streams.empty()); for (int i = 0; i < num_servers; ++i) { - int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding); + int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding); assert(stream_index == num_http_streams); } return num_http_streams++; @@ -221,3 +221,10 @@ void ServerPool::set_encoding(int stream_index, Stream::Encoding encoding) servers[i].set_encoding(stream_index, encoding); } } + +void ServerPool::set_src_encoding(int stream_index, Stream::Encoding encoding) +{ + for (int i = 0; i < num_servers; ++i) { + servers[i].set_src_encoding(stream_index, encoding); + } +} diff --git a/serverpool.h b/serverpool.h index 44084bc..e52aee8 100644 --- a/serverpool.h +++ b/serverpool.h @@ -29,7 +29,7 @@ public: void add_client_from_serialized(const ClientProto &client); // Adds the given stream to all the servers. Returns the stream index. - int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding); + int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding); int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); void delete_stream(const std::string &url); int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index); @@ -52,9 +52,12 @@ public: // Changes the given stream's amount of forced prebuffering on all the servers. void set_prebuffering_bytes(int stream_index, size_t new_amount); - // Changes the given stream's encoding type on all the servers. + // Changes the given stream's output encoding type on all the servers. void set_encoding(int stream_index, Stream::Encoding encoding); + // Changes the given stream's input encoding type on all the servers. + void set_src_encoding(int stream_index, Stream::Encoding encoding); + // Adds the given gen204 endpoint to all the servers. void add_gen204(const std::string &url, const std::string &allow_origin); diff --git a/state.proto b/state.proto index c6f0e00..2a788eb 100644 --- a/state.proto +++ b/state.proto @@ -47,6 +47,7 @@ message InputProto { optional int64 bytes_received = 11; optional int64 data_bytes_received = 12; optional int64 connect_time = 13; + optional bool is_metacube_encoded = 15 [default=true]; }; // Corresponds to class Acceptor. diff --git a/stream.cpp b/stream.cpp index 64bf2e8..b498b2e 100644 --- a/stream.cpp +++ b/stream.cpp @@ -20,9 +20,10 @@ using namespace std; -Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding) +Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding) : url(url), encoding(encoding), + src_encoding(src_encoding), data_fd(make_tempfile("")), backlog_size(backlog_size), prebuffering_bytes(prebuffering_bytes), diff --git a/stream.h b/stream.h index c98feb4..057a48c 100644 --- a/stream.h +++ b/stream.h @@ -24,7 +24,7 @@ struct Stream { // Must be in sync with StreamConfig::Encoding. enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; - Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding); + Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding); ~Stream(); // Serialization/deserialization. @@ -51,6 +51,9 @@ struct Stream { // be Metacube, for reflecting to another Cubemap instance). Encoding encoding; + // What encoding we expect the incoming data to be in (usually Metacube). + Encoding src_encoding; + // The stream data itself, stored in a circular buffer. // // We store our data in a file, so that we can send the data to the -- 2.39.2