From 9abb89bcf7940e2ada9d708f86a218a56334f68d Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sun, 21 Apr 2013 01:51:52 +0200 Subject: [PATCH 1/1] Refer to streams internally mostly by an index, not the stream_id. Inputs no longer know about stream_id, only the stream_index. This is faster (we used maybe 5% of our time in find_stream), and will make it easier to add non-HTTP outputs in the future. Within (HTTP) clients, stream_id also no longer exists, but is replaced by "url", which really works exactly the same way. url is mostly used during re-exec, however, so it's less important than it used to be. --- accesslog.cpp | 2 +- client.cpp | 10 ++--- client.h | 4 +- config.cpp | 4 +- config.h | 2 +- httpinput.cpp | 16 ++++---- httpinput.h | 6 +-- input.h | 2 +- main.cpp | 40 ++++++++++---------- server.cpp | 101 +++++++++++++++++++++++++------------------------ server.h | 25 ++++++------ serverpool.cpp | 51 ++++++++++++++++++------- serverpool.h | 19 ++++++---- state.proto | 4 +- stats.cpp | 2 +- stream.cpp | 8 ++-- stream.h | 2 +- udpinput.cpp | 10 ++--- udpinput.h | 4 +- 19 files changed, 172 insertions(+), 140 deletions(-) diff --git a/accesslog.cpp b/accesslog.cpp index 81eff90..410b3b6 100644 --- a/accesslog.cpp +++ b/accesslog.cpp @@ -58,7 +58,7 @@ void AccessLogThread::do_work() fprintf(logfp, "%llu %s %s %d %llu %llu %llu\n", (long long unsigned)(writes[i].connect_time), writes[i].remote_addr.c_str(), - writes[i].stream_id.c_str(), + writes[i].url.c_str(), int(now - writes[i].connect_time), (long long unsigned)(writes[i].bytes_sent), (long long unsigned)(writes[i].bytes_lost), diff --git a/client.cpp b/client.cpp index 04518c8..f6361f3 100644 --- a/client.cpp +++ b/client.cpp @@ -60,7 +60,7 @@ Client::Client(const ClientProto &serialized, Stream *stream) connect_time(serialized.connect_time()), state(State(serialized.state())), request(serialized.request()), - stream_id(serialized.stream_id()), + url(serialized.url()), stream(stream), header_or_error(serialized.header_or_error()), header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()), @@ -90,7 +90,7 @@ ClientProto Client::serialize() const serialized.set_connect_time(connect_time); serialized.set_state(state); serialized.set_request(request); - serialized.set_stream_id(stream_id); + serialized.set_url(url); serialized.set_header_or_error(header_or_error); serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()); serialized.set_stream_pos(stream_pos); @@ -103,10 +103,10 @@ ClientProto Client::serialize() const ClientStats Client::get_stats() const { ClientStats stats; - if (stream_id.empty()) { - stats.stream_id = "-"; + if (url.empty()) { + stats.url = "-"; } else { - stats.stream_id = stream_id; + stats.url = url; } stats.sock = sock; stats.fwmark = fwmark; diff --git a/client.h b/client.h index 9a189c0..c73810d 100644 --- a/client.h +++ b/client.h @@ -12,7 +12,7 @@ struct Stream; // Digested statistics for writing to logs etc. struct ClientStats { - std::string stream_id; + std::string url; int sock; int fwmark; std::string remote_addr; @@ -50,7 +50,7 @@ struct Client { // What stream we're connecting to; parsed from . // Not relevant for READING_REQUEST. - std::string stream_id; + std::string url; Stream *stream; // The header we want to send. This is nominally a copy of Stream::header, diff --git a/config.cpp b/config.cpp index c5547bd..4050338 100644 --- a/config.cpp +++ b/config.cpp @@ -189,12 +189,12 @@ bool parse_stream(const ConfigLine &line, Config *config) } StreamConfig stream; - stream.stream_id = line.arguments[0]; + stream.url = line.arguments[0]; map::const_iterator src_it = line.parameters.find("src"); if (src_it == line.parameters.end()) { log(WARNING, "stream '%s' has no src= attribute, clients will not get any data.", - stream.stream_id.c_str()); + stream.url.c_str()); } else { stream.src = src_it->second; // TODO: Verify that the URL is parseable? diff --git a/config.h b/config.h index 5395d5a..d79d3d7 100644 --- a/config.h +++ b/config.h @@ -12,7 +12,7 @@ struct MarkPoolConfig { }; struct StreamConfig { - std::string stream_id; + std::string url; // As seen by the client. std::string src; // Can be empty. size_t backlog_size; int mark_pool; // -1 for none. diff --git a/httpinput.cpp b/httpinput.cpp index f92ac13..c30651e 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -240,8 +240,8 @@ bool HTTPInput::parse_response(const std::string &request) http_header.append(it->first + ": " + it->second + "\r\n"); } - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header, ""); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, ""); } return true; @@ -264,8 +264,8 @@ void HTTPInput::do_work() request_bytes_sent = 0; response.clear(); pending_data.clear(); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], "", ""); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], "", ""); } { @@ -466,12 +466,12 @@ void HTTPInput::process_data(char *ptr, size_t bytes) char *inner_data = pending_data.data() + sizeof(metacube_block_header); if (flags & METACUBE_FLAGS_HEADER) { string header(inner_data, inner_data + size); - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->set_header(stream_ids[i], http_header, header); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->set_header(stream_indices[i], http_header, header); } } else { - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->add_data(stream_ids[i], inner_data, size); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->add_data(stream_indices[i], inner_data, size); } } diff --git a/httpinput.h b/httpinput.h index 9a27871..fc39b30 100644 --- a/httpinput.h +++ b/httpinput.h @@ -21,9 +21,9 @@ public: virtual std::string get_url() const { return url; } - virtual void add_destination(const std::string &stream_id) + virtual void add_destination(int stream_index) { - stream_ids.push_back(stream_id); + stream_indices.push_back(stream_index); } private: @@ -53,7 +53,7 @@ private: }; State state; - std::vector stream_ids; + std::vector stream_indices; // The URL and its parsed components. std::string url; diff --git a/input.h b/input.h index e0bd54b..76e1b66 100644 --- a/input.h +++ b/input.h @@ -22,7 +22,7 @@ public: virtual InputProto serialize() const = 0; virtual std::string get_url() const = 0; virtual void close_socket() = 0; - virtual void add_destination(const std::string &stream_id) = 0; + virtual void add_destination(int stream_index) = 0; }; #endif // !defined(_INPUT_H) diff --git a/main.cpp b/main.cpp index c89ec72..5ec0cb4 100644 --- a/main.cpp +++ b/main.cpp @@ -136,7 +136,7 @@ void create_config_inputs(const Config &config, multimap &deserialized_stream_ids, + const set &deserialized_urls, multimap *inputs) { for (unsigned i = 0; i < config.mark_pools.size(); ++i) { @@ -144,43 +144,45 @@ void create_streams(const Config &config, mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); } - set expecting_stream_ids = deserialized_stream_ids; + set expecting_urls = deserialized_urls; for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; - if (deserialized_stream_ids.count(stream_config.stream_id) == 0) { - servers->add_stream(stream_config.stream_id, - stream_config.backlog_size, - Stream::Encoding(stream_config.encoding)); + int stream_index; + if (deserialized_urls.count(stream_config.url) == 0) { + stream_index = servers->add_stream(stream_config.url, + stream_config.backlog_size, + Stream::Encoding(stream_config.encoding)); } else { - servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size); - servers->set_encoding(stream_config.stream_id, + stream_index = servers->lookup_stream_by_url(stream_config.url); + assert(stream_index != -1); + servers->set_backlog_size(stream_index, stream_config.backlog_size); + servers->set_encoding(stream_index, Stream::Encoding(stream_config.encoding)); } - expecting_stream_ids.erase(stream_config.stream_id); + expecting_urls.erase(stream_config.url); if (stream_config.mark_pool != -1) { - servers->set_mark_pool(stream_config.stream_id, - mark_pools[stream_config.mark_pool]); + servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); } string src = stream_config.src; if (!src.empty()) { multimap::iterator input_it = inputs->find(src); assert(input_it != inputs->end()); - input_it->second.input->add_destination(stream_config.stream_id); + input_it->second.input->add_destination(stream_index); ++input_it->second.refcount; } } // Warn about any servers we've lost. // TODO: Make an option (delete=yes?) to actually shut down streams. - for (set::const_iterator stream_it = expecting_stream_ids.begin(); - stream_it != expecting_stream_ids.end(); + for (set::const_iterator stream_it = expecting_urls.begin(); + stream_it != expecting_urls.end(); ++stream_it) { - string stream_id = *stream_it; + string url = *stream_it; log(WARNING, "stream '%s' disappeared from the configuration file. " "It will not be deleted, but clients will not get any new inputs.", - stream_id.c_str()); + url.c_str()); } } @@ -328,7 +330,7 @@ start: CubemapStateProto loaded_state; struct timeval serialize_start; - set deserialized_stream_ids; + set deserialized_urls; map deserialized_acceptors; multimap inputs; // multimap due to older versions without deduplication. if (state_fd != -1) { @@ -361,7 +363,7 @@ start: } servers->add_stream_from_serialized(stream, data_fds); - deserialized_stream_ids.insert(stream.stream_id()); + deserialized_urls.insert(stream.url()); } // Deserialize the inputs. Note that we don't actually add them to any stream yet. @@ -386,7 +388,7 @@ start: create_config_inputs(config, &inputs); // Find all streams in the configuration file, create them, and connect to the inputs. - create_streams(config, deserialized_stream_ids, &inputs); + create_streams(config, deserialized_urls, &inputs); vector acceptors = create_acceptors(config, &deserialized_acceptors); // Put back the existing clients. It doesn't matter which server we diff --git a/server.cpp b/server.cpp index 2eb33ed..5bd0d55 100644 --- a/server.cpp +++ b/server.cpp @@ -46,10 +46,8 @@ Server::Server() Server::~Server() { - for (map::iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { - delete stream_it->second; + for (size_t i = 0; i < streams.size(); ++i) { + delete streams[i]; } safe_close(epoll_fd); @@ -100,11 +98,9 @@ void Server::do_work() process_client(client); } - for (map::iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { + for (size_t i = 0; i < streams.size(); ++i) { vector to_process; - swap(stream_it->second->to_process, to_process); + swap(streams[i]->to_process, to_process); for (size_t i = 0; i < to_process.size(); ++i) { process_client(to_process[i]); } @@ -123,10 +119,8 @@ CubemapStateProto Server::serialize() ++client_it) { serialized.add_clients()->MergeFrom(client_it->second.serialize()); } - for (map::const_iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { - serialized.add_streams()->MergeFrom(stream_it->second->serialize()); + for (size_t i = 0; i < streams.size(); ++i) { + serialized.add_streams()->MergeFrom(streams[i]->serialize()); } return serialized; } @@ -160,11 +154,12 @@ void Server::add_client_from_serialized(const ClientProto &client) { MutexLock lock(&mutex); Stream *stream; - map::iterator stream_it = streams.find(client.stream_id()); - if (stream_it == streams.end()) { + int stream_index = lookup_stream_by_url(client.url()); + if (stream_index == -1) { + assert(client.state() != Client::SENDING_DATA); stream = NULL; } else { - stream = stream_it->second; + stream = streams[stream_index]; } pair::iterator, bool> ret = clients.insert(make_pair(client.sock(), Client(client, stream))); @@ -180,7 +175,6 @@ void Server::add_client_from_serialized(const ClientProto &client) // the sleeping array again soon. ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; } - ev.data.u64 = 0; // Keep Valgrind happy. ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) { log_perror("epoll_ctl(EPOLL_CTL_ADD)"); @@ -195,37 +189,51 @@ void Server::add_client_from_serialized(const ClientProto &client) } } -void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding) +int Server::lookup_stream_by_url(const std::string &url) const +{ + map::const_iterator url_it = url_map.find(url); + if (url_it == url_map.end()) { + return -1; + } + return url_it->second; +} + +int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) { MutexLock lock(&mutex); - streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding))); + url_map.insert(make_pair(url, streams.size())); + streams.push_back(new Stream(url, backlog_size, encoding)); + return streams.size() - 1; } -void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) +int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) { MutexLock lock(&mutex); - streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd))); + url_map.insert(make_pair(stream.url(), streams.size())); + streams.push_back(new Stream(stream, data_fd)); + return streams.size() - 1; } -void Server::set_backlog_size(const string &stream_id, size_t new_size) +void Server::set_backlog_size(int stream_index, size_t new_size) { MutexLock lock(&mutex); - assert(streams.count(stream_id) != 0); - streams[stream_id]->set_backlog_size(new_size); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->set_backlog_size(new_size); } -void Server::set_encoding(const string &stream_id, Stream::Encoding encoding) +void Server::set_encoding(int stream_index, Stream::Encoding encoding) { MutexLock lock(&mutex); - assert(streams.count(stream_id) != 0); - streams[stream_id]->encoding = encoding; + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->encoding = encoding; } -void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header) +void Server::set_header(int stream_index, const string &http_header, const string &stream_header) { MutexLock lock(&mutex); - find_stream(stream_id)->http_header = http_header; - find_stream(stream_id)->stream_header = stream_header; + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->http_header = http_header; + streams[stream_index]->stream_header = stream_header; // If there are clients we haven't sent anything to yet, we should give // them the header, so push back into the SENDING_HEADER state. @@ -240,17 +248,19 @@ void Server::set_header(const string &stream_id, const string &http_header, cons } } -void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool) +void Server::set_mark_pool(int stream_index, MarkPool *mark_pool) { MutexLock lock(&mutex); assert(clients.empty()); - find_stream(stream_id)->mark_pool = mark_pool; + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->mark_pool = mark_pool; } -void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) +void Server::add_data_deferred(int stream_index, const char *data, size_t bytes) { MutexLock lock(&queued_data_mutex); - find_stream(stream_id)->add_data_deferred(data, bytes); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->add_data_deferred(data, bytes); } // See the .h file for postconditions after this function. @@ -444,12 +454,14 @@ int Server::parse_request(Client *client) if (request_tokens[0] != "GET") { return 400; // Should maybe be 405 instead? } - if (streams.count(request_tokens[1]) == 0) { + + map::const_iterator url_map_it = url_map.find(request_tokens[1]); + if (url_map_it == url_map.end()) { return 404; // Not found. } - client->stream_id = request_tokens[1]; - client->stream = find_stream(client->stream_id); + client->url = request_tokens[1]; + client->stream = streams[url_map_it->second]; if (client->stream->mark_pool != NULL) { client->fwmark = client->stream->mark_pool->get_mark(); } else { @@ -467,7 +479,7 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - Stream *stream = find_stream(client->stream_id); + Stream *stream = client->stream; if (stream->encoding == Stream::STREAM_ENCODING_RAW) { client->header_or_error = stream->http_header + "\r\n" + @@ -555,13 +567,6 @@ void Server::close_client(Client *client) clients.erase(client->sock); } -Stream *Server::find_stream(const string &stream_id) -{ - map::iterator it = streams.find(stream_id); - assert(it != streams.end()); - return it->second; -} - void Server::process_queued_data() { MutexLock lock(&queued_data_mutex); @@ -570,10 +575,8 @@ void Server::process_queued_data() add_client(queued_add_clients[i]); } queued_add_clients.clear(); - - for (map::iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { - stream_it->second->process_queued_data(); + + for (size_t i = 0; i < streams.size(); ++i) { + streams[i]->process_queued_data(); } } diff --git a/server.h b/server.h index edcdc83..476e790 100644 --- a/server.h +++ b/server.h @@ -35,29 +35,30 @@ public: std::vector get_client_stats() const; // Set header (both HTTP header and any stream headers) for the given stream. - void set_header(const std::string &stream_id, + void set_header(int stream_index, const std::string &http_header, const std::string &stream_header); // Set that the given stream should use the given mark pool from now on. // NOTE: This should be set before any clients are connected! - void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool); + void set_mark_pool(int stream_index, MarkPool *mark_pool); // These will be deferred until the next time an iteration in do_work() happens, // and the order between them are undefined. // XXX: header should ideally be ordered with respect to data. void add_client_deferred(int sock); - void add_data_deferred(const std::string &stream_id, const char *data, size_t bytes); + void add_data_deferred(int stream_index, const char *data, size_t bytes); // These should not be called while running, since that would violate // threading assumptions (ie., that epoll is only called from one thread // at the same time). CubemapStateProto serialize(); void add_client_from_serialized(const ClientProto &client); - void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding); - void add_stream_from_serialized(const StreamProto &stream, int data_fd); - void set_backlog_size(const std::string &stream_id, size_t new_size); - void set_encoding(const std::string &stream_id, Stream::Encoding encoding); + int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding); + int add_stream_from_serialized(const StreamProto &stream, int data_fd); + int lookup_stream_by_url(const std::string &url) const; + void set_backlog_size(int stream_index, size_t new_size); + void set_encoding(int stream_index, Stream::Encoding encoding); private: // Mutex protecting queued_add_clients and streams[..]->queued_data. @@ -80,8 +81,11 @@ private: // All variables below this line are protected by the mutex. mutable pthread_mutex_t mutex; - // Map from stream ID to stream. - std::map streams; + // All streams. + std::vector streams; + + // Map from URL to index into . + std::map url_map; // Map from file descriptor to client. std::map clients; @@ -122,9 +126,6 @@ private: // the SENDING_ERROR state. void construct_error(Client *client, int error_code); - // TODO: This function should probably die. - Stream *find_stream(const std::string &stream_id); - void process_queued_data(); void add_client(int sock); diff --git a/serverpool.cpp b/serverpool.cpp index e309ab9..fb8ca45 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -61,17 +61,32 @@ void ServerPool::add_client_from_serialized(const ClientProto &client) servers[clients_added++ % num_servers].add_client_from_serialized(client); } -void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding) +int ServerPool::lookup_stream_by_url(const std::string &url) const { + assert(servers != NULL); + return servers[0].lookup_stream_by_url(url); +} + +int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) +{ + int stream_index = -1; for (int i = 0; i < num_servers; ++i) { - servers[i].add_stream(stream_id, backlog_size, encoding); + int stream_index2 = servers[i].add_stream(url, backlog_size, encoding); + if (i == 0) { + stream_index = stream_index2; + } else { + // Verify that all servers have this under the same stream index. + assert(stream_index == stream_index2); + } } + return stream_index; } -void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector &data_fds) +int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector &data_fds) { assert(!data_fds.empty()); string contents; + int stream_index = -1; for (int i = 0; i < num_servers; ++i) { int data_fd; if (i < int(data_fds.size())) { @@ -87,26 +102,34 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vec data_fd = make_tempfile(contents); } - servers[i].add_stream_from_serialized(stream, data_fd); + int stream_index2 = servers[i].add_stream_from_serialized(stream, data_fd); + if (i == 0) { + stream_index = stream_index2; + } else { + // Verify that all servers have this under the same stream index. + assert(stream_index == stream_index2); + } } // Close and delete any leftovers, if the number of servers was reduced. for (size_t i = num_servers; i < data_fds.size(); ++i) { safe_close(data_fds[i]); // Implicitly deletes the file. } + + return stream_index; } -void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header) +void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header) { for (int i = 0; i < num_servers; ++i) { - servers[i].set_header(stream_id, http_header, stream_header); + servers[i].set_header(stream_index, http_header, stream_header); } } -void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes) +void ServerPool::add_data(int stream_index, const char *data, size_t bytes) { for (int i = 0; i < num_servers; ++i) { - servers[i].add_data_deferred(stream_id, data, bytes); + servers[i].add_data_deferred(stream_index, data, bytes); } } @@ -134,23 +157,23 @@ vector ServerPool::get_client_stats() const return ret; } -void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool) +void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool) { for (int i = 0; i < num_servers; ++i) { - servers[i].set_mark_pool(stream_id, mark_pool); + servers[i].set_mark_pool(stream_index, mark_pool); } } -void ServerPool::set_backlog_size(const string &stream_id, size_t new_size) +void ServerPool::set_backlog_size(int stream_index, size_t new_size) { for (int i = 0; i < num_servers; ++i) { - servers[i].set_backlog_size(stream_id, new_size); + servers[i].set_backlog_size(stream_index, new_size); } } -void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding) +void ServerPool::set_encoding(int stream_index, Stream::Encoding encoding) { for (int i = 0; i < num_servers; ++i) { - servers[i].set_encoding(stream_id, encoding); + servers[i].set_encoding(stream_index, encoding); } } diff --git a/serverpool.h b/serverpool.h index 6a2fcb5..5f5f6f8 100644 --- a/serverpool.h +++ b/serverpool.h @@ -26,24 +26,27 @@ public: void add_client(int sock); void add_client_from_serialized(const ClientProto &client); - // Adds the given stream to all the servers. - void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding); - void add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); + // Adds the given stream to all the servers. Returns the stream index. + int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding); + int add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); + + // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure. + int lookup_stream_by_url(const std::string &url) const; // Adds the given data to all the servers. - void set_header(const std::string &stream_id, + void set_header(int stream_index, const std::string &http_header, const std::string &stream_header); - void add_data(const std::string &stream_id, const char *data, size_t bytes); + void add_data(int stream_index, const char *data, size_t bytes); // Connects the given stream to the given mark pool for all the servers. - void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool); + void set_mark_pool(int stream_index, MarkPool *mark_pool); // Changes the given stream's backlog size on all the servers. - void set_backlog_size(const std::string &stream_id, size_t new_size); + void set_backlog_size(int stream_index, size_t new_size); // Changes the given stream's encoding type on all the servers. - void set_encoding(const std::string &stream_id, Stream::Encoding encoding); + void set_encoding(int stream_index, Stream::Encoding encoding); // Starts all the servers. void run(); diff --git a/state.proto b/state.proto index 88ad2d8..1776280 100644 --- a/state.proto +++ b/state.proto @@ -5,7 +5,7 @@ message ClientProto { optional int64 connect_time = 9; optional int32 state = 2; optional bytes request = 3; - optional string stream_id = 4; + optional string url = 4; optional bytes header_or_error = 5; optional int64 header_or_error_bytes_sent = 6; optional int64 stream_pos = 7; @@ -21,7 +21,7 @@ message StreamProto { repeated int32 data_fds = 8; optional int64 backlog_size = 5 [default=1048576]; optional int64 bytes_received = 3; - optional string stream_id = 4; + optional string url = 4; // Older versions stored the HTTP and video headers together in this field. optional bytes header = 1; diff --git a/stats.cpp b/stats.cpp index 280f019..6dffa58 100644 --- a/stats.cpp +++ b/stats.cpp @@ -60,7 +60,7 @@ void StatsThread::do_work() client_stats[i].remote_addr.c_str(), client_stats[i].sock, client_stats[i].fwmark, - client_stats[i].stream_id.c_str(), + client_stats[i].url.c_str(), int(now - client_stats[i].connect_time), (long long unsigned)(client_stats[i].bytes_sent), (long long unsigned)(client_stats[i].bytes_lost), diff --git a/stream.cpp b/stream.cpp index 37961fa..9922655 100644 --- a/stream.cpp +++ b/stream.cpp @@ -15,8 +15,8 @@ using namespace std; -Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding) - : stream_id(stream_id), +Stream::Stream(const string &url, size_t backlog_size, Encoding encoding) + : url(url), encoding(encoding), data_fd(make_tempfile("")), backlog_size(backlog_size), @@ -36,7 +36,7 @@ Stream::~Stream() } Stream::Stream(const StreamProto &serialized, int data_fd) - : stream_id(serialized.stream_id()), + : url(serialized.url()), http_header(serialized.http_header()), stream_header(serialized.stream_header()), encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later. @@ -71,7 +71,7 @@ StreamProto Stream::serialize() serialized.add_data_fds(data_fd); serialized.set_backlog_size(backlog_size); serialized.set_bytes_received(bytes_received); - serialized.set_stream_id(stream_id); + serialized.set_url(url); data_fd = -1; return serialized; } diff --git a/stream.h b/stream.h index 33fb55e..d762869 100644 --- a/stream.h +++ b/stream.h @@ -29,7 +29,7 @@ struct Stream { // Changes the backlog size, restructuring the data as needed. void set_backlog_size(size_t new_size); - std::string stream_id; + std::string url; // The HTTP response header, without the trailing double newline. std::string http_header; diff --git a/udpinput.cpp b/udpinput.cpp index 07bf63d..db54485 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -67,10 +67,10 @@ void UDPInput::construct_header() "Connection: close\r\n"; } -void UDPInput::add_destination(const string &stream_id) +void UDPInput::add_destination(int stream_index) { - stream_ids.push_back(stream_id); - servers->set_header(stream_id, http_header, ""); + stream_indices.push_back(stream_index); + servers->set_header(stream_index, http_header, ""); } void UDPInput::do_work() @@ -106,8 +106,8 @@ void UDPInput::do_work() continue; } - for (size_t i = 0; i < stream_ids.size(); ++i) { - servers->add_data(stream_ids[i], buf, ret); + for (size_t i = 0; i < stream_indices.size(); ++i) { + servers->add_data(stream_indices[i], buf, ret); } } } diff --git a/udpinput.h b/udpinput.h index 481b006..e41266e 100644 --- a/udpinput.h +++ b/udpinput.h @@ -19,7 +19,7 @@ public: virtual std::string get_url() const { return url; } virtual void close_socket(); - virtual void add_destination(const std::string &stream_id); + virtual void add_destination(int stream_index); private: // Actually gets the packets. @@ -28,7 +28,7 @@ private: // Create the HTTP header. void construct_header(); - std::vector stream_ids; + std::vector stream_indices; // The URL and its parsed components. std::string url; -- 2.39.2