]> git.sesse.net Git - cubemap/commitdiff
Add suppor for raw (non-Metacube) inputs over HTTP. Only really useful for TS.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 9 Apr 2016 23:15:21 +0000 (01:15 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 9 Apr 2016 23:15:37 +0000 (01:15 +0200)
15 files changed:
config.cpp
config.h
cubemap.config.sample
httpinput.cpp
httpinput.h
input.cpp
input.h
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stream.cpp
stream.h

index 0b5ed20bdb5192ffa60e7ca2783844836f3741a7..51a8b8d71082d9287e173d404ffbff2ee3a7b6ad 100644 (file)
@@ -242,7 +242,7 @@ bool parse_stream(const ConfigLine &line, Config *config)
                stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str());
        }
 
-       // Parse encoding.
+       // Parse ouptut encoding.
        map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
        if (encoding_parm_it == line.parameters.end() ||
            encoding_parm_it->second == "raw") {
@@ -254,6 +254,18 @@ bool parse_stream(const ConfigLine &line, Config *config)
                return false;
        }
 
+       // Parse input encoding.
+       map<string, string>::const_iterator src_encoding_parm_it = line.parameters.find("src_encoding");
+       if (src_encoding_parm_it == line.parameters.end() ||
+           src_encoding_parm_it->second == "metacube") {
+               stream.src_encoding = StreamConfig::STREAM_ENCODING_METACUBE;
+       } else if (src_encoding_parm_it->second == "raw") {
+               stream.src_encoding = StreamConfig::STREAM_ENCODING_RAW;
+       } else {
+               log(ERROR, "Parameter 'src_encoding' must be either 'raw' or 'metacube' (default)");
+               return false;
+       }
+
        // Parse the pacing rate, converting from kilobits to bytes as needed.
        map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
        if (pacing_rate_it == line.parameters.end()) {
@@ -318,6 +330,18 @@ bool parse_udpstream(const ConfigLine &line, Config *config)
                }
        }
 
+       // Parse input encoding.
+       map<string, string>::const_iterator src_encoding_parm_it = line.parameters.find("src_encoding");
+       if (src_encoding_parm_it == line.parameters.end() ||
+           src_encoding_parm_it->second == "metacube") {
+               udpstream.src_encoding = StreamConfig::STREAM_ENCODING_METACUBE;
+       } else if (src_encoding_parm_it->second == "raw") {
+               udpstream.src_encoding = StreamConfig::STREAM_ENCODING_RAW;
+       } else {
+               log(ERROR, "Parameter 'src_encoding' must be either 'raw' or 'metacube' (default)");
+               return false;
+       }
+
        config->udpstreams.push_back(udpstream);
        return true;
 }
