extern ServerPool *servers;
-HTTPInput::HTTPInput(const string &stream_id, const string &url)
+HTTPInput::HTTPInput(const string &url)
: state(NOT_CONNECTED),
- stream_id(stream_id),
url(url),
has_metacube_header(false),
sock(-1)
HTTPInput::HTTPInput(const InputProto &serialized)
: state(State(serialized.state())),
- stream_id(serialized.stream_id()),
url(serialized.url()),
request(serialized.request()),
request_bytes_sent(serialized.request_bytes_sent()),
{
InputProto serialized;
serialized.set_state(state);
- serialized.set_stream_id(stream_id);
serialized.set_url(url);
serialized.set_request(request);
serialized.set_request_bytes_sent(request_bytes_sent);
int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
if (err == -1) {
log(WARNING, "[%s] Lookup of '%s' failed (%s).",
- stream_id.c_str(), host.c_str(), gai_strerror(err));
+ url.c_str(), host.c_str(), gai_strerror(err));
freeaddrinfo(ai);
return -1;
}
// Give the last one as error.
log(WARNING, "[%s] Connect to '%s' failed (%s)",
- stream_id.c_str(), host.c_str(), strerror(errno));
+ url.c_str(), host.c_str(), strerror(errno));
freeaddrinfo(base_ai);
return -1;
}
{
vector<string> lines = split_lines(response);
if (lines.empty()) {
- log(WARNING, "[%s] Empty HTTP response from input.", stream_id.c_str());
+ log(WARNING, "[%s] Empty HTTP response from input.", url.c_str());
return false;
}
vector<string> first_line_tokens = split_tokens(lines[0]);
if (first_line_tokens.size() < 2) {
log(WARNING, "[%s] Malformed response line '%s' from input.",
- stream_id.c_str(), lines[0].c_str());
+ url.c_str(), lines[0].c_str());
return false;
}
int response = atoi(first_line_tokens[1].c_str());
if (response != 200) {
log(WARNING, "[%s] Non-200 response '%s' from input.",
- stream_id.c_str(), lines[0].c_str());
+ url.c_str(), lines[0].c_str());
return false;
}
size_t split = lines[i].find(":");
if (split == string::npos) {
log(WARNING, "[%s] Ignoring malformed HTTP response line '%s'",
- stream_id.c_str(), lines[i].c_str());
+ url.c_str(), lines[i].c_str());
continue;
}
++it) {
http_header.append(it->first + ": " + it->second + "\r\n");
}
- http_header.append("\r\n");
- servers->set_header(stream_id, http_header);
+ http_header.append("\r\n");
+
+ for (size_t i = 0; i < stream_ids.size(); ++i) {
+ servers->set_header(stream_ids[i], http_header);
+ }
return true;
}
request_bytes_sent = 0;
response.clear();
pending_data.clear();
- servers->set_header(stream_id, "");
+ for (size_t i = 0; i < stream_ids.size(); ++i) {
+ servers->set_header(stream_ids[i], "");
+ }
{
string protocol; // Thrown away.
if (!parse_url(url, &protocol, &host, &port, &path)) {
- log(WARNING, "[%s] Failed to parse URL '%s'", stream_id.c_str(), url.c_str());
+ log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str());
break;
}
}
if (ret == 0) {
// This really shouldn't happen...
log(ERROR, "[%s] Socket unexpectedly closed while reading header",
- stream_id.c_str());
+ url.c_str());
state = CLOSING_SOCKET;
continue;
}
RequestParseStatus status = wait_for_double_newline(&response, buf, ret);
if (status == RP_OUT_OF_SPACE) {
- log(WARNING, "[%s] Sever sent overlong HTTP response!", stream_id.c_str());
+ log(WARNING, "[%s] Sever sent overlong HTTP response!", url.c_str());
state = CLOSING_SOCKET;
continue;
} else if (status == RP_NOT_FINISHED_YET) {
}
log(INFO, "[%s] Connected to '%s', receiving data.",
- stream_id.c_str(), url.c_str());
+ url.c_str(), url.c_str());
state = RECEIVING_DATA;
break;
}
if (ret == 0) {
// This really shouldn't happen...
log(ERROR, "[%s] Socket unexpectedly closed while reading header",
- stream_id.c_str());
+ url.c_str());
state = CLOSING_SOCKET;
continue;
}
// or the connection just got closed.
// The earlier steps have already given the error message, if any.
if (state == NOT_CONNECTED && !should_stop) {
- log(INFO, "[%s] Waiting 0.2 second and restarting...", stream_id.c_str());
+ log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
usleep(200000);
}
}
char *inner_data = pending_data.data() + sizeof(metacube_block_header);
if (flags & METACUBE_FLAGS_HEADER) {
string header(inner_data, inner_data + size);
- servers->set_header(stream_id, http_header + header);
+ for (size_t i = 0; i < stream_ids.size(); ++i) {
+ servers->set_header(stream_ids[i], http_header + header);
+ }
} else {
- servers->add_data(stream_id, inner_data, size);
+ for (size_t i = 0; i < stream_ids.size(); ++i) {
+ servers->add_data(stream_ids[i], inner_data, size);
+ }
}
// Consume the block. This isn't the most efficient way of dealing with things
return;
}
log(WARNING, "[%s] Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?",
- stream_id.c_str(), (long long)num_bytes);
+ url.c_str(), (long long)num_bytes);
pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
}
class HTTPInput : public Input {
public:
- HTTPInput(const std::string &stream_id, const std::string &url);
+ HTTPInput(const std::string &url);
// Serialization/deserialization.
HTTPInput(const InputProto &serialized);
virtual std::string get_url() const { return url; }
+ virtual void add_destination(const std::string &stream_id)
+ {
+ stream_ids.push_back(stream_id);
+ }
+
private:
// Actually does the download.
virtual void do_work();
};
State state;
- std::string stream_id;
+ std::vector<std::string> stream_ids;
// The URL and its parsed components.
std::string url;
return true;
}
-Input *create_input(const std::string &stream_id, const std::string &url)
+Input *create_input(const std::string &url)
{
string protocol, host, port, path;
if (!parse_url(url, &protocol, &host, &port, &path)) {
return NULL;
}
if (protocol == "http") {
- return new HTTPInput(stream_id, url);
+ return new HTTPInput(url);
}
if (protocol == "udp") {
- return new UDPInput(stream_id, url);
+ return new UDPInput(url);
}
return NULL;
}
// Figure out the right type of input based on the URL, and create a new Input of the right type.
// Will return NULL if unknown.
-Input *create_input(const std::string &stream_id, const std::string &url);
+Input *create_input(const std::string &url);
Input *create_input(const InputProto &serialized);
class Input : public Thread {
virtual InputProto serialize() const = 0;
virtual std::string get_url() const = 0;
virtual void close_socket() = 0;
+ virtual void add_destination(const std::string &stream_id) = 0;
};
#endif // !defined(_INPUT_H)
volatile bool hupped = false;
volatile bool stopped = false;
+struct InputWithRefcount {
+ Input *input;
+ int refcount;
+};
+
void hup(int signum)
{
hupped = true;
CubemapStateProto collect_state(const timeval &serialize_start,
const vector<Acceptor *> acceptors,
- const vector<Input *> inputs,
+ const multimap<string, InputWithRefcount> inputs,
ServerPool *servers)
{
CubemapStateProto state = servers->serialize(); // Fills streams() and clients().
state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
}
- for (size_t i = 0; i < inputs.size(); ++i) {
- state.add_inputs()->MergeFrom(inputs[i]->serialize());
+ for (multimap<string, InputWithRefcount>::const_iterator input_it = inputs.begin();
+ input_it != inputs.end();
+ ++input_it) {
+ state.add_inputs()->MergeFrom(input_it->second.input->serialize());
}
return state;
}
// Find all streams in the configuration file, and create inputs for them.
-vector<Input *> create_inputs(const Config &config,
- map<string, Input *> *deserialized_inputs)
+void create_config_inputs(const Config &config, multimap<string, InputWithRefcount> *inputs)
{
- vector<Input *> inputs;
for (unsigned i = 0; i < config.streams.size(); ++i) {
const StreamConfig &stream_config = config.streams[i];
if (stream_config.src.empty()) {
continue;
}
- string stream_id = stream_config.stream_id;
string src = stream_config.src;
-
- Input *input = NULL;
- map<string, Input *>::iterator deserialized_input_it =
- deserialized_inputs->find(stream_id);
- if (deserialized_input_it != deserialized_inputs->end()) {
- input = deserialized_input_it->second;
- if (input->get_url() != src) {
- log(INFO, "Stream '%s' has changed URL from '%s' to '%s', restarting input.",
- stream_id.c_str(), input->get_url().c_str(), src.c_str());
- input->close_socket();
- delete input;
- input = NULL;
- }
- deserialized_inputs->erase(deserialized_input_it);
+ if (inputs->count(src) != 0) {
+ continue;
}
- if (input == NULL) {
- input = create_input(stream_id, src);
- if (input == NULL) {
- log(ERROR, "did not understand URL '%s', clients will not get any data.",
- src.c_str());
- continue;
- }
+
+ InputWithRefcount iwr;
+ iwr.input = create_input(src);
+ if (iwr.input == NULL) {
+ log(ERROR, "did not understand URL '%s', clients will not get any data.",
+ src.c_str());
+ continue;
}
- input->run();
- inputs.push_back(input);
+ iwr.refcount = 0;
+ inputs->insert(make_pair(src, iwr));
}
- return inputs;
}
void create_streams(const Config &config,
const set<string> &deserialized_stream_ids,
- map<string, Input *> *deserialized_inputs)
+ multimap<string, InputWithRefcount> *inputs)
{
for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
const MarkPoolConfig &mp_config = config.mark_pools[i];
servers->set_mark_pool(stream_config.stream_id,
mark_pools[stream_config.mark_pool]);
}
+
+ string src = stream_config.src;
+ if (!src.empty()) {
+ multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
+ assert(input_it != inputs->end());
+ input_it->second.input->add_destination(stream_config.stream_id);
+ ++input_it->second.refcount;
+ }
}
// Warn about any servers we've lost.
log(WARNING, "stream '%s' disappeared from the configuration file. "
"It will not be deleted, but clients will not get any new inputs.",
stream_id.c_str());
- if (deserialized_inputs->count(stream_id) != 0) {
- delete (*deserialized_inputs)[stream_id];
- deserialized_inputs->erase(stream_id);
- }
}
}
CubemapStateProto loaded_state;
struct timeval serialize_start;
set<string> deserialized_stream_ids;
- map<string, Input *> deserialized_inputs;
map<int, Acceptor *> deserialized_acceptors;
+ multimap<string, InputWithRefcount> inputs; // multimap due to older versions without deduplication.
if (state_fd != -1) {
log(INFO, "Deserializing state from previous process...");
string serialized;
deserialized_stream_ids.insert(loaded_state.streams(i).stream_id());
}
- // Deserialize the inputs. Note that we don't actually add them to any state yet.
+ // Deserialize the inputs. Note that we don't actually add them to any stream yet.
for (int i = 0; i < loaded_state.inputs_size(); ++i) {
- deserialized_inputs.insert(make_pair(
- loaded_state.inputs(i).stream_id(),
- create_input(loaded_state.inputs(i))));
+ InputWithRefcount iwr;
+ iwr.input = create_input(loaded_state.inputs(i));
+ iwr.refcount = 0;
+ inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr));
}
// Deserialize the acceptors.
log(INFO, "Deserialization done.");
}
- // Find all streams in the configuration file, and create them.
- create_streams(config, deserialized_stream_ids, &deserialized_inputs);
-
- vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
- vector<Input *> inputs = create_inputs(config, &deserialized_inputs);
+ // Add any new inputs coming from the config.
+ create_config_inputs(config, &inputs);
- // All deserialized inputs should now have been taken care of, one way or the other.
- assert(deserialized_inputs.empty());
+ // Find all streams in the configuration file, create them, and connect to the inputs.
+ create_streams(config, deserialized_stream_ids, &inputs);
+ vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
// Put back the existing clients. It doesn't matter which server we
// allocate them to, so just do round-robin. However, we need to add
servers->run();
+ // Now delete all inputs that are longer in use, and start the others.
+ for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+ input_it != inputs.end(); ) {
+ if (input_it->second.refcount == 0) {
+ log(WARNING, "Input '%s' no longer in use, closing.",
+ input_it->first.c_str());
+ delete input_it->second.input;
+ inputs.erase(input_it++);
+ } else {
+ input_it->second.input->run();
+ ++input_it;
+ }
+ }
+
// Start writing statistics.
StatsThread *stats_thread = NULL;
if (!config.stats_file.empty()) {
for (size_t i = 0; i < acceptors.size(); ++i) {
acceptors[i]->stop();
}
- for (size_t i = 0; i < inputs.size(); ++i) {
- inputs[i]->stop();
+ for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
+ input_it != inputs.end();
+ ++input_it) {
+ input_it->second.input->stop();
}
servers->stop();
// Corresponds to class Input.
message InputProto {
optional int32 state = 1;
- optional string stream_id = 2;
optional string url = 3;
optional bytes request = 4;
optional int32 request_bytes_sent = 5;
extern ServerPool *servers;
-UDPInput::UDPInput(const string &stream_id, const string &url)
- : stream_id(stream_id),
- url(url),
+UDPInput::UDPInput(const string &url)
+ : url(url),
sock(-1)
{
// Should be verified by the caller.
}
UDPInput::UDPInput(const InputProto &serialized)
- : stream_id(serialized.stream_id()),
- url(serialized.url()),
+ : url(serialized.url()),
sock(serialized.sock())
{
// Should be verified by the caller.
InputProto UDPInput::serialize() const
{
InputProto serialized;
- serialized.set_stream_id(stream_id);
serialized.set_url(url);
serialized.set_sock(sock);
return serialized;
"Cache-control: no-cache\r\n"
"Server: " SERVER_IDENTIFICATION "\r\n"
"\r\n";
- servers->set_header(stream_id, header);
+ for (size_t i = 0; i < stream_ids.size(); ++i) {
+ servers->set_header(stream_ids[i], header);
+ }
}
void UDPInput::do_work()
sock = create_server_socket(port_num, UDP_SOCKET);
if (sock == -1) {
log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
- stream_id.c_str());
+ url.c_str());
usleep(200000);
continue;
}
continue;
}
- servers->add_data(stream_id, buf, ret);
+ for (size_t i = 0; i < stream_ids.size(); ++i) {
+ servers->add_data(stream_ids[i], buf, ret);
+ }
}
}
class UDPInput : public Input {
public:
- UDPInput(const std::string &stream_id, const std::string &url);
+ UDPInput(const std::string &url);
// Serialization/deserialization.
UDPInput(const InputProto &serialized);
virtual std::string get_url() const { return url; }
virtual void close_socket();
+ virtual void add_destination(const std::string &stream_id)
+ {
+ stream_ids.push_back(stream_id);
+ }
+
private:
// Actually gets the packets.
virtual void do_work();
// Create the HTTP header.
void construct_header();
- std::string stream_id;
+ std::vector<std::string> stream_ids;
// The URL and its parsed components.
std::string url;