X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=24aa84e357a129db9fcea63b43cec986b114407a;hp=0c36e49363178f1ccd0a4955ef119a5ea6ebe9bd;hb=9abaf929b9da3158e9b7f799775ac56e88d28740;hpb=f51b3892514540ff3f08ab052296091f3a6f7a93 diff --git a/server.cpp b/server.cpp index 0c36e49..24aa84e 100644 --- a/server.cpp +++ b/server.cpp @@ -1,177 +1,31 @@ -#include -#include -#include -#include #include -#include -#include +#include #include -#include -#include +#include +#include #include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include #include +#include +#include +#include +#include +#include "accesslog.h" +#include "log.h" #include "markpool.h" -#include "metacube.h" -#include "server.h" #include "mutexlock.h" #include "parse.h" -#include "util.h" +#include "server.h" #include "state.pb.h" +#include "stream.h" using namespace std; -Client::Client(int sock) - : sock(sock), - fwmark(0), - connect_time(time(NULL)), - state(Client::READING_REQUEST), - stream(NULL), - header_or_error_bytes_sent(0), - bytes_sent(0) -{ - request.reserve(1024); - - // Find the remote address, and convert it to ASCII. - sockaddr_in6 addr; - socklen_t addr_len = sizeof(addr); - - if (getpeername(sock, reinterpret_cast(&addr), &addr_len) == -1) { - perror("getpeername"); - remote_addr = ""; - } else { - char buf[INET6_ADDRSTRLEN]; - if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) { - perror("inet_ntop"); - remote_addr = ""; - } else { - remote_addr = buf; - } - } -} - -Client::Client(const ClientProto &serialized, Stream *stream) - : sock(serialized.sock()), - remote_addr(serialized.remote_addr()), - connect_time(serialized.connect_time()), - state(State(serialized.state())), - request(serialized.request()), - stream_id(serialized.stream_id()), - stream(stream), - header_or_error(serialized.header_or_error()), - header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()), - bytes_sent(serialized.bytes_sent()) -{ - if (stream->mark_pool != NULL) { - fwmark = stream->mark_pool->get_mark(); - } else { - fwmark = 0; // No mark. - } - if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { - if (fwmark != 0) { - perror("setsockopt(SO_MARK)"); - } - } -} - -ClientProto Client::serialize() const -{ - ClientProto serialized; - serialized.set_sock(sock); - serialized.set_remote_addr(remote_addr); - serialized.set_connect_time(connect_time); - serialized.set_state(state); - serialized.set_request(request); - serialized.set_stream_id(stream_id); - serialized.set_header_or_error(header_or_error); - serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()); - serialized.set_bytes_sent(bytes_sent); - return serialized; -} - -ClientStats Client::get_stats() const -{ - ClientStats stats; - stats.stream_id = stream_id; - stats.remote_addr = remote_addr; - stats.connect_time = connect_time; - stats.bytes_sent = bytes_sent; - return stats; -} - -Stream::Stream(const string &stream_id, size_t backlog_size) - : stream_id(stream_id), - data_fd(make_tempfile("")), - backlog_size(backlog_size), - bytes_received(0), - mark_pool(NULL) -{ - if (data_fd == -1) { - exit(1); - } -} - -Stream::~Stream() -{ - if (data_fd != -1) { - int ret; - do { - ret = close(data_fd); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - perror("close"); - } - } -} - -Stream::Stream(const StreamProto &serialized) - : stream_id(serialized.stream_id()), - header(serialized.header()), - data_fd(make_tempfile(serialized.data())), - backlog_size(serialized.backlog_size()), - bytes_received(serialized.bytes_received()), - mark_pool(NULL) -{ - if (data_fd == -1) { - exit(1); - } -} - -StreamProto Stream::serialize() -{ - StreamProto serialized; - serialized.set_header(header); - if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd. - exit(1); - } - serialized.set_backlog_size(backlog_size); - serialized.set_bytes_received(bytes_received); - serialized.set_stream_id(stream_id); - data_fd = -1; - return serialized; -} - -void Stream::put_client_to_sleep(Client *client) -{ - sleeping_clients.push_back(client); -} - -void Stream::wake_up_all_clients() -{ - if (to_process.empty()) { - swap(sleeping_clients, to_process); - } else { - to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end()); - sleeping_clients.clear(); - } -} +extern AccessLogThread *access_log; Server::Server() { @@ -180,7 +34,7 @@ Server::Server() epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { - perror("epoll_fd"); + log_perror("epoll_fd"); exit(1); } } @@ -193,7 +47,7 @@ Server::~Server() } while (ret == -1 && errno == EINTR); if (ret == -1) { - perror("close(epoll_fd)"); + log_perror("close(epoll_fd)"); } } @@ -221,7 +75,7 @@ void Server::do_work() continue; } if (nfds == -1) { - perror("epoll_wait"); + log_perror("epoll_wait"); exit(1); } @@ -230,9 +84,7 @@ void Server::do_work() process_queued_data(); for (int i = 0; i < nfds; ++i) { - int fd = events[i].data.fd; - assert(clients.count(fd) != 0); - Client *client = &clients[fd]; + Client *client = reinterpret_cast(events[i].data.u64); if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) { close_client(client); @@ -290,10 +142,9 @@ void Server::add_client(int sock) // Start listening on data from this socket. epoll_event ev; ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; - ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = sock; + ev.data.u64 = reinterpret_cast(&clients[sock]); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_ADD)"); + log_perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } @@ -303,7 +154,13 @@ void Server::add_client(int sock) void Server::add_client_from_serialized(const ClientProto &client) { MutexLock lock(&mutex); - Stream *stream = find_stream(client.stream_id()); + Stream *stream; + map::iterator stream_it = streams.find(client.stream_id()); + if (stream_it == streams.end()) { + stream = NULL; + } else { + stream = stream_it->second; + } clients.insert(make_pair(client.sock(), Client(client, stream))); Client *client_ptr = &clients[client.sock()]; @@ -317,14 +174,14 @@ void Server::add_client_from_serialized(const ClientProto &client) ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; } ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = client.sock(); + ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_ADD)"); + log_perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } if (client_ptr->state == Client::SENDING_DATA && - client_ptr->bytes_sent == client_ptr->stream->bytes_received) { + client_ptr->stream_pos == client_ptr->stream->bytes_received) { client_ptr->stream->put_client_to_sleep(client_ptr); } else { process_client(client_ptr); @@ -343,6 +200,13 @@ void Server::add_stream_from_serialized(const StreamProto &stream) streams.insert(make_pair(stream.stream_id(), new Stream(stream))); } +void Server::set_backlog_size(const std::string &stream_id, size_t new_size) +{ + MutexLock lock(&mutex); + assert(streams.count(stream_id) != 0); + streams[stream_id]->set_backlog_size(new_size); +} + void Server::set_header(const string &stream_id, const string &header) { MutexLock lock(&mutex); @@ -355,7 +219,7 @@ void Server::set_header(const string &stream_id, const string &header) ++client_it) { Client *client = &client_it->second; if (client->state == Client::SENDING_DATA && - client->bytes_sent == 0) { + client->stream_pos == 0) { construct_header(client); } } @@ -374,50 +238,6 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t queued_data[stream_id].append(string(data, data + bytes)); } -void Server::add_data(const string &stream_id, const char *data, ssize_t bytes) -{ - Stream *stream = find_stream(stream_id); - size_t pos = stream->bytes_received % stream->backlog_size; - stream->bytes_received += bytes; - - if (pos + bytes > stream->backlog_size) { - ssize_t to_copy = stream->backlog_size - pos; - while (to_copy > 0) { - int ret = pwrite(stream->data_fd, data, to_copy, pos); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret == -1) { - perror("pwrite"); - // Dazed and confused, but trying to continue... - break; - } - pos += ret; - data += ret; - to_copy -= ret; - bytes -= ret; - } - pos = 0; - } - - while (bytes > 0) { - int ret = pwrite(stream->data_fd, data, bytes, pos); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret == -1) { - perror("pwrite"); - // Dazed and confused, but trying to continue... - break; - } - pos += ret; - data += ret; - bytes -= ret; - } - - stream->wake_up_all_clients(); -} - // See the .h file for postconditions after this function. void Server::process_client(Client *client) { @@ -437,7 +257,7 @@ read_request_again: return; } if (ret == -1) { - perror("read"); + log_perror("read"); close_client(client); return; } @@ -451,7 +271,7 @@ read_request_again: switch (status) { case RP_OUT_OF_SPACE: - fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock); + log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str()); close_client(client); return; case RP_NOT_FINISHED_YET: @@ -459,7 +279,7 @@ read_request_again: // See if there's more data for us. goto read_request_again; case RP_EXTRA_DATA: - fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock); + log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str()); close_client(client); return; case RP_FINISHED: @@ -499,7 +319,7 @@ sending_header_or_error_again: if (ret == -1) { // Error! Postcondition #1. - perror("write"); + log_perror("write"); close_client(client); return; } @@ -526,7 +346,7 @@ sending_header_or_error_again: // but we'll start sending immediately as we get data. // This is postcondition #3. client->state = Client::SENDING_DATA; - client->bytes_sent = client->stream->bytes_received; + client->stream_pos = client->stream->bytes_received; client->stream->put_client_to_sleep(client); return; } @@ -535,28 +355,30 @@ sending_data_again: // See if there's some data we've lost. Ideally, we should drop to a block boundary, // but resync will be the mux's problem. Stream *stream = client->stream; - size_t bytes_to_send = stream->bytes_received - client->bytes_sent; + size_t bytes_to_send = stream->bytes_received - client->stream_pos; if (bytes_to_send == 0) { return; } if (bytes_to_send > stream->backlog_size) { - fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n", - client->sock, + log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection", + client->remote_addr.c_str(), (long long int)(bytes_to_send - stream->backlog_size)); - client->bytes_sent = stream->bytes_received - stream->backlog_size; + client->stream_pos = stream->bytes_received - stream->backlog_size; + client->bytes_lost += bytes_to_send - stream->backlog_size; + ++client->num_loss_events; bytes_to_send = stream->backlog_size; } // See if we need to split across the circular buffer. bool more_data = false; - if ((client->bytes_sent % stream->backlog_size) + bytes_to_send > stream->backlog_size) { - bytes_to_send = stream->backlog_size - (client->bytes_sent % stream->backlog_size); + if ((client->stream_pos % stream->backlog_size) + bytes_to_send > stream->backlog_size) { + bytes_to_send = stream->backlog_size - (client->stream_pos % stream->backlog_size); more_data = true; } ssize_t ret; do { - loff_t offset = client->bytes_sent % stream->backlog_size; + loff_t offset = client->stream_pos % stream->backlog_size; ret = sendfile(client->sock, stream->data_fd, &offset, bytes_to_send); } while (ret == -1 && errno == EINTR); @@ -568,17 +390,18 @@ sending_data_again: } if (ret == -1) { // Error, close; postcondition #1. - perror("sendfile"); + log_perror("sendfile"); close_client(client); return; } + client->stream_pos += ret; client->bytes_sent += ret; - if (client->bytes_sent == stream->bytes_received) { + if (client->stream_pos == stream->bytes_received) { // We don't have any more data for this client, so put it to sleep. // This is postcondition #3. stream->put_client_to_sleep(client); - } else if (more_data) { + } else if (more_data && size_t(ret) == bytes_to_send) { goto sending_data_again; } break; @@ -615,7 +438,7 @@ int Server::parse_request(Client *client) } if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) { if (client->fwmark != 0) { - perror("setsockopt(SO_MARK)"); + log_perror("setsockopt(SO_MARK)"); } } client->request.clear(); @@ -632,11 +455,10 @@ void Server::construct_header(Client *client) epoll_event ev; ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; - ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = client->sock; + ev.data.u64 = reinterpret_cast(client); if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_MOD)"); + log_perror("epoll_ctl(EPOLL_CTL_MOD)"); exit(1); } } @@ -653,11 +475,10 @@ void Server::construct_error(Client *client, int error_code) epoll_event ev; ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; - ev.data.u64 = 0; // Keep Valgrind happy. - ev.data.fd = client->sock; + ev.data.u64 = reinterpret_cast(client); if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_MOD)"); + log_perror("epoll_ctl(EPOLL_CTL_MOD)"); exit(1); } } @@ -672,7 +493,7 @@ void delete_from(vector *v, T elem) void Server::close_client(Client *client) { if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) { - perror("epoll_ctl(EPOLL_CTL_DEL)"); + log_perror("epoll_ctl(EPOLL_CTL_DEL)"); exit(1); } @@ -686,6 +507,9 @@ void Server::close_client(Client *client) } } + // Log to access_log. + access_log->write(client->get_stats()); + // Bye-bye! int ret; do { @@ -693,7 +517,7 @@ void Server::close_client(Client *client) } while (ret == -1 && errno == EINTR); if (ret == -1) { - perror("close"); + log_perror("close"); } clients.erase(client->sock); @@ -718,7 +542,9 @@ void Server::process_queued_data() for (map::iterator queued_it = queued_data.begin(); queued_it != queued_data.end(); ++queued_it) { - add_data(queued_it->first, queued_it->second.data(), queued_it->second.size()); + Stream *stream = find_stream(queued_it->first); + stream->add_data(queued_it->second.data(), queued_it->second.size()); + stream->wake_up_all_clients(); } queued_data.clear(); }