X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=c9b942f2c686e19a5d06682eb79075f7e59defad;hp=6e1005e1090ccd851c5690f3f6f4174c6c00f8ab;hb=c5cdeee3a4c99abbf7303b47634b0a748e50bcb4;hpb=5ab36b04b0c12058394335e891398c494df513d2 diff --git a/server.cpp b/server.cpp index 6e1005e..c9b942f 100644 --- a/server.cpp +++ b/server.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -8,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -16,16 +18,19 @@ #include #include +#include "markpool.h" #include "metacube.h" #include "server.h" #include "mutexlock.h" #include "parse.h" +#include "util.h" #include "state.pb.h" using namespace std; Client::Client(int sock) : sock(sock), + fwmark(0), connect_time(time(NULL)), state(Client::READING_REQUEST), stream(NULL), @@ -64,6 +69,16 @@ Client::Client(const ClientProto &serialized, Stream *stream) header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()), bytes_sent(serialized.bytes_sent()) { + if (stream->mark_pool != NULL) { + fwmark = stream->mark_pool->get_mark(); + } else { + fwmark = 0; // No mark. + } + if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { + if (fwmark != 0) { + perror("setsockopt(SO_MARK)"); + } + } } ClientProto Client::serialize() const @@ -93,34 +108,50 @@ ClientStats Client::get_stats() const Stream::Stream(const string &stream_id) : stream_id(stream_id), - data(new char[BACKLOG_SIZE]), - data_size(0) + data_fd(make_tempfile("")), + data_size(0), + mark_pool(NULL) { - memset(data, 0, BACKLOG_SIZE); + if (data_fd == -1) { + exit(1); + } } Stream::~Stream() { - delete[] data; + if (data_fd != -1) { + int ret; + do { + ret = close(data_fd); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + perror("close"); + } + } } Stream::Stream(const StreamProto &serialized) : stream_id(serialized.stream_id()), header(serialized.header()), - data(new char[BACKLOG_SIZE]), - data_size(serialized.data_size()) + data_fd(make_tempfile(serialized.data())), + data_size(serialized.data_size()), + mark_pool(NULL) { - assert(serialized.data().size() == BACKLOG_SIZE); - memcpy(data, serialized.data().data(), BACKLOG_SIZE); + if (data_fd == -1) { + exit(1); + } } -StreamProto Stream::serialize() const +StreamProto Stream::serialize() { StreamProto serialized; serialized.set_header(header); - serialized.set_data(string(data, data + BACKLOG_SIZE)); + if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd. + exit(1); + } serialized.set_data_size(data_size); serialized.set_stream_id(stream_id); + data_fd = -1; return serialized; } @@ -163,31 +194,6 @@ Server::~Server() } } -void Server::run() -{ - should_stop = false; - - // Joinable is already the default, but it's good to be certain. - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&worker_thread, &attr, Server::do_work_thunk, this); -} - -void Server::stop() -{ - { - MutexLock lock(&mutex); - should_stop = true; - } - - pthread_kill(worker_thread, SIGHUP); - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); - } -} - vector Server::get_client_stats() const { vector ret; @@ -201,13 +207,6 @@ vector Server::get_client_stats() const return ret; } -void *Server::do_work_thunk(void *arg) -{ - Server *server = static_cast(arg); - server->do_work(); - return NULL; -} - void Server::do_work() { for ( ;; ) { @@ -358,6 +357,13 @@ void Server::set_header(const string &stream_id, const string &header) } } } + +void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool) +{ + MutexLock lock(&mutex); + assert(clients.empty()); + find_stream(stream_id)->mark_pool = mark_pool; +} void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) { @@ -365,21 +371,47 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t queued_data[stream_id].append(string(data, data + bytes)); } -void Server::add_data(const string &stream_id, const char *data, size_t bytes) +void Server::add_data(const string &stream_id, const char *data, ssize_t bytes) { Stream *stream = find_stream(stream_id); size_t pos = stream->data_size % BACKLOG_SIZE; stream->data_size += bytes; if (pos + bytes > BACKLOG_SIZE) { - size_t to_copy = BACKLOG_SIZE - pos; - memcpy(stream->data + pos, data, to_copy); - data += to_copy; - bytes -= to_copy; + ssize_t to_copy = BACKLOG_SIZE - pos; + while (to_copy > 0) { + int ret = pwrite(stream->data_fd, data, to_copy, pos); + if (ret == -1 && errno == EINTR) { + continue; + } + if (ret == -1) { + perror("pwrite"); + // Dazed and confused, but trying to continue... + break; + } + pos += ret; + data += ret; + to_copy -= ret; + bytes -= ret; + } pos = 0; } - memcpy(stream->data + pos, data, bytes); + while (bytes > 0) { + int ret = pwrite(stream->data_fd, data, bytes, pos); + if (ret == -1 && errno == EINTR) { + continue; + } + if (ret == -1) { + perror("pwrite"); + // Dazed and confused, but trying to continue... + break; + } + pos += ret; + data += ret; + bytes -= ret; + } + stream->wake_up_all_clients(); } @@ -496,6 +528,7 @@ sending_header_or_error_again: return; } case Client::SENDING_DATA: { +sending_data_again: // See if there's some data we've lost. Ideally, we should drop to a block boundary, // but resync will be the mux's problem. Stream *stream = client->stream; @@ -512,27 +545,18 @@ sending_header_or_error_again: } // See if we need to split across the circular buffer. - ssize_t ret; + bool more_data = false; if ((client->bytes_sent % BACKLOG_SIZE) + bytes_to_send > BACKLOG_SIZE) { - size_t bytes_first_part = BACKLOG_SIZE - (client->bytes_sent % BACKLOG_SIZE); - - iovec iov[2]; - iov[0].iov_base = const_cast(stream->data + (client->bytes_sent % BACKLOG_SIZE)); - iov[0].iov_len = bytes_first_part; + bytes_to_send = BACKLOG_SIZE - (client->bytes_sent % BACKLOG_SIZE); + more_data = true; + } - iov[1].iov_base = const_cast(stream->data); - iov[1].iov_len = bytes_to_send - bytes_first_part; + ssize_t ret; + do { + loff_t offset = client->bytes_sent % BACKLOG_SIZE; + ret = sendfile(client->sock, stream->data_fd, &offset, bytes_to_send); + } while (ret == -1 && errno == EINTR); - do { - ret = writev(client->sock, iov, 2); - } while (ret == -1 && errno == EINTR); - } else { - do { - ret = write(client->sock, - stream->data + (client->bytes_sent % BACKLOG_SIZE), - bytes_to_send); - } while (ret == -1 && errno == EINTR); - } if (ret == -1 && errno == EAGAIN) { // We're out of socket space, so return; epoll will wake us up // when there is more room. @@ -541,7 +565,7 @@ sending_header_or_error_again: } if (ret == -1) { // Error, close; postcondition #1. - perror("write/writev"); + perror("sendfile"); close_client(client); return; } @@ -551,9 +575,8 @@ sending_header_or_error_again: // We don't have any more data for this client, so put it to sleep. // This is postcondition #3. stream->put_client_to_sleep(client); - } else { - // XXX: Do we need to go another round here to explicitly - // get the EAGAIN? + } else if (more_data) { + goto sending_data_again; } break; } @@ -582,6 +605,16 @@ int Server::parse_request(Client *client) client->stream_id = request_tokens[1]; client->stream = find_stream(client->stream_id); + if (client->stream->mark_pool != NULL) { + client->fwmark = client->stream->mark_pool->get_mark(); + } else { + client->fwmark = 0; // No mark. + } + if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) { + if (client->fwmark != 0) { + perror("setsockopt(SO_MARK)"); + } + } client->request.clear(); return 200; // OK! @@ -644,8 +677,12 @@ void Server::close_client(Client *client) if (client->stream != NULL) { delete_from(&client->stream->sleeping_clients, client); delete_from(&client->stream->to_process, client); + if (client->stream->mark_pool != NULL) { + int fwmark = client->fwmark; + client->stream->mark_pool->release_mark(fwmark); + } } - + // Bye-bye! int ret; do {