]> git.sesse.net Git - cubemap/commitdiff
Add support for deduplicating headers/short responses.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 5 Apr 2018 18:21:22 +0000 (20:21 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 6 Apr 2018 19:42:12 +0000 (21:42 +0200)
We don't do any of it right now, but it is relevant when we get HLS
playlists, which we may very well want to share between clients.

client.cpp
client.h
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto

index c2cbcf5033bf76c86ccdf36e28078d525f969841..05b8e7d88fcd0b622e2386e84b83abedf0687a24 100644 (file)
@@ -54,7 +54,7 @@ Client::Client(int sock)
        }
 }
        
-Client::Client(const ClientProto &serialized, Stream *stream)
+Client::Client(const ClientProto &serialized, const vector<shared_ptr<const string>> &short_responses, Stream *stream)
        : sock(serialized.sock()),
          remote_addr(serialized.remote_addr()),
          referer(serialized.referer()),
@@ -63,7 +63,6 @@ Client::Client(const ClientProto &serialized, Stream *stream)
          request(serialized.request()),
          url(serialized.url()),
          stream(stream),
-         header_or_short_response(serialized.header_or_short_response()),
          header_or_short_response_bytes_sent(serialized.header_or_short_response_bytes_sent()),
          stream_pos(serialized.stream_pos()),
          stream_pos_end(serialized.stream_pos_end()),
@@ -78,6 +77,16 @@ Client::Client(const ClientProto &serialized, Stream *stream)
                        }
                }
        }
+
+       if (serialized.has_header_or_short_response_old()) {
+               // Pre-1.4.0.
+               header_or_short_response_holder = serialized.header_or_short_response_old();
+               header_or_short_response = &header_or_short_response_holder;
+       } else if (serialized.has_header_or_short_response_index()) {
+               assert(size_t(serialized.header_or_short_response_index()) < short_responses.size());
+               header_or_short_response_ref = short_responses[serialized.header_or_short_response_index()];
+               header_or_short_response = header_or_short_response_ref.get();
+       }
        connect_time.tv_sec = serialized.connect_time_sec();
        connect_time.tv_nsec = serialized.connect_time_nsec();
 
@@ -106,7 +115,7 @@ Client::Client(const ClientProto &serialized, Stream *stream)
        }
 }
 
-ClientProto Client::serialize() const
+ClientProto Client::serialize(unordered_map<const string *, size_t> *short_response_pool) const
 {
        ClientProto serialized;
        serialized.set_sock(sock);
@@ -118,7 +127,14 @@ ClientProto Client::serialize() const
        serialized.set_state(state);
        serialized.set_request(request);
        serialized.set_url(url);
-       serialized.set_header_or_short_response(header_or_short_response);
+
+       if (header_or_short_response != nullptr) {
+               // See if this string is already in the pool (deduplicated by the pointer); if not, insert it.
+               auto iterator_and_inserted = short_response_pool->emplace(
+                       header_or_short_response, short_response_pool->size());
+               serialized.set_header_or_short_response_index(iterator_and_inserted.first->second);
+       }
+
        serialized.set_header_or_short_response_bytes_sent(serialized.header_or_short_response_bytes_sent());
        serialized.set_stream_pos(stream_pos);
        serialized.set_stream_pos_end(stream_pos_end);
index e558bd0e4e8fa47454bbb7d87bf2802a00973431..8f9f682e268e291935278f91cedda3257dc1bae5 100644 (file)
--- a/client.h
+++ b/client.h
@@ -5,7 +5,11 @@
 
 #include <stddef.h>
 #include <time.h>
+
+#include <memory>
 #include <string>
+#include <unordered_map>
+#include <vector>
 
 #include "tlse.h"
 
@@ -30,8 +34,8 @@ struct Client {
        Client(int sock);
 
        // Serialization/deserialization.
-       Client(const ClientProto &serialized, Stream *stream);
-       ClientProto serialize() const;
+       Client(const ClientProto &serialized, const std::vector<std::shared_ptr<const std::string>> &short_responses, Stream *stream);
+       ClientProto serialize(std::unordered_map<const std::string *, size_t> *short_response_pool) const;
 
        ClientStats get_stats() const;
 
@@ -73,7 +77,20 @@ struct Client {
        //
        // Must start with the string "HTTP/1.0 ", which will be changed to 1.1
        // if relevant.
-       std::string header_or_short_response;
+       const std::string *header_or_short_response = nullptr;
+
+       // <header_or_short_response> can come from two distinct places; it can be
+       // local to the Client object, or it can be shared between many Clients
+       // (typically HLS playlists, that can be so large that they are expensive
+       // to hold in many copies). <header_or_short_response> will point to exactly
+       // one of these, which should be cleared out/dereferenced when it is
+       // no longer needed.
+       //
+       // The use of shared_ptr is somewhat overkill since we don't need
+       // to access the HLS playlists from multiple threads, but it's not a
+       // big deal for us.
+       std::string header_or_short_response_holder;
+       std::shared_ptr<const std::string> header_or_short_response_ref;
 
        // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER
        // or SENDING_SHORT_RESPONSE.
index 858a6d0867000bf9ec9fcb456fa66f621bd4544d..88c49f4513bf60521fe782a387cddaf53999506c 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -496,6 +496,12 @@ start:
                        servers->create_tls_context_for_acceptor(acceptor);
                }
        }
+
+       // Allocate strings for the short responses.
+       vector<shared_ptr<const string>> short_response_pool;
+       for (const ShortResponsePool &str : loaded_state.short_response_pool()) {
+               short_response_pool.emplace_back(new string(str.header_or_short_response()));
+       }
        
        // Put back the existing clients. It doesn't matter which server we
        // allocate them to, so just do round-robin. However, we need to sort them
@@ -507,10 +513,12 @@ start:
                if (all_urls.count(loaded_state.clients(i).url()) == 0) {
                        safe_close(loaded_state.clients(i).sock());
                } else {
-                       servers->add_client_from_serialized(loaded_state.clients(i));
+                       servers->add_client_from_serialized(loaded_state.clients(i), short_response_pool);
                }
        }
        
