X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=5bd0d555c18194ca112a04b21cb3d9344272722e;hp=2eb33ed9edcac0ff793f779a7fec21db4cc6b058;hb=9abb89bcf7940e2ada9d708f86a218a56334f68d;hpb=ef7f588a9b7a63ba2153cd06ce5322db4453fa16 diff --git a/server.cpp b/server.cpp index 2eb33ed..5bd0d55 100644 --- a/server.cpp +++ b/server.cpp @@ -46,10 +46,8 @@ Server::Server() Server::~Server() { - for (map::iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { - delete stream_it->second; + for (size_t i = 0; i < streams.size(); ++i) { + delete streams[i]; } safe_close(epoll_fd); @@ -100,11 +98,9 @@ void Server::do_work() process_client(client); } - for (map::iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { + for (size_t i = 0; i < streams.size(); ++i) { vector to_process; - swap(stream_it->second->to_process, to_process); + swap(streams[i]->to_process, to_process); for (size_t i = 0; i < to_process.size(); ++i) { process_client(to_process[i]); } @@ -123,10 +119,8 @@ CubemapStateProto Server::serialize() ++client_it) { serialized.add_clients()->MergeFrom(client_it->second.serialize()); } - for (map::const_iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { - serialized.add_streams()->MergeFrom(stream_it->second->serialize()); + for (size_t i = 0; i < streams.size(); ++i) { + serialized.add_streams()->MergeFrom(streams[i]->serialize()); } return serialized; } @@ -160,11 +154,12 @@ void Server::add_client_from_serialized(const ClientProto &client) { MutexLock lock(&mutex); Stream *stream; - map::iterator stream_it = streams.find(client.stream_id()); - if (stream_it == streams.end()) { + int stream_index = lookup_stream_by_url(client.url()); + if (stream_index == -1) { + assert(client.state() != Client::SENDING_DATA); stream = NULL; } else { - stream = stream_it->second; + stream = streams[stream_index]; } pair::iterator, bool> ret = clients.insert(make_pair(client.sock(), Client(client, stream))); @@ -180,7 +175,6 @@ void Server::add_client_from_serialized(const ClientProto &client) // the sleeping array again soon. ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; } - ev.data.u64 = 0; // Keep Valgrind happy. ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) { log_perror("epoll_ctl(EPOLL_CTL_ADD)"); @@ -195,37 +189,51 @@ void Server::add_client_from_serialized(const ClientProto &client) } } -void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding) +int Server::lookup_stream_by_url(const std::string &url) const +{ + map::const_iterator url_it = url_map.find(url); + if (url_it == url_map.end()) { + return -1; + } + return url_it->second; +} + +int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding) { MutexLock lock(&mutex); - streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding))); + url_map.insert(make_pair(url, streams.size())); + streams.push_back(new Stream(url, backlog_size, encoding)); + return streams.size() - 1; } -void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) +int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) { MutexLock lock(&mutex); - streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd))); + url_map.insert(make_pair(stream.url(), streams.size())); + streams.push_back(new Stream(stream, data_fd)); + return streams.size() - 1; } -void Server::set_backlog_size(const string &stream_id, size_t new_size) +void Server::set_backlog_size(int stream_index, size_t new_size) { MutexLock lock(&mutex); - assert(streams.count(stream_id) != 0); - streams[stream_id]->set_backlog_size(new_size); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->set_backlog_size(new_size); } -void Server::set_encoding(const string &stream_id, Stream::Encoding encoding) +void Server::set_encoding(int stream_index, Stream::Encoding encoding) { MutexLock lock(&mutex); - assert(streams.count(stream_id) != 0); - streams[stream_id]->encoding = encoding; + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->encoding = encoding; } -void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header) +void Server::set_header(int stream_index, const string &http_header, const string &stream_header) { MutexLock lock(&mutex); - find_stream(stream_id)->http_header = http_header; - find_stream(stream_id)->stream_header = stream_header; + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->http_header = http_header; + streams[stream_index]->stream_header = stream_header; // If there are clients we haven't sent anything to yet, we should give // them the header, so push back into the SENDING_HEADER state. @@ -240,17 +248,19 @@ void Server::set_header(const string &stream_id, const string &http_header, cons } } -void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool) +void Server::set_mark_pool(int stream_index, MarkPool *mark_pool) { MutexLock lock(&mutex); assert(clients.empty()); - find_stream(stream_id)->mark_pool = mark_pool; + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->mark_pool = mark_pool; } -void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) +void Server::add_data_deferred(int stream_index, const char *data, size_t bytes) { MutexLock lock(&queued_data_mutex); - find_stream(stream_id)->add_data_deferred(data, bytes); + assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); + streams[stream_index]->add_data_deferred(data, bytes); } // See the .h file for postconditions after this function. @@ -444,12 +454,14 @@ int Server::parse_request(Client *client) if (request_tokens[0] != "GET") { return 400; // Should maybe be 405 instead? } - if (streams.count(request_tokens[1]) == 0) { + + map::const_iterator url_map_it = url_map.find(request_tokens[1]); + if (url_map_it == url_map.end()) { return 404; // Not found. } - client->stream_id = request_tokens[1]; - client->stream = find_stream(client->stream_id); + client->url = request_tokens[1]; + client->stream = streams[url_map_it->second]; if (client->stream->mark_pool != NULL) { client->fwmark = client->stream->mark_pool->get_mark(); } else { @@ -467,7 +479,7 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - Stream *stream = find_stream(client->stream_id); + Stream *stream = client->stream; if (stream->encoding == Stream::STREAM_ENCODING_RAW) { client->header_or_error = stream->http_header + "\r\n" + @@ -555,13 +567,6 @@ void Server::close_client(Client *client) clients.erase(client->sock); } -Stream *Server::find_stream(const string &stream_id) -{ - map::iterator it = streams.find(stream_id); - assert(it != streams.end()); - return it->second; -} - void Server::process_queued_data() { MutexLock lock(&queued_data_mutex); @@ -570,10 +575,8 @@ void Server::process_queued_data() add_client(queued_add_clients[i]); } queued_add_clients.clear(); - - for (map::iterator stream_it = streams.begin(); - stream_it != streams.end(); - ++stream_it) { - stream_it->second->process_queued_data(); + + for (size_t i = 0; i < streams.size(); ++i) { + streams[i]->process_queued_data(); } }