#include "server.h"
#include "state.pb.h"
#include "stream.h"
+#include "util.h"
using namespace std;
Server::~Server()
{
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- delete stream_it->second;
+ for (size_t i = 0; i < streams.size(); ++i) {
+ delete streams[i];
}
- int ret;
- do {
- ret = close(epoll_fd);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close(epoll_fd)");
- }
+ safe_close(epoll_fd);
}
vector<ClientStats> Server::get_client_stats() const
void Server::do_work()
{
- for ( ;; ) {
- int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
- if (nfds == -1 && errno == EINTR) {
- if (should_stop) {
- return;
- }
- continue;
- }
- if (nfds == -1) {
+ while (!should_stop()) {
+ // Wait until there's activity on at least one of the fds,
+ // or 20 ms (about one frame at 50 fps) has elapsed.
+ //
+ // We could in theory wait forever and rely on wakeup()
+ // from add_client_deferred() and add_data_deferred(),
+ // but wakeup is a pretty expensive operation, and the
+ // two threads might end up fighting over a lock, so it's
+ // seemingly (much) more efficient to just have a timeout here.
+ int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS, &sigset_without_usr1_block);
+ if (nfds == -1 && errno != EINTR) {
log_perror("epoll_wait");
exit(1);
}
process_client(client);
}
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
+ for (size_t i = 0; i < streams.size(); ++i) {
vector<Client *> to_process;
- swap(stream_it->second->to_process, to_process);
+ swap(streams[i]->to_process, to_process);
for (size_t i = 0; i < to_process.size(); ++i) {
process_client(to_process[i]);
}
}
-
- if (should_stop) {
- return;
- }
}
}
++client_it) {
serialized.add_clients()->MergeFrom(client_it->second.serialize());
}
- for (map<string, Stream *>::const_iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- serialized.add_streams()->MergeFrom(stream_it->second->serialize());
+ for (size_t i = 0; i < streams.size(); ++i) {
+ serialized.add_streams()->MergeFrom(streams[i]->serialize());
}
return serialized;
}
void Server::add_client(int sock)
{
- clients.insert(make_pair(sock, Client(sock)));
+ pair<map<int, Client>::iterator, bool> ret =
+ clients.insert(make_pair(sock, Client(sock)));
+ assert(ret.second == true); // Should not already exist.
+ Client *client_ptr = &ret.first->second;
// Start listening on data from this socket.
epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
- ev.data.u64 = reinterpret_cast<uint64_t>(&clients[sock]);
+ ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
log_perror("epoll_ctl(EPOLL_CTL_ADD)");
exit(1);
}
- process_client(&clients[sock]);
+ process_client(client_ptr);
}
void Server::add_client_from_serialized(const ClientProto &client)
{
MutexLock lock(&mutex);
Stream *stream;
- map<string, Stream *>::iterator stream_it = streams.find(client.stream_id());
- if (stream_it == streams.end()) {
+ int stream_index = lookup_stream_by_url(client.url());
+ if (stream_index == -1) {
+ assert(client.state() != Client::SENDING_DATA);
stream = NULL;
} else {
- stream = stream_it->second;
+ stream = streams[stream_index];
}
- clients.insert(make_pair(client.sock(), Client(client, stream)));
- Client *client_ptr = &clients[client.sock()];
+ pair<map<int, Client>::iterator, bool> ret =
+ clients.insert(make_pair(client.sock(), Client(client, stream)));
+ assert(ret.second == true); // Should not already exist.
+ Client *client_ptr = &ret.first->second;
// Start listening on data from this socket.
epoll_event ev;
// the sleeping array again soon.
ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
}
- ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
log_perror("epoll_ctl(EPOLL_CTL_ADD)");
}
}
-void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
+int Server::lookup_stream_by_url(const std::string &url) const
+{
+ map<string, int>::const_iterator url_it = url_map.find(url);
+ if (url_it == url_map.end()) {
+ return -1;
+ }
+ return url_it->second;
+}
+
+int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
+ url_map.insert(make_pair(url, streams.size()));
+ streams.push_back(new Stream(url, backlog_size, encoding));
+ return streams.size() - 1;
}
-void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
+int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
+ url_map.insert(make_pair(stream.url(), streams.size()));
+ streams.push_back(new Stream(stream, data_fd));
+ return streams.size() - 1;
}
-void Server::set_backlog_size(const string &stream_id, size_t new_size)
+void Server::set_backlog_size(int stream_index, size_t new_size)
{
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- streams[stream_id]->set_backlog_size(new_size);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->set_backlog_size(new_size);
}
-void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
+void Server::set_encoding(int stream_index, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- streams[stream_id]->encoding = encoding;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->encoding = encoding;
}
-void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
MutexLock lock(&mutex);
- find_stream(stream_id)->http_header = http_header;
- find_stream(stream_id)->stream_header = stream_header;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->http_header = http_header;
+ streams[stream_index]->stream_header = stream_header;
// If there are clients we haven't sent anything to yet, we should give
// them the header, so push back into the SENDING_HEADER state.
}
}
-void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
{
MutexLock lock(&mutex);
assert(clients.empty());
- find_stream(stream_id)->mark_pool = mark_pool;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->mark_pool = mark_pool;
}
-void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes)
{
MutexLock lock(&queued_data_mutex);
- queued_data[stream_id].append(string(data, data + bytes));
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->add_data_deferred(data, bytes);
}
// See the .h file for postconditions after this function.
if (request_tokens[0] != "GET") {
return 400; // Should maybe be 405 instead?
}
- if (streams.count(request_tokens[1]) == 0) {
+
+ map<string, int>::const_iterator url_map_it = url_map.find(request_tokens[1]);
+ if (url_map_it == url_map.end()) {
return 404; // Not found.
}
- client->stream_id = request_tokens[1];
- client->stream = find_stream(client->stream_id);
+ client->url = request_tokens[1];
+ client->stream = streams[url_map_it->second];
if (client->stream->mark_pool != NULL) {
client->fwmark = client->stream->mark_pool->get_mark();
} else {
void Server::construct_header(Client *client)
{
- Stream *stream = find_stream(client->stream_id);
+ Stream *stream = client->stream;
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
client->header_or_error = stream->http_header +
"\r\n" +
access_log->write(client->get_stats());
// Bye-bye!
- int ret;
- do {
- ret = close(client->sock);
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(client->sock);
clients.erase(client->sock);
}
-Stream *Server::find_stream(const string &stream_id)
-{
- map<string, Stream *>::iterator it = streams.find(stream_id);
- assert(it != streams.end());
- return it->second;
-}
-
void Server::process_queued_data()
{
MutexLock lock(&queued_data_mutex);
add_client(queued_add_clients[i]);
}
queued_add_clients.clear();
-
- for (map<string, string>::iterator queued_it = queued_data.begin();
- queued_it != queued_data.end();
- ++queued_it) {
- Stream *stream = find_stream(queued_it->first);
- stream->add_data(queued_it->second.data(), queued_it->second.size());
- stream->wake_up_all_clients();
+
+ for (size_t i = 0; i < streams.size(); ++i) {
+ streams[i]->process_queued_data();
}
- queued_data.clear();
}