}
}
-Client::Client(const ClientProto &serialized, Stream *stream)
+Client::Client(const ClientProto &serialized, const vector<shared_ptr<const string>> &short_responses, Stream *stream)
: sock(serialized.sock()),
remote_addr(serialized.remote_addr()),
referer(serialized.referer()),
request(serialized.request()),
url(serialized.url()),
stream(stream),
- header_or_short_response(serialized.header_or_short_response()),
header_or_short_response_bytes_sent(serialized.header_or_short_response_bytes_sent()),
stream_pos(serialized.stream_pos()),
stream_pos_end(serialized.stream_pos_end()),
}
}
}
+
+ if (serialized.has_header_or_short_response_old()) {
+ // Pre-1.4.0.
+ header_or_short_response_holder = serialized.header_or_short_response_old();
+ header_or_short_response = &header_or_short_response_holder;
+ } else if (serialized.has_header_or_short_response_index()) {
+ assert(size_t(serialized.header_or_short_response_index()) < short_responses.size());
+ header_or_short_response_ref = short_responses[serialized.header_or_short_response_index()];
+ header_or_short_response = header_or_short_response_ref.get();
+ }
connect_time.tv_sec = serialized.connect_time_sec();
connect_time.tv_nsec = serialized.connect_time_nsec();
}
}
-ClientProto Client::serialize() const
+ClientProto Client::serialize(unordered_map<const string *, size_t> *short_response_pool) const
{
ClientProto serialized;
serialized.set_sock(sock);
serialized.set_state(state);
serialized.set_request(request);
serialized.set_url(url);
- serialized.set_header_or_short_response(header_or_short_response);
+
+ if (header_or_short_response != nullptr) {
+ // See if this string is already in the pool (deduplicated by the pointer); if not, insert it.
+ auto iterator_and_inserted = short_response_pool->emplace(
+ header_or_short_response, short_response_pool->size());
+ serialized.set_header_or_short_response_index(iterator_and_inserted.first->second);
+ }
+
serialized.set_header_or_short_response_bytes_sent(serialized.header_or_short_response_bytes_sent());
serialized.set_stream_pos(stream_pos);
serialized.set_stream_pos_end(stream_pos_end);
#include <stddef.h>
#include <time.h>
+
+#include <memory>
#include <string>
+#include <unordered_map>
+#include <vector>
#include "tlse.h"
Client(int sock);
// Serialization/deserialization.
- Client(const ClientProto &serialized, Stream *stream);
- ClientProto serialize() const;
+ Client(const ClientProto &serialized, const std::vector<std::shared_ptr<const std::string>> &short_responses, Stream *stream);
+ ClientProto serialize(std::unordered_map<const std::string *, size_t> *short_response_pool) const;
ClientStats get_stats() const;
//
// Must start with the string "HTTP/1.0 ", which will be changed to 1.1
// if relevant.
- std::string header_or_short_response;
+ const std::string *header_or_short_response = nullptr;
+
+ // <header_or_short_response> can come from two distinct places; it can be
+ // local to the Client object, or it can be shared between many Clients
+ // (typically HLS playlists, that can be so large that they are expensive
+ // to hold in many copies). <header_or_short_response> will point to exactly
+ // one of these, which should be cleared out/dereferenced when it is
+ // no longer needed.
+ //
+ // The use of shared_ptr is somewhat overkill since we don't need
+ // to access the HLS playlists from multiple threads, but it's not a
+ // big deal for us.
+ std::string header_or_short_response_holder;
+ std::shared_ptr<const std::string> header_or_short_response_ref;
// Number of bytes we've sent of the header. Only relevant for SENDING_HEADER
// or SENDING_SHORT_RESPONSE.
servers->create_tls_context_for_acceptor(acceptor);
}
}
+
+ // Allocate strings for the short responses.
+ vector<shared_ptr<const string>> short_response_pool;
+ for (const ShortResponsePool &str : loaded_state.short_response_pool()) {
+ short_response_pool.emplace_back(new string(str.header_or_short_response()));
+ }
// Put back the existing clients. It doesn't matter which server we
// allocate them to, so just do round-robin. However, we need to sort them
if (all_urls.count(loaded_state.clients(i).url()) == 0) {
safe_close(loaded_state.clients(i).sock());
} else {
- servers->add_client_from_serialized(loaded_state.clients(i));
+ servers->add_client_from_serialized(loaded_state.clients(i), short_response_pool);
}
}
+ short_response_pool.clear(); // No longer needed; the clients have their own refcounts now.
+
servers->run();
// Now delete all inputs that are longer in use, and start the others.
}
}
-CubemapStateProto Server::serialize()
+CubemapStateProto Server::serialize(unordered_map<const string *, size_t> *short_response_pool)
{
// We don't serialize anything queued, so empty the queues.
process_queued_data();
CubemapStateProto serialized;
for (const auto &fd_and_client : clients) {
- serialized.add_clients()->MergeFrom(fd_and_client.second.serialize());
+ serialized.add_clients()->MergeFrom(fd_and_client.second.serialize(short_response_pool));
}
for (unique_ptr<Stream> &stream : streams) {
serialized.add_streams()->MergeFrom(stream->serialize());
process_client(client_ptr);
}
-void Server::add_client_from_serialized(const ClientProto &client)
+void Server::add_client_from_serialized(const ClientProto &client, const vector<shared_ptr<const string>> &short_responses)
{
lock_guard<mutex> lock(mu);
Stream *stream;
} else {
stream = streams[stream_index].get();
}
- auto inserted = clients.insert(make_pair(client.sock(), Client(client, stream)));
+ auto inserted = clients.insert(make_pair(client.sock(), Client(client, short_responses, stream)));
assert(inserted.second == true); // Should not already exist.
Client *client_ptr = &inserted.first->second;
int ret;
do {
ret = write(client->sock,
- client->header_or_short_response.data() + client->header_or_short_response_bytes_sent,
- client->header_or_short_response.size() - client->header_or_short_response_bytes_sent);
+ client->header_or_short_response->data() + client->header_or_short_response_bytes_sent,
+ client->header_or_short_response->size() - client->header_or_short_response_bytes_sent);
} while (ret == -1 && errno == EINTR);
if (ret == -1 && errno == EAGAIN) {
}
client->header_or_short_response_bytes_sent += ret;
- assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response.size());
+ assert(client->header_or_short_response_bytes_sent <= client->header_or_short_response->size());
- if (client->header_or_short_response_bytes_sent < client->header_or_short_response.size()) {
+ if (client->header_or_short_response_bytes_sent < client->header_or_short_response->size()) {
// We haven't sent all yet. Fine; go another round.
goto sending_header_or_short_response_again;
}
// We're done sending the header or error! Clear it to release some memory.
- client->header_or_short_response.clear();
+ client->header_or_short_response = nullptr;
+ client->header_or_short_response_holder.clear();
+ client->header_or_short_response_ref.reset();
if (client->state == Client::SENDING_SHORT_RESPONSE) {
if (more_requests(client)) {
void Server::construct_header(Client *client)
{
Stream *stream = client->stream;
- client->header_or_short_response = stream->http_header;
+ string response = stream->http_header;
if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
char buf[64];
snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", stream->stream_header.size());
- client->header_or_short_response.append(buf);
+ response.append(buf);
} else if (client->stream_pos_end != Client::STREAM_POS_NO_END) {
char buf[64];
snprintf(buf, sizeof(buf), "Content-length: %zu\r\n", client->stream_pos_end - client->stream_pos);
- client->header_or_short_response.append(buf);
+ response.append(buf);
}
if (client->http_11) {
- assert(client->header_or_short_response.find("HTTP/1.0") == 0);
- client->header_or_short_response[7] = '1'; // Change to HTTP/1.1.
+ assert(response.find("HTTP/1.0") == 0);
+ response[7] = '1'; // Change to HTTP/1.1.
if (client->close_after_response) {
- client->header_or_short_response.append("Connection: close\r\n");
+ response.append("Connection: close\r\n");
}
} else {
assert(client->close_after_response);
}
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
- client->header_or_short_response.append("\r\n");
+ response.append("\r\n");
} else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
- client->header_or_short_response.append(
- "Content-encoding: metacube\r\n"
- "\r\n");
+ response.append("Content-encoding: metacube\r\n\r\n");
if (!stream->stream_header.empty()) {
metacube2_block_header hdr;
memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
hdr.size = htonl(stream->stream_header.size());
hdr.flags = htons(METACUBE_FLAGS_HEADER);
hdr.csum = htons(metacube2_compute_crc(&hdr));
- client->header_or_short_response.append(
- string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
+ response.append(string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
}
} else {
assert(false);
}
if (client->stream_pos == Client::STREAM_POS_HEADER_ONLY) {
client->state = Client::SENDING_SHORT_RESPONSE;
- client->header_or_short_response.append(stream->stream_header);
+ response.append(stream->stream_header);
} else {
client->state = Client::SENDING_HEADER;
if (client->stream_pos_end == Client::STREAM_POS_NO_END) { // Fragments don't contain stream headers.
- client->header_or_short_response.append(stream->stream_header);
+ response.append(stream->stream_header);
}
}
+ client->header_or_short_response_holder = move(response);
+ client->header_or_short_response = &client->header_or_short_response_holder;
+
// Switch states.
change_epoll_events(client, EPOLLOUT | EPOLLET | EPOLLRDHUP);
}
"HTTP/1.%d %d Error\r\nContent-type: text/plain\r\nContent-length: 30\r\n\r\nSomething went wrong. Sorry.\r\n",
client->http_11, error_code);
}
- client->header_or_short_response = error;
+ client->header_or_short_response_holder = error;
+ client->header_or_short_response = &client->header_or_short_response_holder;
// Switch states.
client->state = Client::SENDING_SHORT_RESPONSE;
map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
assert(ping_url_map_it != ping_url_map.end());
+ string response;
if (client->http_11) {
- client->header_or_short_response = "HTTP/1.1 204 No Content\r\n";
+ response = "HTTP/1.1 204 No Content\r\n";
if (client->close_after_response) {
- client->header_or_short_response.append("Connection: close\r\n");
+ response.append("Connection: close\r\n");
}
} else {
- client->header_or_short_response = "HTTP/1.0 204 No Content\r\n";
+ response = "HTTP/1.0 204 No Content\r\n";
assert(client->close_after_response);
}
if (!ping_url_map_it->second.empty()) {
- client->header_or_short_response.append("Access-Control-Allow-Origin: ");
- client->header_or_short_response.append(ping_url_map_it->second);
+ response.append("Access-Control-Allow-Origin: ");
+ response.append(ping_url_map_it->second);
+ response.append("\r\n");
}
- client->header_or_short_response.append("\r\n");
+ response.append("\r\n");
+
+ client->header_or_short_response_holder = move(response);
+ client->header_or_short_response = &client->header_or_short_response_holder;
// Switch states.
client->state = Client::SENDING_SHORT_RESPONSE;
client->state = Client::READING_REQUEST;
client->url.clear();
client->stream = NULL;
- client->header_or_short_response.clear();
+ client->header_or_short_response = nullptr;
+ client->header_or_short_response_holder.clear();
+ client->header_or_short_response_ref.reset();
client->header_or_short_response_bytes_sent = 0;
change_epoll_events(client, EPOLLIN | EPOLLET | EPOLLRDHUP); // No TLS handshake, so no EPOLLOUT needed.
// These should not be called while running, since that would violate
// threading assumptions (ie., that epoll is only called from one thread
// at the same time).
- CubemapStateProto serialize();
- void add_client_from_serialized(const ClientProto &client);
+ CubemapStateProto serialize(std::unordered_map<const std::string *, size_t> *short_response_pool);
+ void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
int add_stream_from_serialized(const StreamProto &stream, int data_fd);
int lookup_stream_by_url(const std::string &url) const;
{
CubemapStateProto state;
+ unordered_map<const string *, size_t> short_response_pool;
+
for (int i = 0; i < num_servers; ++i) {
- CubemapStateProto local_state = servers[i].serialize();
+ CubemapStateProto local_state = servers[i].serialize(&short_response_pool);
// The stream state should be identical between the servers, so we only store it once,
// save for the fds, which we keep around to distribute to the servers after re-exec.
}
}
+ for (size_t i = 0; i < short_response_pool.size(); ++i) {
+ state.mutable_short_response_pool()->Add();
+ }
+ for (const auto &string_and_index : short_response_pool) {
+ state.mutable_short_response_pool(string_and_index.second)->set_header_or_short_response(*string_and_index.first);
+ }
+
return state;
}
servers[clients_added++ % num_servers].add_client_deferred(sock, acceptor);
}
-void ServerPool::add_client_from_serialized(const ClientProto &client)
+void ServerPool::add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses)
{
- servers[clients_added++ % num_servers].add_client_from_serialized(client);
+ servers[clients_added++ % num_servers].add_client_from_serialized(client, short_responses);
}
int ServerPool::lookup_stream_by_url(const string &url) const
// Picks a server (round-robin) and allocates the given client to it.
void add_client(int sock, Acceptor *acceptor);
- void add_client_from_serialized(const ClientProto &client);
+ void add_client_from_serialized(const ClientProto &client, const std::vector<std::shared_ptr<const std::string>> &short_responses);
// Adds the given stream to all the servers. Returns the stream index.
int add_stream(const std::string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
optional int32 state = 2;
optional bytes request = 3;
optional string url = 4;
- optional bytes header_or_short_response = 5;
+ optional bytes header_or_short_response_old = 5; // Only for pre-1.4.0 servers. Use header_or_short_response_index instead.
+ optional int64 header_or_short_response_index = 21; // Index into CubemapStateProto.short_response_pool.
optional int64 header_or_short_response_bytes_sent = 6;
optional int64 stream_pos = 7;
optional int64 stream_pos_end = 20 [default=-1];
optional bytes private_key = 5;
};
+// For deduplicating strings in header_or_short_response.
+message ShortResponsePool {
+ optional bytes header_or_short_response = 1;
+};
+
message CubemapStateProto {
optional int64 serialize_start_sec = 6;
optional int64 serialize_start_usec = 7;
repeated StreamProto streams = 2;
repeated InputProto inputs = 5;
repeated AcceptorProto acceptors = 8;
+ repeated ShortResponsePool short_response_pool = 9;
};