From: Steinar H. Gunderson Date: Fri, 16 Aug 2013 17:11:03 +0000 (+0200) Subject: Make an option to properly delete streams. X-Git-Tag: 1.0.0~15 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=4ec6a7f04455b023d3ca91d4e84a19993ec4d34e;ds=sidebyside Make an option to properly delete streams. --- diff --git a/main.cpp b/main.cpp index cf728ca..3f36f32 100644 --- a/main.cpp +++ b/main.cpp @@ -135,7 +135,9 @@ void create_config_inputs(const Config &config, multimapadd_stream(stream_config.url, stream_config.backlog_size, @@ -168,7 +179,6 @@ void create_streams(const Config &config, servers->set_encoding(stream_index, Stream::Encoding(stream_config.encoding)); } - expecting_urls.erase(stream_config.url); if (stream_config.mark_pool != -1) { servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); @@ -184,14 +194,14 @@ void create_streams(const Config &config, } } - // Warn about any HTTP servers we've lost. - // TODO: Make an option (delete=yes?) to actually shut down streams. + // Warn about any streams servers we've lost. for (set::const_iterator stream_it = expecting_urls.begin(); stream_it != expecting_urls.end(); ++stream_it) { string url = *stream_it; log(WARNING, "stream '%s' disappeared from the configuration file. " - "It will not be deleted, but clients will not get any new inputs.", + "It will not be deleted, but clients will not get any new inputs. " + "If you really meant to delete it, set src=delete and reload.", url.c_str()); } @@ -269,6 +279,17 @@ bool dry_run_config(const std::string &argv0, const std::string &config_filename return (WIFEXITED(status) && WEXITSTATUS(status) == 0); } +void find_deleted_streams(const Config &config, set *deleted_urls) +{ + for (unsigned i = 0; i < config.streams.size(); ++i) { + const StreamConfig &stream_config = config.streams[i]; + if (stream_config.src == "delete") { + log(INFO, "Deleting stream '%s'.", stream_config.url.c_str()); + deleted_urls->insert(stream_config.url); + } + } +} + int main(int argc, char **argv) { signal(SIGHUP, hup); @@ -356,6 +377,10 @@ start: servers = new ServerPool(config.num_servers); + // Find all the streams that are to be deleted. + set deleted_urls; + find_deleted_streams(config, &deleted_urls); + CubemapStateProto loaded_state; struct timeval serialize_start; set deserialized_urls; @@ -379,19 +404,26 @@ start: for (int i = 0; i < loaded_state.streams_size(); ++i) { const StreamProto &stream = loaded_state.streams(i); - vector data_fds; - for (int j = 0; j < stream.data_fds_size(); ++j) { - data_fds.push_back(stream.data_fds(j)); + if (deleted_urls.count(stream.url()) != 0) { + // Delete the stream backlogs. + for (int j = 0; j < stream.data_fds_size(); ++j) { + safe_close(stream.data_fds(j)); + } + } else { + vector data_fds; + for (int j = 0; j < stream.data_fds_size(); ++j) { + data_fds.push_back(stream.data_fds(j)); + } + + // Older versions stored the data once in the protobuf instead of + // sending around file descriptors. + if (data_fds.empty() && stream.has_data()) { + data_fds.push_back(make_tempfile(stream.data())); + } + + servers->add_stream_from_serialized(stream, data_fds); + deserialized_urls.insert(stream.url()); } - - // Older versions stored the data once in the protobuf instead of - // sending around file descriptors. - if (data_fds.empty() && stream.has_data()) { - data_fds.push_back(make_tempfile(stream.data())); - } - - servers->add_stream_from_serialized(stream, data_fds); - deserialized_urls.insert(stream.url()); } // Deserialize the inputs. Note that we don't actually add them to any stream yet. @@ -423,7 +455,11 @@ start: // allocate them to, so just do round-robin. However, we need to add // them after the mark pools have been set up. for (int i = 0; i < loaded_state.clients_size(); ++i) { - servers->add_client_from_serialized(loaded_state.clients(i)); + if (deleted_urls.count(loaded_state.clients(i).url()) != 0) { + safe_close(loaded_state.clients(i).sock()); + } else { + servers->add_client_from_serialized(loaded_state.clients(i)); + } } servers->run(); diff --git a/serverpool.h b/serverpool.h index 13211f9..b67fa48 100644 --- a/serverpool.h +++ b/serverpool.h @@ -32,6 +32,7 @@ public: // Adds the given stream to all the servers. Returns the stream index. int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding); int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); + void delete_stream(const std::string &url); int add_udpstream(const sockaddr_in6 &dst, MarkPool *mark_pool); // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.