Server::~Server()
{
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- delete stream_it->second;
+ for (size_t i = 0; i < streams.size(); ++i) {
+ delete streams[i];
}
safe_close(epoll_fd);
process_client(client);
}
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
+ for (size_t i = 0; i < streams.size(); ++i) {
vector<Client *> to_process;
- swap(stream_it->second->to_process, to_process);
+ swap(streams[i]->to_process, to_process);
for (size_t i = 0; i < to_process.size(); ++i) {
process_client(to_process[i]);
}
++client_it) {
serialized.add_clients()->MergeFrom(client_it->second.serialize());
}
- for (map<string, Stream *>::const_iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- serialized.add_streams()->MergeFrom(stream_it->second->serialize());
+ for (size_t i = 0; i < streams.size(); ++i) {
+ serialized.add_streams()->MergeFrom(streams[i]->serialize());
}
return serialized;
}
{
MutexLock lock(&mutex);
Stream *stream;
- map<string, Stream *>::iterator stream_it = streams.find(client.stream_id());
- if (stream_it == streams.end()) {
+ int stream_index = lookup_stream_by_url(client.url());
+ if (stream_index == -1) {
+ assert(client.state() != Client::SENDING_DATA);
stream = NULL;
} else {
- stream = stream_it->second;
+ stream = streams[stream_index];
}
pair<map<int, Client>::iterator, bool> ret =
clients.insert(make_pair(client.sock(), Client(client, stream)));
// the sleeping array again soon.
ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
}
- ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
log_perror("epoll_ctl(EPOLL_CTL_ADD)");
}
}
-void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
+int Server::lookup_stream_by_url(const std::string &url) const
+{
+ map<string, int>::const_iterator url_it = url_map.find(url);
+ if (url_it == url_map.end()) {
+ return -1;
+ }
+ return url_it->second;
+}
+
+int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
+ url_map.insert(make_pair(url, streams.size()));
+ streams.push_back(new Stream(url, backlog_size, encoding));
+ return streams.size() - 1;
}
-void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
+int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
+ url_map.insert(make_pair(stream.url(), streams.size()));
+ streams.push_back(new Stream(stream, data_fd));
+ return streams.size() - 1;
}
-void Server::set_backlog_size(const string &stream_id, size_t new_size)
+void Server::set_backlog_size(int stream_index, size_t new_size)
{
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- streams[stream_id]->set_backlog_size(new_size);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->set_backlog_size(new_size);
}
-void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
+void Server::set_encoding(int stream_index, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- streams[stream_id]->encoding = encoding;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->encoding = encoding;
}
-void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
MutexLock lock(&mutex);
- find_stream(stream_id)->http_header = http_header;
- find_stream(stream_id)->stream_header = stream_header;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->http_header = http_header;
+ streams[stream_index]->stream_header = stream_header;
// If there are clients we haven't sent anything to yet, we should give
// them the header, so push back into the SENDING_HEADER state.
}
}
-void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
{
MutexLock lock(&mutex);
assert(clients.empty());
- find_stream(stream_id)->mark_pool = mark_pool;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->mark_pool = mark_pool;
}
-void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes)
{
MutexLock lock(&queued_data_mutex);
- find_stream(stream_id)->add_data_deferred(data, bytes);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->add_data_deferred(data, bytes);
}
// See the .h file for postconditions after this function.
if (request_tokens[0] != "GET") {
return 400; // Should maybe be 405 instead?
}
- if (streams.count(request_tokens[1]) == 0) {
+
+ map<string, int>::const_iterator url_map_it = url_map.find(request_tokens[1]);
+ if (url_map_it == url_map.end()) {
return 404; // Not found.
}
- client->stream_id = request_tokens[1];
- client->stream = find_stream(client->stream_id);
+ client->url = request_tokens[1];
+ client->stream = streams[url_map_it->second];
if (client->stream->mark_pool != NULL) {
client->fwmark = client->stream->mark_pool->get_mark();
} else {
void Server::construct_header(Client *client)
{
- Stream *stream = find_stream(client->stream_id);
+ Stream *stream = client->stream;
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
client->header_or_error = stream->http_header +
"\r\n" +
clients.erase(client->sock);
}
-Stream *Server::find_stream(const string &stream_id)
-{
- map<string, Stream *>::iterator it = streams.find(stream_id);
- assert(it != streams.end());
- return it->second;
-}
-
void Server::process_queued_data()
{
MutexLock lock(&queued_data_mutex);
add_client(queued_add_clients[i]);
}
queued_add_clients.clear();
-
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- stream_it->second->process_queued_data();
+
+ for (size_t i = 0; i < streams.size(); ++i) {
+ streams[i]->process_queued_data();
}
}