Refer to streams internally mostly by an index, not the stream_id.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 20 Apr 2013 23:51:52 +0000 (01:51 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 20 Apr 2013 23:54:30 +0000 (01:54 +0200)
Inputs no longer know about stream_id, only the stream_index.
This is faster (we used maybe 5% of our time in find_stream),
and will make it easier to add non-HTTP outputs in the future.

Within (HTTP) clients, stream_id also no longer exists, but is replaced
by "url", which really works exactly the same way. url is mostly used during
re-exec, however, so it's less important than it used to be.

19 files changed:
accesslog.cpp
client.cpp
client.h
config.cpp
config.h
httpinput.cpp
httpinput.h
input.h
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stats.cpp
stream.cpp
stream.h
udpinput.cpp
udpinput.h

index 81eff90..410b3b6 100644 (file)
@@ -58,7 +58,7 @@ void AccessLogThread::do_work()
                                fprintf(logfp, "%llu %s %s %d %llu %llu %llu\n",
                                        (long long unsigned)(writes[i].connect_time),
                                        writes[i].remote_addr.c_str(),
-                                       writes[i].stream_id.c_str(),
+                                       writes[i].url.c_str(),
                                        int(now - writes[i].connect_time),
                                        (long long unsigned)(writes[i].bytes_sent),
                                        (long long unsigned)(writes[i].bytes_lost),
index 04518c8..f6361f3 100644 (file)
@@ -60,7 +60,7 @@ Client::Client(const ClientProto &serialized, Stream *stream)
          connect_time(serialized.connect_time()),
          state(State(serialized.state())),
          request(serialized.request()),
-         stream_id(serialized.stream_id()),
+         url(serialized.url()),
          stream(stream),
          header_or_error(serialized.header_or_error()),
          header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()),
@@ -90,7 +90,7 @@ ClientProto Client::serialize() const
        serialized.set_connect_time(connect_time);
        serialized.set_state(state);
        serialized.set_request(request);
-       serialized.set_stream_id(stream_id);
+       serialized.set_url(url);
        serialized.set_header_or_error(header_or_error);
        serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent());
        serialized.set_stream_pos(stream_pos);
