Support Metacube _output_. Required splitting HTTP headers from stream headers, which...
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 17 Apr 2013 23:02:28 +0000 (01:02 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 17 Apr 2013 23:02:28 +0000 (01:02 +0200)
13 files changed:
config.cpp
config.h
cubemap.config.sample
httpinput.cpp
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stream.cpp
stream.h
udpinput.cpp

index fad29a5..c5547bd 100644 (file)
@@ -207,6 +207,18 @@ bool parse_stream(const ConfigLine &line, Config *config)
                stream.backlog_size = atoi(backlog_it->second.c_str());
        }
 
+       // Parse encoding.
+       map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
+       if (encoding_parm_it == line.parameters.end() ||
+           encoding_parm_it->second == "raw") {
+               stream.encoding = StreamConfig::STREAM_ENCODING_RAW;
+       } else if (encoding_parm_it->second == "metacube") {
+               stream.encoding = StreamConfig::STREAM_ENCODING_METACUBE;
+       } else {
+               log(ERROR, "Parameter 'encoding' must be either 'raw' (default) or 'metacube'");
+               return false;
+       }
+
        // Parse marks, if so desired.
        map<string, string>::const_iterator mark_parm_it = line.parameters.find("mark");
        if (mark_parm_it == line.parameters.end()) {
index 47dabe9..5395d5a 100644 (file)
--- a/config.h
+++ b/config.h
@@ -16,6 +16,7 @@ struct StreamConfig {
        std::string src;  // Can be empty.
        size_t backlog_size;
        int mark_pool;  // -1 for none.
+       enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
 };
 
 struct AcceptorConfig {
index 8504d2a..f3e2e0c 100644 (file)
@@ -26,4 +26,5 @@ error_log type=console
 # now the streams!
 #
 stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
+stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
 stream /udp.ts src=udp://@:1234 backlog_size=1048576
index 51b4f41..457c216 100644 (file)
@@ -50,6 +50,13 @@ HTTPInput::HTTPInput(const InputProto &serialized)
 
        string protocol;
        parse_url(url, &protocol, &host, &port, &path);  // Don't care if it fails.
+
+       // Older versions stored the extra \r\n in the HTTP header.
+       // Strip it if we find it.
+       if (http_header.size() >= 4 &&
+           memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) {
+               http_header.resize(http_header.size() - 2);
+       }
 }
 
 void HTTPInput::close_socket()
@@ -200,10 +207,9 @@ bool HTTPInput::parse_response(const std::string &request)
             ++it) {
                http_header.append(it->first + ": " + it->second + "\r\n");
        }
-       http_header.append("\r\n");
 
        for (size_t i = 0; i < stream_ids.size(); ++i) {
-               servers->set_header(stream_ids[i], http_header);
+               servers->set_header(stream_ids[i], http_header, "");
        }
 
        return true;
@@ -237,7 +243,7 @@ void HTTPInput::do_work()
                        response.clear();
                        pending_data.clear();
                        for (size_t i = 0; i < stream_ids.size(); ++i) {
-                               servers->set_header(stream_ids[i], "");
+                               servers->set_header(stream_ids[i], "", "");
                        }
 
                        {
@@ -444,7 +450,7 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                if (flags & METACUBE_FLAGS_HEADER) {
                        string header(inner_data, inner_data + size);
                        for (size_t i = 0; i < stream_ids.size(); ++i) {
-                               servers->set_header(stream_ids[i], http_header + header);
+                               servers->set_header(stream_ids[i], http_header, header);
                        }
                } else { 
                        for (size_t i = 0; i < stream_ids.size(); ++i) {
index fa63549..b45713d 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -143,9 +143,13 @@ void create_streams(const Config &config,
        for (unsigned i = 0; i < config.streams.size(); ++i) {
                const StreamConfig &stream_config = config.streams[i];
                if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
-                       servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
+                       servers->add_stream(stream_config.stream_id,
+                                           stream_config.backlog_size,
+                                           Stream::Encoding(stream_config.encoding));
                } else {
                        servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
+                       servers->set_encoding(stream_config.stream_id,
+                                             Stream::Encoding(stream_config.encoding));
                }
                expecting_stream_ids.erase(stream_config.stream_id);
 
index 8c3382d..e4ea647 100644 (file)
@@ -1,3 +1,4 @@
+#include <arpa/inet.h>
 #include <assert.h>
 #include <errno.h>
 #include <pthread.h>
@@ -17,6 +18,7 @@
 #include "accesslog.h"
 #include "log.h"
 #include "markpool.h"
+#include "metacube.h"
 #include "mutexlock.h"
 #include "parse.h"
 #include "server.h"
@@ -194,10 +196,10 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 }
 
-void Server::add_stream(const string &stream_id, size_t backlog_size)
+void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size)));
+       streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
 }
 
 void Server::add_stream_from_serialized(const StreamProto &stream)
