return true;
}
+bool parse_ping(const ConfigLine &line, Config *config)
+{
+ if (line.arguments.size() != 1) {
+ log(ERROR, "'ping' takes exactly one argument");
+ return false;
+ }
+
+ PingConfig ping;
+ ping.url = line.arguments[0];
+
+ // Parse the CORS origin, if it exists.
+ map<string, string>::const_iterator allow_origin_it = line.parameters.find("allow_origin");
+ if (allow_origin_it != line.parameters.end()) {
+ ping.allow_origin = allow_origin_it->second;
+ }
+
+ config->pings.push_back(ping);
+ return true;
+}
+
bool parse_error_log(const ConfigLine &line, Config *config)
{
if (line.arguments.size() != 0) {
if (!parse_udpstream(line, config)) {
return false;
}
+ } else if (line.keyword == "ping") {
+ if (!parse_ping(line, config)) {
+ return false;
+ }
} else if (line.keyword == "error_log") {
if (!parse_error_log(line, config)) {
return false;
int multicast_iface_index; // Default is -1 (use operating system default).
};
+struct PingConfig {
+ std::string url; // As seen by the client.
+ std::string allow_origin; // Can be empty.
+};
+
struct AcceptorConfig {
sockaddr_in6 addr;
};
int num_servers;
std::vector<StreamConfig> streams;
std::vector<UDPStreamConfig> udpstreams;
+ std::vector<PingConfig> pings;
std::vector<AcceptorConfig> acceptors;
std::vector<LogConfig> log_destinations;
# IPv4 multicast output, to the given group. You can explicitly set the TTL
# and/or multicast output interface, if the defaults do not suit you.
udpstream 233.252.0.1:5002 src=http://pannekake.samfundet.no:9094/frikanalen.ts.metacube ttl=32 multicast_output_interface=eth1
+
+# A type of HTTP resource that is not a stream, but rather just a very simple
+# document that contains “pong” and nothing else. allow_origin= is optional;
+# if it is set, the response will contain an Access-Control-Allow-Origin header
+# with the given value, allowing the ping response to be read (and
+# differentiated from an error) from a remote domain using XHR.
+#
+# If you have a stream and a ping endpoint with the same URL, the stream takes
+# precedence and the ping endpoint is silently ignored.
+ping /ping allow_origin=*
++input_it->second.refcount;
}
}
+
+ // HTTP ping endpoints.
+ for (unsigned i = 0; i < config.pings.size(); ++i) {
+ const PingConfig &ping_config = config.pings[i];
+ servers->add_ping(ping_config.url, ping_config.allow_origin);
+ }
}
void open_logs(const vector<LogConfig> &log_destinations)
int Server::lookup_stream_by_url(const string &url) const
{
- map<string, int>::const_iterator url_it = url_map.find(url);
- if (url_it == url_map.end()) {
+ map<string, int>::const_iterator stream_url_it = stream_url_map.find(url);
+ if (stream_url_it == stream_url_map.end()) {
return -1;
}
- return url_it->second;
+ return stream_url_it->second;
}
int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- url_map.insert(make_pair(url, streams.size()));
+ stream_url_map.insert(make_pair(url, streams.size()));
streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
return streams.size() - 1;
}
int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
MutexLock lock(&mutex);
- url_map.insert(make_pair(stream.url(), streams.size()));
+ stream_url_map.insert(make_pair(stream.url(), streams.size()));
streams.push_back(new Stream(stream, data_fd));
return streams.size() - 1;
}
streams[stream_index]->pacing_rate = pacing_rate;
}
+void Server::add_ping(const std::string &url, const std::string &allow_origin)
+{
+ MutexLock lock(&mutex);
+ assert(clients.empty());
+ ping_url_map[url] = allow_origin;
+}
+
void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
{
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
int error_code = parse_request(client);
if (error_code == 200) {
construct_header(client);
+ } else if (error_code == -200) {
+ construct_pong(client);
} else {
construct_error(client, error_code);
}
}
string url = request_tokens[1];
+ client->url = url;
if (url.find("?backlog") == url.size() - 8) {
client->stream_pos = -2;
url = url.substr(0, url.size() - 8);
client->stream_pos = -1;
}
- map<string, int>::const_iterator url_map_it = url_map.find(url);
- if (url_map_it == url_map.end()) {
- return 404; // Not found.
+ map<string, int>::const_iterator stream_url_map_it = stream_url_map.find(url);
+ if (stream_url_map_it == stream_url_map.end()) {
+ map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(url);
+ if (ping_url_map_it == ping_url_map.end()) {
+ return 404; // Not found.
+ } else {
+ return -200; // Special internal error code for pong.
+ }
}
- Stream *stream = streams[url_map_it->second];
+ Stream *stream = streams[stream_url_map_it->second];
if (stream->http_header.empty()) {
return 503; // Service unavailable.
}
- client->url = request_tokens[1];
-
client->stream = stream;
if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) {
if (client->stream->pacing_rate != ~0U) {
}
}
+void Server::construct_pong(Client *client)
+{
+ map<string, string>::const_iterator ping_url_map_it = ping_url_map.find(client->url);
+ assert(ping_url_map_it != ping_url_map.end());
+
+ if (ping_url_map_it->second.empty()) {
+ client->header_or_short_response =
+ "HTTP/1.0 200 OK\r\n"
+ "Content-type: text/plain\r\n"
+ "\r\n"
+ "Pong!\r\n";
+ } else {
+ char pong[256];
+ snprintf(pong, 256,
+ "HTTP/1.0 200 OK\r\n"
+ "Content-type: text/plain\r\n"
+ "Access-Control-Allow-Origin: %s\r\n"
+ "\r\n"
+ "Pong!\r\n",
+ ping_url_map_it->second.c_str());
+ client->header_or_short_response = pong;
+ }
+
+ // Switch states.
+ client->state = Client::SENDING_SHORT_RESPONSE;
+
+ epoll_event ev;
+ ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
+ ev.data.u64 = reinterpret_cast<uint64_t>(client);
+
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
+ log_perror("epoll_ctl(EPOLL_CTL_MOD)");
+ exit(1);
+ }
+}
+
template<class T>
void delete_from(vector<T> *v, T elem)
{
void set_backlog_size(int stream_index, size_t new_size);
void set_prebuffering_bytes(int stream_index, size_t new_amount);
void set_encoding(int stream_index, Stream::Encoding encoding);
+ void add_ping(const std::string &url, const std::string &allow_origin);
private:
// Mutex protecting queued_add_clients.
std::vector<Stream *> streams;
// Map from URL to index into <streams>.
- std::map<std::string, int> url_map;
+ std::map<std::string, int> stream_url_map;
+
+ // Map from URL to CORS Allow-Origin header (or empty string).
+ std::map<std::string, std::string> ping_url_map;
// Map from file descriptor to client.
std::map<int, Client> clients;
// Close a given client socket, and clean up after it.
void close_client(Client *client);
- // Parse the HTTP request. Returns a HTTP status code (200/400/404).
+ // Parse the HTTP request. Returns a HTTP status code (200/400/404),
+ // or -200 for a pong (which should be answered with 200).
int parse_request(Client *client);
// Construct the HTTP header, and set the client into
// the SENDING_SHORT_RESPONSE state.
void construct_error(Client *client, int error_code);
+ // Construct a pong, and set the client into the SENDING_SHORT_RESPONSE state.
+ void construct_pong(Client *client);
+
void process_queued_data();
void skip_lost_data(Client *client);
}
}
+void ServerPool::add_ping(const std::string &url, const std::string &allow_origin)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].add_ping(url, allow_origin);
+ }
+}
+
void ServerPool::run()
{
for (int i = 0; i < num_servers; ++i) {
// Changes the given stream's encoding type on all the servers.
void set_encoding(int stream_index, Stream::Encoding encoding);
+ // Adds the given ping endpoint to all the servers.
+ void add_ping(const std::string &url, const std::string &allow_origin);
+
// Starts all the servers.
void run();