From 742f56c25bdd54dc438b21cb5e422de520bc7231 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 11 Apr 2013 21:08:14 +0200 Subject: [PATCH 01/16] Move version identification into a common place. --- httpinput.cpp | 5 +++-- main.cpp | 3 ++- version.h | 9 +++++++++ 3 files changed, 14 insertions(+), 3 deletions(-) create mode 100644 version.h diff --git a/httpinput.cpp b/httpinput.cpp index 1db4f91..012f5bc 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -23,6 +23,7 @@ #include "server.h" #include "serverpool.h" #include "parse.h" +#include "version.h" #include "state.pb.h" using namespace std; @@ -163,7 +164,7 @@ bool HTTPInput::parse_response(const std::string &request) // TODO: Make case-insensitive. // XXX: Use a Via: instead? if (parameters.count("Server") == 0) { - parameters.insert(make_pair("Server", "metacube/0.1")); + parameters.insert(make_pair("Server", SERVER_IDENTIFICATION)); } else { for (multimap::iterator it = parameters.begin(); it != parameters.end(); @@ -171,7 +172,7 @@ bool HTTPInput::parse_response(const std::string &request) if (it->first != "Server") { continue; } - it->second = "metacube/0.1 (reflecting: " + it->second + ")"; + it->second = SERVER_IDENTIFICATION " (reflecting: " + it->second + ")"; } } diff --git a/main.cpp b/main.cpp index 04d0d3d..e258c86 100644 --- a/main.cpp +++ b/main.cpp @@ -27,6 +27,7 @@ #include "input.h" #include "httpinput.h" #include "stats.h" +#include "version.h" #include "state.pb.h" using namespace std; @@ -323,7 +324,7 @@ void create_streams(const vector &config, int main(int argc, char **argv) { - fprintf(stderr, "\nCubemap starting.\n"); + fprintf(stderr, "\nCubemap " SERVER_VERSION " starting.\n"); struct timeval serialize_start; bool is_reexec = false; diff --git a/version.h b/version.h new file mode 100644 index 0000000..5b5596a --- /dev/null +++ b/version.h @@ -0,0 +1,9 @@ +#ifndef _VERSION_H +#define _VERSION_H + +// Version number. Don't expect this to change all that often. + +#define SERVER_VERSION "0.1" +#define SERVER_IDENTIFICATION "Cubemap/" SERVER_VERSION + +#endif // !defined(_VERSION_H) -- 2.39.2 From 7c4707af4ea7af05e34bc0414d18d9fd54e4f23e Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 11 Apr 2013 21:31:11 +0200 Subject: [PATCH 02/16] More support for multiple input types. --- input.cpp | 26 ++++++++++++++++++++++++++ input.h | 6 ++++++ main.cpp | 5 ++--- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/input.cpp b/input.cpp index 1e61780..00b6547 100644 --- a/input.cpp +++ b/input.cpp @@ -1,7 +1,9 @@ #include #include +#include "httpinput.h" #include "input.h" +#include "state.pb.h" using namespace std; @@ -50,5 +52,29 @@ bool parse_url(const string &url, string *protocol, string *host, string *port, return true; } +Input *create_input(const std::string &stream_id, const std::string &url) +{ + string protocol, host, port, path; + if (!parse_url(url, &protocol, &host, &port, &path)) { + return NULL; + } + if (protocol == "http") { + return new HTTPInput(stream_id, url); + } + return NULL; +} + +Input *create_input(const InputProto &serialized) +{ + string protocol, host, port, path; + if (!parse_url(serialized.url(), &protocol, &host, &port, &path)) { + return NULL; + } + if (protocol == "http") { + return new HTTPInput(serialized); + } + return NULL; +} + Input::~Input() {} diff --git a/input.h b/input.h index 3e86510..50cfeaa 100644 --- a/input.h +++ b/input.h @@ -5,11 +5,17 @@ #include "thread.h" +class Input; class InputProto; // Extremely rudimentary URL parsing. bool parse_url(const std::string &url, std::string *protocol, std::string *host, std::string *port, std::string *path); +// Figure out the right type of input based on the URL, and create a new Input of the right type. +// Will return NULL if unknown. +Input *create_input(const std::string &stream_id, const std::string &url); +Input *create_input(const InputProto &serialized); + class Input : public Thread { public: virtual ~Input(); diff --git a/main.cpp b/main.cpp index e258c86..03ef0d7 100644 --- a/main.cpp +++ b/main.cpp @@ -25,7 +25,6 @@ #include "server.h" #include "serverpool.h" #include "input.h" -#include "httpinput.h" #include "stats.h" #include "version.h" #include "state.pb.h" @@ -269,7 +268,7 @@ vector create_inputs(const vector &config, deserialized_inputs->erase(deserialized_input_it); } if (input == NULL) { - input = new HTTPInput(stream_id, src); + input = create_input(stream_id, src); } input->run(); inputs.push_back(input); @@ -360,7 +359,7 @@ int main(int argc, char **argv) for (int i = 0; i < loaded_state.inputs_size(); ++i) { deserialized_inputs.insert(make_pair( loaded_state.inputs(i).stream_id(), - new HTTPInput(loaded_state.inputs(i)))); + create_input(loaded_state.inputs(i)))); } // Convert the acceptor from older serialized formats. -- 2.39.2 From d60b2c17baf9b71e4a70a499fa1d34753821c822 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 11 Apr 2013 21:36:55 +0200 Subject: [PATCH 03/16] Do not keep pending data across HTTP connections. --- httpinput.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/httpinput.cpp b/httpinput.cpp index 012f5bc..46f1358 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -215,6 +215,7 @@ void HTTPInput::do_work() request.clear(); request_bytes_sent = 0; response.clear(); + pending_data.clear(); { string protocol; // Thrown away. -- 2.39.2 From f583e4d329222d8ce2a11524e924c02139b4f28d Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 11 Apr 2013 21:46:10 +0200 Subject: [PATCH 04/16] When create_input() fails, give an error message instead of crashing. --- main.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/main.cpp b/main.cpp index 03ef0d7..25f82eb 100644 --- a/main.cpp +++ b/main.cpp @@ -269,6 +269,11 @@ vector create_inputs(const vector &config, } if (input == NULL) { input = create_input(stream_id, src); + if (input == NULL) { + fprintf(stderr, "ERROR: did not understand URL '%s', clients will not get any data.\n", + src.c_str()); + continue; + } } input->run(); inputs.push_back(input); -- 2.39.2 From ca9624c43b968a0f29ea44e47851ff686bb64bb6 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Thu, 11 Apr 2013 23:34:03 +0200 Subject: [PATCH 05/16] Support UDP input. Also fix some issues with socket closing. --- Makefile | 2 +- acceptor.cpp | 18 +++++-- acceptor.h | 6 ++- cubemap.config.sample | 1 + httpinput.cpp | 12 +++++ httpinput.h | 2 + input.cpp | 21 +++++--- input.h | 1 + main.cpp | 3 +- udpinput.cpp | 121 ++++++++++++++++++++++++++++++++++++++++++ udpinput.h | 39 ++++++++++++++ 11 files changed, 211 insertions(+), 15 deletions(-) create mode 100644 udpinput.cpp create mode 100644 udpinput.h diff --git a/Makefile b/Makefile index 5367daa..bc3511a 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g LDLIBS=-lpthread -lprotobuf -OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o +OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o all: cubemap diff --git a/acceptor.cpp b/acceptor.cpp index 87bf47b..fbc5b04 100644 --- a/acceptor.cpp +++ b/acceptor.cpp @@ -17,9 +17,15 @@ using namespace std; extern ServerPool *servers; extern volatile bool hupped; -int create_server_socket(int port) +int create_server_socket(int port, SocketType socket_type) { - int server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + int server_sock; + if (socket_type == TCP_SOCKET) { + server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + } else { + assert(socket_type == UDP_SOCKET); + server_sock = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP); + } if (server_sock == -1) { perror("socket"); exit(1); @@ -54,9 +60,11 @@ int create_server_socket(int port) exit(1); } - if (listen(server_sock, 128) == -1) { - perror("listen"); - exit(1); + if (socket_type == TCP_SOCKET) { + if (listen(server_sock, 128) == -1) { + perror("listen"); + exit(1); + } } return server_sock; diff --git a/acceptor.h b/acceptor.h index caa4647..e0a1bd5 100644 --- a/acceptor.h +++ b/acceptor.h @@ -3,7 +3,11 @@ #include "thread.h" -int create_server_socket(int port); +enum SocketType { + TCP_SOCKET, + UDP_SOCKET, +}; +int create_server_socket(int port, SocketType socket_type); class AcceptorProto; diff --git a/cubemap.config.sample b/cubemap.config.sample index 5773927..852e0ca 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -12,3 +12,4 @@ stats_interval 60 # now the streams! # stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000 +stream /udp.ts src=udp://@:1234 diff --git a/httpinput.cpp b/httpinput.cpp index 46f1358..354e4b9 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -57,6 +57,18 @@ HTTPInput::HTTPInput(const InputProto &serialized) parse_url(url, &protocol, &host, &port, &path); // Don't care if it fails. } +void HTTPInput::close_socket() +{ + int ret; + do { + ret = close(sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + perror("close()"); + } +} + InputProto HTTPInput::serialize() const { InputProto serialized; diff --git a/httpinput.h b/httpinput.h index 894d8e9..1714b71 100644 --- a/httpinput.h +++ b/httpinput.h @@ -15,6 +15,8 @@ public: // Serialization/deserialization. HTTPInput(const InputProto &serialized); virtual InputProto serialize() const; + + virtual void close_socket(); virtual std::string get_url() const { return url; } diff --git a/input.cpp b/input.cpp index 00b6547..3f6a9d7 100644 --- a/input.cpp +++ b/input.cpp @@ -2,6 +2,7 @@ #include #include "httpinput.h" +#include "udpinput.h" #include "input.h" #include "state.pb.h" @@ -10,18 +11,18 @@ using namespace std; // Extremely rudimentary URL parsing. bool parse_url(const string &url, string *protocol, string *host, string *port, string *path) { - if (url.find("http://") != 0) { + size_t split = url.find("://"); + if (split == string::npos) { return false; } + *protocol = string(url.begin(), url.begin() + split); - *protocol = "http"; - - string rest = url.substr(strlen("http://")); - size_t split = rest.find_first_of(":/"); + string rest = string(url.begin() + split + 3, url.end()); + split = rest.find_first_of(":/"); if (split == string::npos) { // http://foo *host = rest; - *port = "http"; + *port = *protocol; *path = "/"; return true; } @@ -47,7 +48,7 @@ bool parse_url(const string &url, string *protocol, string *host, string *port, } // http://foo/bar - *port = "http"; + *port = *protocol; *path = rest; return true; } @@ -61,6 +62,9 @@ Input *create_input(const std::string &stream_id, const std::string &url) if (protocol == "http") { return new HTTPInput(stream_id, url); } + if (protocol == "udp") { + return new UDPInput(stream_id, url); + } return NULL; } @@ -73,6 +77,9 @@ Input *create_input(const InputProto &serialized) if (protocol == "http") { return new HTTPInput(serialized); } + if (protocol == "udp") { + return new UDPInput(serialized); + } return NULL; } diff --git a/input.h b/input.h index 50cfeaa..79cfc54 100644 --- a/input.h +++ b/input.h @@ -21,6 +21,7 @@ public: virtual ~Input(); virtual InputProto serialize() const = 0; virtual std::string get_url() const = 0; + virtual void close_socket() = 0; }; #endif // !defined(_INPUT_H) diff --git a/main.cpp b/main.cpp index 25f82eb..d77fb2e 100644 --- a/main.cpp +++ b/main.cpp @@ -215,7 +215,7 @@ vector create_acceptors( acceptor = deserialized_acceptor_it->second; deserialized_acceptors->erase(deserialized_acceptor_it); } else { - int server_sock = create_server_socket(port); + int server_sock = create_server_socket(port, TCP_SOCKET); acceptor = new Acceptor(server_sock, port); } acceptor->run(); @@ -262,6 +262,7 @@ vector create_inputs(const vector &config, if (input->get_url() != src) { fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n", stream_id.c_str(), input->get_url().c_str(), src.c_str()); + input->close_socket(); delete input; input = NULL; } diff --git a/udpinput.cpp b/udpinput.cpp new file mode 100644 index 0000000..cd66f16 --- /dev/null +++ b/udpinput.cpp @@ -0,0 +1,121 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "acceptor.h" +#include "udpinput.h" +#include "serverpool.h" +#include "version.h" +#include "state.pb.h" + +using namespace std; + +extern ServerPool *servers; + +UDPInput::UDPInput(const string &stream_id, const string &url) + : stream_id(stream_id), + url(url), + sock(-1) +{ + // Should be verified by the caller. + string protocol; + bool ok = parse_url(url, &protocol, &host, &port, &path); + assert(ok); + + construct_header(); +} + +UDPInput::UDPInput(const InputProto &serialized) + : stream_id(serialized.stream_id()), + url(serialized.url()), + sock(serialized.sock()) +{ + // Should be verified by the caller. + string protocol; + bool ok = parse_url(url, &protocol, &host, &port, &path); + assert(ok); + + construct_header(); +} + +InputProto UDPInput::serialize() const +{ + InputProto serialized; + serialized.set_url(url); + serialized.set_sock(sock); + return serialized; +} + +void UDPInput::close_socket() +{ + int ret; + do { + ret = close(sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + perror("close()"); + } + + sock = -1; +} + +void UDPInput::construct_header() +{ + string header = + "HTTP/1.0 200 OK\r\n" + "Content-type: application/octet-stream\r\n" + "Cache-control: no-cache\r\n" + "Server: " SERVER_IDENTIFICATION "\r\n" + "\r\n"; + servers->set_header(stream_id, header); +} + +void UDPInput::do_work() +{ + while (!should_stop) { + if (sock == -1) { + int port_num = atoi(port.c_str()); + sock = create_server_socket(port_num, UDP_SOCKET); + if (sock == -1) { + fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n"); + usleep(200000); + continue; + } + } + + // Since we are non-blocking, we need to wait for the right state first. + // Wait up to 50 ms, then check should_stop. + pollfd pfd; + pfd.fd = sock; + pfd.events = POLLIN; + + int nfds = poll(&pfd, 1, 50); + if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + continue; + } + if (nfds == -1) { + perror("poll"); + close_socket(); + continue; + } + + char buf[4096]; + int ret; + do { + ret = recv(sock, buf, sizeof(buf), 0); + } while (ret == -1 && errno == EINTR); + + if (ret <= 0) { + perror("recv"); + close_socket(); + continue; + } + + servers->add_data(stream_id, buf, ret); + } +} diff --git a/udpinput.h b/udpinput.h new file mode 100644 index 0000000..0f89d68 --- /dev/null +++ b/udpinput.h @@ -0,0 +1,39 @@ +#ifndef _UDPINPUT_H +#define _UDPINPUT_H 1 + +#include +#include + +#include "input.h" + +class InputProto; + +class UDPInput : public Input { +public: + UDPInput(const std::string &stream_id, const std::string &url); + + // Serialization/deserialization. + UDPInput(const InputProto &serialized); + virtual InputProto serialize() const; + + virtual std::string get_url() const { return url; } + virtual void close_socket(); + +private: + // Actually gets the packets. + virtual void do_work(); + + // Create the HTTP header. + void construct_header(); + + std::string stream_id; + + // The URL and its parsed components. + std::string url; + std::string host, port, path; + + // The socket we are receiving on (or -1). + int sock; +}; + +#endif // !defined(_UDPINPUT_H) -- 2.39.2 From aeda7b341454f243ff8a3d742222c24bf75c338d Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 01:52:29 +0200 Subject: [PATCH 06/16] Split config parsing out of parse.h. --- config.cpp | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++ config.h | 32 ++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 config.cpp create mode 100644 config.h diff --git a/config.cpp b/config.cpp new file mode 100644 index 0000000..b6ad99b --- /dev/null +++ b/config.cpp @@ -0,0 +1,126 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include "parse.h" + +using namespace std; + +vector parse_config(const string &filename) +{ + vector ret; + + FILE *fp = fopen(filename.c_str(), "r"); + if (fp == NULL) { + perror(filename.c_str()); + exit(1); + } + + char buf[4096]; + while (!feof(fp)) { + if (fgets(buf, sizeof(buf), fp) == NULL) { + break; + } + + // Chop off the string at the first #, \r or \n. + buf[strcspn(buf, "#\r\n")] = 0; + + // Remove all whitespace from the end of the string. + size_t len = strlen(buf); + while (len > 0 && isspace(buf[len - 1])) { + buf[--len] = 0; + } + + // If the line is now all blank, ignore it. + if (len == 0) { + continue; + } + + vector tokens = split_tokens(buf); + assert(!tokens.empty()); + + ConfigLine line; + line.keyword = tokens[0]; + + for (size_t i = 1; i < tokens.size(); ++i) { + // foo=bar is a parameter; anything else is an argument. + size_t equals_pos = tokens[i].find_first_of('='); + if (equals_pos == string::npos) { + line.arguments.push_back(tokens[i]); + } else { + string key = tokens[i].substr(0, equals_pos); + string value = tokens[i].substr(equals_pos + 1, string::npos); + line.parameters.insert(make_pair(key, value)); + } + } + + ret.push_back(line); + } + + fclose(fp); + return ret; +} + +string fetch_config_string(const vector &config, const string &keyword, + ParameterType parameter_type, const string &default_value) +{ + assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); + for (unsigned i = 0; i < config.size(); ++i) { + if (config[i].keyword != keyword) { + continue; + } + if (config[i].parameters.size() > 0 || + config[i].arguments.size() != 1) { + fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); + exit(1); + } + return config[i].arguments[0]; + } + if (parameter_type == PARAMATER_MANDATORY) { + fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", + keyword.c_str()); + exit(1); + } else { + return default_value; + } +} + +int fetch_config_int(const std::vector &config, const std::string &keyword, + int min_limit, int max_limit, + ParameterType parameter_type, int default_value) +{ + assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); + bool value_found = false; + int value = -1; + for (unsigned i = 0; i < config.size(); ++i) { + if (config[i].keyword != keyword) { + continue; + } + if (config[i].parameters.size() > 0 || + config[i].arguments.size() != 1) { + fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); + exit(1); + } + value_found = true; + value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity. + } + if (!value_found) { + if (parameter_type == PARAMETER_OPTIONAL) { + return default_value; + } + fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", + keyword.c_str()); + exit(1); + } + if (value < min_limit || value > max_limit) { + fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n", + keyword.c_str(), value, min_limit, max_limit); + exit(1); + } + return value; +} diff --git a/config.h b/config.h new file mode 100644 index 0000000..0600083 --- /dev/null +++ b/config.h @@ -0,0 +1,32 @@ +#ifndef _CONFIG_H +#define _CONFIG_H + +// Various routines that deal with parsing the configuration file. + +#include +#include +#include + +struct ConfigLine { + std::string keyword; + std::vector arguments; + std::map parameters; +}; + +// Parse the configuration file. +std::vector parse_config(const std::string &filename); + +enum ParameterType { + PARAMETER_OPTIONAL, + PARAMATER_MANDATORY, +}; + +std::string fetch_config_string(const std::vector &config, const std::string &keyword, + ParameterType parameter_type, const std::string &default_value = ""); + +// Note: Limits are inclusive. +int fetch_config_int(const std::vector &config, const std::string &keyword, + int min_limit, int max_limit, + ParameterType parameter_type, int default_value = -1); + +#endif // !defined(_CONFIG_H) -- 2.39.2 From 32d84e70aa1f1c9cd8334aceac0a617b4a7b4b40 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 01:56:12 +0200 Subject: [PATCH 07/16] Add rudimentary README. --- Makefile | 2 +- README | 7 ++++ main.cpp | 1 + parse.cpp | 114 ------------------------------------------------------ parse.h | 25 +----------- 5 files changed, 10 insertions(+), 139 deletions(-) create mode 100644 README diff --git a/Makefile b/Makefile index bc3511a..f99e226 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g LDLIBS=-lpthread -lprotobuf -OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o +OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o thread.o state.pb.o all: cubemap diff --git a/README b/README new file mode 100644 index 0000000..06c1c64 --- /dev/null +++ b/README @@ -0,0 +1,7 @@ +Cubemap is a high-performance, high-availaility video reflector, +specifically made for use with VLC. + +Copyright 2013 Steinar H. Gunderson . +Licensed under the GNU GPL, version 2. + +More information to come here, later. :-) diff --git a/main.cpp b/main.cpp index d77fb2e..907be07 100644 --- a/main.cpp +++ b/main.cpp @@ -19,6 +19,7 @@ #include #include "acceptor.h" +#include "config.h" #include "markpool.h" #include "metacube.h" #include "parse.h" diff --git a/parse.cpp b/parse.cpp index d8706d3..eed7d19 100644 --- a/parse.cpp +++ b/parse.cpp @@ -58,120 +58,6 @@ vector split_lines(const string &str) return ret; } -vector parse_config(const string &filename) -{ - vector ret; - - FILE *fp = fopen(filename.c_str(), "r"); - if (fp == NULL) { - perror(filename.c_str()); - exit(1); - } - - char buf[4096]; - while (!feof(fp)) { - if (fgets(buf, sizeof(buf), fp) == NULL) { - break; - } - - // Chop off the string at the first #, \r or \n. - buf[strcspn(buf, "#\r\n")] = 0; - - // Remove all whitespace from the end of the string. - size_t len = strlen(buf); - while (len > 0 && isspace(buf[len - 1])) { - buf[--len] = 0; - } - - // If the line is now all blank, ignore it. - if (len == 0) { - continue; - } - - vector tokens = split_tokens(buf); - assert(!tokens.empty()); - - ConfigLine line; - line.keyword = tokens[0]; - - for (size_t i = 1; i < tokens.size(); ++i) { - // foo=bar is a parameter; anything else is an argument. - size_t equals_pos = tokens[i].find_first_of('='); - if (equals_pos == string::npos) { - line.arguments.push_back(tokens[i]); - } else { - string key = tokens[i].substr(0, equals_pos); - string value = tokens[i].substr(equals_pos + 1, string::npos); - line.parameters.insert(make_pair(key, value)); - } - } - - ret.push_back(line); - } - - fclose(fp); - return ret; -} - -string fetch_config_string(const vector &config, const string &keyword, - ParameterType parameter_type, const string &default_value) -{ - assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); - for (unsigned i = 0; i < config.size(); ++i) { - if (config[i].keyword != keyword) { - continue; - } - if (config[i].parameters.size() > 0 || - config[i].arguments.size() != 1) { - fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); - exit(1); - } - return config[i].arguments[0]; - } - if (parameter_type == PARAMATER_MANDATORY) { - fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", - keyword.c_str()); - exit(1); - } else { - return default_value; - } -} - -int fetch_config_int(const std::vector &config, const std::string &keyword, - int min_limit, int max_limit, - ParameterType parameter_type, int default_value) -{ - assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); - bool value_found = false; - int value = -1; - for (unsigned i = 0; i < config.size(); ++i) { - if (config[i].keyword != keyword) { - continue; - } - if (config[i].parameters.size() > 0 || - config[i].arguments.size() != 1) { - fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); - exit(1); - } - value_found = true; - value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity. - } - if (!value_found) { - if (parameter_type == PARAMETER_OPTIONAL) { - return default_value; - } - fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", - keyword.c_str()); - exit(1); - } - if (value < min_limit || value > max_limit) { - fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n", - keyword.c_str(), value, min_limit, max_limit); - exit(1); - } - return value; -} - #define MAX_REQUEST_SIZE 16384 /* 16 kB. */ RequestParseStatus wait_for_double_newline(string *existing_data, const char *new_data, size_t new_data_size) diff --git a/parse.h b/parse.h index 3cfa091..6912dd9 100644 --- a/parse.h +++ b/parse.h @@ -1,40 +1,17 @@ #ifndef _PARSE_H #define _PARSE_H -// Various routines that deal with parsing; both configuration files and HTTP requests. +// Various routines that deal with parsing; both HTTP requests and more generic text. -#include #include #include -struct ConfigLine { - std::string keyword; - std::vector arguments; - std::map parameters; -}; - // Split a line on whitespace, e.g. "foo bar baz" -> {"foo", "bar", "baz"}. std::vector split_tokens(const std::string &line); // Split a string on \n or \r\n, e.g. "foo\nbar\r\n\nbaz\r\n\r\n" -> {"foo", "bar", "baz"}. std::vector split_lines(const std::string &str); -// Parse the configuration file. -std::vector parse_config(const std::string &filename); - -enum ParameterType { - PARAMETER_OPTIONAL, - PARAMATER_MANDATORY, -}; - -std::string fetch_config_string(const std::vector &config, const std::string &keyword, - ParameterType parameter_type, const std::string &default_value = ""); - -// Note: Limits are inclusive. -int fetch_config_int(const std::vector &config, const std::string &keyword, - int min_limit, int max_limit, - ParameterType parameter_type, int default_value = -1); - // Add the new data to an existing string, looking for \r\n\r\n // (typical of HTTP requests and/or responses). Will return one // of the given statuses. -- 2.39.2 From 3845193b930d998630bb53b8d4f05a2b54aefd83 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 22:08:09 +0200 Subject: [PATCH 08/16] Fix UDPInput serialization. --- udpinput.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/udpinput.cpp b/udpinput.cpp index cd66f16..bed7b44 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -45,6 +45,7 @@ UDPInput::UDPInput(const InputProto &serialized) InputProto UDPInput::serialize() const { InputProto serialized; + serialized.set_stream_id(stream_id); serialized.set_url(url); serialized.set_sock(sock); return serialized; -- 2.39.2 From 2c6cd9718a5baf5a2ed5be73ec3c525d4873f45a Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 22:09:24 +0200 Subject: [PATCH 09/16] Split configuration parsing out cleanly from initialization. This mostly is required for -configtest later, but also generally splits things out in cleaner and nicer ways than before. --- config.cpp | 212 +++++++++++++++++++++++++++++++++++++++++++---------- config.h | 37 ++++++---- main.cpp | 156 ++++++++++----------------------------- 3 files changed, 234 insertions(+), 171 deletions(-) diff --git a/config.cpp b/config.cpp index b6ad99b..4897113 100644 --- a/config.cpp +++ b/config.cpp @@ -1,8 +1,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -11,14 +13,18 @@ using namespace std; -vector parse_config(const string &filename) -{ - vector ret; +struct ConfigLine { + string keyword; + vector arguments; + map parameters; +}; +bool read_config(const string &filename, vector *lines) +{ FILE *fp = fopen(filename.c_str(), "r"); if (fp == NULL) { perror(filename.c_str()); - exit(1); + return false; } char buf[4096]; @@ -59,17 +65,15 @@ vector parse_config(const string &filename) } } - ret.push_back(line); + lines->push_back(line); } fclose(fp); - return ret; + return true; } -string fetch_config_string(const vector &config, const string &keyword, - ParameterType parameter_type, const string &default_value) +bool fetch_config_string(const vector &config, const string &keyword, string *value) { - assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); for (unsigned i = 0; i < config.size(); ++i) { if (config[i].keyword != keyword) { continue; @@ -77,26 +81,16 @@ string fetch_config_string(const vector &config, const string &keywo if (config[i].parameters.size() > 0 || config[i].arguments.size() != 1) { fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); - exit(1); + return false; } - return config[i].arguments[0]; - } - if (parameter_type == PARAMATER_MANDATORY) { - fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", - keyword.c_str()); - exit(1); - } else { - return default_value; + *value = config[i].arguments[0]; + return true; } + return false; } -int fetch_config_int(const std::vector &config, const std::string &keyword, - int min_limit, int max_limit, - ParameterType parameter_type, int default_value) +bool fetch_config_int(const vector &config, const string &keyword, int *value) { - assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL); - bool value_found = false; - int value = -1; for (unsigned i = 0; i < config.size(); ++i) { if (config[i].keyword != keyword) { continue; @@ -104,23 +98,165 @@ int fetch_config_int(const std::vector &config, const std::string &k if (config[i].parameters.size() > 0 || config[i].arguments.size() != 1) { fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); - exit(1); + return false; + } + *value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity. + return true; + } + return false; +} + +bool parse_port(const ConfigLine &line, Config *config) +{ + if (line.arguments.size() != 1) { + fprintf(stderr, "ERROR: 'port' takes exactly one argument\n"); + return false; + } + + AcceptorConfig acceptor; + acceptor.port = atoi(line.arguments[0].c_str()); + if (acceptor.port < 1 || acceptor.port >= 65536) { + fprintf(stderr, "ERROR: port %d is out of range (must be [1,65536>).\n", acceptor.port); + return false; + } + + config->acceptors.push_back(acceptor); + return true; +} + +int allocate_mark_pool(int from, int to, Config *config) +{ + int pool_index = -1; + + // Reuse mark pools if an identical one exists. + // Otherwise, check if we're overlapping some other mark pool. + for (size_t i = 0; i < config->mark_pools.size(); ++i) { + const MarkPoolConfig &pool = config->mark_pools[i]; + if (from == pool.from && to == pool.to) { + pool_index = i; + } else if ((from >= pool.from && from < pool.to) || + (to >= pool.from && to < pool.to)) { + fprintf(stderr, "WARNING: Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks.\n", + from, to, pool.from, pool.to); + fprintf(stderr, " Mark pools must either be completely disjunct, or completely overlapping.\n"); } - value_found = true; - value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity. } - if (!value_found) { - if (parameter_type == PARAMETER_OPTIONAL) { - return default_value; + + if (pool_index != -1) { + return pool_index; + } + + // No match to existing pools. + MarkPoolConfig pool; + pool.from = from; + pool.to = to; + config->mark_pools.push_back(pool); + + return config->mark_pools.size() - 1; +} + +bool parse_mark_pool(const string &mark_str, int *from, int *to) +{ + size_t split = mark_str.find_first_of('-'); + if (split == string::npos) { + fprintf(stderr, "ERROR: Invalid mark specification '%s' (expected 'X-Y').\n", + mark_str.c_str()); + return false; + } + + string from_str(mark_str.begin(), mark_str.begin() + split); + string to_str(mark_str.begin() + split + 1, mark_str.end()); + *from = atoi(from_str.c_str()); + *to = atoi(to_str.c_str()); + + if (*from <= 0 || *from >= 65536 || *to <= 0 || *to >= 65536) { + fprintf(stderr, "ERROR: Mark pool range %d-%d is outside legal range [1,65536>.\n", + *from, *to); + return false; + } + + return true; +} + +bool parse_stream(const ConfigLine &line, Config *config) +{ + if (line.arguments.size() != 1) { + fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n"); + return false; + } + + StreamConfig stream; + stream.stream_id = line.arguments[0]; + + map::const_iterator src_it = line.parameters.find("src"); + if (src_it == line.parameters.end()) { + fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n", + stream.stream_id.c_str()); + } else { + stream.src = src_it->second; + // TODO: Verify that the URL is parseable? + } + + // Parse marks, if so desired. + map::const_iterator mark_parm_it = line.parameters.find("mark"); + if (mark_parm_it == line.parameters.end()) { + stream.mark_pool = -1; + } else { + int from, to; + if (!parse_mark_pool(mark_parm_it->second, &from, &to)) { + return false; } - fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", - keyword.c_str()); - exit(1); + stream.mark_pool = allocate_mark_pool(from, to, config); + } + + config->streams.push_back(stream); + return true; +} + +bool parse_config(const string &filename, Config *config) +{ + vector lines; + if (!read_config(filename, &lines)) { + return false; } - if (value < min_limit || value > max_limit) { - fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n", - keyword.c_str(), value, min_limit, max_limit); - exit(1); + + if (!fetch_config_int(lines, "num_servers", &config->num_servers)) { + fprintf(stderr, "ERROR: Missing 'num_servers' statement in config file.\n"); + return false; + } + if (config->num_servers < 1 || config->num_servers >= 20000) { // Insanely high max limit. + fprintf(stderr, "ERROR: 'num_servers' is %d, needs to be in [1, 20000>.\n", config->num_servers); + return false; + } + + // See if the user wants stats. + config->stats_interval = 60; + bool has_stats_file = fetch_config_string(lines, "stats_file", &config->stats_file); + bool has_stats_interval = fetch_config_int(lines, "stats_interval", &config->stats_interval); + if (has_stats_interval && !has_stats_file) { + fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n"); } - return value; + + for (size_t i = 0; i < lines.size(); ++i) { + const ConfigLine &line = lines[i]; + if (line.keyword == "num_servers" || + line.keyword == "stats_file" || + line.keyword == "stats_interval") { + // Already taken care of, above. + } else if (line.keyword == "port") { + if (!parse_port(line, config)) { + return false; + } + } else if (line.keyword == "stream") { + if (!parse_stream(line, config)) { + return false; + } + } else { + fprintf(stderr, "ERROR: Unknown configuration keyword '%s'.\n", + line.keyword.c_str()); + return false; + } + } + + return true; } diff --git a/config.h b/config.h index 0600083..d45ab08 100644 --- a/config.h +++ b/config.h @@ -3,30 +3,35 @@ // Various routines that deal with parsing the configuration file. -#include #include #include -struct ConfigLine { - std::string keyword; - std::vector arguments; - std::map parameters; +struct MarkPoolConfig { + int from, to; }; -// Parse the configuration file. -std::vector parse_config(const std::string &filename); +struct StreamConfig { + std::string stream_id; + std::string src; // Can be empty. + int mark_pool; // -1 for none. +}; -enum ParameterType { - PARAMETER_OPTIONAL, - PARAMATER_MANDATORY, +struct AcceptorConfig { + int port; }; -std::string fetch_config_string(const std::vector &config, const std::string &keyword, - ParameterType parameter_type, const std::string &default_value = ""); +struct Config { + int num_servers; + std::vector mark_pools; + std::vector streams; + std::vector acceptors; + + std::string stats_file; // Empty means no stats file. + int stats_interval; +}; -// Note: Limits are inclusive. -int fetch_config_int(const std::vector &config, const std::string &keyword, - int min_limit, int max_limit, - ParameterType parameter_type, int default_value = -1); +// Parse and validate configuration. Returns false on error. +// is taken to be empty (uninitialized) on entry. +bool parse_config(const std::string &filename, Config *config); #endif // !defined(_CONFIG_H) diff --git a/main.cpp b/main.cpp index 907be07..c5640e0 100644 --- a/main.cpp +++ b/main.cpp @@ -139,85 +139,23 @@ CubemapStateProto read_tempfile(int state_fd) return state; } -// Reuse mark pools if one already exists. -MarkPool *get_mark_pool(map, MarkPool *> *mark_pools, int from, int to) -{ - pair mark_range(from, to); - if (mark_pools->count(mark_range) != 0) { - return (*mark_pools)[mark_range]; - } - - // Check if we're overlapping some other mark pool. - for (map, MarkPool *>::const_iterator mp_it = mark_pools->begin(); - mp_it != mark_pools->end(); - ++mp_it) { - int other_from = mp_it->first.first; - int other_to = mp_it->first.second; - if ((from >= other_from && from < other_to) || - (to >= other_from && to < other_to)) { - fprintf(stderr, "WARNING: Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks.\n", - from, to, other_from, other_to); - fprintf(stderr, " Mark pools must either be completely disjunct, or completely overlapping.\n"); - } - } - - MarkPool *mark_pool = new MarkPool(from, to); - mark_pools->insert(make_pair(mark_range, mark_pool)); - return mark_pool; -} - -MarkPool *parse_mark_pool(map, MarkPool *> *mark_pools, const string &mark_str) -{ - size_t split = mark_str.find_first_of('-'); - if (split == string::npos) { - fprintf(stderr, "WARNING: Invalid mark specification '%s' (expected 'X-Y'), ignoring.\n", - mark_str.c_str()); - return NULL; - } - - string from_str(mark_str.begin(), mark_str.begin() + split); - string to_str(mark_str.begin() + split + 1, mark_str.end()); - int from = atoi(from_str.c_str()); - int to = atoi(to_str.c_str()); - - if (from <= 0 || from >= 65536 || to <= 0 || to >= 65536) { - fprintf(stderr, "WARNING: Mark pool range %d-%d is outside legal range [1,65536>, ignoring.\n", - from, to); - return NULL; - } - - return get_mark_pool(mark_pools, from, to); -} - // Find all port statements in the configuration file, and create acceptors for htem. vector create_acceptors( - const vector &config, + const Config &config, map *deserialized_acceptors) { vector acceptors; - for (unsigned i = 0; i < config.size(); ++i) { - if (config[i].keyword != "port") { - continue; - } - if (config[i].arguments.size() != 1) { - fprintf(stderr, "ERROR: 'port' takes exactly one argument\n"); - exit(1); - } - int port = atoi(config[i].arguments[0].c_str()); - if (port < 1 || port >= 65536) { - fprintf(stderr, "WARNING: port %d is out of range (must be [1,65536>), ignoring\n", port); - continue; - } - + for (unsigned i = 0; i < config.acceptors.size(); ++i) { + const AcceptorConfig &acceptor_config = config.acceptors[i]; Acceptor *acceptor = NULL; map::iterator deserialized_acceptor_it = - deserialized_acceptors->find(port); + deserialized_acceptors->find(acceptor_config.port); if (deserialized_acceptor_it != deserialized_acceptors->end()) { acceptor = deserialized_acceptor_it->second; deserialized_acceptors->erase(deserialized_acceptor_it); } else { - int server_sock = create_server_socket(port, TCP_SOCKET); - acceptor = new Acceptor(server_sock, port); + int server_sock = create_server_socket(acceptor_config.port, TCP_SOCKET); + acceptor = new Acceptor(server_sock, acceptor_config.port); } acceptor->run(); acceptors.push_back(acceptor); @@ -235,26 +173,19 @@ vector create_acceptors( } // Find all streams in the configuration file, and create inputs for them. -vector create_inputs(const vector &config, +vector create_inputs(const Config &config, map *deserialized_inputs) { vector inputs; - for (unsigned i = 0; i < config.size(); ++i) { - if (config[i].keyword != "stream") { - continue; - } - assert(config[i].arguments.size() == 1); - string stream_id = config[i].arguments[0]; - - map::const_iterator src_it = - config[i].parameters.find("src"); - if (src_it == config[i].parameters.end()) { - fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n", - stream_id.c_str()); + for (unsigned i = 0; i < config.streams.size(); ++i) { + const StreamConfig &stream_config = config.streams[i]; + if (stream_config.src.empty()) { continue; } - string src = src_it->second; + string stream_id = stream_config.stream_id; + string src = stream_config.src; + Input *input = NULL; map::iterator deserialized_input_it = deserialized_inputs->find(stream_id); @@ -283,32 +214,27 @@ vector create_inputs(const vector &config, return inputs; } -void create_streams(const vector &config, +void create_streams(const Config &config, const set &deserialized_stream_ids, map *deserialized_inputs) { + vector mark_pools; // FIXME: leak + for (unsigned i = 0; i < config.mark_pools.size(); ++i) { + const MarkPoolConfig &mp_config = config.mark_pools[i]; + mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); + } + set expecting_stream_ids = deserialized_stream_ids; - map, MarkPool *> mark_pools; - for (unsigned i = 0; i < config.size(); ++i) { - if (config[i].keyword != "stream") { - continue; - } - if (config[i].arguments.size() != 1) { - fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n"); - exit(1); + for (unsigned i = 0; i < config.streams.size(); ++i) { + const StreamConfig &stream_config = config.streams[i]; + if (deserialized_stream_ids.count(stream_config.stream_id) == 0) { + servers->add_stream(stream_config.stream_id); } - string stream_id = config[i].arguments[0]; - if (deserialized_stream_ids.count(stream_id) == 0) { - servers->add_stream(stream_id); - } - expecting_stream_ids.erase(stream_id); - - // Set up marks, if so desired. - map::const_iterator mark_parm_it = - config[i].parameters.find("mark"); - if (mark_parm_it != config[i].parameters.end()) { - MarkPool *mark_pool = parse_mark_pool(&mark_pools, mark_parm_it->second); - servers->set_mark_pool(stream_id, mark_pool); + expecting_stream_ids.erase(stream_config.stream_id); + + if (stream_config.mark_pool != -1) { + servers->set_mark_pool(stream_config.stream_id, + mark_pools[stream_config.mark_pool]); } } @@ -336,11 +262,12 @@ int main(int argc, char **argv) bool is_reexec = false; string config_filename = (argc == 1) ? "cubemap.config" : argv[1]; - vector config = parse_config(config_filename); - - int num_servers = fetch_config_int(config, "num_servers", 1, 20000, PARAMATER_MANDATORY); // Insanely high max limit. + Config config; + if (!parse_config(config_filename, &config)) { + exit(1); + } - servers = new ServerPool(num_servers); + servers = new ServerPool(config.num_servers); CubemapStateProto loaded_state; set deserialized_stream_ids; @@ -389,13 +316,6 @@ int main(int argc, char **argv) // Find all streams in the configuration file, and create them. create_streams(config, deserialized_stream_ids, &deserialized_inputs); - // See if the user wants stats. - string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL); - int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1); - if (stats_interval != -1 && stats_file.empty()) { - fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n"); - } - servers->run(); vector acceptors = create_acceptors(config, &deserialized_acceptors); @@ -415,9 +335,11 @@ int main(int argc, char **argv) // Start writing statistics. StatsThread *stats_thread = NULL; - if (!stats_file.empty()) { - stats_thread = new StatsThread(stats_file, stats_interval); + if (!config.stats_file.empty()) { + stats_thread = new StatsThread(config.stats_file, config.stats_interval); stats_thread->run(); + } else if (config.stats_interval != -1) { + fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n"); } signal(SIGHUP, hup); @@ -453,7 +375,7 @@ int main(int argc, char **argv) fprintf(stderr, "Serializing state and re-execing...\n"); int state_fd = make_tempfile(collect_state( - serialize_start, acceptors, inputs, servers, num_servers)); + serialize_start, acceptors, inputs, servers, config.num_servers)); delete servers; char buf[16]; -- 2.39.2 From ce95fbe3011d2e5be2da737b05bca9af8db8a646 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 22:31:05 +0200 Subject: [PATCH 10/16] Move some serialization logic into ServerPool, where it belongs. --- main.cpp | 19 +++---------------- serverpool.cpp | 20 ++++++++++++++++++++ serverpool.h | 5 ++--- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/main.cpp b/main.cpp index c5640e0..eed486e 100644 --- a/main.cpp +++ b/main.cpp @@ -73,10 +73,9 @@ int make_tempfile(const CubemapStateProto &state) CubemapStateProto collect_state(const timeval &serialize_start, const vector acceptors, const vector inputs, - ServerPool *servers, - int num_servers) + ServerPool *servers) { - CubemapStateProto state; + CubemapStateProto state = servers->serialize(); // Fills streams() and clients(). state.set_serialize_start_sec(serialize_start.tv_sec); state.set_serialize_start_usec(serialize_start.tv_usec); @@ -88,18 +87,6 @@ CubemapStateProto collect_state(const timeval &serialize_start, state.add_inputs()->MergeFrom(inputs[i]->serialize()); } - for (int i = 0; i < num_servers; ++i) { - CubemapStateProto local_state = servers->get_server(i)->serialize(); - - // The stream state should be identical between the servers, so we only store it once. - if (i == 0) { - state.mutable_streams()->MergeFrom(local_state.streams()); - } - for (int j = 0; j < local_state.clients_size(); ++j) { - state.add_clients()->MergeFrom(local_state.clients(j)); - } - } - return state; } @@ -375,7 +362,7 @@ int main(int argc, char **argv) fprintf(stderr, "Serializing state and re-execing...\n"); int state_fd = make_tempfile(collect_state( - serialize_start, acceptors, inputs, servers, config.num_servers)); + serialize_start, acceptors, inputs, servers)); delete servers; char buf[16]; diff --git a/serverpool.cpp b/serverpool.cpp index cf6933c..9f1a728 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -1,4 +1,5 @@ #include "serverpool.h" +#include "state.pb.h" using namespace std; @@ -13,6 +14,25 @@ ServerPool::~ServerPool() { delete[] servers; } + +CubemapStateProto ServerPool::serialize() +{ + CubemapStateProto state; + + for (int i = 0; i < num_servers; ++i) { + CubemapStateProto local_state = servers[i].serialize(); + + // The stream state should be identical between the servers, so we only store it once. + if (i == 0) { + state.mutable_streams()->MergeFrom(local_state.streams()); + } + for (int j = 0; j < local_state.clients_size(); ++j) { + state.add_clients()->MergeFrom(local_state.clients(j)); + } + } + + return state; +} void ServerPool::add_client(int sock) { diff --git a/serverpool.h b/serverpool.h index 5dee5f7..559f76a 100644 --- a/serverpool.h +++ b/serverpool.h @@ -13,9 +13,8 @@ public: ServerPool(int num_servers); ~ServerPool(); - // Accessor. Only to be used in rare situations, really. - // The ServerPool retains ownership. - Server *get_server(int num) { return &servers[num]; } + // Fills streams() and clients(). + CubemapStateProto serialize(); // Picks a server (round-robin) and allocates the given client to it. void add_client(int sock); -- 2.39.2 From 36bf9e6f11b86bb15007993ac7df5e2a651eb3e1 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 23:04:01 +0200 Subject: [PATCH 11/16] Switch option parsing to getopt. --- main.cpp | 54 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/main.cpp b/main.cpp index eed486e..64ee29f 100644 --- a/main.cpp +++ b/main.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -245,14 +246,38 @@ int main(int argc, char **argv) { fprintf(stderr, "\nCubemap " SERVER_VERSION " starting.\n"); + // Parse options. + int state_fd = -1; + for ( ;; ) { + static const option long_options[] = { + { "state", required_argument, 0, 's' }, + }; + int option_index = 0; + int c = getopt_long (argc, argv, "s:", long_options, &option_index); + + if (c == -1) { + break; + } + switch (c) { + case 's': + state_fd = atoi(optarg); + break; + default: + assert(false); + } + } + + string config_filename = "cubemap.config"; + if (optind < argc) { + config_filename = argv[optind++]; + } + struct timeval serialize_start; - bool is_reexec = false; - string config_filename = (argc == 1) ? "cubemap.config" : argv[1]; Config config; if (!parse_config(config_filename, &config)) { exit(1); - } + } servers = new ServerPool(config.num_servers); @@ -260,11 +285,8 @@ int main(int argc, char **argv) set deserialized_stream_ids; map deserialized_inputs; map deserialized_acceptors; - if (argc == 4 && strcmp(argv[2], "-state") == 0) { - is_reexec = true; - + if (state_fd != -1) { fprintf(stderr, "Deserializing state from previous process... "); - int state_fd = atoi(argv[3]); loaded_state = read_tempfile(state_fd); serialize_start.tv_sec = loaded_state.serialize_start_sec(); @@ -311,13 +333,11 @@ int main(int argc, char **argv) // All deserialized inputs should now have been taken care of, one way or the other. assert(deserialized_inputs.empty()); - if (is_reexec) { - // Put back the existing clients. It doesn't matter which server we - // allocate them to, so just do round-robin. However, we need to add - // them after the mark pools have been set up. - for (int i = 0; i < loaded_state.clients_size(); ++i) { - servers->add_client_from_serialized(loaded_state.clients(i)); - } + // Put back the existing clients. It doesn't matter which server we + // allocate them to, so just do round-robin. However, we need to add + // them after the mark pools have been set up. + for (int i = 0; i < loaded_state.clients_size(); ++i) { + servers->add_client_from_serialized(loaded_state.clients(i)); } // Start writing statistics. @@ -333,7 +353,7 @@ int main(int argc, char **argv) struct timeval server_start; gettimeofday(&server_start, NULL); - if (is_reexec) { + if (state_fd != -1) { // Measure time from we started deserializing (below) to now, when basically everything // is up and running. This is, in other words, a conservative estimate of how long our // “glitch” period was, not counting of course reconnects if the configuration changed. @@ -361,7 +381,7 @@ int main(int argc, char **argv) servers->stop(); fprintf(stderr, "Serializing state and re-execing...\n"); - int state_fd = make_tempfile(collect_state( + state_fd = make_tempfile(collect_state( serialize_start, acceptors, inputs, servers)); delete servers; @@ -369,7 +389,7 @@ int main(int argc, char **argv) sprintf(buf, "%d", state_fd); for ( ;; ) { - execlp(argv[0], argv[0], config_filename.c_str(), "-state", buf, NULL); + execlp(argv[0], argv[0], config_filename.c_str(), "--state", buf, NULL); perror("execlp"); fprintf(stderr, "PANIC: re-exec of %s failed. Waiting 0.2 seconds and trying again...\n", argv[0]); usleep(200000); -- 2.39.2 From 9ca9a70e1d0d675a99399de8a5f88fcf5009ef1d Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 23:04:50 +0200 Subject: [PATCH 12/16] Since we just broke upgrade compatibility, kill some older stuff in the state protos. --- main.cpp | 7 ------- state.proto | 4 ---- 2 files changed, 11 deletions(-) diff --git a/main.cpp b/main.cpp index 64ee29f..03938c3 100644 --- a/main.cpp +++ b/main.cpp @@ -305,13 +305,6 @@ int main(int argc, char **argv) create_input(loaded_state.inputs(i)))); } - // Convert the acceptor from older serialized formats. - if (loaded_state.has_server_sock() && loaded_state.has_port()) { - AcceptorProto *acceptor = loaded_state.add_acceptors(); - acceptor->set_server_sock(loaded_state.server_sock()); - acceptor->set_port(loaded_state.port()); - } - // Deserialize the acceptors. for (int i = 0; i < loaded_state.acceptors_size(); ++i) { deserialized_acceptors.insert(make_pair( diff --git a/state.proto b/state.proto index cfad404..eb4660f 100644 --- a/state.proto +++ b/state.proto @@ -46,8 +46,4 @@ message CubemapStateProto { repeated StreamProto streams = 2; repeated InputProto inputs = 5; repeated AcceptorProto acceptors = 8; - - // Deprecated. Use Acceptor instead. - optional int32 server_sock = 3; - optional int32 port = 4; }; -- 2.39.2 From 08d2e990ae182810f2c1b4d4cf9ce23b2701707a Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 12 Apr 2013 23:07:01 +0200 Subject: [PATCH 13/16] Implement --test-config. --- main.cpp | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/main.cpp b/main.cpp index 03938c3..f220cfb 100644 --- a/main.cpp +++ b/main.cpp @@ -244,16 +244,16 @@ void create_streams(const Config &config, int main(int argc, char **argv) { - fprintf(stderr, "\nCubemap " SERVER_VERSION " starting.\n"); - // Parse options. int state_fd = -1; + bool test_config = false; for ( ;; ) { static const option long_options[] = { { "state", required_argument, 0, 's' }, + { "test-config", no_argument, 0, 't' }, }; int option_index = 0; - int c = getopt_long (argc, argv, "s:", long_options, &option_index); + int c = getopt_long (argc, argv, "s:t", long_options, &option_index); if (c == -1) { break; @@ -262,6 +262,9 @@ int main(int argc, char **argv) case 's': state_fd = atoi(optarg); break; + case 't': + test_config = true; + break; default: assert(false); } @@ -278,7 +281,11 @@ int main(int argc, char **argv) if (!parse_config(config_filename, &config)) { exit(1); } + if (test_config) { + exit(0); + } + fprintf(stderr, "\nCubemap " SERVER_VERSION " starting.\n"); servers = new ServerPool(config.num_servers); CubemapStateProto loaded_state; -- 2.39.2 From 7fc0624f07b0451f8a8c997e18c29da10076aa38 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 13 Apr 2013 00:24:36 +0200 Subject: [PATCH 14/16] When re-execing, try with --test-config first, and revert if it fails. --- main.cpp | 51 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/main.cpp b/main.cpp index f220cfb..afc3edf 100644 --- a/main.cpp +++ b/main.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include #include @@ -241,6 +243,45 @@ void create_streams(const Config &config, } } } + +bool dry_run_config(const std::string &argv0, const std::string &config_filename) +{ + char *argv0_copy = strdup(argv0.c_str()); + char *config_filename_copy = strdup(config_filename.c_str()); + + pid_t pid = fork(); + switch (pid) { + case -1: + perror("fork()"); + free(argv0_copy); + free(config_filename_copy); + return false; + case 0: + // Child. + execlp(argv0_copy, argv0_copy, "--test-config", config_filename_copy, NULL); + perror(argv0_copy); + _exit(1); + default: + // Parent. + break; + } + + free(argv0_copy); + free(config_filename_copy); + + int status; + pid_t err; + do { + err = waitpid(pid, &status, 0); + } while (err == -1 && errno == EINTR); + + if (err == -1) { + perror("waitpid()"); + return false; + } + + return (WIFEXITED(status) && WEXITSTATUS(status) == 0); +} int main(int argc, char **argv) { @@ -275,8 +316,6 @@ int main(int argc, char **argv) config_filename = argv[optind++]; } - struct timeval serialize_start; - Config config; if (!parse_config(config_filename, &config)) { exit(1); @@ -285,10 +324,12 @@ int main(int argc, char **argv) exit(0); } +start: fprintf(stderr, "\nCubemap " SERVER_VERSION " starting.\n"); servers = new ServerPool(config.num_servers); CubemapStateProto loaded_state; + struct timeval serialize_start; set deserialized_stream_ids; map deserialized_inputs; map deserialized_acceptors; @@ -384,6 +425,12 @@ int main(int argc, char **argv) state_fd = make_tempfile(collect_state( serialize_start, acceptors, inputs, servers)); delete servers; + + if (!dry_run_config(argv[0], config_filename)) { + fprintf(stderr, "ERROR: %s --test-config failed. Restarting old version instead of new.\n", argv[0]); + hupped = false; + goto start; + } char buf[16]; sprintf(buf, "%d", state_fd); -- 2.39.2 From cd2eb6436f31c562174d9ee055aef3481bf752a3 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 13 Apr 2013 01:14:32 +0200 Subject: [PATCH 15/16] Fix a missing #include. --- main.cpp | 1 + server.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/main.cpp b/main.cpp index afc3edf..2771f39 100644 --- a/main.cpp +++ b/main.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include diff --git a/server.cpp b/server.cpp index d3fbaaf..8881912 100644 --- a/server.cpp +++ b/server.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include -- 2.39.2 From 94561762b1294b76508eb5554aedbb56fcaececc Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 13 Apr 2013 01:15:24 +0200 Subject: [PATCH 16/16] Thread has virtual member functions, so it should have a virtual destructor. --- thread.cpp | 2 ++ thread.h | 1 + 2 files changed, 3 insertions(+) diff --git a/thread.cpp b/thread.cpp index 03bfd9d..7aded3b 100644 --- a/thread.cpp +++ b/thread.cpp @@ -3,6 +3,8 @@ #include #include "thread.h" + +Thread::~Thread() {} void Thread::run() { diff --git a/thread.h b/thread.h index d9b9535..9c9bdc6 100644 --- a/thread.h +++ b/thread.h @@ -10,6 +10,7 @@ class Thread { public: + virtual ~Thread(); void run(); void stop(); -- 2.39.2