stream.prebuffering_bytes = atoi(prebuffer_it->second.c_str());
}
- // Parse encoding.
+ // Parse ouptut encoding.
map<string, string>::const_iterator encoding_parm_it = line.parameters.find("encoding");
if (encoding_parm_it == line.parameters.end() ||
encoding_parm_it->second == "raw") {
return false;
}
+ // Parse input encoding.
+ map<string, string>::const_iterator src_encoding_parm_it = line.parameters.find("src_encoding");
+ if (src_encoding_parm_it == line.parameters.end() ||
+ src_encoding_parm_it->second == "metacube") {
+ stream.src_encoding = StreamConfig::STREAM_ENCODING_METACUBE;
+ } else if (src_encoding_parm_it->second == "raw") {
+ stream.src_encoding = StreamConfig::STREAM_ENCODING_RAW;
+ } else {
+ log(ERROR, "Parameter 'src_encoding' must be either 'raw' or 'metacube' (default)");
+ return false;
+ }
+
// Parse the pacing rate, converting from kilobits to bytes as needed.
map<string, string>::const_iterator pacing_rate_it = line.parameters.find("pacing_rate_kbit");
if (pacing_rate_it == line.parameters.end()) {
}
}
+ // Parse input encoding.
+ map<string, string>::const_iterator src_encoding_parm_it = line.parameters.find("src_encoding");
+ if (src_encoding_parm_it == line.parameters.end() ||
+ src_encoding_parm_it->second == "metacube") {
+ udpstream.src_encoding = StreamConfig::STREAM_ENCODING_METACUBE;
+ } else if (src_encoding_parm_it->second == "raw") {
+ udpstream.src_encoding = StreamConfig::STREAM_ENCODING_RAW;
+ } else {
+ log(ERROR, "Parameter 'src_encoding' must be either 'raw' or 'metacube' (default)");
+ return false;
+ }
+
config->udpstreams.push_back(udpstream);
return true;
}
size_t backlog_size;
size_t prebuffering_bytes;
uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
- enum { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE } encoding;
+ enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
+ Encoding encoding;
+ Encoding src_encoding;
};
struct UDPStreamConfig {
uint32_t pacing_rate; // In bytes per second. Default is ~0U (no limit).
int ttl; // Default is -1 (use operating system default).
int multicast_iface_index; // Default is -1 (use operating system default).
+ StreamConfig::Encoding src_encoding;
};
struct Gen204Config {
# for sending on to another Cubemap instance.
stream /test.flv.metacube src=http://gruessi.zrh.sesse.net:4013/test.flv encoding=metacube
+# A stream where the input is _not_ Metacube framed. Note that the stream needs to
+# have no header and be self-synchronizing (like with UDP input below), and most formats
+# are not like this. A typical example, however, is MPEG-TS.
+stream /test.ts src=http://gruessi.zrh.sesse.net:4013/test.ts src_encoding=raw
+
# UDP input. TS is the most common container to use over UDP (you cannot
# take any arbitrary container and expect it to work).
# backlog_size=<number of bytes> overrides the backlog, which is normally 10 MB.
extern ServerPool *servers;
-HTTPInput::HTTPInput(const string &url)
+HTTPInput::HTTPInput(const string &url, Input::Encoding encoding)
: state(NOT_CONNECTED),
url(url),
+ encoding(encoding),
has_metacube_header(false),
sock(-1)
{
HTTPInput::HTTPInput(const InputProto &serialized)
: state(State(serialized.state())),
url(serialized.url()),
+ encoding(serialized.is_metacube_encoded() ?
+ Input::INPUT_ENCODING_METACUBE :
+ Input::INPUT_ENCODING_RAW),
request(serialized.request()),
request_bytes_sent(serialized.request_bytes_sent()),
response(serialized.response()),
serialized.set_bytes_received(stats.bytes_received);
serialized.set_data_bytes_received(stats.data_bytes_received);
serialized.set_connect_time(stats.connect_time);
+ if (encoding == Input::INPUT_ENCODING_METACUBE) {
+ serialized.set_is_metacube_encoded(true);
+ } else {
+ assert(encoding == Input::INPUT_ENCODING_RAW);
+ serialized.set_is_metacube_encoded(false);
+ }
return serialized;
}
process_data(&extra_data[0], extra_data.size());
}
- log(INFO, "[%s] Connected to '%s', receiving data.",
- url.c_str(), url.c_str());
+ if (encoding == Input::INPUT_ENCODING_RAW) {
+ log(INFO, "[%s] Connected to '%s', receiving raw data.",
+ url.c_str(), url.c_str());
+ } else {
+ assert(encoding == Input::INPUT_ENCODING_METACUBE);
+ log(INFO, "[%s] Connected to '%s', receiving data.",
+ url.c_str(), url.c_str());
+ }
state = RECEIVING_DATA;
break;
}
stats.bytes_received += bytes;
}
+ if (encoding == Input::INPUT_ENCODING_RAW) {
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->add_data(stream_indices[i], ptr, bytes, SUITABLE_FOR_STREAM_START);
+ }
+ return;
+ }
+
+ assert(encoding == Input::INPUT_ENCODING_METACUBE);
+
for ( ;; ) {
// If we don't have enough data (yet) for even the Metacube header, just return.
if (pending_data.size() < sizeof(metacube2_block_header)) {
class HTTPInput : public Input {
public:
- HTTPInput(const std::string &url);
+ HTTPInput(const std::string &url, Input::Encoding encoding);
// Serialization/deserialization.
HTTPInput(const InputProto &serialized);
std::string url;
std::string host, port, path;
+ // What the input stream is to be interpreted as (normally Metacube).
+ Input::Encoding encoding;
+
// The HTTP request, with headers and all.
// Only relevant for SENDING_REQUEST.
std::string request;
return true;
}
-Input *create_input(const string &url)
+Input *create_input(const string &url, Input::Encoding encoding)
{
string protocol, user, host, port, path;
if (!parse_url(url, &protocol, &user, &host, &port, &path)) {
return NULL;
}
if (protocol == "http") {
- return new HTTPInput(url);
+ return new HTTPInput(url, encoding);
}
if (protocol == "udp") {
+ if (encoding == Input::INPUT_ENCODING_METACUBE) {
+ return NULL;
+ }
return new UDPInput(url);
}
return NULL;
class Input;
class InputProto;
-// Extremely rudimentary URL parsing.
-bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path);
-
-// Figure out the right type of input based on the URL, and create a new Input of the right type.
-// Will return NULL if unknown.
-Input *create_input(const std::string &url);
-Input *create_input(const InputProto &serialized);
-
// Digested statistics for writing to logs etc.
struct InputStats {
std::string url;
class Input : public Thread {
public:
+ // Must be in sync with StreamConfig::Encoding.
+ enum Encoding { INPUT_ENCODING_RAW = 0, INPUT_ENCODING_METACUBE };
+
virtual ~Input();
virtual InputProto serialize() const = 0;
virtual std::string get_url() const = 0;
virtual InputStats get_stats() const = 0;
};
+// Extremely rudimentary URL parsing.
+bool parse_url(const std::string &url, std::string *protocol, std::string *user, std::string *host, std::string *port, std::string *path);
+
+// Figure out the right type of input based on the URL, and create a new Input of the right type.
+// Will return NULL if unknown.
+Input *create_input(const std::string &url, Input::Encoding encoding);
+Input *create_input(const InputProto &serialized);
+
#endif // !defined(_INPUT_H)
volatile bool hupped = false;
volatile bool stopped = false;
+typedef pair<string, Input::Encoding> InputKey;
+
namespace {
struct OrderByConnectionTime {
CubemapStateProto collect_state(const timespec &serialize_start,
const vector<Acceptor *> acceptors,
- const multimap<string, InputWithRefcount> inputs,
+ const multimap<InputKey, InputWithRefcount> inputs,
ServerPool *servers)
{
CubemapStateProto state = servers->serialize(); // Fills streams() and clients().
state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
}
- for (multimap<string, InputWithRefcount>::const_iterator input_it = inputs.begin();
+ for (multimap<InputKey, InputWithRefcount>::const_iterator input_it = inputs.begin();
input_it != inputs.end();
++input_it) {
state.add_inputs()->MergeFrom(input_it->second.input->serialize());
return acceptors;
}
-void create_config_input(const string &src, multimap<string, InputWithRefcount> *inputs)
+void create_config_input(const string &src, Input::Encoding encoding, multimap<InputKey, InputWithRefcount> *inputs)
{
if (src.empty()) {
return;
}
- if (inputs->count(src) != 0) {
+ InputKey key(src, encoding);
+ if (inputs->count(key) != 0) {
return;
}
InputWithRefcount iwr;
- iwr.input = create_input(src);
+ iwr.input = create_input(src, encoding);
if (iwr.input == NULL) {
- log(ERROR, "did not understand URL '%s', clients will not get any data.",
+ log(ERROR, "did not understand URL '%s' or source encoding was invalid, clients will not get any data.",
src.c_str());
return;
}
iwr.refcount = 0;
- inputs->insert(make_pair(src, iwr));
+ inputs->insert(make_pair(key, iwr));
}
// Find all streams in the configuration file, and create inputs for them.
-void create_config_inputs(const Config &config, multimap<string, InputWithRefcount> *inputs)
+void create_config_inputs(const Config &config, multimap<InputKey, InputWithRefcount> *inputs)
{
for (unsigned i = 0; i < config.streams.size(); ++i) {
const StreamConfig &stream_config = config.streams[i];
if (stream_config.src != "delete") {
- create_config_input(stream_config.src, inputs);
+ create_config_input(stream_config.src, Input::Encoding(stream_config.src_encoding), inputs);
}
}
for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
const UDPStreamConfig &udpstream_config = config.udpstreams[i];
- create_config_input(udpstream_config.src, inputs);
+ create_config_input(udpstream_config.src, Input::Encoding(udpstream_config.src_encoding), inputs);
}
}
void create_streams(const Config &config,
const set<string> &deserialized_urls,
- multimap<string, InputWithRefcount> *inputs)
+ multimap<InputKey, InputWithRefcount> *inputs)
{
// HTTP streams.
set<string> expecting_urls = deserialized_urls;
stream_index = servers->add_stream(stream_config.url,
stream_config.backlog_size,
stream_config.prebuffering_bytes,
- Stream::Encoding(stream_config.encoding));
+ Stream::Encoding(stream_config.encoding),
+ Stream::Encoding(stream_config.src_encoding));
} else {
stream_index = servers->lookup_stream_by_url(stream_config.url);
assert(stream_index != -1);
servers->set_prebuffering_bytes(stream_index, stream_config.prebuffering_bytes);
servers->set_encoding(stream_index,
Stream::Encoding(stream_config.encoding));
+ servers->set_src_encoding(stream_index,
+ Stream::Encoding(stream_config.src_encoding));
}
servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
string src = stream_config.src;
+ Input::Encoding src_encoding = Input::Encoding(stream_config.src_encoding);
if (!src.empty()) {
- multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
+ multimap<InputKey, InputWithRefcount>::iterator input_it = inputs->find(make_pair(src, src_encoding));
if (input_it != inputs->end()) {
input_it->second.input->add_destination(stream_index);
++input_it->second.refcount;
udpstream_config.multicast_iface_index);
string src = udpstream_config.src;
+ Input::Encoding src_encoding = Input::Encoding(udpstream_config.src_encoding);
if (!src.empty()) {
- multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
+ multimap<InputKey, InputWithRefcount>::iterator input_it = inputs->find(make_pair(src, src_encoding));
assert(input_it != inputs->end());
input_it->second.input->add_destination(stream_index);
++input_it->second.refcount;
timespec serialize_start;
set<string> deserialized_urls;
map<sockaddr_in6, Acceptor *, Sockaddr6Compare> deserialized_acceptors;
- multimap<string, InputWithRefcount> inputs; // multimap due to older versions without deduplication.
+ multimap<InputKey, InputWithRefcount> inputs; // multimap due to older versions without deduplication.
if (state_fd != -1) {
log(INFO, "Deserializing state from previous process...");
string serialized;
InputWithRefcount iwr;
iwr.input = create_input(serialized_input);
iwr.refcount = 0;
- inputs.insert(make_pair(serialized_input.url(), iwr));
+
+ Input::Encoding src_encoding = serialized_input.is_metacube_encoded() ?
+ Input::INPUT_ENCODING_METACUBE :
+ Input::INPUT_ENCODING_RAW;
+ InputKey key(serialized_input.url(), src_encoding);
+ inputs.insert(make_pair(key, iwr));
}
// Deserialize the acceptors.
servers->run();
// Now delete all inputs that are longer in use, and start the others.
- for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+ for (multimap<InputKey, InputWithRefcount>::iterator input_it = inputs.begin();
input_it != inputs.end(); ) {
if (input_it->second.refcount == 0) {
- log(WARNING, "Input '%s' no longer in use, closing.",
- input_it->first.c_str());
+ if (input_it->first.second == Input::INPUT_ENCODING_RAW) {
+ log(WARNING, "Raw input '%s' no longer in use, closing.",
+ input_it->first.first.c_str());
+ } else {
+ assert(input_it->first.second == Input::INPUT_ENCODING_METACUBE);
+ log(WARNING, "Metacube input '%s' no longer in use, closing.",
+ input_it->first.first.c_str());
+ }
input_it->second.input->close_socket();
delete input_it->second.input;
inputs.erase(input_it++);
InputStatsThread *input_stats_thread = NULL;
if (!config.input_stats_file.empty()) {
vector<Input*> inputs_no_refcount;
- for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+ for (multimap<InputKey, InputWithRefcount>::iterator input_it = inputs.begin();
input_it != inputs.end(); ++input_it) {
inputs_no_refcount.push_back(input_it->second.input);
}
for (size_t i = 0; i < acceptors.size(); ++i) {
acceptors[i]->stop();
}
- for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+ for (multimap<InputKey, InputWithRefcount>::iterator input_it = inputs.begin();
input_it != inputs.end();
++input_it) {
input_it->second.input->stop();
return stream_url_it->second;
}
-int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
+int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
{
MutexLock lock(&mutex);
stream_url_map.insert(make_pair(url, streams.size()));
- streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
+ streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding));
return streams.size() - 1;
}
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
streams[stream_index]->encoding = encoding;
}
+
+void Server::set_src_encoding(int stream_index, Stream::Encoding encoding)
+{
+ MutexLock lock(&mutex);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->src_encoding = encoding;
+}
void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
// at the same time).
CubemapStateProto serialize();
void add_client_from_serialized(const ClientProto &client);
- int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding);
+ int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
int add_stream_from_serialized(const StreamProto &stream, int data_fd);
int lookup_stream_by_url(const std::string &url) const;
void set_backlog_size(int stream_index, size_t new_size);
void set_prebuffering_bytes(int stream_index, size_t new_amount);
void set_encoding(int stream_index, Stream::Encoding encoding);
+ void set_src_encoding(int stream_index, Stream::Encoding encoding);
void add_gen204(const std::string &url, const std::string &allow_origin);
private:
return servers[0].lookup_stream_by_url(url);
}
-int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
+int ServerPool::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding)
{
// Adding more HTTP streams after UDP streams would cause the UDP stream
// indices to move around, which is obviously not good.
assert(udp_streams.empty());
for (int i = 0; i < num_servers; ++i) {
- int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding);
+ int stream_index = servers[i].add_stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding);
assert(stream_index == num_http_streams);
}
return num_http_streams++;
servers[i].set_encoding(stream_index, encoding);
}
}
+
+void ServerPool::set_src_encoding(int stream_index, Stream::Encoding encoding)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_src_encoding(stream_index, encoding);
+ }
+}
void add_client_from_serialized(const ClientProto &client);
// Adds the given stream to all the servers. Returns the stream index.
- int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding);
+ int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
void delete_stream(const std::string &url);
int add_udpstream(const sockaddr_in6 &dst, int pacing_rate, int ttl, int multicast_iface_index);
// Changes the given stream's amount of forced prebuffering on all the servers.
void set_prebuffering_bytes(int stream_index, size_t new_amount);
- // Changes the given stream's encoding type on all the servers.
+ // Changes the given stream's output encoding type on all the servers.
void set_encoding(int stream_index, Stream::Encoding encoding);
+ // Changes the given stream's input encoding type on all the servers.
+ void set_src_encoding(int stream_index, Stream::Encoding encoding);
+
// Adds the given gen204 endpoint to all the servers.
void add_gen204(const std::string &url, const std::string &allow_origin);
optional int64 bytes_received = 11;
optional int64 data_bytes_received = 12;
optional int64 connect_time = 13;
+ optional bool is_metacube_encoded = 15 [default=true];
};
// Corresponds to class Acceptor.
using namespace std;
-Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding)
+Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding)
: url(url),
encoding(encoding),
+ src_encoding(src_encoding),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
prebuffering_bytes(prebuffering_bytes),
// Must be in sync with StreamConfig::Encoding.
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
- Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding);
+ Stream(const std::string &stream_id, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding);
~Stream();
// Serialization/deserialization.
// be Metacube, for reflecting to another Cubemap instance).
Encoding encoding;
+ // What encoding we expect the incoming data to be in (usually Metacube).
+ Encoding src_encoding;
+
// The stream data itself, stored in a circular buffer.
//
// We store our data in a file, so that we can send the data to the