stream.backlog_size = atoi(backlog_it->second.c_str());
}
+ // Parse encoding.
+ map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
+ if (encoding_parm_it == line.parameters.end() ||
+ encoding_parm_it->second == "raw") {
+ stream.encoding = StreamConfig::STREAM_ENCODING_RAW;
+ } else if (encoding_parm_it->second == "metacube") {
+ stream.encoding = StreamConfig::STREAM_ENCODING_METACUBE;
+ } else {
+ log(ERROR, "Parameter 'encoding' must be either 'raw' (default) or 'metacube'");
+ return false;
+ }
+
// Parse marks, if so desired.
map<string, string>::const_iterator mark_parm_it = line.parameters.find("mark");
if (mark_parm_it == line.parameters.end()) {
std::string src; // Can be empty.
size_t backlog_size;
int mark_pool; // -1 for none.
+ enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
};
struct AcceptorConfig {
# now the streams!
#
stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
+stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
stream /udp.ts src=udp://@:1234 backlog_size=1048576
string protocol;
parse_url(url, &protocol, &host, &port, &path); // Don't care if it fails.
+
+ // Older versions stored the extra \r\n in the HTTP header.
+ // Strip it if we find it.
+ if (http_header.size() >= 4 &&
+ memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) {
+ http_header.resize(http_header.size() - 2);
+ }
}
void HTTPInput::close_socket()
++it) {
http_header.append(it->first + ": " + it->second + "\r\n");
}
- http_header.append("\r\n");
for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header);
+ servers->set_header(stream_ids[i], http_header, "");
}
return true;
response.clear();
pending_data.clear();
for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], "");
+ servers->set_header(stream_ids[i], "", "");
}
{
if (flags & METACUBE_FLAGS_HEADER) {
string header(inner_data, inner_data + size);
for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header + header);
+ servers->set_header(stream_ids[i], http_header, header);
}
} else {
for (size_t i = 0; i < stream_ids.size(); ++i) {
for (unsigned i = 0; i < config.streams.size(); ++i) {
const StreamConfig &stream_config = config.streams[i];
if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
- servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
+ servers->add_stream(stream_config.stream_id,
+ stream_config.backlog_size,
+ Stream::Encoding(stream_config.encoding));
} else {
servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
+ servers->set_encoding(stream_config.stream_id,
+ Stream::Encoding(stream_config.encoding));
}
expecting_stream_ids.erase(stream_config.stream_id);
+#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include "accesslog.h"
#include "log.h"
#include "markpool.h"
+#include "metacube.h"
#include "mutexlock.h"
#include "parse.h"
#include "server.h"
}
}
-void Server::add_stream(const string &stream_id, size_t backlog_size)
+void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size)));
+ streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
}
void Server::add_stream_from_serialized(const StreamProto &stream)
streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
}
-void Server::set_backlog_size(const std::string &stream_id, size_t new_size)
+void Server::set_backlog_size(const string &stream_id, size_t new_size)
{
MutexLock lock(&mutex);
assert(streams.count(stream_id) != 0);
streams[stream_id]->set_backlog_size(new_size);
}
-void Server::set_header(const string &stream_id, const string &header)
+void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- find_stream(stream_id)->header = header;
+ assert(streams.count(stream_id) != 0);
+ streams[stream_id]->encoding = encoding;
+}
+
+void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+{
+ MutexLock lock(&mutex);
+ find_stream(stream_id)->http_header = http_header;
+ find_stream(stream_id)->stream_header = stream_header;
// If there are clients we haven't sent anything to yet, we should give
// them the header, so push back into the SENDING_HEADER state.
}
}
-void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
{
MutexLock lock(&mutex);
assert(clients.empty());
void Server::construct_header(Client *client)
{
- client->header_or_error = find_stream(client->stream_id)->header;
+ Stream *stream = find_stream(client->stream_id);
+ if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
+ client->header_or_error = stream->http_header +
+ "\r\n" +
+ stream->stream_header;
+ } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+ metacube_block_header hdr;
+ memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(stream->stream_header.size());
+ hdr.flags = htonl(METACUBE_FLAGS_HEADER);
+
+ client->header_or_error = stream->http_header +
+ "Content-encoding: metacube\r\n" +
+ "\r\n" +
+ string(reinterpret_cast<char *>(&hdr), sizeof(hdr)) +
+ stream->stream_header;
+ } else {
+ assert(false);
+ }
// Switch states.
client->state = Client::SENDING_HEADER;
#include <vector>
#include "client.h"
+#include "stream.h"
#include "thread.h"
class ClientProto;
std::vector<ClientStats> get_client_stats() const;
// Set header (both HTTP header and any stream headers) for the given stream.
- void set_header(const std::string &stream_id, const std::string &header);
+ void set_header(const std::string &stream_id,
+ const std::string &http_header,
+ const std::string &stream_header);
// Set that the given stream should use the given mark pool from now on.
// NOTE: This should be set before any clients are connected!
// at the same time).
CubemapStateProto serialize();
void add_client_from_serialized(const ClientProto &client);
- void add_stream(const std::string &stream_id, size_t bytes_received);
+ void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding);
void add_stream_from_serialized(const StreamProto &stream);
void set_backlog_size(const std::string &stream_id, size_t new_size);
+ void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
private:
// Mutex protecting queued_data only. Note that if you want to hold both this
servers[clients_added++ % num_servers].add_client_from_serialized(client);
}
-void ServerPool::add_stream(const std::string &stream_id, size_t backlog_size)
+void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
{
for (int i = 0; i < num_servers; ++i) {
- servers[i].add_stream(stream_id, backlog_size);
+ servers[i].add_stream(stream_id, backlog_size, encoding);
}
}
}
}
-void ServerPool::set_header(const std::string &stream_id, const std::string &header)
+void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
{
for (int i = 0; i < num_servers; ++i) {
- servers[i].set_header(stream_id, header);
+ servers[i].set_header(stream_id, http_header, stream_header);
}
}
-void ServerPool::add_data(const std::string &stream_id, const char *data, size_t bytes)
+void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
{
for (int i = 0; i < num_servers; ++i) {
servers[i].add_data_deferred(stream_id, data, bytes);
return ret;
}
-void ServerPool::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool)
+void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
{
for (int i = 0; i < num_servers; ++i) {
servers[i].set_mark_pool(stream_id, mark_pool);
}
}
-void ServerPool::set_backlog_size(const std::string &stream_id, size_t new_size)
+void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
{
for (int i = 0; i < num_servers; ++i) {
servers[i].set_backlog_size(stream_id, new_size);
}
}
+
+void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_encoding(stream_id, encoding);
+ }
+}
void add_client_from_serialized(const ClientProto &client);
// Adds the given stream to all the servers.
- void add_stream(const std::string &stream_id, size_t backlog_size);
+ void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding);
void add_stream_from_serialized(const StreamProto &stream);
// Adds the given data to all the servers.
- void set_header(const std::string &stream_id, const std::string &header);
+ void set_header(const std::string &stream_id,
+ const std::string &http_header,
+ const std::string &stream_header);
void add_data(const std::string &stream_id, const char *data, size_t bytes);
// Connects the given stream to the given mark pool for all the servers.
// Changes the given stream's backlog size on all the servers.
void set_backlog_size(const std::string &stream_id, size_t new_size);
+ // Changes the given stream's encoding type on all the servers.
+ void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
+
// Starts all the servers.
void run();
// Corresponds to struct Stream.
message StreamProto {
- optional bytes header = 1;
+ optional bytes http_header = 6;
+ optional bytes stream_header = 7;
optional bytes data = 2;
optional int64 backlog_size = 5 [default=1048576];
optional int64 bytes_received = 3;
optional string stream_id = 4;
+
+ // Older versions stored the HTTP and video headers together in this field.
+ optional bytes header = 1;
};
// Corresponds to class Input.
+#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include "state.pb.h"
#include "log.h"
+#include "metacube.h"
#include "stream.h"
#include "util.h"
using namespace std;
-Stream::Stream(const string &stream_id, size_t backlog_size)
+Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
: stream_id(stream_id),
+ encoding(encoding),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
bytes_received(0),
Stream::Stream(const StreamProto &serialized)
: stream_id(serialized.stream_id()),
- header(serialized.header()),
+ http_header(serialized.http_header()),
+ stream_header(serialized.stream_header()),
+ encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
data_fd(make_tempfile(serialized.data())),
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
if (data_fd == -1) {
exit(1);
}
+
+ // Split old-style headers into HTTP and video headers.
+ if (!serialized.header().empty()) {
+ string header = serialized.header();
+ size_t split = header.find("\r\n\r\n");
+ if (split == string::npos) {
+ http_header = header;
+ stream_header = "";
+ } else {
+ http_header = header.substr(0, split + 2); // Split off the second \r\n.
+ stream_header = header.substr(split, string::npos);
+ }
+ }
}
StreamProto Stream::serialize()
{
StreamProto serialized;
- serialized.set_header(header);
+ serialized.set_http_header(http_header);
+ serialized.set_stream_header(stream_header);
if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd.
exit(1);
}
}
void Stream::add_data(const char *data, ssize_t bytes)
+{
+ if (encoding == Stream::STREAM_ENCODING_RAW) {
+ add_data_raw(data, bytes);
+ } else if (encoding == STREAM_ENCODING_METACUBE) {
+ metacube_block_header hdr;
+ memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(bytes);
+ hdr.flags = htonl(0);
+
+ char *block = new char[bytes + sizeof(hdr)];
+ memcpy(block, &hdr, sizeof(hdr));
+ memcpy(block + sizeof(hdr), data, bytes);
+ add_data_raw(block, bytes + sizeof(hdr));
+ delete[] block;
+ } else {
+ assert(false);
+ }
+}
+
+void Stream::add_data_raw(const char *data, ssize_t bytes)
{
size_t pos = bytes_received % backlog_size;
bytes_received += bytes;
struct Client;
struct Stream {
- Stream(const std::string &stream_id, size_t backlog_size);
+ // Must be in sync with StreamConfig::Encoding.
+ enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
+
+ Stream(const std::string &stream_id, size_t backlog_size, Encoding encoding);
~Stream();
// Serialization/deserialization.
std::string stream_id;
- // The HTTP response header, plus the video stream header (if any).
- std::string header;
+ // The HTTP response header, without the trailing double newline.
+ std::string http_header;
+
+ // The video stream header (if any).
+ std::string stream_header;
+
+ // What encoding we apply to the outgoing data (usually raw, but can also
+ // be Metacube, for reflecting to another Cubemap instance).
+ Encoding encoding;
// The stream data itself, stored in a circular buffer.
//
private:
Stream(const Stream& other);
+
+ void add_data_raw(const char *data, ssize_t bytes);
};
#endif // !defined(_STREAM_H)
"HTTP/1.0 200 OK\r\n"
"Content-type: application/octet-stream\r\n"
"Cache-control: no-cache\r\n"
- "Server: " SERVER_IDENTIFICATION "\r\n"
- "\r\n";
+ "Server: " SERVER_IDENTIFICATION "\r\n";
for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], header);
+ servers->set_header(stream_ids[i], header, "");
}
}