@@ -206,17 +208,25 @@ void Server::add_stream_from_serialized(const StreamProto &stream)
        streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
 }
        
-void Server::set_backlog_size(const std::string &stream_id, size_t new_size)
+void Server::set_backlog_size(const string &stream_id, size_t new_size)
 {
        MutexLock lock(&mutex);
        assert(streams.count(stream_id) != 0);
        streams[stream_id]->set_backlog_size(new_size);
 }
        
-void Server::set_header(const string &stream_id, const string &header)
+void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       find_stream(stream_id)->header = header;
+       assert(streams.count(stream_id) != 0);
+       streams[stream_id]->encoding = encoding;
+}
+       
+void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+{
+       MutexLock lock(&mutex);
+       find_stream(stream_id)->http_header = http_header;
+       find_stream(stream_id)->stream_header = stream_header;
 
        // If there are clients we haven't sent anything to yet, we should give
        // them the header, so push back into the SENDING_HEADER state.
@@ -231,7 +241,7 @@ void Server::set_header(const string &stream_id, const string &header)
        }
 }
        
-void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
 {
        MutexLock lock(&mutex);
        assert(clients.empty());
@@ -454,7 +464,25 @@ int Server::parse_request(Client *client)
 
 void Server::construct_header(Client *client)
 {
-       client->header_or_error = find_stream(client->stream_id)->header;
+       Stream *stream = find_stream(client->stream_id);
+       if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
+               client->header_or_error = stream->http_header +
+                       "\r\n" +
+                       stream->stream_header;
+       } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+               metacube_block_header hdr;
+               memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+               hdr.size = htonl(stream->stream_header.size());
+               hdr.flags = htonl(METACUBE_FLAGS_HEADER);
+
+               client->header_or_error = stream->http_header +
+                       "Content-encoding: metacube\r\n" +
+                       "\r\n" +
+                       string(reinterpret_cast<char *>(&hdr), sizeof(hdr)) +
+                       stream->stream_header;
+       } else {
+               assert(false);
+       }
 
        // Switch states.
        client->state = Client::SENDING_HEADER;
index 2d227b6..523f5c5 100644 (file)
--- a/server.h
+++ b/server.h
@@ -12,6 +12,7 @@
 #include <vector>
 
 #include "client.h"
+#include "stream.h"
 #include "thread.h"
 
 class ClientProto;
@@ -34,7 +35,9 @@ public:
        std::vector<ClientStats> get_client_stats() const;
 
        // Set header (both HTTP header and any stream headers) for the given stream.
-       void set_header(const std::string &stream_id, const std::string &header);
+       void set_header(const std::string &stream_id,
+                       const std::string &http_header,
+                       const std::string &stream_header);
 
        // Set that the given stream should use the given mark pool from now on.
        // NOTE: This should be set before any clients are connected!
@@ -51,9 +54,10 @@ public:
        // at the same time).
        CubemapStateProto serialize();
        void add_client_from_serialized(const ClientProto &client);
-       void add_stream(const std::string &stream_id, size_t bytes_received);
+       void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding);
        void add_stream_from_serialized(const StreamProto &stream);
        void set_backlog_size(const std::string &stream_id, size_t new_size);