+       short_response_pool.clear();  // No longer needed; the clients have their own refcounts now.
+
        servers->run();
 
        // Now delete all inputs that are longer in use, and start the others.
index 3107ead689e0e35b287cf9a0a9a6e65772846e01..6570b5b73d084531d0c7099fd1979422f56616db 100644 (file)
@@ -167,7 +167,7 @@ void Server::do_work()
        }
 }
 
-CubemapStateProto Server::serialize()
+CubemapStateProto Server::serialize(unordered_map<const string *, size_t> *short_response_pool)
 {
        // We don't serialize anything queued, so empty the queues.
        process_queued_data();
@@ -185,7 +185,7 @@ CubemapStateProto Server::serialize()
 
        CubemapStateProto serialized;
        for (const auto &fd_and_client : clients) {
-               serialized.add_clients()->MergeFrom(fd_and_client.second.serialize());
+               serialized.add_clients()->MergeFrom(fd_and_client.second.serialize(short_response_pool));
        }
        for (unique_ptr<Stream> &stream : streams) {
                serialized.add_streams()->MergeFrom(stream->serialize());
@@ -248,7 +248,7 @@ void Server::add_client(int sock, Acceptor *acceptor)
        process_client(client_ptr);
 }
 
-void Server::add_client_from_serialized(const ClientProto &client)
+void Server::add_client_from_serialized(const ClientProto &client, const vector<shared_ptr<const string>> &short_responses)
 {
        lock_guard<mutex> lock(mu);
        Stream *stream;
@@ -259,7 +259,7 @@ void Server::add_client_from_serialized(const ClientProto &client)
        } else {
                stream = streams[stream_index].get();
        }
-       auto inserted = clients.insert(make_pair(client.sock(), Client(client, stream)));
+       auto inserted = clients.insert(make_pair(client.sock(), Client(client, short_responses, stream)));
        assert(inserted.second == true);  // Should not already exist.
        Client *client_ptr = &inserted.first->second;
 
@@ -495,8 +495,8 @@ sending_header_or_short_response_again:
                int ret;
                do {
                        ret = write(client->sock,
-                                   client->header_or_short_response.data() + client->header_or_short_response_bytes_sent,
-                                   client->header_or_short_response.size() - client->header_or_short_response_bytes_sent);
+                                   client->header_or_short_response->data() + client->header_or_short_response_bytes_sent,
+                                   client->header_or_short_response->size() - client->header_or_short_response_bytes_sent);
                } while (ret == -1 && errno == EINTR);
 
                if (ret == -1 && errno == EAGAIN) {
@@ -515,15 +515,17 @@ sending_header_or_short_response_again:
                }
                
                client->header_or_short_response_bytes_sent += ret;
-               assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response.size());
+               assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response->size());
 
