X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=88819129d45a4d1a85fdd86f156bc4617b61f716;hp=d8c68fc4f5117df388d9d9c852f351ce92f8d330;hb=3fdf2e48bca3edcb0de00e0dbd0d0aae81ba9aa9;hpb=4fcea4a90506f45311daf49a58b02a3723e6a5ff diff --git a/server.cpp b/server.cpp index d8c68fc..8881912 100644 --- a/server.cpp +++ b/server.cpp @@ -1,20 +1,23 @@ #include #include #include +#include #include #include -#include #include #include #include #include #include +#include +#include #include #include #include #include #include +#include "markpool.h" #include "metacube.h" #include "server.h" #include "mutexlock.h" @@ -25,16 +28,37 @@ using namespace std; Client::Client(int sock) : sock(sock), + fwmark(0), + connect_time(time(NULL)), state(Client::READING_REQUEST), stream(NULL), header_or_error_bytes_sent(0), bytes_sent(0) { request.reserve(1024); + + // Find the remote address, and convert it to ASCII. + sockaddr_in6 addr; + socklen_t addr_len = sizeof(addr); + + if (getpeername(sock, reinterpret_cast(&addr), &addr_len) == -1) { + perror("getpeername"); + remote_addr = ""; + } else { + char buf[INET6_ADDRSTRLEN]; + if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) { + perror("inet_ntop"); + remote_addr = ""; + } else { + remote_addr = buf; + } + } } Client::Client(const ClientProto &serialized, Stream *stream) : sock(serialized.sock()), + remote_addr(serialized.remote_addr()), + connect_time(serialized.connect_time()), state(State(serialized.state())), request(serialized.request()), stream_id(serialized.stream_id()), @@ -43,12 +67,24 @@ Client::Client(const ClientProto &serialized, Stream *stream) header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()), bytes_sent(serialized.bytes_sent()) { + if (stream->mark_pool != NULL) { + fwmark = stream->mark_pool->get_mark(); + } else { + fwmark = 0; // No mark. + } + if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { + if (fwmark != 0) { + perror("setsockopt(SO_MARK)"); + } + } } ClientProto Client::serialize() const { ClientProto serialized; serialized.set_sock(sock); + serialized.set_remote_addr(remote_addr); + serialized.set_connect_time(connect_time); serialized.set_state(state); serialized.set_request(request); serialized.set_stream_id(stream_id); @@ -57,11 +93,22 @@ ClientProto Client::serialize() const serialized.set_bytes_sent(bytes_sent); return serialized; } + +ClientStats Client::get_stats() const +{ + ClientStats stats; + stats.stream_id = stream_id; + stats.remote_addr = remote_addr; + stats.connect_time = connect_time; + stats.bytes_sent = bytes_sent; + return stats; +} Stream::Stream(const string &stream_id) : stream_id(stream_id), data(new char[BACKLOG_SIZE]), - data_size(0) + data_size(0), + mark_pool(NULL) { memset(data, 0, BACKLOG_SIZE); } @@ -75,7 +122,8 @@ Stream::Stream(const StreamProto &serialized) : stream_id(serialized.stream_id()), header(serialized.header()), data(new char[BACKLOG_SIZE]), - data_size(serialized.data_size()) + data_size(serialized.data_size()), + mark_pool(NULL) { assert(serialized.data().size() == BACKLOG_SIZE); memcpy(data, serialized.data().data(), BACKLOG_SIZE); @@ -109,6 +157,7 @@ void Stream::wake_up_all_clients() Server::Server() { pthread_mutex_init(&mutex, NULL); + pthread_mutex_init(&queued_data_mutex, NULL); epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { @@ -129,35 +178,17 @@ Server::~Server() } } -void Server::run() +vector Server::get_client_stats() const { - should_stop = false; - - // Joinable is already the default, but it's good to be certain. - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&worker_thread, &attr, Server::do_work_thunk, this); -} - -void Server::stop() -{ - { - MutexLock lock(&mutex); - should_stop = true; - } + vector ret; - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); + MutexLock lock(&mutex); + for (map::const_iterator client_it = clients.begin(); + client_it != clients.end(); + ++client_it) { + ret.push_back(client_it->second.get_stats()); } -} - -void *Server::do_work_thunk(void *arg) -{ - Server *server = static_cast(arg); - server->do_work(); - return NULL; + return ret; } void Server::do_work() @@ -165,6 +196,9 @@ void Server::do_work() for ( ;; ) { int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS); if (nfds == -1 && errno == EINTR) { + if (should_stop) { + return; + } continue; } if (nfds == -1) { @@ -174,9 +208,7 @@ void Server::do_work() MutexLock lock(&mutex); // We release the mutex between iterations. - if (should_stop) { - return; - } + process_queued_data(); for (int i = 0; i < nfds; ++i) { int fd = events[i].data.fd; @@ -194,17 +226,24 @@ void Server::do_work() for (map::iterator stream_it = streams.begin(); stream_it != streams.end(); ++stream_it) { - Stream *stream = stream_it->second; - for (size_t i = 0; i < stream->to_process.size(); ++i) { - process_client(stream->to_process[i]); + vector to_process; + swap(stream_it->second->to_process, to_process); + for (size_t i = 0; i < to_process.size(); ++i) { + process_client(to_process[i]); } - stream->to_process.clear(); + } + + if (should_stop) { + return; } } } -CubemapStateProto Server::serialize() const +CubemapStateProto Server::serialize() { + // We don't serialize anything queued, so empty the queues. + process_queued_data(); + CubemapStateProto serialized; for (map::const_iterator client_it = clients.begin(); client_it != clients.end(); @@ -219,9 +258,14 @@ CubemapStateProto Server::serialize() const return serialized; } +void Server::add_client_deferred(int sock) +{ + MutexLock lock(&queued_data_mutex); + queued_add_clients.push_back(sock); +} + void Server::add_client(int sock) { - MutexLock lock(&mutex); clients.insert(make_pair(sock, Client(sock))); // Start listening on data from this socket. @@ -298,13 +342,21 @@ void Server::set_header(const string &stream_id, const string &header) } } -void Server::add_data(const string &stream_id, const char *data, size_t bytes) +void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool) { - if (bytes == 0) { - return; - } - MutexLock lock(&mutex); + assert(clients.empty()); + find_stream(stream_id)->mark_pool = mark_pool; +} + +void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) +{ + MutexLock lock(&queued_data_mutex); + queued_data[stream_id].append(string(data, data + bytes)); +} + +void Server::add_data(const string &stream_id, const char *data, size_t bytes) +{ Stream *stream = find_stream(stream_id); size_t pos = stream->data_size % BACKLOG_SIZE; stream->data_size += bytes; @@ -350,34 +402,27 @@ read_request_again: return; } - // Guard against overlong requests gobbling up all of our space. - if (client->request.size() + ret > MAX_CLIENT_REQUEST) { + RequestParseStatus status = wait_for_double_newline(&client->request, buf, ret); + + switch (status) { + case RP_OUT_OF_SPACE: fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock); close_client(client); return; - } - - // See if we have \r\n\r\n anywhere in the request. We start three bytes - // before what we just appended, in case we just got the final character. - size_t existing_req_bytes = client->request.size(); - client->request.append(string(buf, buf + ret)); - - size_t start_at = (existing_req_bytes >= 3 ? existing_req_bytes - 3 : 0); - const char *ptr = reinterpret_cast( - memmem(client->request.data() + start_at, client->request.size() - start_at, - "\r\n\r\n", 4)); - if (ptr == NULL) { + case RP_NOT_FINISHED_YET: // OK, we don't have the entire header yet. Fine; we'll get it later. // See if there's more data for us. goto read_request_again; - } - - if (ptr != client->request.data() + client->request.size() - 4) { + case RP_EXTRA_DATA: fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock); close_client(client); return; + case RP_FINISHED: + break; } + assert(status == RP_FINISHED); + int error_code = parse_request(client); if (error_code == 200) { construct_header(client); @@ -527,6 +572,16 @@ int Server::parse_request(Client *client) client->stream_id = request_tokens[1]; client->stream = find_stream(client->stream_id); + if (client->stream->mark_pool != NULL) { + client->fwmark = client->stream->mark_pool->get_mark(); + } else { + client->fwmark = 0; // No mark. + } + if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) { + if (client->fwmark != 0) { + perror("setsockopt(SO_MARK)"); + } + } client->request.clear(); return 200; // OK! @@ -534,8 +589,7 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - client->header_or_error = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" + - find_stream(client->stream_id)->header; + client->header_or_error = find_stream(client->stream_id)->header; // Switch states. client->state = Client::SENDING_HEADER; @@ -571,6 +625,13 @@ void Server::construct_error(Client *client, int error_code) exit(1); } } + +template +void delete_from(vector *v, T elem) +{ + typename vector::iterator new_end = remove(v->begin(), v->end(), elem); + v->erase(new_end, v->end()); +} void Server::close_client(Client *client) { @@ -581,14 +642,14 @@ void Server::close_client(Client *client) // This client could be sleeping, so we'll need to fix that. (Argh, O(n).) if (client->stream != NULL) { - vector::iterator new_end = - remove(client->stream->sleeping_clients.begin(), - client->stream->sleeping_clients.end(), - client); - client->stream->sleeping_clients.erase( - new_end, client->stream->sleeping_clients.end()); + delete_from(&client->stream->sleeping_clients, client); + delete_from(&client->stream->to_process, client); + if (client->stream->mark_pool != NULL) { + int fwmark = client->fwmark; + client->stream->mark_pool->release_mark(fwmark); + } } - + // Bye-bye! int ret; do { @@ -608,3 +669,20 @@ Stream *Server::find_stream(const string &stream_id) assert(it != streams.end()); return it->second; } + +void Server::process_queued_data() +{ + MutexLock lock(&queued_data_mutex); + + for (size_t i = 0; i < queued_add_clients.size(); ++i) { + add_client(queued_add_clients[i]); + } + queued_add_clients.clear(); + + for (map::iterator queued_it = queued_data.begin(); + queued_it != queued_data.end(); + ++queued_it) { + add_data(queued_it->first, queued_it->second.data(), queued_it->second.size()); + } + queued_data.clear(); +}