Refer to streams internally mostly by an index, not the stream_id.
[cubemap] / server.cpp
index 2eb33ed..5bd0d55 100644 (file)
@@ -46,10 +46,8 @@ Server::Server()
 
 Server::~Server()
 {
-       for (map<string, Stream *>::iterator stream_it = streams.begin();
-            stream_it != streams.end();
-            ++stream_it) {
-               delete stream_it->second;
+       for (size_t i = 0; i < streams.size(); ++i) {   
+               delete streams[i];
        }
 
        safe_close(epoll_fd);
@@ -100,11 +98,9 @@ void Server::do_work()
                        process_client(client);
                }
 
-               for (map<string, Stream *>::iterator stream_it = streams.begin();
-                    stream_it != streams.end();
-                    ++stream_it) {
+               for (size_t i = 0; i < streams.size(); ++i) {   
                        vector<Client *> to_process;
-                       swap(stream_it->second->to_process, to_process);
+                       swap(streams[i]->to_process, to_process);
                        for (size_t i = 0; i < to_process.size(); ++i) {
                                process_client(to_process[i]);
                        }
@@ -123,10 +119,8 @@ CubemapStateProto Server::serialize()
             ++client_it) {
                serialized.add_clients()->MergeFrom(client_it->second.serialize());
        }
-       for (map<string, Stream *>::const_iterator stream_it = streams.begin();
-            stream_it != streams.end();
-            ++stream_it) {
-               serialized.add_streams()->MergeFrom(stream_it->second->serialize());
+       for (size_t i = 0; i < streams.size(); ++i) {   
+               serialized.add_streams()->MergeFrom(streams[i]->serialize());
        }
        return serialized;
 }
@@ -160,11 +154,12 @@ void Server::add_client_from_serialized(const ClientProto &client)
 {
        MutexLock lock(&mutex);
        Stream *stream;
-       map<string, Stream *>::iterator stream_it = streams.find(client.stream_id());
-       if (stream_it == streams.end()) {
+       int stream_index = lookup_stream_by_url(client.url());
+       if (stream_index == -1) {
+               assert(client.state() != Client::SENDING_DATA);
                stream = NULL;
        } else {
-               stream = stream_it->second;
+               stream = streams[stream_index];
        }
        pair<map<int, Client>::iterator, bool> ret =
                clients.insert(make_pair(client.sock(), Client(client, stream)));
@@ -180,7 +175,6 @@ void Server::add_client_from_serialized(const ClientProto &client)
                // the sleeping array again soon.
                ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
        }
-       ev.data.u64 = 0;  // Keep Valgrind happy.
        ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
                log_perror("epoll_ctl(EPOLL_CTL_ADD)");
@@ -195,37 +189,51 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 }
 
-void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
+int Server::lookup_stream_by_url(const std::string &url) const
+{
+       map<string, int>::const_iterator url_it = url_map.find(url);
+       if (url_it == url_map.end()) {
+               return -1;
+       }
+       return url_it->second;
+}
+
+int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
+       url_map.insert(make_pair(url, streams.size()));
+       streams.push_back(new Stream(url, backlog_size, encoding));
+       return streams.size() - 1;
 }
 
-void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
+int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
+       url_map.insert(make_pair(stream.url(), streams.size()));
+       streams.push_back(new Stream(stream, data_fd));
+       return streams.size() - 1;
 }
        
-void Server::set_backlog_size(const string &stream_id, size_t new_size)
+void Server::set_backlog_size(int stream_index, size_t new_size)
 {
        MutexLock lock(&mutex);
-       assert(streams.count(stream_id) != 0);
-       streams[stream_id]->set_backlog_size(new_size);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->set_backlog_size(new_size);
 }
        
-void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
+void Server::set_encoding(int stream_index, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       assert(streams.count(stream_id) != 0);
-       streams[stream_id]->encoding = encoding;
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->encoding = encoding;
 }
        
-void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
 {
        MutexLock lock(&mutex);
-       find_stream(stream_id)->http_header = http_header;
-       find_stream(stream_id)->stream_header = stream_header;
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->http_header = http_header;
+       streams[stream_index]->stream_header = stream_header;
 
        // If there are clients we haven't sent anything to yet, we should give
        // them the header, so push back into the SENDING_HEADER state.
@@ -240,17 +248,19 @@ void Server::set_header(const string &stream_id, const string &http_header, cons
        }
 }
        
-void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
 {
        MutexLock lock(&mutex);
        assert(clients.empty());
-       find_stream(stream_id)->mark_pool = mark_pool;
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->mark_pool = mark_pool;
 }
 
-void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes)
 {
        MutexLock lock(&queued_data_mutex);
-       find_stream(stream_id)->add_data_deferred(data, bytes);
+       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+       streams[stream_index]->add_data_deferred(data, bytes);
 }
 
 // See the .h file for postconditions after this function.     
@@ -444,12 +454,14 @@ int Server::parse_request(Client *client)
        if (request_tokens[0] != "GET") {
                return 400;  // Should maybe be 405 instead?
        }
-       if (streams.count(request_tokens[1]) == 0) {
+
+       map<string, int>::const_iterator url_map_it = url_map.find(request_tokens[1]);
+       if (url_map_it == url_map.end()) {
                return 404;  // Not found.
        }
 
-       client->stream_id = request_tokens[1];
-       client->stream = find_stream(client->stream_id);
+       client->url = request_tokens[1];
+       client->stream = streams[url_map_it->second];
        if (client->stream->mark_pool != NULL) {
                client->fwmark = client->stream->mark_pool->get_mark();
        } else {
@@ -467,7 +479,7 @@ int Server::parse_request(Client *client)
 
 void Server::construct_header(Client *client)
 {
-       Stream *stream = find_stream(client->stream_id);
+       Stream *stream = client->stream;
        if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
                client->header_or_error = stream->http_header +
                        "\r\n" +
@@ -555,13 +567,6 @@ void Server::close_client(Client *client)
        clients.erase(client->sock);
 }
        
-Stream *Server::find_stream(const string &stream_id)
-{
-       map<string, Stream *>::iterator it = streams.find(stream_id);
-       assert(it != streams.end());
-       return it->second;
-}
-
 void Server::process_queued_data()
 {
        MutexLock lock(&queued_data_mutex);
@@ -570,10 +575,8 @@ void Server::process_queued_data()
                add_client(queued_add_clients[i]);
        }
        queued_add_clients.clear();     
-       
-       for (map<string, Stream *>::iterator stream_it = streams.begin();
-            stream_it != streams.end();
-            ++stream_it) {
-               stream_it->second->process_queued_data();
+
+       for (size_t i = 0; i < streams.size(); ++i) {   
+               streams[i]->process_queued_data();
        }
 }