@@ -103,10 +103,10 @@ ClientProto Client::serialize() const
 ClientStats Client::get_stats() const
 {
        ClientStats stats;
-       if (stream_id.empty()) {
-               stats.stream_id = "-";
+       if (url.empty()) {
+               stats.url = "-";
        } else {
-               stats.stream_id = stream_id;
+               stats.url = url;
        }
        stats.sock = sock;
        stats.fwmark = fwmark;
index 9a189c0..c73810d 100644 (file)
--- a/client.h
+++ b/client.h
@@ -12,7 +12,7 @@ struct Stream;
 
 // Digested statistics for writing to logs etc.
 struct ClientStats {
-       std::string stream_id;
+       std::string url;
        int sock;
        int fwmark;
        std::string remote_addr;
@@ -50,7 +50,7 @@ struct Client {
 
        // What stream we're connecting to; parsed from <request>.
        // Not relevant for READING_REQUEST.
-       std::string stream_id;
+       std::string url;
        Stream *stream;
 
        // The header we want to send. This is nominally a copy of Stream::header,
index c5547bd..4050338 100644 (file)
@@ -189,12 +189,12 @@ bool parse_stream(const ConfigLine &line, Config *config)
        }
 
        StreamConfig stream;
-       stream.stream_id = line.arguments[0];
+       stream.url = line.arguments[0];
 
        map<string, string>::const_iterator src_it = line.parameters.find("src");
        if (src_it == line.parameters.end()) {
                log(WARNING, "stream '%s' has no src= attribute, clients will not get any data.",
-                       stream.stream_id.c_str());
+                       stream.url.c_str());
        } else {
                stream.src = src_it->second;
                // TODO: Verify that the URL is parseable?
index 5395d5a..d79d3d7 100644 (file)
--- a/config.h
+++ b/config.h
@@ -12,7 +12,7 @@ struct MarkPoolConfig {
 };
 
 struct StreamConfig {
-       std::string stream_id;
+       std::string url;  // As seen by the client.
        std::string src;  // Can be empty.
        size_t backlog_size;
        int mark_pool;  // -1 for none.
index f92ac13..c30651e 100644 (file)
@@ -240,8 +240,8 @@ bool HTTPInput::parse_response(const std::string &request)
                http_header.append(it->first + ": " + it->second + "\r\n");
        }
 
-       for (size_t i = 0; i < stream_ids.size(); ++i) {
-               servers->set_header(stream_ids[i], http_header, "");
+       for (size_t i = 0; i < stream_indices.size(); ++i) {
+               servers->set_header(stream_indices[i], http_header, "");
        }
 
        return true;
@@ -264,8 +264,8 @@ void HTTPInput::do_work()
                        request_bytes_sent = 0;
                        response.clear();
                        pending_data.clear();
-                       for (size_t i = 0; i < stream_ids.size(); ++i) {
-                               servers->set_header(stream_ids[i], "", "");
+                       for (size_t i = 0; i < stream_indices.size(); ++i) {
+                               servers->set_header(stream_indices[i], "", "");
                        }
 
                        {
@@ -466,12 +466,12 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                char *inner_data = pending_data.data() + sizeof(metacube_block_header);
                if (flags & METACUBE_FLAGS_HEADER) {
                        string header(inner_data, inner_data + size);
-                       for (size_t i = 0; i < stream_ids.size(); ++i) {
-                               servers->set_header(stream_ids[i], http_header, header);
+                       for (size_t i = 0; i < stream_indices.size(); ++i) {
+                               servers->set_header(stream_indices[i], http_header, header);
                        }
                } else { 
-                       for (size_t i = 0; i < stream_ids.size(); ++i) {
-                               servers->add_data(stream_ids[i], inner_data, size);
+                       for (size_t i = 0; i < stream_indices.size(); ++i) {
+                               servers->add_data(stream_indices[i], inner_data, size);
                        }
                }
 
index 9a27871..fc39b30 100644 (file)
@@ -21,9 +21,9 @@ public:
 
        virtual std::string get_url() const { return url; }
 
-       virtual void add_destination(const std::string &stream_id)
+       virtual void add_destination(int stream_index)
        {
-               stream_ids.push_back(stream_id);
+               stream_indices.push_back(stream_index);
        }
 
 private:
@@ -53,7 +53,7 @@ private:
        };
        State state;
 
-       std::vector<std::string> stream_ids;
+       std::vector<int> stream_indices;
 
        // The URL and its parsed components.
        std::string url;
diff --git a/input.h b/input.h
index e0bd54b..76e1b66 100644 (file)
--- a/input.h
+++ b/input.h
@@ -22,7 +22,7 @@ public:
        virtual InputProto serialize() const = 0;
        virtual std::string get_url() const = 0;
        virtual void close_socket() = 0;
-       virtual void add_destination(const std::string &stream_id) = 0;
+       virtual void add_destination(int stream_index) = 0;
 };
 
 #endif  // !defined(_INPUT_H)
index c89ec72..5ec0cb4 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -136,7 +136,7 @@ void create_config_inputs(const Config &config, multimap<string, InputWithRefcou
 }
 
 void create_streams(const Config &config,
-                    const set<string> &deserialized_stream_ids,
+                    const set<string> &deserialized_urls,
                     multimap<string, InputWithRefcount> *inputs)
 {
        for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
@@ -144,43 +144,45 @@ void create_streams(const Config &config,
                mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
        }
 
-       set<string> expecting_stream_ids = deserialized_stream_ids;
+       set<string> expecting_urls = deserialized_urls;
        for (unsigned i = 0; i < config.streams.size(); ++i) {
                const StreamConfig &stream_config = config.streams[i];
-               if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
-                       servers->add_stream(stream_config.stream_id,
-                                           stream_config.backlog_size,
-                                           Stream::Encoding(stream_config.encoding));
+               int stream_index;
+               if (deserialized_urls.count(stream_config.url) == 0) {
+                       stream_index = servers->add_stream(stream_config.url,
+                                                          stream_config.backlog_size,
+                                                          Stream::Encoding(stream_config.encoding));
                } else {
-                       servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
-                       servers->set_encoding(stream_config.stream_id,
+                       stream_index = servers->lookup_stream_by_url(stream_config.url);
+                       assert(stream_index != -1);
+                       servers->set_backlog_size(stream_index, stream_config.backlog_size);
+                       servers->set_encoding(stream_index,
                                              Stream::Encoding(stream_config.encoding));
                }
-               expecting_stream_ids.erase(stream_config.stream_id);
+               expecting_urls.erase(stream_config.url);
 
                if (stream_config.mark_pool != -1) {
-                       servers->set_mark_pool(stream_config.stream_id,
-                                              mark_pools[stream_config.mark_pool]);
+                       servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
                }
 
                string src = stream_config.src;
                if (!src.empty()) {
                        multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
                        assert(input_it != inputs->end());
-                       input_it->second.input->add_destination(stream_config.stream_id);
+                       input_it->second.input->add_destination(stream_index);
                        ++input_it->second.refcount;
                }
        }
 
        // Warn about any servers we've lost.
        // TODO: Make an option (delete=yes?) to actually shut down streams.
-       for (set<string>::const_iterator stream_it = expecting_stream_ids.begin();
-            stream_it != expecting_stream_ids.end();
+       for (set<string>::const_iterator stream_it = expecting_urls.begin();
+            stream_it != expecting_urls.end();
             ++stream_it) {
-               string stream_id = *stream_it;
+               string url = *stream_it;
                log(WARNING, "stream '%s' disappeared from the configuration file. "
                             "It will not be deleted, but clients will not get any new inputs.",
-                            stream_id.c_str());
+                            url.c_str());
        }
 }
        
@@ -328,7 +330,7 @@ start:
 
        CubemapStateProto loaded_state;
        struct timeval serialize_start;
-       set<string> deserialized_stream_ids;
+       set<string> deserialized_urls;
        map<int, Acceptor *> deserialized_acceptors;
        multimap<string, InputWithRefcount> inputs;  // multimap due to older versions without deduplication.
        if (state_fd != -1) {
@@ -361,7 +363,7 @@ start:
                        }
 
                        servers->add_stream_from_serialized(stream, data_fds);
-                       deserialized_stream_ids.insert(stream.stream_id());
+                       deserialized_urls.insert(stream.url());
                }
 
                // Deserialize the inputs. Note that we don't actually add them to any stream yet.
@@ -386,7 +388,7 @@ start:
        create_config_inputs(config, &inputs);
        
        // Find all streams in the configuration file, create them, and connect to the inputs.
-       create_streams(config, deserialized_stream_ids, &inputs);
+       create_streams(config, deserialized_urls, &inputs);
        vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
        
        // Put back the existing clients. It doesn't matter which server we
index 2eb33ed..5bd0d55 100644 (file)
@@ -46,10 +46,8 @@ Server::Server()
 
 Server::~Server()
 {
-       for (map<string, Stream *>::iterator stream_it = streams.begin();
-            stream_it != streams.end();
-            ++stream_it) {
-               delete stream_it->second;
+       for (size_t i = 0; i < streams.size(); ++i) {   
+               delete streams[i];
        }
 
        safe_close(epoll_fd);
@@ -100,11 +98,9 @@ void Server::do_work()
                        process_client(client);
                }
 
-               for (map<string, Stream *>::iterator stream_it = streams.begin();
-                    stream_it != streams.end();
-                    ++stream_it) {
+               for (size_t i = 0; i < streams.size(); ++i) {   
                        vector<Client *> to_process;
-                       swap(stream_it->second->to_process, to_process);
+                       swap(streams[i]->to_process, to_process);
                        for (size_t i = 0; i < to_process.size(); ++i) {
                                process_client(to_process[i]);
                        }
@@ -123,10 +119,8 @@ CubemapStateProto Server::serialize()
             ++client_it) {
                serialized.add_clients()->MergeFrom(client_it->second.serialize());
        }
-       for (map<string, Stream *>::const_iterator stream_it = streams.begin();
-            stream_it != streams.end();
-            ++stream_it) {
-               serialized.add_streams()->MergeFrom(stream_it->second->serialize());
+       for (size_t i = 0; i < streams.size(); ++i) {   
+               serialized.add_streams()->MergeFrom(streams[i]->serialize());
        }
        return serialized;
 }
@@ -160,11 +154,12 @@ void Server::add_client_from_serialized(const ClientProto &client)
 {
        MutexLock lock(&mutex);
        Stream *stream;
-       map<string, Stream *>::iterator stream_it = streams.find(client.stream_id());
-       if (stream_it == streams.end()) {
+       int stream_index = lookup_stream_by_url(client.url());
+       if (stream_index == -1) {
+               assert(client.state() != Client::SENDING_DATA);
                stream = NULL;
        } else {
-               stream = stream_it->second;
+               stream = streams[stream_index];
        }
        pair<map<int, Client>::iterator, bool> ret =
                clients.insert(make_pair(client.sock(), Client(client, stream)));
@@ -180,7 +175,6 @@ void Server::add_client_from_serialized(const ClientProto &client)
                // the sleeping array again soon.
                ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
        }
-       ev.data.u64 = 0;  // Keep Valgrind happy.
        ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
                log_perror("epoll_ctl(EPOLL_CTL_ADD)");
@@ -195,37 +189,51 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 }
 
-void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
+int Server::lookup_stream_by_url(const std::string &url) const
+{
+       map<string, int>::const_iterator url_it = url_map.find(url);
+       if (url_it == url_map.end()) {
+               return -1;
+       }
+       return url_it->second;
+}
+
+int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
+       url_map.insert(make_pair(url, streams.size()));
+       streams.push_back(new Stream(url, backlog_size, encoding));
+       return streams.size() - 1;
 }
 
-void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
+int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
+       url_map.insert(make_pair(stream.url(), streams.size()));
+       streams.push_back(new Stream(stream, data_fd));
+       return streams.size() - 1;
 }
        
-void Server::set_backlog_size(const string &stream_id, size_t new_size)
+void Server::set_backlog_size(int stream_index, size_t new_size)
 {
        MutexLock lock(&mutex);
-       assert(streams.count(stream_id) != 0);
-       streams[stream_id]->set_backlog_size(new_size);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->set_backlog_size(new_size);
 }
        
-void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
+void Server::set_encoding(int stream_index, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       assert(streams.count(stream_id) != 0);
-       streams[stream_id]->encoding = encoding;
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->encoding = encoding;
 }
        
-void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
 {
        MutexLock lock(&mutex);
-       find_stream(stream_id)->http_header = http_header;
-       find_stream(stream_id)->stream_header = stream_header;
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->http_header = http_header;
+       streams[stream_index]->stream_header = stream_header;
 
        // If there are clients we haven't sent anything to yet, we should give
        // them the header, so push back into the SENDING_HEADER state.
@@ -240,17 +248,19 @@ void Server::set_header(const string &stream_id, const string &http_header, cons
        }
 }
        
-void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
 {
        MutexLock lock(&mutex);
        assert(clients.empty());
-       find_stream(stream_id)->mark_pool = mark_pool;
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->mark_pool = mark_pool;
 }
 
-void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes)
 {
        MutexLock lock(&queued_data_mutex);
-       find_stream(stream_id)->add_data_deferred(data, bytes);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->add_data_deferred(data, bytes);
 }
 
 // See the .h file for postconditions after this function.     
@@ -444,12 +454,14 @@ int Server::parse_request(Client *client)
        if (request_tokens[0] != "GET") {
                return 400;  // Should maybe be 405 instead?
        }
-       if (streams.count(request_tokens[1]) == 0) {
+
+       map<string, int>::const_iterator url_map_it = url_map.find(request_tokens[1]);
+       if (url_map_it == url_map.end()) {
                return 404;  // Not found.
        }
 
-       client->stream_id = request_tokens[1];
-       client->stream = find_stream(client->stream_id);
+       client->url = request_tokens[1];
+       client->stream = streams[url_map_it->second];
        if (client->stream->mark_pool != NULL) {
                client->fwmark = client->stream->mark_pool->get_mark();
        } else {
@@ -467,7 +479,7 @@ int Server::parse_request(Client *client)
 
 void Server::construct_header(Client *client)
 {
-       Stream *stream = find_stream(client->stream_id);
+       Stream *stream = client->stream;
        if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
                client->header_or_error = stream->http_header +
                        "\r\n" +
@@ -555,13 +567,6 @@ void Server::close_client(Client *client)
        clients.erase(client->sock);
 }
        
-Stream *Server::find_stream(const string &stream_id)
-{
-       map<string, Stream *>::iterator it = streams.find(stream_id);
-       assert(it != streams.end());
-       return it->second;
-}
-
 void Server::process_queued_data()
 {
        MutexLock lock(&queued_data_mutex);
@@ -570,10 +575,8 @@ void Server::process_queued_data()
                add_client(queued_add_clients[i]);
        }
        queued_add_clients.clear();     
-       
-       for (map<string, Stream *>::iterator stream_it = streams.begin();
-            stream_it != streams.end();
-            ++stream_it) {
-               stream_it->second->process_queued_data();
+
+       for (size_t i = 0; i < streams.size(); ++i) {   
+               streams[i]->process_queued_data();
        }
 }
index edcdc83..476e790 100644 (file)
--- a/server.h
+++ b/server.h
@@ -35,29 +35,30 @@ public:
        std::vector<ClientStats> get_client_stats() const;
 
        // Set header (both HTTP header and any stream headers) for the given stream.
-       void set_header(const std::string &stream_id,
+       void set_header(int stream_index,
                        const std::string &http_header,
                        const std::string &stream_header);
 
        // Set that the given stream should use the given mark pool from now on.
        // NOTE: This should be set before any clients are connected!
-       void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
+       void set_mark_pool(int stream_index, MarkPool *mark_pool);
 
        // These will be deferred until the next time an iteration in do_work() happens,
        // and the order between them are undefined.
        // XXX: header should ideally be ordered with respect to data.
        void add_client_deferred(int sock);
-       void add_data_deferred(const std::string &stream_id, const char *data, size_t bytes);
+       void add_data_deferred(int stream_index, const char *data, size_t bytes);
 
        // These should not be called while running, since that would violate
        // threading assumptions (ie., that epoll is only called from one thread
        // at the same time).
        CubemapStateProto serialize();
        void add_client_from_serialized(const ClientProto &client);
-       void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding);
-       void add_stream_from_serialized(const StreamProto &stream, int data_fd);
-       void set_backlog_size(const std::string &stream_id, size_t new_size);
-       void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
+       int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding);
+       int add_stream_from_serialized(const StreamProto &stream, int data_fd);
+       int lookup_stream_by_url(const std::string &url) const;
+       void set_backlog_size(int stream_index, size_t new_size);
+       void set_encoding(int stream_index, Stream::Encoding encoding);
 
 private:
        // Mutex protecting queued_add_clients and streams[..]->queued_data.
@@ -80,8 +81,11 @@ private:
        // All variables below this line are protected by the mutex.
        mutable pthread_mutex_t mutex;
 
-       // Map from stream ID to stream.
-       std::map<std::string, Stream *> streams;
+       // All streams.
+       std::vector<Stream *> streams;
+
+       // Map from URL to index into <streams>.
+       std::map<std::string, int> url_map;
 
        // Map from file descriptor to client.
        std::map<int, Client> clients;
@@ -122,9 +126,6 @@ private:
        // the SENDING_ERROR state.
        void construct_error(Client *client, int error_code);
 
-       // TODO: This function should probably die.
-       Stream *find_stream(const std::string &stream_id);
-
        void process_queued_data();
 
        void add_client(int sock);
index e309ab9..fb8ca45 100644 (file)
@@ -61,17 +61,32 @@ void ServerPool::add_client_from_serialized(const ClientProto &client)
        servers[clients_added++ % num_servers].add_client_from_serialized(client);
 }
 
-void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
+int ServerPool::lookup_stream_by_url(const std::string &url) const
 {
+       assert(servers != NULL);
+       return servers[0].lookup_stream_by_url(url);
+}
+
+int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+{
+       int stream_index = -1;
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].add_stream(stream_id, backlog_size, encoding);
+               int stream_index2 = servers[i].add_stream(url, backlog_size, encoding);
+               if (i == 0) {
+                       stream_index = stream_index2;
+               } else {
+                       // Verify that all servers have this under the same stream index.
+                       assert(stream_index == stream_index2);
+               }
        }
+       return stream_index;
 }
 
-void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
+int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
 {
        assert(!data_fds.empty());
        string contents;
+       int stream_index = -1;
        for (int i = 0; i < num_servers; ++i) {
                int data_fd;
                if (i < int(data_fds.size())) {
@@ -87,26 +102,34 @@ void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vec
                        data_fd = make_tempfile(contents);
                }
 
-               servers[i].add_stream_from_serialized(stream, data_fd);
+               int stream_index2 = servers[i].add_stream_from_serialized(stream, data_fd);
+               if (i == 0) {
+                       stream_index = stream_index2;
+               } else {
+                       // Verify that all servers have this under the same stream index.
+                       assert(stream_index == stream_index2);
+               }
        }
 
        // Close and delete any leftovers, if the number of servers was reduced.
        for (size_t i = num_servers; i < data_fds.size(); ++i) {
                safe_close(data_fds[i]);  // Implicitly deletes the file.
        }
+
+       return stream_index;
 }
 
-void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header)
 {
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].set_header(stream_id, http_header, stream_header);
+               servers[i].set_header(stream_index, http_header, stream_header);
        }
 }
 