+       void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
 
 private:
        // Mutex protecting queued_data only. Note that if you want to hold both this
index 352f719..60a913d 100644 (file)
@@ -48,10 +48,10 @@ void ServerPool::add_client_from_serialized(const ClientProto &client)
        servers[clients_added++ % num_servers].add_client_from_serialized(client);
 }
 
-void ServerPool::add_stream(const std::string &stream_id, size_t backlog_size)
+void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
 {
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].add_stream(stream_id, backlog_size);
+               servers[i].add_stream(stream_id, backlog_size, encoding);
        }
 }
 
@@ -62,14 +62,14 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream)
        }
 }
 
-void ServerPool::set_header(const std::string &stream_id, const std::string &header)
+void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
 {
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].set_header(stream_id, header);
+               servers[i].set_header(stream_id, http_header, stream_header);
        }
 }
 
-void ServerPool::add_data(const std::string &stream_id, const char *data, size_t bytes)
+void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
 {
        for (int i = 0; i < num_servers; ++i) {
                servers[i].add_data_deferred(stream_id, data, bytes);
@@ -100,16 +100,23 @@ vector<ClientStats> ServerPool::get_client_stats() const
        return ret;
 }
        
-void ServerPool::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool)
+void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
 {
        for (int i = 0; i < num_servers; ++i) {
                servers[i].set_mark_pool(stream_id, mark_pool);
        }       
 }
 
-void ServerPool::set_backlog_size(const std::string &stream_id, size_t new_size)
+void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
 {
        for (int i = 0; i < num_servers; ++i) {
                servers[i].set_backlog_size(stream_id, new_size);
        }       
 }
+
+void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_encoding(stream_id, encoding);
+       }       
+}
index ba60d7c..84a201f 100644 (file)
@@ -26,11 +26,13 @@ public:
        void add_client_from_serialized(const ClientProto &client);
 
        // Adds the given stream to all the servers.
-       void add_stream(const std::string &stream_id, size_t backlog_size);
+       void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding);
        void add_stream_from_serialized(const StreamProto &stream);
 
        // Adds the given data to all the servers.
-       void set_header(const std::string &stream_id, const std::string &header);
+       void set_header(const std::string &stream_id,
+                       const std::string &http_header,
+                       const std::string &stream_header);
        void add_data(const std::string &stream_id, const char *data, size_t bytes);
 
        // Connects the given stream to the given mark pool for all the servers.
@@ -39,6 +41,9 @@ public:
        // Changes the given stream's backlog size on all the servers.
        void set_backlog_size(const std::string &stream_id, size_t new_size);
 
+       // Changes the given stream's encoding type on all the servers.
+       void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
+
        // Starts all the servers.
        void run();
 