-               if (client->header_or_short_response_bytes_sent < client->header_or_short_response.size()) {
+               if (client->header_or_short_response_bytes_sent < client->header_or_short_response->size()) {
                        // We haven't sent all yet. Fine; go another round.
                        goto sending_header_or_short_response_again;
                }
 
                // We're done sending the header or error! Clear it to release some memory.
-               client->header_or_short_response.clear();
+               client->header_or_short_response = nullptr;
+               client->header_or_short_response_holder.clear();
+               client->header_or_short_response_ref.reset();
 
                if (client->state == Client::SENDING_SHORT_RESPONSE) {
                        if (more_requests(client)) {
@@ -983,53 +985,53 @@ int Server::parse_request(Client *client)
 void Server::construct_header(Client *client)
 {
        Stream *stream = client->stream;
-       client->header_or_short_response = stream->http_header;
+       string response = stream->http_header;
        if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
                char buf[64];
                snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", stream->stream_header.size());
-               client->header_or_short_response.append(buf);
+               response.append(buf);
        } else if (client->stream_pos_end != Client::STREAM_POS_NO_END) {
                char buf[64];
                snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", client->stream_pos_end - client->stream_pos);
-               client->header_or_short_response.append(buf);
+               response.append(buf);
        }
        if (client->http_11) {
-               assert(client->header_or_short_response.find("HTTP/1.0") == 0);
-               client->header_or_short_response[7] = '1';  // Change to HTTP/1.1.
+               assert(response.find("HTTP/1.0") == 0);
+               response[7] = '1';  // Change to HTTP/1.1.
                if (client->close_after_response) {
-                       client->header_or_short_response.append("Connection: close\r\n");
+                       response.append("Connection: close\r\n");
                }
        } else {
                assert(client->close_after_response);
        }
        if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
-               client->header_or_short_response.append("\r\n");
+               response.append("\r\n");
        } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
-               client->header_or_short_response.append(
-                       "Content-encoding: metacube\r\n"
-                       "\r\n");
+               response.append("Content-encoding: metacube\r\n\r\n");
                if (!stream->stream_header.empty()) {
                        metacube2_block_header hdr;
                        memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
                        hdr.size = htonl(stream->stream_header.size());
                        hdr.flags = htons(METACUBE_FLAGS_HEADER);
                        hdr.csum = htons(metacube2_compute_crc(&hdr));
-                       client->header_or_short_response.append(
-                               string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
+                       response.append(string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
                }
        } else {
                assert(false);
        }
        if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
                client->state = Client::SENDING_SHORT_RESPONSE;
-               client->header_or_short_response.append(stream->stream_header);
+               response.append(stream->stream_header);
        } else {
                client->state = Client::SENDING_HEADER;
                if (client->stream_pos_end == Client::STREAM_POS_NO_END) {  // Fragments don't contain stream headers.
-                       client->header_or_short_response.append(stream->stream_header);
+                       response.append(stream->stream_header);
                }
        }
 
+       client->header_or_short_response_holder = move(response);
+       client->header_or_short_response = &client->header_or_short_response_holder;
+
        // Switch states.
        change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
 }
@@ -1046,7 +1048,8 @@ void Server::construct_error(Client *client, int error_code)
                        "HTTP/1.%d %d Error\r\nContent-type: text/plain\r\nContent-length: 30\r\n\r\nSomething went wrong. Sorry.\r\n",
                        client->http_11, error_code);
        }
-       client->header_or_short_response = error;
+       client->header_or_short_response_holder = error;
+       client->header_or_short_response = &client->header_or_short_response_holder;
 
        // Switch states.
        client->state = Client::SENDING_SHORT_RESPONSE;
@@ -1058,20 +1061,25 @@ void Server::construct_204(Client *client)
        map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
        assert(ping_url_map_it != ping_url_map.end());
 
+       string response;
        if (client->http_11) {
-               client->header_or_short_response = "HTTP/1.1 204 No Content\r\n";
+               response = "HTTP/1.1 204 No Content\r\n";
                if (client->close_after_response) {
-                       client->header_or_short_response.append("Connection: close\r\n");
+                       response.append("Connection: close\r\n");
                }
        } else {
-               client->header_or_short_response = "HTTP/1.0 204 No Content\r\n";
+               response = "HTTP/1.0 204 No Content\r\n";
                assert(client->close_after_response);
        }
        if (!ping_url_map_it->second.empty()) {
-               client->header_or_short_response.append("Access-Control-Allow-Origin: ");
-               client->header_or_short_response.append(ping_url_map_it->second);
+               response.append("Access-Control-Allow-Origin: ");
+               response.append(ping_url_map_it->second);
+               response.append("\r\n");
        }
-       client->header_or_short_response.append("\r\n");
+       response.append("\r\n");
+
+       client->header_or_short_response_holder = move(response);
+       client->header_or_short_response = &client->header_or_short_response_holder;
 
        // Switch states.
        client->state = Client::SENDING_SHORT_RESPONSE;