-void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
+void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
 {
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].add_data_deferred(stream_id, data, bytes);
+               servers[i].add_data_deferred(stream_index, data, bytes);
        }
 }
 
@@ -134,23 +157,23 @@ vector<ClientStats> ServerPool::get_client_stats() const
        return ret;
 }
        
-void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
+void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool)
 {
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].set_mark_pool(stream_id, mark_pool);
+               servers[i].set_mark_pool(stream_index, mark_pool);
        }       
 }
 
-void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
+void ServerPool::set_backlog_size(int stream_index, size_t new_size)
 {
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].set_backlog_size(stream_id, new_size);
+               servers[i].set_backlog_size(stream_index, new_size);
        }       
 }
 
-void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
+void ServerPool::set_encoding(int stream_index, Stream::Encoding encoding)
 {
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].set_encoding(stream_id, encoding);
+               servers[i].set_encoding(stream_index, encoding);
        }       
 }
index 6a2fcb5..5f5f6f8 100644 (file)
@@ -26,24 +26,27 @@ public:
        void add_client(int sock);
        void add_client_from_serialized(const ClientProto &client);
 
-       // Adds the given stream to all the servers.
-       void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding);
-       void add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
+       // Adds the given stream to all the servers. Returns the stream index.
+       int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
+       int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
+
+       // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
+       int lookup_stream_by_url(const std::string &url) const;
 
        // Adds the given data to all the servers.
