From 8f44468bfe4a1d1607b0ab7044c3071605ae1fa7 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 18 Apr 2013 01:02:28 +0200 Subject: [PATCH] Support Metacube _output_. Required splitting HTTP headers from stream headers, which was a good thing anyway, but a bit painful in upgrades. --- config.cpp | 12 ++++++++++++ config.h | 1 + cubemap.config.sample | 1 + httpinput.cpp | 14 ++++++++++---- main.cpp | 6 +++++- server.cpp | 42 +++++++++++++++++++++++++++++++++------- server.h | 8 ++++++-- serverpool.cpp | 21 +++++++++++++------- serverpool.h | 9 +++++++-- state.proto | 6 +++++- stream.cpp | 45 ++++++++++++++++++++++++++++++++++++++++--- stream.h | 18 ++++++++++++++--- udpinput.cpp | 5 ++--- 13 files changed, 155 insertions(+), 33 deletions(-) diff --git a/config.cpp b/config.cpp index fad29a5..c5547bd 100644 --- a/config.cpp +++ b/config.cpp @@ -207,6 +207,18 @@ bool parse_stream(const ConfigLine &line, Config *config) stream.backlog_size = atoi(backlog_it->second.c_str()); } + // Parse encoding. + map::const_iterator encoding_parm_it = line.parameters.find("encoding"); + if (encoding_parm_it == line.parameters.end() || + encoding_parm_it->second == "raw") { + stream.encoding = StreamConfig::STREAM_ENCODING_RAW; + } else if (encoding_parm_it->second == "metacube") { + stream.encoding = StreamConfig::STREAM_ENCODING_METACUBE; + } else { + log(ERROR, "Parameter 'encoding' must be either 'raw' (default) or 'metacube'"); + return false; + } + // Parse marks, if so desired. map::const_iterator mark_parm_it = line.parameters.find("mark"); if (mark_parm_it == line.parameters.end()) { diff --git a/config.h b/config.h index 47dabe9..5395d5a 100644 --- a/config.h +++ b/config.h @@ -16,6 +16,7 @@ struct StreamConfig { std::string src; // Can be empty. size_t backlog_size; int mark_pool; // -1 for none. + enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding; }; struct AcceptorConfig { diff --git a/cubemap.config.sample b/cubemap.config.sample index 8504d2a..f3e2e0c 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -26,4 +26,5 @@ error_log type=console # now the streams! # stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000 +stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube stream /udp.ts src=udp://@:1234 backlog_size=1048576 diff --git a/httpinput.cpp b/httpinput.cpp index 51b4f41..457c216 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -50,6 +50,13 @@ HTTPInput::HTTPInput(const InputProto &serialized) string protocol; parse_url(url, &protocol, &host, &port, &path); // Don't care if it fails. + + // Older versions stored the extra \r\n in the HTTP header. + // Strip it if we find it. + if (http_header.size() >= 4 && + memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) { + http_header.resize(http_header.size() - 2); + } } void HTTPInput::close_socket() @@ -200,10 +207,9 @@ bool HTTPInput::parse_response(const std::string &request) ++it) { http_header.append(it->first + ": " + it->second + "\r\n"); } - http_header.append("\r\n"); for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header); + servers->set_header(stream_ids[i], http_header, ""); } return true; @@ -237,7 +243,7 @@ void HTTPInput::do_work() response.clear(); pending_data.clear(); for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], ""); + servers->set_header(stream_ids[i], "", ""); } { @@ -444,7 +450,7 @@ void HTTPInput::process_data(char *ptr, size_t bytes) if (flags & METACUBE_FLAGS_HEADER) { string header(inner_data, inner_data + size); for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header + header); + servers->set_header(stream_ids[i], http_header, header); } } else { for (size_t i = 0; i < stream_ids.size(); ++i) { diff --git a/main.cpp b/main.cpp index fa63549..b45713d 100644 --- a/main.cpp +++ b/main.cpp @@ -143,9 +143,13 @@ void create_streams(const Config &config, for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; if (deserialized_stream_ids.count(stream_config.stream_id) == 0) { - servers->add_stream(stream_config.stream_id, stream_config.backlog_size); + servers->add_stream(stream_config.stream_id, + stream_config.backlog_size, + Stream::Encoding(stream_config.encoding)); } else { servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size); + servers->set_encoding(stream_config.stream_id, + Stream::Encoding(stream_config.encoding)); } expecting_stream_ids.erase(stream_config.stream_id); diff --git a/server.cpp b/server.cpp index 8c3382d..e4ea647 100644 --- a/server.cpp +++ b/server.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -17,6 +18,7 @@ #include "accesslog.h" #include "log.h" #include "markpool.h" +#include "metacube.h" #include "mutexlock.h" #include "parse.h" #include "server.h" @@ -194,10 +196,10 @@ void Server::add_client_from_serialized(const ClientProto &client) } } -void Server::add_stream(const string &stream_id, size_t backlog_size) +void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding) { MutexLock lock(&mutex); - streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size))); + streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding))); } void Server::add_stream_from_serialized(const StreamProto &stream) @@ -206,17 +208,25 @@ void Server::add_stream_from_serialized(const StreamProto &stream) streams.insert(make_pair(stream.stream_id(), new Stream(stream))); } -void Server::set_backlog_size(const std::string &stream_id, size_t new_size) +void Server::set_backlog_size(const string &stream_id, size_t new_size) { MutexLock lock(&mutex); assert(streams.count(stream_id) != 0); streams[stream_id]->set_backlog_size(new_size); } -void Server::set_header(const string &stream_id, const string &header) +void Server::set_encoding(const string &stream_id, Stream::Encoding encoding) { MutexLock lock(&mutex); - find_stream(stream_id)->header = header; + assert(streams.count(stream_id) != 0); + streams[stream_id]->encoding = encoding; +} + +void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header) +{ + MutexLock lock(&mutex); + find_stream(stream_id)->http_header = http_header; + find_stream(stream_id)->stream_header = stream_header; // If there are clients we haven't sent anything to yet, we should give // them the header, so push back into the SENDING_HEADER state. @@ -231,7 +241,7 @@ void Server::set_header(const string &stream_id, const string &header) } } -void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool) +void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool) { MutexLock lock(&mutex); assert(clients.empty()); @@ -454,7 +464,25 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - client->header_or_error = find_stream(client->stream_id)->header; + Stream *stream = find_stream(client->stream_id); + if (stream->encoding == Stream::STREAM_ENCODING_RAW) { + client->header_or_error = stream->http_header + + "\r\n" + + stream->stream_header; + } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) { + metacube_block_header hdr; + memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(stream->stream_header.size()); + hdr.flags = htonl(METACUBE_FLAGS_HEADER); + + client->header_or_error = stream->http_header + + "Content-encoding: metacube\r\n" + + "\r\n" + + string(reinterpret_cast(&hdr), sizeof(hdr)) + + stream->stream_header; + } else { + assert(false); + } // Switch states. client->state = Client::SENDING_HEADER; diff --git a/server.h b/server.h index 2d227b6..523f5c5 100644 --- a/server.h +++ b/server.h @@ -12,6 +12,7 @@ #include #include "client.h" +#include "stream.h" #include "thread.h" class ClientProto; @@ -34,7 +35,9 @@ public: std::vector get_client_stats() const; // Set header (both HTTP header and any stream headers) for the given stream. - void set_header(const std::string &stream_id, const std::string &header); + void set_header(const std::string &stream_id, + const std::string &http_header, + const std::string &stream_header); // Set that the given stream should use the given mark pool from now on. // NOTE: This should be set before any clients are connected! @@ -51,9 +54,10 @@ public: // at the same time). CubemapStateProto serialize(); void add_client_from_serialized(const ClientProto &client); - void add_stream(const std::string &stream_id, size_t bytes_received); + void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding); void add_stream_from_serialized(const StreamProto &stream); void set_backlog_size(const std::string &stream_id, size_t new_size); + void set_encoding(const std::string &stream_id, Stream::Encoding encoding); private: // Mutex protecting queued_data only. Note that if you want to hold both this diff --git a/serverpool.cpp b/serverpool.cpp index 352f719..60a913d 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -48,10 +48,10 @@ void ServerPool::add_client_from_serialized(const ClientProto &client) servers[clients_added++ % num_servers].add_client_from_serialized(client); } -void ServerPool::add_stream(const std::string &stream_id, size_t backlog_size) +void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding) { for (int i = 0; i < num_servers; ++i) { - servers[i].add_stream(stream_id, backlog_size); + servers[i].add_stream(stream_id, backlog_size, encoding); } } @@ -62,14 +62,14 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream) } } -void ServerPool::set_header(const std::string &stream_id, const std::string &header) +void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header) { for (int i = 0; i < num_servers; ++i) { - servers[i].set_header(stream_id, header); + servers[i].set_header(stream_id, http_header, stream_header); } } -void ServerPool::add_data(const std::string &stream_id, const char *data, size_t bytes) +void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes) { for (int i = 0; i < num_servers; ++i) { servers[i].add_data_deferred(stream_id, data, bytes); @@ -100,16 +100,23 @@ vector ServerPool::get_client_stats() const return ret; } -void ServerPool::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool) +void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool) { for (int i = 0; i < num_servers; ++i) { servers[i].set_mark_pool(stream_id, mark_pool); } } -void ServerPool::set_backlog_size(const std::string &stream_id, size_t new_size) +void ServerPool::set_backlog_size(const string &stream_id, size_t new_size) { for (int i = 0; i < num_servers; ++i) { servers[i].set_backlog_size(stream_id, new_size); } } + +void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding) +{ + for (int i = 0; i < num_servers; ++i) { + servers[i].set_encoding(stream_id, encoding); + } +} diff --git a/serverpool.h b/serverpool.h index ba60d7c..84a201f 100644 --- a/serverpool.h +++ b/serverpool.h @@ -26,11 +26,13 @@ public: void add_client_from_serialized(const ClientProto &client); // Adds the given stream to all the servers. - void add_stream(const std::string &stream_id, size_t backlog_size); + void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding); void add_stream_from_serialized(const StreamProto &stream); // Adds the given data to all the servers. - void set_header(const std::string &stream_id, const std::string &header); + void set_header(const std::string &stream_id, + const std::string &http_header, + const std::string &stream_header); void add_data(const std::string &stream_id, const char *data, size_t bytes); // Connects the given stream to the given mark pool for all the servers. @@ -39,6 +41,9 @@ public: // Changes the given stream's backlog size on all the servers. void set_backlog_size(const std::string &stream_id, size_t new_size); + // Changes the given stream's encoding type on all the servers. + void set_encoding(const std::string &stream_id, Stream::Encoding encoding); + // Starts all the servers. void run(); diff --git a/state.proto b/state.proto index 87b028c..5d38a6b 100644 --- a/state.proto +++ b/state.proto @@ -16,11 +16,15 @@ message ClientProto { // Corresponds to struct Stream. message StreamProto { - optional bytes header = 1; + optional bytes http_header = 6; + optional bytes stream_header = 7; optional bytes data = 2; optional int64 backlog_size = 5 [default=1048576]; optional int64 bytes_received = 3; optional string stream_id = 4; + + // Older versions stored the HTTP and video headers together in this field. + optional bytes header = 1; }; // Corresponds to class Input. diff --git a/stream.cpp b/stream.cpp index 2b9a695..e61428a 100644 --- a/stream.cpp +++ b/stream.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -7,13 +8,15 @@ #include "state.pb.h" #include "log.h" +#include "metacube.h" #include "stream.h" #include "util.h" using namespace std; -Stream::Stream(const string &stream_id, size_t backlog_size) +Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding) : stream_id(stream_id), + encoding(encoding), data_fd(make_tempfile("")), backlog_size(backlog_size), bytes_received(0), @@ -39,7 +42,9 @@ Stream::~Stream() Stream::Stream(const StreamProto &serialized) : stream_id(serialized.stream_id()), - header(serialized.header()), + http_header(serialized.http_header()), + stream_header(serialized.stream_header()), + encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later. data_fd(make_tempfile(serialized.data())), backlog_size(serialized.backlog_size()), bytes_received(serialized.bytes_received()), @@ -48,12 +53,26 @@ Stream::Stream(const StreamProto &serialized) if (data_fd == -1) { exit(1); } + + // Split old-style headers into HTTP and video headers. + if (!serialized.header().empty()) { + string header = serialized.header(); + size_t split = header.find("\r\n\r\n"); + if (split == string::npos) { + http_header = header; + stream_header = ""; + } else { + http_header = header.substr(0, split + 2); // Split off the second \r\n. + stream_header = header.substr(split, string::npos); + } + } } StreamProto Stream::serialize() { StreamProto serialized; - serialized.set_header(header); + serialized.set_http_header(http_header); + serialized.set_stream_header(stream_header); if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd. exit(1); } @@ -105,6 +124,26 @@ void Stream::put_client_to_sleep(Client *client) } void Stream::add_data(const char *data, ssize_t bytes) +{ + if (encoding == Stream::STREAM_ENCODING_RAW) { + add_data_raw(data, bytes); + } else if (encoding == STREAM_ENCODING_METACUBE) { + metacube_block_header hdr; + memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(bytes); + hdr.flags = htonl(0); + + char *block = new char[bytes + sizeof(hdr)]; + memcpy(block, &hdr, sizeof(hdr)); + memcpy(block + sizeof(hdr), data, bytes); + add_data_raw(block, bytes + sizeof(hdr)); + delete[] block; + } else { + assert(false); + } +} + +void Stream::add_data_raw(const char *data, ssize_t bytes) { size_t pos = bytes_received % backlog_size; bytes_received += bytes; diff --git a/stream.h b/stream.h index a4aa642..22a162d 100644 --- a/stream.h +++ b/stream.h @@ -15,7 +15,10 @@ class StreamProto; struct Client; struct Stream { - Stream(const std::string &stream_id, size_t backlog_size); + // Must be in sync with StreamConfig::Encoding. + enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE }; + + Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding); ~Stream(); // Serialization/deserialization. @@ -27,8 +30,15 @@ struct Stream { std::string stream_id; - // The HTTP response header, plus the video stream header (if any). - std::string header; + // The HTTP response header, without the trailing double newline. + std::string http_header; + + // The video stream header (if any). + std::string stream_header; + + // What encoding we apply to the outgoing data (usually raw, but can also + // be Metacube, for reflecting to another Cubemap instance). + Encoding encoding; // The stream data itself, stored in a circular buffer. // @@ -73,6 +83,8 @@ struct Stream { private: Stream(const Stream& other); + + void add_data_raw(const char *data, ssize_t bytes); }; #endif // !defined(_STREAM_H) diff --git a/udpinput.cpp b/udpinput.cpp index bd83381..ae4f288 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -70,10 +70,9 @@ void UDPInput::construct_header() "HTTP/1.0 200 OK\r\n" "Content-type: application/octet-stream\r\n" "Cache-control: no-cache\r\n" - "Server: " SERVER_IDENTIFICATION "\r\n" - "\r\n"; + "Server: " SERVER_IDENTIFICATION "\r\n"; for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], header); + servers->set_header(stream_ids[i], header, ""); } } -- 2.39.2