X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=server.cpp;h=a0a32f8e704a716cad99563cd75b4bc47cfeb41b;hb=ba03a3ed19cc8b02d4bb198edfd1c8bee9f28e35;hp=46bda5b21935ff36d0aca3be838885c5e4b7b7ba;hpb=8cc780cf37063ce29f13380976a54dd8302fe3a9;p=cubemap diff --git a/server.cpp b/server.cpp index 46bda5b..a0a32f8 100644 --- a/server.cpp +++ b/server.cpp @@ -1,8 +1,11 @@ #include #include +#include #include +#include #include #include +#include #include #include #include @@ -14,16 +17,21 @@ #include #include +#include "accesslog.h" #include "log.h" #include "markpool.h" +#include "metacube.h" #include "mutexlock.h" #include "parse.h" #include "server.h" #include "state.pb.h" #include "stream.h" +#include "util.h" using namespace std; +extern AccessLogThread *access_log; + Server::Server() { pthread_mutex_init(&mutex, NULL); @@ -38,14 +46,13 @@ Server::Server() Server::~Server() { - int ret; - do { - ret = close(epoll_fd); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close(epoll_fd)"); + for (map::iterator stream_it = streams.begin(); + stream_it != streams.end(); + ++stream_it) { + delete stream_it->second; } + + safe_close(epoll_fd); } vector Server::get_client_stats() const @@ -63,15 +70,11 @@ vector Server::get_client_stats() const void Server::do_work() { - for ( ;; ) { - int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS); - if (nfds == -1 && errno == EINTR) { - if (should_stop) { - return; - } - continue; - } - if (nfds == -1) { + while (!should_stop()) { + // Wait until there's activity on at least one of the fds, + // or we are waken up due to new queued clients or data. + int nfds = epoll_pwait(epoll_fd, events, EPOLL_MAX_EVENTS, -1, &sigset_without_usr1_block); + if (nfds == -1 && errno != EINTR) { log_perror("epoll_wait"); exit(1); } @@ -100,10 +103,6 @@ void Server::do_work() process_client(to_process[i]); } } - - if (should_stop) { - return; - } } } @@ -130,22 +129,26 @@ void Server::add_client_deferred(int sock) { MutexLock lock(&queued_data_mutex); queued_add_clients.push_back(sock); + wakeup(); } void Server::add_client(int sock) { - clients.insert(make_pair(sock, Client(sock))); + pair::iterator, bool> ret = + clients.insert(make_pair(sock, Client(sock))); + assert(ret.second == true); // Should not already exist. + Client *client_ptr = &ret.first->second; // Start listening on data from this socket. epoll_event ev; ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; - ev.data.u64 = reinterpret_cast(&clients[sock]); + ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) { log_perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } - process_client(&clients[sock]); + process_client(client_ptr); } void Server::add_client_from_serialized(const ClientProto &client) @@ -158,8 +161,10 @@ void Server::add_client_from_serialized(const ClientProto &client) } else { stream = stream_it->second; } - clients.insert(make_pair(client.sock(), Client(client, stream))); - Client *client_ptr = &clients[client.sock()]; + pair::iterator, bool> ret = + clients.insert(make_pair(client.sock(), Client(client, stream))); + assert(ret.second == true); // Should not already exist. + Client *client_ptr = &ret.first->second; // Start listening on data from this socket. epoll_event ev; @@ -185,29 +190,37 @@ void Server::add_client_from_serialized(const ClientProto &client) } } -void Server::add_stream(const string &stream_id, size_t backlog_size) +void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding) { MutexLock lock(&mutex); - streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size))); + streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding))); } -void Server::add_stream_from_serialized(const StreamProto &stream) +void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) { MutexLock lock(&mutex); - streams.insert(make_pair(stream.stream_id(), new Stream(stream))); + streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd))); } -void Server::set_backlog_size(const std::string &stream_id, size_t new_size) +void Server::set_backlog_size(const string &stream_id, size_t new_size) { MutexLock lock(&mutex); assert(streams.count(stream_id) != 0); streams[stream_id]->set_backlog_size(new_size); } -void Server::set_header(const string &stream_id, const string &header) +void Server::set_encoding(const string &stream_id, Stream::Encoding encoding) +{ + MutexLock lock(&mutex); + assert(streams.count(stream_id) != 0); + streams[stream_id]->encoding = encoding; +} + +void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header) { MutexLock lock(&mutex); - find_stream(stream_id)->header = header; + find_stream(stream_id)->http_header = http_header; + find_stream(stream_id)->stream_header = stream_header; // If there are clients we haven't sent anything to yet, we should give // them the header, so push back into the SENDING_HEADER state. @@ -222,7 +235,7 @@ void Server::set_header(const string &stream_id, const string &header) } } -void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool) +void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool) { MutexLock lock(&mutex); assert(clients.empty()); @@ -233,6 +246,7 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t { MutexLock lock(&queued_data_mutex); queued_data[stream_id].append(string(data, data + bytes)); + wakeup(); } // See the .h file for postconditions after this function. @@ -268,7 +282,7 @@ read_request_again: switch (status) { case RP_OUT_OF_SPACE: - log(WARNING, "fd %d sent overlong request!", client->sock); + log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str()); close_client(client); return; case RP_NOT_FINISHED_YET: @@ -276,7 +290,7 @@ read_request_again: // See if there's more data for us. goto read_request_again; case RP_EXTRA_DATA: - log(WARNING, "fd %d had junk data after request!", client->sock); + log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str()); close_client(client); return; case RP_FINISHED: @@ -357,13 +371,17 @@ sending_data_again: return; } if (bytes_to_send > stream->backlog_size) { - log(WARNING, "fd %d lost %lld bytes, maybe too slow connection", - client->sock, - (long long int)(bytes_to_send - stream->backlog_size)); + size_t bytes_lost = bytes_to_send - stream->backlog_size; client->stream_pos = stream->bytes_received - stream->backlog_size; - client->bytes_lost += bytes_to_send - stream->backlog_size; + client->bytes_lost += bytes_lost; ++client->num_loss_events; bytes_to_send = stream->backlog_size; + + double loss_fraction = double(client->bytes_lost) / double(client->bytes_lost + client->bytes_sent); + log(WARNING, "[%s] Client lost %lld bytes (total loss: %.2f%%), maybe too slow connection", + client->remote_addr.c_str(), + (long long int)(bytes_lost), + 100.0 * loss_fraction); } // See if we need to split across the circular buffer. @@ -445,7 +463,27 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - client->header_or_error = find_stream(client->stream_id)->header; + Stream *stream = find_stream(client->stream_id); + if (stream->encoding == Stream::STREAM_ENCODING_RAW) { + client->header_or_error = stream->http_header + + "\r\n" + + stream->stream_header; + } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) { + client->header_or_error = stream->http_header + + "Content-encoding: metacube\r\n" + + "\r\n"; + if (!stream->stream_header.empty()) { + metacube_block_header hdr; + memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(stream->stream_header.size()); + hdr.flags = htonl(METACUBE_FLAGS_HEADER); + client->header_or_error.append( + string(reinterpret_cast(&hdr), sizeof(hdr))); + } + client->header_or_error.append(stream->stream_header); + } else { + assert(false); + } // Switch states. client->state = Client::SENDING_HEADER; @@ -504,15 +542,11 @@ void Server::close_client(Client *client) } } - // Bye-bye! - int ret; - do { - ret = close(client->sock); - } while (ret == -1 && errno == EINTR); + // Log to access_log. + access_log->write(client->get_stats()); - if (ret == -1) { - log_perror("close"); - } + // Bye-bye! + safe_close(client->sock); clients.erase(client->sock); }