index 9a5cc571f50a6868623a88370d6a39afa9c0f777..a230731cee52b3a89566ec3f2df143439f258d97 100644 (file)
--- a/config.h
+++ b/config.h
@@ -15,7 +15,9 @@ struct StreamConfig {
        size_t backlog_size;
        size_t prebuffering_bytes;
        uint32_t pacing_rate;  // In bytes per second. Default is ~0U (no limit).
-       enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
+       enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
+       Encoding encoding;
+       Encoding src_encoding;
 };
 
 struct UDPStreamConfig {
@@ -24,6 +26,7 @@ struct UDPStreamConfig {
        uint32_t pacing_rate;  // In bytes per second. Default is ~0U (no limit).
        int ttl;  // Default is -1 (use operating system default).
        int multicast_iface_index;  // Default is -1 (use operating system default).
+       StreamConfig::Encoding src_encoding;
 };
 
 struct Gen204Config {
index 35bfe5ea221a1b69f97324807a2d33b7f176a275..0d283ddbb748a49dca2ceb03b14046a9bf691567 100644 (file)
@@ -51,6 +51,11 @@ stream /test-jwplayer.flv src=http://gruessi.zrh.sesse.net:4013/test.flv force_p
 # for sending on to another Cubemap instance.
 stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
 
+# A stream where the input is _not_ Metacube framed. Note that the stream needs to
+# have no header and be self-synchronizing (like with UDP input below), and most formats
+# are not like this. A typical example, however, is MPEG-TS.
+stream /test.ts src=http://gruessi.zrh.sesse.net:4013/test.ts src_encoding=raw
+
 # UDP input. TS is the most common container to use over UDP (you cannot
 # take any arbitrary container and expect it to work).
 # backlog_size=<number of bytes> overrides the backlog, which is normally 10 MB.
index 50a975cfece1fa70f9535b553fc6c7f322571579..5fb55b378610e846f2d74f6074f82f1f95ee58b7 100644 (file)
@@ -32,9 +32,10 @@ using namespace std;
 
 extern ServerPool *servers;
 
-HTTPInput::HTTPInput(const string &url)
+HTTPInput::HTTPInput(const string &url, Input::Encoding encoding)
        : state(NOT_CONNECTED),
          url(url),
+         encoding(encoding),
          has_metacube_header(false),
          sock(-1)
 {
@@ -48,6 +49,9 @@ HTTPInput::HTTPInput(const string &url)
 HTTPInput::HTTPInput(const InputProto &serialized)
        : state(State(serialized.state())),
          url(serialized.url()),
+         encoding(serialized.is_metacube_encoded() ?
+                  Input::INPUT_ENCODING_METACUBE :
+                  Input::INPUT_ENCODING_RAW),
          request(serialized.request()),
          request_bytes_sent(serialized.request_bytes_sent()),
          response(serialized.response()),
@@ -100,6 +104,12 @@ InputProto HTTPInput::serialize() const
        serialized.set_bytes_received(stats.bytes_received);
        serialized.set_data_bytes_received(stats.data_bytes_received);
        serialized.set_connect_time(stats.connect_time);
+       if (encoding == Input::INPUT_ENCODING_METACUBE) {
+               serialized.set_is_metacube_encoded(true);
+       } else {
+               assert(encoding == Input::INPUT_ENCODING_RAW);
+               serialized.set_is_metacube_encoded(false);
+       }
        return serialized;
 }
 
@@ -412,8 +422,14 @@ void HTTPInput::do_work()
                                process_data(&extra_data[0], extra_data.size());
                        }
 
-                       log(INFO, "[%s] Connected to '%s', receiving data.",
-                                  url.c_str(), url.c_str());
+                       if (encoding == Input::INPUT_ENCODING_RAW) {
+                               log(INFO, "[%s] Connected to '%s', receiving raw data.",
+                                          url.c_str(), url.c_str());
+                       } else {
+                               assert(encoding == Input::INPUT_ENCODING_METACUBE);
+                               log(INFO, "[%s] Connected to '%s', receiving data.",
+                                          url.c_str(), url.c_str());
+                       }
                        state = RECEIVING_DATA;
                        break;
                }
@@ -472,6 +488,15 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                stats.bytes_received += bytes;
        }
 
