From 061988af511f42da3cd584b4d983177504ddc177 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 5 Apr 2018 20:21:22 +0200 Subject: [PATCH] Add support for deduplicating headers/short responses. We don't do any of it right now, but it is relevant when we get HLS playlists, which we may very well want to share between clients. --- client.cpp | 24 ++++++++++++++--- client.h | 23 +++++++++++++--- main.cpp | 10 ++++++- server.cpp | 72 ++++++++++++++++++++++++++++---------------------- server.h | 4 +-- serverpool.cpp | 15 ++++++++--- serverpool.h | 2 +- state.proto | 9 ++++++- 8 files changed, 113 insertions(+), 46 deletions(-) diff --git a/client.cpp b/client.cpp index c2cbcf5..05b8e7d 100644 --- a/client.cpp +++ b/client.cpp @@ -54,7 +54,7 @@ Client::Client(int sock) } } -Client::Client(const ClientProto &serialized, Stream *stream) +Client::Client(const ClientProto &serialized, const vector> &short_responses, Stream *stream) : sock(serialized.sock()), remote_addr(serialized.remote_addr()), referer(serialized.referer()), @@ -63,7 +63,6 @@ Client::Client(const ClientProto &serialized, Stream *stream) request(serialized.request()), url(serialized.url()), stream(stream), - header_or_short_response(serialized.header_or_short_response()), header_or_short_response_bytes_sent(serialized.header_or_short_response_bytes_sent()), stream_pos(serialized.stream_pos()), stream_pos_end(serialized.stream_pos_end()), @@ -78,6 +77,16 @@ Client::Client(const ClientProto &serialized, Stream *stream) } } } + + if (serialized.has_header_or_short_response_old()) { + // Pre-1.4.0. + header_or_short_response_holder = serialized.header_or_short_response_old(); + header_or_short_response = &header_or_short_response_holder; + } else if (serialized.has_header_or_short_response_index()) { + assert(size_t(serialized.header_or_short_response_index()) < short_responses.size()); + header_or_short_response_ref = short_responses[serialized.header_or_short_response_index()]; + header_or_short_response = header_or_short_response_ref.get(); + } connect_time.tv_sec = serialized.connect_time_sec(); connect_time.tv_nsec = serialized.connect_time_nsec(); @@ -106,7 +115,7 @@ Client::Client(const ClientProto &serialized, Stream *stream) } } -ClientProto Client::serialize() const +ClientProto Client::serialize(unordered_map *short_response_pool) const { ClientProto serialized; serialized.set_sock(sock); @@ -118,7 +127,14 @@ ClientProto Client::serialize() const serialized.set_state(state); serialized.set_request(request); serialized.set_url(url); - serialized.set_header_or_short_response(header_or_short_response); + + if (header_or_short_response != nullptr) { + // See if this string is already in the pool (deduplicated by the pointer); if not, insert it. + auto iterator_and_inserted = short_response_pool->emplace( + header_or_short_response, short_response_pool->size()); + serialized.set_header_or_short_response_index(iterator_and_inserted.first->second); + } + serialized.set_header_or_short_response_bytes_sent(serialized.header_or_short_response_bytes_sent()); serialized.set_stream_pos(stream_pos); serialized.set_stream_pos_end(stream_pos_end); diff --git a/client.h b/client.h index e558bd0..8f9f682 100644 --- a/client.h +++ b/client.h @@ -5,7 +5,11 @@ #include #include + +#include #include +#include +#include #include "tlse.h" @@ -30,8 +34,8 @@ struct Client { Client(int sock); // Serialization/deserialization. - Client(const ClientProto &serialized, Stream *stream); - ClientProto serialize() const; + Client(const ClientProto &serialized, const std::vector> &short_responses, Stream *stream); + ClientProto serialize(std::unordered_map *short_response_pool) const; ClientStats get_stats() const; @@ -73,7 +77,20 @@ struct Client { // // Must start with the string "HTTP/1.0 ", which will be changed to 1.1 // if relevant. - std::string header_or_short_response; + const std::string *header_or_short_response = nullptr; + + // can come from two distinct places; it can be + // local to the Client object, or it can be shared between many Clients + // (typically HLS playlists, that can be so large that they are expensive + // to hold in many copies). will point to exactly + // one of these, which should be cleared out/dereferenced when it is + // no longer needed. + // + // The use of shared_ptr is somewhat overkill since we don't need + // to access the HLS playlists from multiple threads, but it's not a + // big deal for us. + std::string header_or_short_response_holder; + std::shared_ptr header_or_short_response_ref; // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER // or SENDING_SHORT_RESPONSE. diff --git a/main.cpp b/main.cpp index 858a6d0..88c49f4 100644 --- a/main.cpp +++ b/main.cpp @@ -496,6 +496,12 @@ start: servers->create_tls_context_for_acceptor(acceptor); } } + + // Allocate strings for the short responses. + vector> short_response_pool; + for (const ShortResponsePool &str : loaded_state.short_response_pool()) { + short_response_pool.emplace_back(new string(str.header_or_short_response())); + } // Put back the existing clients. It doesn't matter which server we // allocate them to, so just do round-robin. However, we need to sort them @@ -507,10 +513,12 @@ start: if (all_urls.count(loaded_state.clients(i).url()) == 0) { safe_close(loaded_state.clients(i).sock()); } else { - servers->add_client_from_serialized(loaded_state.clients(i)); + servers->add_client_from_serialized(loaded_state.clients(i), short_response_pool); } } + short_response_pool.clear(); // No longer needed; the clients have their own refcounts now. + servers->run(); // Now delete all inputs that are longer in use, and start the others. diff --git a/server.cpp b/server.cpp index 3107ead..6570b5b 100644 --- a/server.cpp +++ b/server.cpp @@ -167,7 +167,7 @@ void Server::do_work() } } -CubemapStateProto Server::serialize() +CubemapStateProto Server::serialize(unordered_map *short_response_pool) { // We don't serialize anything queued, so empty the queues. process_queued_data(); @@ -185,7 +185,7 @@ CubemapStateProto Server::serialize() CubemapStateProto serialized; for (const auto &fd_and_client : clients) { - serialized.add_clients()->MergeFrom(fd_and_client.second.serialize()); + serialized.add_clients()->MergeFrom(fd_and_client.second.serialize(short_response_pool)); } for (unique_ptr &stream : streams) { serialized.add_streams()->MergeFrom(stream->serialize()); @@ -248,7 +248,7 @@ void Server::add_client(int sock, Acceptor *acceptor) process_client(client_ptr); } -void Server::add_client_from_serialized(const ClientProto &client) +void Server::add_client_from_serialized(const ClientProto &client, const vector> &short_responses) { lock_guard lock(mu); Stream *stream; @@ -259,7 +259,7 @@ void Server::add_client_from_serialized(const ClientProto &client) } else { stream = streams[stream_index].get(); } - auto inserted = clients.insert(make_pair(client.sock(), Client(client, stream))); + auto inserted = clients.insert(make_pair(client.sock(), Client(client, short_responses, stream))); assert(inserted.second == true); // Should not already exist. Client *client_ptr = &inserted.first->second; @@ -495,8 +495,8 @@ sending_header_or_short_response_again: int ret; do { ret = write(client->sock, - client->header_or_short_response.data() + client->header_or_short_response_bytes_sent, - client->header_or_short_response.size() - client->header_or_short_response_bytes_sent); + client->header_or_short_response->data() + client->header_or_short_response_bytes_sent, + client->header_or_short_response->size() - client->header_or_short_response_bytes_sent); } while (ret == -1 && errno == EINTR); if (ret == -1 && errno == EAGAIN) { @@ -515,15 +515,17 @@ sending_header_or_short_response_again: } client->header_or_short_response_bytes_sent += ret; - assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response.size()); + assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response->size()); - if (client->header_or_short_response_bytes_sent < client->header_or_short_response.size()) { + if (client->header_or_short_response_bytes_sent < client->header_or_short_response->size()) { // We haven't sent all yet. Fine; go another round. goto sending_header_or_short_response_again; } // We're done sending the header or error! Clear it to release some memory. - client->header_or_short_response.clear(); + client->header_or_short_response = nullptr; + client->header_or_short_response_holder.clear(); + client->header_or_short_response_ref.reset(); if (client->state == Client::SENDING_SHORT_RESPONSE) { if (more_requests(client)) { @@ -983,53 +985,53 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { Stream *stream = client->stream; - client->header_or_short_response = stream->http_header; + string response = stream->http_header; if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) { char buf[64]; snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", stream->stream_header.size()); - client->header_or_short_response.append(buf); + response.append(buf); } else if (client->stream_pos_end != Client::STREAM_POS_NO_END) { char buf[64]; snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", client->stream_pos_end - client->stream_pos); - client->header_or_short_response.append(buf); + response.append(buf); } if (client->http_11) { - assert(client->header_or_short_response.find("HTTP/1.0") == 0); - client->header_or_short_response[7] = '1'; // Change to HTTP/1.1. + assert(response.find("HTTP/1.0") == 0); + response[7] = '1'; // Change to HTTP/1.1. if (client->close_after_response) { - client->header_or_short_response.append("Connection: close\r\n"); + response.append("Connection: close\r\n"); } } else { assert(client->close_after_response); } if (stream->encoding == Stream::STREAM_ENCODING_RAW) { - client->header_or_short_response.append("\r\n"); + response.append("\r\n"); } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) { - client->header_or_short_response.append( - "Content-encoding: metacube\r\n" - "\r\n"); + response.append("Content-encoding: metacube\r\n\r\n"); if (!stream->stream_header.empty()) { metacube2_block_header hdr; memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync)); hdr.size = htonl(stream->stream_header.size()); hdr.flags = htons(METACUBE_FLAGS_HEADER); hdr.csum = htons(metacube2_compute_crc(&hdr)); - client->header_or_short_response.append( - string(reinterpret_cast(&hdr), sizeof(hdr))); + response.append(string(reinterpret_cast(&hdr), sizeof(hdr))); } } else { assert(false); } if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) { client->state = Client::SENDING_SHORT_RESPONSE; - client->header_or_short_response.append(stream->stream_header); + response.append(stream->stream_header); } else { client->state = Client::SENDING_HEADER; if (client->stream_pos_end == Client::STREAM_POS_NO_END) { // Fragments don't contain stream headers. - client->header_or_short_response.append(stream->stream_header); + response.append(stream->stream_header); } } + client->header_or_short_response_holder = move(response); + client->header_or_short_response = &client->header_or_short_response_holder; + // Switch states. change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP); } @@ -1046,7 +1048,8 @@ void Server::construct_error(Client *client, int error_code) "HTTP/1.%d %d Error\r\nContent-type: text/plain\r\nContent-length: 30\r\n\r\nSomething went wrong. Sorry.\r\n", client->http_11, error_code); } - client->header_or_short_response = error; + client->header_or_short_response_holder = error; + client->header_or_short_response = &client->header_or_short_response_holder; // Switch states. client->state = Client::SENDING_SHORT_RESPONSE; @@ -1058,20 +1061,25 @@ void Server::construct_204(Client *client) map::const_iterator ping_url_map_it = ping_url_map.find(client->url); assert(ping_url_map_it != ping_url_map.end()); + string response; if (client->http_11) { - client->header_or_short_response = "HTTP/1.1 204 No Content\r\n"; + response = "HTTP/1.1 204 No Content\r\n"; if (client->close_after_response) { - client->header_or_short_response.append("Connection: close\r\n"); + response.append("Connection: close\r\n"); } } else { - client->header_or_short_response = "HTTP/1.0 204 No Content\r\n"; + response = "HTTP/1.0 204 No Content\r\n"; assert(client->close_after_response); } if (!ping_url_map_it->second.empty()) { - client->header_or_short_response.append("Access-Control-Allow-Origin: "); - client->header_or_short_response.append(ping_url_map_it->second); + response.append("Access-Control-Allow-Origin: "); + response.append(ping_url_map_it->second); + response.append("\r\n"); } - client->header_or_short_response.append("\r\n"); + response.append("\r\n"); + + client->header_or_short_response_holder = move(response); + client->header_or_short_response = &client->header_or_short_response_holder; // Switch states. client->state = Client::SENDING_SHORT_RESPONSE; @@ -1133,7 +1141,9 @@ bool Server::more_requests(Client *client) client->state = Client::READING_REQUEST; client->url.clear(); client->stream = NULL; - client->header_or_short_response.clear(); + client->header_or_short_response = nullptr; + client->header_or_short_response_holder.clear(); + client->header_or_short_response_ref.reset(); client->header_or_short_response_bytes_sent = 0; change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP); // No TLS handshake, so no EPOLLOUT needed. diff --git a/server.h b/server.h index 6007626..2fe4f15 100644 --- a/server.h +++ b/server.h @@ -57,8 +57,8 @@ public: // These should not be called while running, since that would violate // threading assumptions (ie., that epoll is only called from one thread // at the same time). - CubemapStateProto serialize(); - void add_client_from_serialized(const ClientProto &client); + CubemapStateProto serialize(std::unordered_map *short_response_pool); + void add_client_from_serialized(const ClientProto &client, const std::vector> &short_responses); int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding); int add_stream_from_serialized(const StreamProto &stream, int data_fd); int lookup_stream_by_url(const std::string &url) const; diff --git a/serverpool.cpp b/serverpool.cpp index a5c8a72..8e233da 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -24,8 +24,10 @@ CubemapStateProto ServerPool::serialize() { CubemapStateProto state; + unordered_map short_response_pool; + for (int i = 0; i < num_servers; ++i) { - CubemapStateProto local_state = servers[i].serialize(); + CubemapStateProto local_state = servers[i].serialize(&short_response_pool); // The stream state should be identical between the servers, so we only store it once, // save for the fds, which we keep around to distribute to the servers after re-exec. @@ -43,6 +45,13 @@ CubemapStateProto ServerPool::serialize() } } + for (size_t i = 0; i < short_response_pool.size(); ++i) { + state.mutable_short_response_pool()->Add(); + } + for (const auto &string_and_index : short_response_pool) { + state.mutable_short_response_pool(string_and_index.second)->set_header_or_short_response(*string_and_index.first); + } + return state; } @@ -51,9 +60,9 @@ void ServerPool::add_client(int sock, Acceptor *acceptor) servers[clients_added++ % num_servers].add_client_deferred(sock, acceptor); } -void ServerPool::add_client_from_serialized(const ClientProto &client) +void ServerPool::add_client_from_serialized(const ClientProto &client, const std::vector> &short_responses) { - servers[clients_added++ % num_servers].add_client_from_serialized(client); + servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses); } int ServerPool::lookup_stream_by_url(const string &url) const diff --git a/serverpool.h b/serverpool.h index 5f661df..6167628 100644 --- a/serverpool.h +++ b/serverpool.h @@ -27,7 +27,7 @@ public: // Picks a server (round-robin) and allocates the given client to it. void add_client(int sock, Acceptor *acceptor); - void add_client_from_serialized(const ClientProto &client); + void add_client_from_serialized(const ClientProto &client, const std::vector> &short_responses); // Adds the given stream to all the servers. Returns the stream index. int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding); diff --git a/state.proto b/state.proto index 7643bbf..e9a4f13 100644 --- a/state.proto +++ b/state.proto @@ -9,7 +9,8 @@ message ClientProto { optional int32 state = 2; optional bytes request = 3; optional string url = 4; - optional bytes header_or_short_response = 5; + optional bytes header_or_short_response_old = 5; // Only for pre-1.4.0 servers. Use header_or_short_response_index instead. + optional int64 header_or_short_response_index = 21; // Index into CubemapStateProto.short_response_pool. optional int64 header_or_short_response_bytes_sent = 6; optional int64 stream_pos = 7; optional int64 stream_pos_end = 20 [default=-1]; @@ -64,6 +65,11 @@ message AcceptorProto { optional bytes private_key = 5; }; +// For deduplicating strings in header_or_short_response. +message ShortResponsePool { + optional bytes header_or_short_response = 1; +}; + message CubemapStateProto { optional int64 serialize_start_sec = 6; optional int64 serialize_start_usec = 7; @@ -71,4 +77,5 @@ message CubemapStateProto { repeated StreamProto streams = 2; repeated InputProto inputs = 5; repeated AcceptorProto acceptors = 8; + repeated ShortResponsePool short_response_pool = 9; }; -- 2.39.2