-       void set_header(const std::string &stream_id,
+       void set_header(int stream_index,
                        const std::string &http_header,
                        const std::string &stream_header);
-       void add_data(const std::string &stream_id, const char *data, size_t bytes);
+       void add_data(int stream_index, const char *data, size_t bytes);
 
        // Connects the given stream to the given mark pool for all the servers.
-       void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
+       void set_mark_pool(int stream_index, MarkPool *mark_pool);
 
        // Changes the given stream's backlog size on all the servers.
-       void set_backlog_size(const std::string &stream_id, size_t new_size);
+       void set_backlog_size(int stream_index, size_t new_size);
 
        // Changes the given stream's encoding type on all the servers.
-       void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
+       void set_encoding(int stream_index, Stream::Encoding encoding);
 
        // Starts all the servers.
        void run();
index 88ad2d8..1776280 100644 (file)
@@ -5,7 +5,7 @@ message ClientProto {
        optional int64 connect_time = 9;
        optional int32 state = 2;
        optional bytes request = 3;
-       optional string stream_id = 4;
+       optional string url = 4;
        optional bytes header_or_error = 5;
        optional int64 header_or_error_bytes_sent = 6;
        optional int64 stream_pos = 7;
@@ -21,7 +21,7 @@ message StreamProto {
        repeated int32 data_fds = 8;
        optional int64 backlog_size = 5 [default=1048576];
        optional int64 bytes_received = 3;
-       optional string stream_id = 4;
+       optional string url = 4;
 
        // Older versions stored the HTTP and video headers together in this field.
        optional bytes header = 1;
index 280f019..6dffa58 100644 (file)
--- a/stats.cpp
+++ b/stats.cpp
@@ -60,7 +60,7 @@ void StatsThread::do_work()
                                client_stats[i].remote_addr.c_str(),
                                client_stats[i].sock,
                                client_stats[i].fwmark,
-                               client_stats[i].stream_id.c_str(),
+                               client_stats[i].url.c_str(),
                                int(now - client_stats[i].connect_time),
                                (long long unsigned)(client_stats[i].bytes_sent),
                                (long long unsigned)(client_stats[i].bytes_lost),
index 37961fa..9922655 100644 (file)
@@ -15,8 +15,8 @@
 
 using namespace std;
 
-Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
-       : stream_id(stream_id),
+Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
+       : url(url),
          encoding(encoding),
          data_fd(make_tempfile("")),
           backlog_size(backlog_size),
@@ -36,7 +36,7 @@ Stream::~Stream()
 }
 
 Stream::Stream(const StreamProto &serialized, int data_fd)
-       : stream_id(serialized.stream_id()),
+       : url(serialized.url()),
          http_header(serialized.http_header()),
          stream_header(serialized.stream_header()),
          encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
@@ -71,7 +71,7 @@ StreamProto Stream::serialize()
        serialized.add_data_fds(data_fd);
        serialized.set_backlog_size(backlog_size);
        serialized.set_bytes_received(bytes_received);
-       serialized.set_stream_id(stream_id);
+       serialized.set_url(url);
        data_fd = -1;
        return serialized;
 }
index 33fb55e..d762869 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -29,7 +29,7 @@ struct Stream {
        // Changes the backlog size, restructuring the data as needed.
        void set_backlog_size(size_t new_size);
 
-       std::string stream_id;
+       std::string url;
 
        // The HTTP response header, without the trailing double newline.
        std::string http_header;
index 07bf63d..db54485 100644 (file)
@@ -67,10 +67,10 @@ void UDPInput::construct_header()
                "Connection: close\r\n";
 }
        
-void UDPInput::add_destination(const string &stream_id)
+void UDPInput::add_destination(int stream_index)
 {
-       stream_ids.push_back(stream_id);
-       servers->set_header(stream_id, http_header, "");
+       stream_indices.push_back(stream_index);
+       servers->set_header(stream_index, http_header, "");
 }
 
 void UDPInput::do_work()
@@ -106,8 +106,8 @@ void UDPInput::do_work()
                        continue;
                }
                
-               for (size_t i = 0; i < stream_ids.size(); ++i) {
-                       servers->add_data(stream_ids[i], buf, ret);
+               for (size_t i = 0; i < stream_indices.size(); ++i) {
+                       servers->add_data(stream_indices[i], buf, ret);
                }
        }
 }
index 481b006..e41266e 100644 (file)
@@ -19,7 +19,7 @@ public:
        virtual std::string get_url() const { return url; }
        virtual void close_socket();
 
-       virtual void add_destination(const std::string &stream_id);
+       virtual void add_destination(int stream_index);
 
 private:
        // Actually gets the packets.
@@ -28,7 +28,7 @@ private:
        // Create the HTTP header.
        void construct_header();
 
-       std::vector<std::string> stream_ids;
+       std::vector<int> stream_indices;
 
        // The URL and its parsed components.
        std::string url;