index 87b028c..5d38a6b 100644 (file)
@@ -16,11 +16,15 @@ message ClientProto {
 
 // Corresponds to struct Stream.
 message StreamProto {
-       optional bytes header = 1;
+       optional bytes http_header = 6;
+       optional bytes stream_header = 7;
        optional bytes data = 2;
        optional int64 backlog_size = 5 [default=1048576];
        optional int64 bytes_received = 3;
        optional string stream_id = 4;
+
+       // Older versions stored the HTTP and video headers together in this field.
+       optional bytes header = 1;
 };
 
 // Corresponds to class Input.
index 2b9a695..e61428a 100644 (file)
@@ -1,3 +1,4 @@
+#include <arpa/inet.h>
 #include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -7,13 +8,15 @@
 
 #include "state.pb.h"
 #include "log.h"
+#include "metacube.h"
 #include "stream.h"
 #include "util.h"
 
 using namespace std;
 
-Stream::Stream(const string &stream_id, size_t backlog_size)
+Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
        : stream_id(stream_id),
+         encoding(encoding),
          data_fd(make_tempfile("")),
           backlog_size(backlog_size),
          bytes_received(0),
@@ -39,7 +42,9 @@ Stream::~Stream()
 
 Stream::Stream(const StreamProto &serialized)
        : stream_id(serialized.stream_id()),
-         header(serialized.header()),
+         http_header(serialized.http_header()),
+         stream_header(serialized.stream_header()),
+         encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
          data_fd(make_tempfile(serialized.data())),
          backlog_size(serialized.backlog_size()),
          bytes_received(serialized.bytes_received()),
@@ -48,12 +53,26 @@ Stream::Stream(const StreamProto &serialized)
        if (data_fd == -1) {
                exit(1);
        }
+
+       // Split old-style headers into HTTP and video headers.
+       if (!serialized.header().empty()) {
+               string header = serialized.header();
+               size_t split = header.find("\r\n\r\n");
+               if (split == string::npos) {
+                       http_header = header;
+                       stream_header = "";
+               } else {
+                       http_header = header.substr(0, split + 2);  // Split off the second \r\n.
+                       stream_header = header.substr(split, string::npos);
+               }
+       }
 }
 
 StreamProto Stream::serialize()
 {
        StreamProto serialized;
-       serialized.set_header(header);
+       serialized.set_http_header(http_header);
+       serialized.set_stream_header(stream_header);
        if (!read_tempfile(data_fd, serialized.mutable_data())) {  // Closes data_fd.
                exit(1);
        }
@@ -105,6 +124,26 @@ void Stream::put_client_to_sleep(Client *client)
 }
 
 void Stream::add_data(const char *data, ssize_t bytes)
+{
+       if (encoding == Stream::STREAM_ENCODING_RAW) {
+               add_data_raw(data, bytes);
+       } else if (encoding == STREAM_ENCODING_METACUBE) {
+               metacube_block_header hdr;
+               memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+               hdr.size = htonl(bytes);
+               hdr.flags = htonl(0);
+
+               char *block = new char[bytes + sizeof(hdr)];
+               memcpy(block, &hdr, sizeof(hdr));
+               memcpy(block + sizeof(hdr), data, bytes);
+               add_data_raw(block, bytes + sizeof(hdr));
+               delete[] block;
+       } else {
+               assert(false);
+       }
+}
+
+void Stream::add_data_raw(const char *data, ssize_t bytes)
 {
        size_t pos = bytes_received % backlog_size;
        bytes_received += bytes;
index a4aa642..22a162d 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -15,7 +15,10 @@ class StreamProto;
 struct Client;
 
 struct Stream {
-       Stream(const std::string &stream_id, size_t backlog_size);
+       // Must be in sync with StreamConfig::Encoding.
+       enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
+
+       Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding);
        ~Stream();
 
        // Serialization/deserialization.
@@ -27,8 +30,15 @@ struct Stream {
 
        std::string stream_id;
 
-       // The HTTP response header, plus the video stream header (if any).
-       std::string header;
+       // The HTTP response header, without the trailing double newline.
+       std::string http_header;
+
+       // The video stream header (if any).
+       std::string stream_header;
+
+       // What encoding we apply to the outgoing data (usually raw, but can also
+       // be Metacube, for reflecting to another Cubemap instance).
+       Encoding encoding;
 
        // The stream data itself, stored in a circular buffer.
        //
@@ -73,6 +83,8 @@ struct Stream {
 
 private:
        Stream(const Stream& other);
+
+       void add_data_raw(const char *data, ssize_t bytes);
 };
 
 #endif  // !defined(_STREAM_H)
index bd83381..ae4f288 100644 (file)
@@ -70,10 +70,9 @@ void UDPInput::construct_header()
                "HTTP/1.0 200 OK\r\n"
                "Content-type: application/octet-stream\r\n"
                "Cache-control: no-cache\r\n"
-               "Server: " SERVER_IDENTIFICATION "\r\n"
-               "\r\n";
+               "Server: " SERVER_IDENTIFICATION "\r\n";
        for (size_t i = 0; i < stream_ids.size(); ++i) {
-               servers->set_header(stream_ids[i], header);
+               servers->set_header(stream_ids[i], header, "");
        }
 }