+       if (encoding == Input::INPUT_ENCODING_RAW) {
+               for (size_t i = 0; i < stream_indices.size(); ++i) {
+                       servers->add_data(stream_indices[i], ptr, bytes, SUITABLE_FOR_STREAM_START);
+               }
+               return;
+       }
+
+       assert(encoding == Input::INPUT_ENCODING_METACUBE);
+
        for ( ;; ) {
                // If we don't have enough data (yet) for even the Metacube header, just return.
                if (pending_data.size() < sizeof(metacube2_block_header)) {
index 6b0c87512dffee116dea7c86c89404a5e0090e56..823204d5b1cff4c264d348b70a731a93e29f1949 100644 (file)
@@ -12,7 +12,7 @@ class InputProto;
 
 class HTTPInput : public Input {
 public:
-       HTTPInput(const std::string &url);
+       HTTPInput(const std::string &url, Input::Encoding encoding);
 
        // Serialization/deserialization.
        HTTPInput(const InputProto &serialized);
@@ -59,6 +59,9 @@ private:
        std::string url;
        std::string host, port, path;
 
+       // What the input stream is to be interpreted as (normally Metacube).
+       Input::Encoding encoding;
+
        // The HTTP request, with headers and all.
        // Only relevant for SENDING_REQUEST.
        std::string request;
index bbd2c3ae829d64c20a72576b8d5eea62331103af..25012a6bbad930f63a53914ed53fd0e9d8cc4fcf 100644 (file)
--- a/input.cpp
+++ b/input.cpp
@@ -94,16 +94,19 @@ bool parse_url(const string &url, string *protocol, string *user, string *host,
        return true;
 }
 
-Input *create_input(const string &url)
+Input *create_input(const string &url, Input::Encoding encoding)
 {
        string protocol, user, host, port, path;
        if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
                return NULL;
        }
        if (protocol == "http") {
-               return new HTTPInput(url);
+               return new HTTPInput(url, encoding);
        }
        if (protocol == "udp") {
+               if (encoding == Input::INPUT_ENCODING_METACUBE) {
+                       return NULL;
+               }
                return new UDPInput(url);
        }
        return NULL;
diff --git a/input.h b/input.h
index 1cea2f541f31aabe027f4b4ce41f5ef20cf64c14..1b94e59ffef5faf1b8952e797caa62653f337af8 100644 (file)
--- a/input.h
+++ b/input.h
 class Input;
 class InputProto;
 
-// Extremely rudimentary URL parsing.
-bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path);
-
-// Figure out the right type of input based on the URL, and create a new Input of the right type.
-// Will return NULL if unknown.
-Input *create_input(const std::string &url);
-Input *create_input(const InputProto &serialized);
-
 // Digested statistics for writing to logs etc.
 struct InputStats {
        std::string url;
@@ -43,6 +35,9 @@ struct InputStats {
 
 class Input : public Thread {
 public:
+       // Must be in sync with StreamConfig::Encoding.
+       enum Encoding { INPUT_ENCODING_RAW = 0, INPUT_ENCODING_METACUBE };
+
        virtual ~Input();
        virtual InputProto serialize() const = 0;
        virtual std::string get_url() const = 0;
@@ -53,4 +48,12 @@ public:
        virtual InputStats get_stats() const = 0;
 };
 
+// Extremely rudimentary URL parsing.
+bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path);
+
+// Figure out the right type of input based on the URL, and create a new Input of the right type.
+// Will return NULL if unknown.
+Input *create_input(const std::string &url, Input::Encoding encoding);
+Input *create_input(const InputProto &serialized);
+
 #endif  // !defined(_INPUT_H)
index 1b6fb4f8103db87287646f258546e045e11dba5c..a7ad64c5748d07e8e7a2b6b66bacb01340d7b44d 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -39,6 +39,8 @@ ServerPool *servers = NULL;
 volatile bool hupped = false;
 volatile bool stopped = false;
 
+typedef pair<string, Input::Encoding> InputKey;
+
 namespace {
 
 struct OrderByConnectionTime {
@@ -70,7 +72,7 @@ void do_nothing(int signum)
 
 CubemapStateProto collect_state(const timespec &serialize_start,
                                 const vector<Acceptor *> acceptors,
-                                const multimap<string, InputWithRefcount> inputs,
+                                const multimap<InputKey, InputWithRefcount> inputs,
                                 ServerPool *servers)
 {
        CubemapStateProto state = servers->serialize();  // Fills streams() and clients().
@@ -81,7 +83,7 @@ CubemapStateProto collect_state(const timespec &serialize_start,
                state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
        }
 
-       for (multimap<string, InputWithRefcount>::const_iterator input_it = inputs.begin();
+       for (multimap<InputKey, InputWithRefcount>::const_iterator input_it = inputs.begin();
             input_it != inputs.end();
             ++input_it) {
                state.add_inputs()->MergeFrom(input_it->second.input->serialize());
@@ -124,44 +126,45 @@ vector<Acceptor *> create_acceptors(
        return acceptors;
 }
 
-void create_config_input(const string &src, multimap<string, InputWithRefcount> *inputs)
+void create_config_input(const string &src, Input::Encoding encoding, multimap<InputKey, InputWithRefcount> *inputs)
 {
        if (src.empty()) {
                return;
        }
-       if (inputs->count(src) != 0) {
+       InputKey key(src, encoding);
+       if (inputs->count(key) != 0) {
                return;
        }
 
        InputWithRefcount iwr;
-       iwr.input = create_input(src);
+       iwr.input = create_input(src, encoding);
        if (iwr.input == NULL) {
-               log(ERROR, "did not understand URL '%s', clients will not get any data.",
+               log(ERROR, "did not understand URL '%s' or source encoding was invalid, clients will not get any data.",
                        src.c_str());
                return;
        }
        iwr.refcount = 0;
-       inputs->insert(make_pair(src, iwr));
+       inputs->insert(make_pair(key, iwr));
 }
 
 // Find all streams in the configuration file, and create inputs for them.
-void create_config_inputs(const Config &config, multimap<string, InputWithRefcount> *inputs)
+void create_config_inputs(const Config &config, multimap<InputKey, InputWithRefcount> *inputs)
 {
        for (unsigned i = 0; i < config.streams.size(); ++i) {
                const StreamConfig &stream_config = config.streams[i];
                if (stream_config.src != "delete") {
-                       create_config_input(stream_config.src, inputs);
+                       create_config_input(stream_config.src, Input::Encoding(stream_config.src_encoding), inputs);
                }
        }
        for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
                const UDPStreamConfig &udpstream_config = config.udpstreams[i];
-               create_config_input(udpstream_config.src, inputs);
+               create_config_input(udpstream_config.src, Input::Encoding(udpstream_config.src_encoding), inputs);
        }
 }
 
 void create_streams(const Config &config,
                     const set<string> &deserialized_urls,
-                    multimap<string, InputWithRefcount> *inputs)
+                    multimap<InputKey, InputWithRefcount> *inputs)
 {
        // HTTP streams.
        set<string> expecting_urls = deserialized_urls;
@@ -181,7 +184,8 @@ void create_streams(const Config &config,
                        stream_index = servers->add_stream(stream_config.url,
                                                           stream_config.backlog_size,
                                                           stream_config.prebuffering_bytes,
-                                                          Stream::Encoding(stream_config.encoding));
+                                                          Stream::Encoding(stream_config.encoding),
+                                                          Stream::Encoding(stream_config.src_encoding));
                } else {
                        stream_index = servers->lookup_stream_by_url(stream_config.url);
                        assert(stream_index != -1);
@@ -189,13 +193,16 @@ void create_streams(const Config &config,
                        servers->set_prebuffering_bytes(stream_index, stream_config.prebuffering_bytes);
                        servers->set_encoding(stream_index,
                                              Stream::Encoding(stream_config.encoding));
+                       servers->set_src_encoding(stream_index,
+                                                 Stream::Encoding(stream_config.src_encoding));
                }
 
                servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
 
                string src = stream_config.src;
+               Input::Encoding src_encoding = Input::Encoding(stream_config.src_encoding);
                if (!src.empty()) {
-                       multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
+                       multimap<InputKey, InputWithRefcount>::iterator input_it = inputs->find(make_pair(src, src_encoding));
                        if (input_it != inputs->end()) {
                                input_it->second.input->add_destination(stream_index);
                                ++input_it->second.refcount;
@@ -224,8 +231,9 @@ void create_streams(const Config &config,
                        udpstream_config.multicast_iface_index);
 
                string src = udpstream_config.src;
+               Input::Encoding src_encoding = Input::Encoding(udpstream_config.src_encoding);
                if (!src.empty()) {
-                       multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
+                       multimap<InputKey, InputWithRefcount>::iterator input_it = inputs->find(make_pair(src, src_encoding));
                        assert(input_it != inputs->end());
                        input_it->second.input->add_destination(stream_index);
                        ++input_it->second.refcount;
@@ -400,7 +408,7 @@ start:
        timespec serialize_start;
        set<string> deserialized_urls;
        map<sockaddr_in6, Acceptor *, Sockaddr6Compare> deserialized_acceptors;
-       multimap<string, InputWithRefcount> inputs;  // multimap due to older versions without deduplication.
+       multimap<InputKey, InputWithRefcount> inputs;  // multimap due to older versions without deduplication.
        if (state_fd != -1) {
                log(INFO, "Deserializing state from previous process...");
                string serialized;
@@ -445,7 +453,12 @@ start:
                        InputWithRefcount iwr;
                        iwr.input = create_input(serialized_input);
                        iwr.refcount = 0;
-                       inputs.insert(make_pair(serialized_input.url(), iwr));
+
+                       Input::Encoding src_encoding = serialized_input.is_metacube_encoded() ?
+                               Input::INPUT_ENCODING_METACUBE :
+                               Input::INPUT_ENCODING_RAW;
+                       InputKey key(serialized_input.url(), src_encoding);
+                       inputs.insert(make_pair(key, iwr));
                } 
 
                // Deserialize the acceptors.
@@ -503,11 +516,17 @@ start:
        servers->run();
 
        // Now delete all inputs that are longer in use, and start the others.
-       for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+       for (multimap<InputKey, InputWithRefcount>::iterator input_it = inputs.begin();
             input_it != inputs.end(); ) {
                if (input_it->second.refcount == 0) {
-                       log(WARNING, "Input '%s' no longer in use, closing.",
-                           input_it->first.c_str());
+                       if (input_it->first.second == Input::INPUT_ENCODING_RAW) {
+                               log(WARNING, "Raw input '%s' no longer in use, closing.",
+                                   input_it->first.first.c_str());
+                       } else {
+                               assert(input_it->first.second == Input::INPUT_ENCODING_METACUBE);
+                               log(WARNING, "Metacube input '%s' no longer in use, closing.",
+                                   input_it->first.first.c_str());
+                       }
                        input_it->second.input->close_socket();
                        delete input_it->second.input;
                        inputs.erase(input_it++);
@@ -527,7 +546,7 @@ start:
        InputStatsThread *input_stats_thread = NULL;
        if (!config.input_stats_file.empty()) {
                vector<Input*> inputs_no_refcount;
-               for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+               for (multimap<InputKey, InputWithRefcount>::iterator input_it = inputs.begin();
                     input_it != inputs.end(); ++input_it) {
                        inputs_no_refcount.push_back(input_it->second.input);
                }
@@ -575,7 +594,7 @@ start:
        for (size_t i = 0; i < acceptors.size(); ++i) {
                acceptors[i]->stop();
        }
-       for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+       for (multimap<InputKey, InputWithRefcount>::iterator input_it = inputs.begin();
             input_it != inputs.end();
             ++input_it) {
                input_it->second.input->stop();
index 9b83ddccd4fd0ff53f39bd08a702495312282d5b..d3b936f0b9c1f869c34c5dbf05c1beb32ad90a8e 100644 (file)
@@ -295,11 +295,11 @@ int Server::lookup_stream_by_url(const string &url) const
        return stream_url_it->second;
 }
 
-int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
+int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
 {
        MutexLock lock(&mutex);
        stream_url_map.insert(make_pair(url, streams.size()));
-       streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
+       streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding));
        return streams.size() - 1;
 }
 
@@ -331,6 +331,13 @@ void Server::set_encoding(int stream_index, Stream::Encoding encoding)
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
        streams[stream_index]->encoding = encoding;
 }
+
+void Server::set_src_encoding(int stream_index, Stream::Encoding encoding)
+{
+       MutexLock lock(&mutex);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->src_encoding = encoding;
+}
        
 void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
 {
index 073ef73eb4f6c340d7adf36e6a18047578e29a38..320fcbcd8da8a274277a9633fbc0e0f1ca95a90c 100644 (file)
--- a/server.h
+++ b/server.h
@@ -55,12 +55,13 @@ public:
        // at the same time).
        CubemapStateProto serialize();
        void add_client_from_serialized(const ClientProto &client);
-       int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding);
+       int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
        int add_stream_from_serialized(const StreamProto &stream, int data_fd);
        int lookup_stream_by_url(const std::string &url) const;
        void set_backlog_size(int stream_index, size_t new_size);
        void set_prebuffering_bytes(int stream_index, size_t new_amount);
        void set_encoding(int stream_index, Stream::Encoding encoding);
+       void set_src_encoding(int stream_index, Stream::Encoding encoding);
        void add_gen204(const std::string &url, const std::string &allow_origin);
 
 private:
index c77c30ef85c262084a0d92ad862da38a4717ceb5..2c663fbce37dd3d4e851a05de866bc7ceb9fb7e6 100644 (file)
@@ -73,14 +73,14 @@ int ServerPool::lookup_stream_by_url(const string &url) const
        return servers[0].lookup_stream_by_url(url);
 }
 
-int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
+int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
 {
        // Adding more HTTP streams after UDP streams would cause the UDP stream
        // indices to move around, which is obviously not good.
        assert(udp_streams.empty());
 
        for (int i = 0; i < num_servers; ++i) {
-               int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding);
+               int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding);
                assert(stream_index == num_http_streams);
        }
        return num_http_streams++;
@@ -221,3 +221,10 @@ void ServerPool::set_encoding(int stream_index, Stream::Encoding encoding)
                servers[i].set_encoding(stream_index, encoding);
        }       
 }
+
+void ServerPool::set_src_encoding(int stream_index, Stream::Encoding encoding)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_src_encoding(stream_index, encoding);
+       }
+}
index 44084bc180e6ebb5466b0e908acd78a3253b2f14..e52aee8acf6ac92db9001dfbd72e12aa473c61af 100644 (file)
@@ -29,7 +29,7 @@ public:
        void add_client_from_serialized(const ClientProto &client);
 
        // Adds the given stream to all the servers. Returns the stream index.
-       int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding);
+       int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
        int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
        void delete_stream(const std::string &url);
        int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
@@ -52,9 +52,12 @@ public:
        // Changes the given stream's amount of forced prebuffering on all the servers.
        void set_prebuffering_bytes(int stream_index, size_t new_amount);
 
-       // Changes the given stream's encoding type on all the servers.
+       // Changes the given stream's output encoding type on all the servers.
        void set_encoding(int stream_index, Stream::Encoding encoding);
 
+       // Changes the given stream's input encoding type on all the servers.
+       void set_src_encoding(int stream_index, Stream::Encoding encoding);
+
        // Adds the given gen204 endpoint to all the servers.
        void add_gen204(const std::string &url, const std::string &allow_origin);
 
index c6f0e004624ec6545cf40d935890a3528b4df7a0..2a788eb07e6fdcba86dfb0fa50c0e8384d51fd50 100644 (file)
@@ -47,6 +47,7 @@ message InputProto {
        optional int64 bytes_received = 11;
        optional int64 data_bytes_received = 12;
        optional int64 connect_time = 13;
+       optional bool is_metacube_encoded = 15 [default=true];
 };
 
 // Corresponds to class Acceptor.
index 64bf2e8053e7c0edd79b17f43ba12160a3dd8795..b498b2e6f7108ced5fab2f0f4d87109521671ca8 100644 (file)
 
 using namespace std;
 
-Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding)
+Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding)
        : url(url),
          encoding(encoding),
+         src_encoding(src_encoding),
          data_fd(make_tempfile("")),
           backlog_size(backlog_size),
          prebuffering_bytes(prebuffering_bytes),
index c98feb46e3bb2e09f5913c1ea31a018e02680075..057a48cbe08e516be02f756ce4aeac3701672ee2 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -24,7 +24,7 @@ struct Stream {
        // Must be in sync with StreamConfig::Encoding.
        enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
 
-       Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding);
+       Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding);
        ~Stream();
 
        // Serialization/deserialization.
@@ -51,6 +51,9 @@ struct Stream {
        // be Metacube, for reflecting to another Cubemap instance).
        Encoding encoding;
 
+       // What encoding we expect the incoming data to be in (usually Metacube).
+       Encoding src_encoding;
+
        // The stream data itself, stored in a circular buffer.
        //
        // We store our data in a file, so that we can send the data to the