@@ -1133,7 +1141,9 @@ bool Server::more_requests(Client *client)
        client->state = Client::READING_REQUEST;
        client->url.clear();
        client->stream = NULL;
-       client->header_or_short_response.clear();
+       client->header_or_short_response = nullptr;
+       client->header_or_short_response_holder.clear();
+       client->header_or_short_response_ref.reset();
        client->header_or_short_response_bytes_sent = 0;
 
        change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP);  // No TLS handshake, so no EPOLLOUT needed.
index 6007626dd57d5e14953b4f9b905e94fbc08d74c9..2fe4f15184a62699c6bcb8c89606ffa95f9559e8 100644 (file)
--- a/server.h
+++ b/server.h
@@ -57,8 +57,8 @@ public:
        // These should not be called while running, since that would violate
        // threading assumptions (ie., that epoll is only called from one thread
        // at the same time).
-       CubemapStateProto serialize();
-       void add_client_from_serialized(const ClientProto &client);
+       CubemapStateProto serialize(std::unordered_map<const std::string *, size_t> *short_response_pool);
+       void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
        int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
        int add_stream_from_serialized(const StreamProto &stream, int data_fd);
        int lookup_stream_by_url(const std::string &url) const;
index a5c8a72ffc0ae7726a1413d3f071cb06666bf6d3..8e233dafe8d9fb319bb29e09e6b4974b4ddedaad 100644 (file)
@@ -24,8 +24,10 @@ CubemapStateProto ServerPool::serialize()
 {
        CubemapStateProto state;
 
+       unordered_map<const string *, size_t> short_response_pool;
+
        for (int i = 0; i < num_servers; ++i) {
-                CubemapStateProto local_state = servers[i].serialize();
+                CubemapStateProto local_state = servers[i].serialize(&short_response_pool);
 
                // The stream state should be identical between the servers, so we only store it once,
                // save for the fds, which we keep around to distribute to the servers after re-exec.
@@ -43,6 +45,13 @@ CubemapStateProto ServerPool::serialize()
                }
         }
 
+       for (size_t i = 0; i < short_response_pool.size(); ++i) {
+               state.mutable_short_response_pool()->Add();
+       }
+       for (const auto &string_and_index : short_response_pool) {
+               state.mutable_short_response_pool(string_and_index.second)->set_header_or_short_response(*string_and_index.first);
+       }
+
        return state;
 }
 
@@ -51,9 +60,9 @@ void ServerPool::add_client(int sock, Acceptor *acceptor)
        servers[clients_added++ % num_servers].add_client_deferred(sock, acceptor);
 }
 
-void ServerPool::add_client_from_serialized(const ClientProto &client)
+void ServerPool::add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses)
 {
-       servers[clients_added++ % num_servers].add_client_from_serialized(client);
+       servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses);
 }
 
 int ServerPool::lookup_stream_by_url(const string &url) const
index 5f661df2dc175046f6197fdf89177341f704dd5b..61676281840f070bd2c70a722dd92f2a92b9d327 100644 (file)
@@ -27,7 +27,7 @@ public:
 
        // Picks a server (round-robin) and allocates the given client to it.
        void add_client(int sock, Acceptor *acceptor);
-       void add_client_from_serialized(const ClientProto &client);
+       void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
 
        // Adds the given stream to all the servers. Returns the stream index.
        int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
index 7643bbf53703c95a9938f6e5b18d36f24d774c20..e9a4f13293c617b09e21b923cca4d6eecae816d6 100644 (file)
@@ -9,7 +9,8 @@ message ClientProto {
        optional int32 state = 2;
        optional bytes request = 3;
        optional string url = 4;
-       optional bytes header_or_short_response = 5;
+       optional bytes header_or_short_response_old = 5;  // Only for pre-1.4.0 servers. Use header_or_short_response_index instead.
+       optional int64 header_or_short_response_index = 21;  // Index into CubemapStateProto.short_response_pool.
        optional int64 header_or_short_response_bytes_sent = 6;
        optional int64 stream_pos = 7;
        optional int64 stream_pos_end = 20 [default=-1];
@@ -64,6 +65,11 @@ message AcceptorProto {
        optional bytes private_key = 5;
 };
 
+// For deduplicating strings in header_or_short_response.
+message ShortResponsePool {
+       optional bytes header_or_short_response = 1;
+};
+
 message CubemapStateProto {
        optional int64 serialize_start_sec = 6;
        optional int64 serialize_start_usec = 7;
@@ -71,4 +77,5 @@ message CubemapStateProto {
        repeated StreamProto streams = 2;
        repeated InputProto inputs = 5;
        repeated AcceptorProto acceptors = 8;
+       repeated ShortResponsePool short_response_pool = 9;
 };