Add a simple HTTP endpoint that returns a very short string.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 15 Aug 2015 22:50:05 +0000 (00:50 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 15 Aug 2015 22:50:05 +0000 (00:50 +0200)
The intention is for clients to be able to probe the endpoint
to figure out which server is the fastest. To this end,
it supports CORS headers so that XHR is allowed to differentiate
servers that are down from servers that respond properly.

config.cpp
config.h
cubemap.config.sample
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h

index aa0f670..9ff0a03 100644 (file)
@@ -322,6 +322,26 @@ bool parse_udpstream(const ConfigLine &line, Config *config)
        return true;
 }
 
+bool parse_ping(const ConfigLine &line, Config *config)
+{
+       if (line.arguments.size() != 1) {
+               log(ERROR, "'ping' takes exactly one argument");
+               return false;
+       }
+
+       PingConfig ping;
+       ping.url = line.arguments[0];
+
+       // Parse the CORS origin, if it exists.
+       map<string, string>::const_iterator allow_origin_it = line.parameters.find("allow_origin");
+       if (allow_origin_it != line.parameters.end()) {
+               ping.allow_origin = allow_origin_it->second;
+       }
+
+       config->pings.push_back(ping);
+       return true;
+}
+
 bool parse_error_log(const ConfigLine &line, Config *config)
 {
        if (line.arguments.size() != 0) {
@@ -423,6 +443,10 @@ bool parse_config(const string &filename, Config *config)
                        if (!parse_udpstream(line, config)) {
                                return false;
                        }
+               } else if (line.keyword == "ping") {
+                       if (!parse_ping(line, config)) {
+                               return false;
+                       }
                } else if (line.keyword == "error_log") {
                        if (!parse_error_log(line, config)) {
                                return false;
index e2dcb43..5a006ca 100644 (file)
--- a/config.h
+++ b/config.h
@@ -26,6 +26,11 @@ struct UDPStreamConfig {
        int multicast_iface_index;  // Default is -1 (use operating system default).
 };
 
+struct PingConfig {
+       std::string url;  // As seen by the client.
+       std::string allow_origin;  // Can be empty.
+};
+
 struct AcceptorConfig {
        sockaddr_in6 addr;
 };
@@ -40,6 +45,7 @@ struct Config {
        int num_servers;
        std::vector<StreamConfig> streams;
        std::vector<UDPStreamConfig> udpstreams;
+       std::vector<PingConfig> pings;
        std::vector<AcceptorConfig> acceptors;
        std::vector<LogConfig> log_destinations;
 
index d15e861..b03ea65 100644 (file)
@@ -91,3 +91,13 @@ udpstream 193.35.52.50:5001 src=http://pannekake.samfundet.no:9094/frikanalen.ts
 # IPv4 multicast output, to the given group. You can explicitly set the TTL
 # and/or multicast output interface, if the defaults do not suit you.
 udpstream 233.252.0.1:5002 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube ttl=32 multicast_output_interface=eth1
+
+# A type of HTTP resource that is not a stream, but rather just a very simple
+# document that contains “pong” and nothing else. allow_origin= is optional;
+# if it is set, the response will contain an Access-Control-Allow-Origin header
+# with the given value, allowing the ping response to be read (and
+# differentiated from an error) from a remote domain using XHR.
+#
+# If you have a stream and a ping endpoint with the same URL, the stream takes
+# precedence and the ping endpoint is silently ignored.
+ping /ping allow_origin=*
index eccf6d7..faf4be8 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -231,6 +231,12 @@ void create_streams(const Config &config,
                        ++input_it->second.refcount;
                }
        }
+
+       // HTTP ping endpoints.
+       for (unsigned i = 0; i < config.pings.size(); ++i) {
+               const PingConfig &ping_config = config.pings[i];
+               servers->add_ping(ping_config.url, ping_config.allow_origin);
+       }
 }
        
 void open_logs(const vector<LogConfig> &log_destinations)
index 615e8fb..9d15e08 100644 (file)
@@ -288,17 +288,17 @@ void Server::add_client_from_serialized(const ClientProto &client)
 
 int Server::lookup_stream_by_url(const string &url) const
 {
-       map<string, int>::const_iterator url_it = url_map.find(url);
-       if (url_it == url_map.end()) {
+       map<string, int>::const_iterator stream_url_it = stream_url_map.find(url);
+       if (stream_url_it == stream_url_map.end()) {
                return -1;
        }
-       return url_it->second;
+       return stream_url_it->second;
 }
 
 int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       url_map.insert(make_pair(url, streams.size()));
+       stream_url_map.insert(make_pair(url, streams.size()));
        streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
        return streams.size() - 1;
 }
@@ -306,7 +306,7 @@ int Server::add_stream(const string &url, size_t backlog_size, size_t prebufferi
 int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        MutexLock lock(&mutex);
-       url_map.insert(make_pair(stream.url(), streams.size()));
+       stream_url_map.insert(make_pair(stream.url(), streams.size()));
        streams.push_back(new Stream(stream, data_fd));
        return streams.size() - 1;
 }
@@ -359,6 +359,13 @@ void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
        streams[stream_index]->pacing_rate = pacing_rate;
 }
 
+void Server::add_ping(const std::string &url, const std::string &allow_origin)
+{
+       MutexLock lock(&mutex);
+       assert(clients.empty());
+       ping_url_map[url] = allow_origin;
+}
+
 void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
 {
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
@@ -418,6 +425,8 @@ read_request_again:
                int error_code = parse_request(client);
                if (error_code == 200) {
                        construct_header(client);
+               } else if (error_code == -200) {
+                       construct_pong(client);
                } else {
                        construct_error(client, error_code);
                }
@@ -636,6 +645,7 @@ int Server::parse_request(Client *client)
        }
 
        string url = request_tokens[1];
+       client->url = url;
        if (url.find("?backlog") == url.size() - 8) {
                client->stream_pos = -2;
                url = url.substr(0, url.size() - 8);
@@ -643,18 +653,21 @@ int Server::parse_request(Client *client)
                client->stream_pos = -1;
        }
 
-       map<string, int>::const_iterator url_map_it = url_map.find(url);
-       if (url_map_it == url_map.end()) {
-               return 404;  // Not found.
+       map<string, int>::const_iterator stream_url_map_it = stream_url_map.find(url);
+       if (stream_url_map_it == stream_url_map.end()) {
+               map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
+               if (ping_url_map_it == ping_url_map.end()) {
+                       return 404;  // Not found.
+               } else {
+                       return -200;  // Special internal error code for pong.
+               }
        }
 
-       Stream *stream = streams[url_map_it->second];
+       Stream *stream = streams[stream_url_map_it->second];
        if (stream->http_header.empty()) {
                return 503;  // Service unavailable.
        }
 
-       client->url = request_tokens[1];
-
        client->stream = stream;
        if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) {
                if (client->stream->pacing_rate != ~0U) {
@@ -724,6 +737,42 @@ void Server::construct_error(Client *client, int error_code)
        }
 }
 
+void Server::construct_pong(Client *client)
+{
+       map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
+       assert(ping_url_map_it != ping_url_map.end());
+
+       if (ping_url_map_it->second.empty()) {
+               client->header_or_short_response =
+                       "HTTP/1.0 200 OK\r\n"
+                       "Content-type: text/plain\r\n"
+                       "\r\n"
+                       "Pong!\r\n";
+       } else {
+               char pong[256];
+               snprintf(pong, 256,
+                        "HTTP/1.0 200 OK\r\n"
+                        "Content-type: text/plain\r\n"
+                        "Access-Control-Allow-Origin: %s\r\n"
+                        "\r\n"
+                        "Pong!\r\n",
+                        ping_url_map_it->second.c_str());
+               client->header_or_short_response = pong;
+       }
+
+       // Switch states.
+       client->state = Client::SENDING_SHORT_RESPONSE;
+
+       epoll_event ev;
+       ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
+       ev.data.u64 = reinterpret_cast<uint64_t>(client);
+
+       if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
+               log_perror("epoll_ctl(EPOLL_CTL_MOD)");
+               exit(1);
+       }
+}
+
 template<class T>
 void delete_from(vector<T> *v, T elem)
 {
index cfc9d25..7297fd7 100644 (file)
--- a/server.h
+++ b/server.h
@@ -61,6 +61,7 @@ public:
        void set_backlog_size(int stream_index, size_t new_size);
        void set_prebuffering_bytes(int stream_index, size_t new_amount);
        void set_encoding(int stream_index, Stream::Encoding encoding);
+       void add_ping(const std::string &url, const std::string &allow_origin);
 
 private:
        // Mutex protecting queued_add_clients.
@@ -87,7 +88,10 @@ private:
        std::vector<Stream *> streams;
 
        // Map from URL to index into <streams>.
-       std::map<std::string, int> url_map;
+       std::map<std::string, int> stream_url_map;
+
+       // Map from URL to CORS Allow-Origin header (or empty string).
+       std::map<std::string, std::string> ping_url_map;
 
        // Map from file descriptor to client.
        std::map<int, Client> clients;
@@ -130,7 +134,8 @@ private:
        // Close a given client socket, and clean up after it.
        void close_client(Client *client);
 
-       // Parse the HTTP request. Returns a HTTP status code (200/400/404).
+       // Parse the HTTP request. Returns a HTTP status code (200/400/404),
+       // or -200 for a pong (which should be answered with 200).
        int parse_request(Client *client);
 
        // Construct the HTTP header, and set the client into
@@ -141,6 +146,9 @@ private:
        // the SENDING_SHORT_RESPONSE state.
        void construct_error(Client *client, int error_code);
 
+       // Construct a pong, and set the client into the SENDING_SHORT_RESPONSE state.
+       void construct_pong(Client *client);
+
        void process_queued_data();
        void skip_lost_data(Client *client);
 
index a598ecd..3159f33 100644 (file)
@@ -163,6 +163,13 @@ void ServerPool::add_data(int stream_index, const char *data, size_t bytes, Stre
        }
 }
 
+void ServerPool::add_ping(const std::string &url, const std::string &allow_origin)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].add_ping(url, allow_origin);
+       }
+}
+
 void ServerPool::run()
 {
        for (int i = 0; i < num_servers; ++i) {
index e0c59cd..cc37e17 100644 (file)
@@ -55,6 +55,9 @@ public:
        // Changes the given stream's encoding type on all the servers.
        void set_encoding(int stream_index, Stream::Encoding encoding);
 
+       // Adds the given ping endpoint to all the servers.
+       void add_ping(const std::string &url, const std::string &allow_origin);
+
        // Starts all the servers.
        void run();