X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=e91ba395ed6e2efa3c106fabf98ffa9756e159ef;hp=d8c68fc4f5117df388d9d9c852f351ce92f8d330;hb=9b565a9e6e66f076abb7266b2c2f015f585fa9cb;hpb=4fcea4a90506f45311daf49a58b02a3723e6a5ff diff --git a/server.cpp b/server.cpp index d8c68fc..e91ba39 100644 --- a/server.cpp +++ b/server.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -109,6 +108,7 @@ void Stream::wake_up_all_clients() Server::Server() { pthread_mutex_init(&mutex, NULL); + pthread_mutex_init(&queued_data_mutex, NULL); epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { @@ -178,6 +178,8 @@ void Server::do_work() return; } + process_queued_data(); + for (int i = 0; i < nfds; ++i) { int fd = events[i].data.fd; assert(clients.count(fd) != 0); @@ -194,17 +196,20 @@ void Server::do_work() for (map::iterator stream_it = streams.begin(); stream_it != streams.end(); ++stream_it) { - Stream *stream = stream_it->second; - for (size_t i = 0; i < stream->to_process.size(); ++i) { - process_client(stream->to_process[i]); + vector to_process; + swap(stream_it->second->to_process, to_process); + for (size_t i = 0; i < to_process.size(); ++i) { + process_client(to_process[i]); } - stream->to_process.clear(); } } } -CubemapStateProto Server::serialize() const +CubemapStateProto Server::serialize() { + // We don't serialize anything queued, so empty the queues. + process_queued_data(); + CubemapStateProto serialized; for (map::const_iterator client_it = clients.begin(); client_it != clients.end(); @@ -219,9 +224,14 @@ CubemapStateProto Server::serialize() const return serialized; } +void Server::add_client_deferred(int sock) +{ + MutexLock lock(&queued_data_mutex); + queued_add_clients.push_back(sock); +} + void Server::add_client(int sock) { - MutexLock lock(&mutex); clients.insert(make_pair(sock, Client(sock))); // Start listening on data from this socket. @@ -297,14 +307,15 @@ void Server::set_header(const string &stream_id, const string &header) } } } - -void Server::add_data(const string &stream_id, const char *data, size_t bytes) + +void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes) { - if (bytes == 0) { - return; - } + MutexLock lock(&queued_data_mutex); + queued_data[stream_id].append(string(data, data + bytes)); +} - MutexLock lock(&mutex); +void Server::add_data(const string &stream_id, const char *data, size_t bytes) +{ Stream *stream = find_stream(stream_id); size_t pos = stream->data_size % BACKLOG_SIZE; stream->data_size += bytes; @@ -350,34 +361,27 @@ read_request_again: return; } - // Guard against overlong requests gobbling up all of our space. - if (client->request.size() + ret > MAX_CLIENT_REQUEST) { + RequestParseStatus status = wait_for_double_newline(&client->request, buf, ret); + + switch (status) { + case RP_OUT_OF_SPACE: fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock); close_client(client); return; - } - - // See if we have \r\n\r\n anywhere in the request. We start three bytes - // before what we just appended, in case we just got the final character. - size_t existing_req_bytes = client->request.size(); - client->request.append(string(buf, buf + ret)); - - size_t start_at = (existing_req_bytes >= 3 ? existing_req_bytes - 3 : 0); - const char *ptr = reinterpret_cast( - memmem(client->request.data() + start_at, client->request.size() - start_at, - "\r\n\r\n", 4)); - if (ptr == NULL) { + case RP_NOT_FINISHED_YET: // OK, we don't have the entire header yet. Fine; we'll get it later. // See if there's more data for us. goto read_request_again; - } - - if (ptr != client->request.data() + client->request.size() - 4) { + case RP_EXTRA_DATA: fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock); close_client(client); return; + case RP_FINISHED: + break; } + assert(status == RP_FINISHED); + int error_code = parse_request(client); if (error_code == 200) { construct_header(client); @@ -534,8 +538,7 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - client->header_or_error = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" + - find_stream(client->stream_id)->header; + client->header_or_error = find_stream(client->stream_id)->header; // Switch states. client->state = Client::SENDING_HEADER; @@ -571,6 +574,13 @@ void Server::construct_error(Client *client, int error_code) exit(1); } } + +template +void delete_from(vector *v, T elem) +{ + typename vector::iterator new_end = remove(v->begin(), v->end(), elem); + v->erase(new_end, v->end()); +} void Server::close_client(Client *client) { @@ -581,12 +591,8 @@ void Server::close_client(Client *client) // This client could be sleeping, so we'll need to fix that. (Argh, O(n).) if (client->stream != NULL) { - vector::iterator new_end = - remove(client->stream->sleeping_clients.begin(), - client->stream->sleeping_clients.end(), - client); - client->stream->sleeping_clients.erase( - new_end, client->stream->sleeping_clients.end()); + delete_from(&client->stream->sleeping_clients, client); + delete_from(&client->stream->to_process, client); } // Bye-bye! @@ -608,3 +614,20 @@ Stream *Server::find_stream(const string &stream_id) assert(it != streams.end()); return it->second; } + +void Server::process_queued_data() +{ + MutexLock lock(&queued_data_mutex); + + for (size_t i = 0; i < queued_add_clients.size(); ++i) { + add_client(queued_add_clients[i]); + } + queued_add_clients.clear(); + + for (map::iterator queued_it = queued_data.begin(); + queued_it != queued_data.end(); + ++queued_it) { + add_data(queued_it->first, queued_it->second.data(), queued_it->second.size()); + } + queued_data.clear(); +}