From 4934a0983fee26765a3c1a5b6bf5834ba6e7e52c Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sun, 16 Aug 2015 00:50:05 +0200 Subject: [PATCH] Add a simple HTTP endpoint that returns a very short string. The intention is for clients to be able to probe the endpoint to figure out which server is the fastest. To this end, it supports CORS headers so that XHR is allowed to differentiate servers that are down from servers that respond properly. --- config.cpp | 24 +++++++++++++++ config.h | 6 ++++ cubemap.config.sample | 10 ++++++ main.cpp | 6 ++++ server.cpp | 71 ++++++++++++++++++++++++++++++++++++------- server.h | 12 ++++++-- serverpool.cpp | 7 +++++ serverpool.h | 3 ++ 8 files changed, 126 insertions(+), 13 deletions(-) diff --git a/config.cpp b/config.cpp index aa0f670..9ff0a03 100644 --- a/config.cpp +++ b/config.cpp @@ -322,6 +322,26 @@ bool parse_udpstream(const ConfigLine &line, Config *config) return true; } +bool parse_ping(const ConfigLine &line, Config *config) +{ + if (line.arguments.size() != 1) { + log(ERROR, "'ping' takes exactly one argument"); + return false; + } + + PingConfig ping; + ping.url = line.arguments[0]; + + // Parse the CORS origin, if it exists. + map::const_iterator allow_origin_it = line.parameters.find("allow_origin"); + if (allow_origin_it != line.parameters.end()) { + ping.allow_origin = allow_origin_it->second; + } + + config->pings.push_back(ping); + return true; +} + bool parse_error_log(const ConfigLine &line, Config *config) { if (line.arguments.size() != 0) { @@ -423,6 +443,10 @@ bool parse_config(const string &filename, Config *config) if (!parse_udpstream(line, config)) { return false; } + } else if (line.keyword == "ping") { + if (!parse_ping(line, config)) { + return false; + } } else if (line.keyword == "error_log") { if (!parse_error_log(line, config)) { return false; diff --git a/config.h b/config.h index e2dcb43..5a006ca 100644 --- a/config.h +++ b/config.h @@ -26,6 +26,11 @@ struct UDPStreamConfig { int multicast_iface_index; // Default is -1 (use operating system default). }; +struct PingConfig { + std::string url; // As seen by the client. + std::string allow_origin; // Can be empty. +}; + struct AcceptorConfig { sockaddr_in6 addr; }; @@ -40,6 +45,7 @@ struct Config { int num_servers; std::vector streams; std::vector udpstreams; + std::vector pings; std::vector acceptors; std::vector log_destinations; diff --git a/cubemap.config.sample b/cubemap.config.sample index d15e861..b03ea65 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -91,3 +91,13 @@ udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts # IPv4 multicast output, to the given group. You can explicitly set the TTL # and/or multicast output interface, if the defaults do not suit you. udpstream 233.252.0.1:5002 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube ttl=32 multicast_output_interface=eth1 + +# A type of HTTP resource that is not a stream, but rather just a very simple +# document that contains “pong” and nothing else. allow_origin= is optional; +# if it is set, the response will contain an Access-Control-Allow-Origin header +# with the given value, allowing the ping response to be read (and +# differentiated from an error) from a remote domain using XHR. +# +# If you have a stream and a ping endpoint with the same URL, the stream takes +# precedence and the ping endpoint is silently ignored. +ping /ping allow_origin=* diff --git a/main.cpp b/main.cpp index eccf6d7..faf4be8 100644 --- a/main.cpp +++ b/main.cpp @@ -231,6 +231,12 @@ void create_streams(const Config &config, ++input_it->second.refcount; } } + + // HTTP ping endpoints. + for (unsigned i = 0; i < config.pings.size(); ++i) { + const PingConfig &ping_config = config.pings[i]; + servers->add_ping(ping_config.url, ping_config.allow_origin); + } } void open_logs(const vector &log_destinations) diff --git a/server.cpp b/server.cpp index 615e8fb..9d15e08 100644 --- a/server.cpp +++ b/server.cpp @@ -288,17 +288,17 @@ void Server::add_client_from_serialized(const ClientProto &client) int Server::lookup_stream_by_url(const string &url) const { - map::const_iterator url_it = url_map.find(url); - if (url_it == url_map.end()) { + map::const_iterator stream_url_it = stream_url_map.find(url); + if (stream_url_it == stream_url_map.end()) { return -1; } - return url_it->second; + return stream_url_it->second; } int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding) { MutexLock lock(&mutex); - url_map.insert(make_pair(url, streams.size())); + stream_url_map.insert(make_pair(url, streams.size())); streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding)); return streams.size() - 1; } @@ -306,7 +306,7 @@ int Server::add_stream(const string &url, size_t backlog_size, size_t prebufferi int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) { MutexLock lock(&mutex); - url_map.insert(make_pair(stream.url(), streams.size())); + stream_url_map.insert(make_pair(stream.url(), streams.size())); streams.push_back(new Stream(stream, data_fd)); return streams.size() - 1; } @@ -359,6 +359,13 @@ void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate) streams[stream_index]->pacing_rate = pacing_rate; } +void Server::add_ping(const std::string &url, const std::string &allow_origin) +{ + MutexLock lock(&mutex); + assert(clients.empty()); + ping_url_map[url] = allow_origin; +} + void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start) { assert(stream_index >= 0 && stream_index < ssize_t(streams.size())); @@ -418,6 +425,8 @@ read_request_again: int error_code = parse_request(client); if (error_code == 200) { construct_header(client); + } else if (error_code == -200) { + construct_pong(client); } else { construct_error(client, error_code); } @@ -636,6 +645,7 @@ int Server::parse_request(Client *client) } string url = request_tokens[1]; + client->url = url; if (url.find("?backlog") == url.size() - 8) { client->stream_pos = -2; url = url.substr(0, url.size() - 8); @@ -643,18 +653,21 @@ int Server::parse_request(Client *client) client->stream_pos = -1; } - map::const_iterator url_map_it = url_map.find(url); - if (url_map_it == url_map.end()) { - return 404; // Not found. + map::const_iterator stream_url_map_it = stream_url_map.find(url); + if (stream_url_map_it == stream_url_map.end()) { + map::const_iterator ping_url_map_it = ping_url_map.find(url); + if (ping_url_map_it == ping_url_map.end()) { + return 404; // Not found. + } else { + return -200; // Special internal error code for pong. + } } - Stream *stream = streams[url_map_it->second]; + Stream *stream = streams[stream_url_map_it->second]; if (stream->http_header.empty()) { return 503; // Service unavailable. } - client->url = request_tokens[1]; - client->stream = stream; if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) { if (client->stream->pacing_rate != ~0U) { @@ -724,6 +737,42 @@ void Server::construct_error(Client *client, int error_code) } } +void Server::construct_pong(Client *client) +{ + map::const_iterator ping_url_map_it = ping_url_map.find(client->url); + assert(ping_url_map_it != ping_url_map.end()); + + if (ping_url_map_it->second.empty()) { + client->header_or_short_response = + "HTTP/1.0 200 OK\r\n" + "Content-type: text/plain\r\n" + "\r\n" + "Pong!\r\n"; + } else { + char pong[256]; + snprintf(pong, 256, + "HTTP/1.0 200 OK\r\n" + "Content-type: text/plain\r\n" + "Access-Control-Allow-Origin: %s\r\n" + "\r\n" + "Pong!\r\n", + ping_url_map_it->second.c_str()); + client->header_or_short_response = pong; + } + + // Switch states. + client->state = Client::SENDING_SHORT_RESPONSE; + + epoll_event ev; + ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP; + ev.data.u64 = reinterpret_cast(client); + + if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { + log_perror("epoll_ctl(EPOLL_CTL_MOD)"); + exit(1); + } +} + template void delete_from(vector *v, T elem) { diff --git a/server.h b/server.h index cfc9d25..7297fd7 100644 --- a/server.h +++ b/server.h @@ -61,6 +61,7 @@ public: void set_backlog_size(int stream_index, size_t new_size); void set_prebuffering_bytes(int stream_index, size_t new_amount); void set_encoding(int stream_index, Stream::Encoding encoding); + void add_ping(const std::string &url, const std::string &allow_origin); private: // Mutex protecting queued_add_clients. @@ -87,7 +88,10 @@ private: std::vector streams; // Map from URL to index into . - std::map url_map; + std::map stream_url_map; + + // Map from URL to CORS Allow-Origin header (or empty string). + std::map ping_url_map; // Map from file descriptor to client. std::map clients; @@ -130,7 +134,8 @@ private: // Close a given client socket, and clean up after it. void close_client(Client *client); - // Parse the HTTP request. Returns a HTTP status code (200/400/404). + // Parse the HTTP request. Returns a HTTP status code (200/400/404), + // or -200 for a pong (which should be answered with 200). int parse_request(Client *client); // Construct the HTTP header, and set the client into @@ -141,6 +146,9 @@ private: // the SENDING_SHORT_RESPONSE state. void construct_error(Client *client, int error_code); + // Construct a pong, and set the client into the SENDING_SHORT_RESPONSE state. + void construct_pong(Client *client); + void process_queued_data(); void skip_lost_data(Client *client); diff --git a/serverpool.cpp b/serverpool.cpp index a598ecd..3159f33 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -163,6 +163,13 @@ void ServerPool::add_data(int stream_index, const char *data, size_t bytes, Stre } } +void ServerPool::add_ping(const std::string &url, const std::string &allow_origin) +{ + for (int i = 0; i < num_servers; ++i) { + servers[i].add_ping(url, allow_origin); + } +} + void ServerPool::run() { for (int i = 0; i < num_servers; ++i) { diff --git a/serverpool.h b/serverpool.h index e0c59cd..cc37e17 100644 --- a/serverpool.h +++ b/serverpool.h @@ -55,6 +55,9 @@ public: // Changes the given stream's encoding type on all the servers. void set_encoding(int stream_index, Stream::Encoding encoding); + // Adds the given ping endpoint to all the servers. + void add_ping(const std::string &url, const std::string &allow_origin); + // Starts all the servers. void run(); -- 2.39.2