#include <stdio.h>
#include <string.h>
#include <stdlib.h>
+#include <limits.h>
#include <ctype.h>
#include <assert.h>
+#include <map>
#include <vector>
#include <string>
using namespace std;
-vector<ConfigLine> parse_config(const string &filename)
-{
- vector<ConfigLine> ret;
+struct ConfigLine {
+ string keyword;
+ vector<string> arguments;
+ map<string, string> parameters;
+};
+bool read_config(const string &filename, vector<ConfigLine> *lines)
+{
FILE *fp = fopen(filename.c_str(), "r");
if (fp == NULL) {
perror(filename.c_str());
- exit(1);
+ return false;
}
char buf[4096];
}
}
- ret.push_back(line);
+ lines->push_back(line);
}
fclose(fp);
- return ret;
+ return true;
}
-string fetch_config_string(const vector<ConfigLine> &config, const string &keyword,
- ParameterType parameter_type, const string &default_value)
+bool fetch_config_string(const vector<ConfigLine> &config, const string &keyword, string *value)
{
- assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL);
for (unsigned i = 0; i < config.size(); ++i) {
if (config[i].keyword != keyword) {
continue;
if (config[i].parameters.size() > 0 ||
config[i].arguments.size() != 1) {
fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str());
- exit(1);
+ return false;
}
- return config[i].arguments[0];
- }
- if (parameter_type == PARAMATER_MANDATORY) {
- fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n",
- keyword.c_str());
- exit(1);
- } else {
- return default_value;
+ *value = config[i].arguments[0];
+ return true;
}
+ return false;
}
-int fetch_config_int(const std::vector<ConfigLine> &config, const std::string &keyword,
- int min_limit, int max_limit,
- ParameterType parameter_type, int default_value)
+bool fetch_config_int(const vector<ConfigLine> &config, const string &keyword, int *value)
{
- assert(parameter_type == PARAMATER_MANDATORY || parameter_type == PARAMETER_OPTIONAL);
- bool value_found = false;
- int value = -1;
for (unsigned i = 0; i < config.size(); ++i) {
if (config[i].keyword != keyword) {
continue;
if (config[i].parameters.size() > 0 ||
config[i].arguments.size() != 1) {
fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str());
- exit(1);
+ return false;
+ }
+ *value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity.
+ return true;
+ }
+ return false;
+}
+
+bool parse_port(const ConfigLine &line, Config *config)
+{
+ if (line.arguments.size() != 1) {
+ fprintf(stderr, "ERROR: 'port' takes exactly one argument\n");
+ return false;
+ }
+
+ AcceptorConfig acceptor;
+ acceptor.port = atoi(line.arguments[0].c_str());
+ if (acceptor.port < 1 || acceptor.port >= 65536) {
+ fprintf(stderr, "ERROR: port %d is out of range (must be [1,65536>).\n", acceptor.port);
+ return false;
+ }
+
+ config->acceptors.push_back(acceptor);
+ return true;
+}
+
+int allocate_mark_pool(int from, int to, Config *config)
+{
+ int pool_index = -1;
+
+ // Reuse mark pools if an identical one exists.
+ // Otherwise, check if we're overlapping some other mark pool.
+ for (size_t i = 0; i < config->mark_pools.size(); ++i) {
+ const MarkPoolConfig &pool = config->mark_pools[i];
+ if (from == pool.from && to == pool.to) {
+ pool_index = i;
+ } else if ((from >= pool.from && from < pool.to) ||
+ (to >= pool.from && to < pool.to)) {
+ fprintf(stderr, "WARNING: Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks.\n",
+ from, to, pool.from, pool.to);
+ fprintf(stderr, " Mark pools must either be completely disjunct, or completely overlapping.\n");
}
- value_found = true;
- value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity.
}
- if (!value_found) {
- if (parameter_type == PARAMETER_OPTIONAL) {
- return default_value;
+
+ if (pool_index != -1) {
+ return pool_index;
+ }
+
+ // No match to existing pools.
+ MarkPoolConfig pool;
+ pool.from = from;
+ pool.to = to;
+ config->mark_pools.push_back(pool);
+
+ return config->mark_pools.size() - 1;
+}
+
+bool parse_mark_pool(const string &mark_str, int *from, int *to)
+{
+ size_t split = mark_str.find_first_of('-');
+ if (split == string::npos) {
+ fprintf(stderr, "ERROR: Invalid mark specification '%s' (expected 'X-Y').\n",
+ mark_str.c_str());
+ return false;
+ }
+
+ string from_str(mark_str.begin(), mark_str.begin() + split);
+ string to_str(mark_str.begin() + split + 1, mark_str.end());
+ *from = atoi(from_str.c_str());
+ *to = atoi(to_str.c_str());
+
+ if (*from <= 0 || *from >= 65536 || *to <= 0 || *to >= 65536) {
+ fprintf(stderr, "ERROR: Mark pool range %d-%d is outside legal range [1,65536>.\n",
+ *from, *to);
+ return false;
+ }
+
+ return true;
+}
+
+bool parse_stream(const ConfigLine &line, Config *config)
+{
+ if (line.arguments.size() != 1) {
+ fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n");
+ return false;
+ }
+
+ StreamConfig stream;
+ stream.stream_id = line.arguments[0];
+
+ map<string, string>::const_iterator src_it = line.parameters.find("src");
+ if (src_it == line.parameters.end()) {
+ fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n",
+ stream.stream_id.c_str());
+ } else {
+ stream.src = src_it->second;
+ // TODO: Verify that the URL is parseable?
+ }
+
+ // Parse marks, if so desired.
+ map<string, string>::const_iterator mark_parm_it = line.parameters.find("mark");
+ if (mark_parm_it == line.parameters.end()) {
+ stream.mark_pool = -1;
+ } else {
+ int from, to;
+ if (!parse_mark_pool(mark_parm_it->second, &from, &to)) {
+ return false;
}
- fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n",
- keyword.c_str());
- exit(1);
+ stream.mark_pool = allocate_mark_pool(from, to, config);
+ }
+
+ config->streams.push_back(stream);
+ return true;
+}
+
+bool parse_config(const string &filename, Config *config)
+{
+ vector<ConfigLine> lines;
+ if (!read_config(filename, &lines)) {
+ return false;
}
- if (value < min_limit || value > max_limit) {
- fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n",
- keyword.c_str(), value, min_limit, max_limit);
- exit(1);
+
+ if (!fetch_config_int(lines, "num_servers", &config->num_servers)) {
+ fprintf(stderr, "ERROR: Missing 'num_servers' statement in config file.\n");
+ return false;
+ }
+ if (config->num_servers < 1 || config->num_servers >= 20000) { // Insanely high max limit.
+ fprintf(stderr, "ERROR: 'num_servers' is %d, needs to be in [1, 20000>.\n", config->num_servers);
+ return false;
+ }
+
+ // See if the user wants stats.
+ config->stats_interval = 60;
+ bool has_stats_file = fetch_config_string(lines, "stats_file", &config->stats_file);
+ bool has_stats_interval = fetch_config_int(lines, "stats_interval", &config->stats_interval);
+ if (has_stats_interval && !has_stats_file) {
+ fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n");
}
- return value;
+
+ for (size_t i = 0; i < lines.size(); ++i) {
+ const ConfigLine &line = lines[i];
+ if (line.keyword == "num_servers" ||
+ line.keyword == "stats_file" ||
+ line.keyword == "stats_interval") {
+ // Already taken care of, above.
+ } else if (line.keyword == "port") {
+ if (!parse_port(line, config)) {
+ return false;
+ }
+ } else if (line.keyword == "stream") {
+ if (!parse_stream(line, config)) {
+ return false;
+ }
+ } else {
+ fprintf(stderr, "ERROR: Unknown configuration keyword '%s'.\n",
+ line.keyword.c_str());
+ return false;
+ }
+ }
+
+ return true;
}
return state;
}
-// Reuse mark pools if one already exists.
-MarkPool *get_mark_pool(map<pair<int, int>, MarkPool *> *mark_pools, int from, int to)
-{
- pair<int, int> mark_range(from, to);
- if (mark_pools->count(mark_range) != 0) {
- return (*mark_pools)[mark_range];
- }
-
- // Check if we're overlapping some other mark pool.
- for (map<pair<int, int>, MarkPool *>::const_iterator mp_it = mark_pools->begin();
- mp_it != mark_pools->end();
- ++mp_it) {
- int other_from = mp_it->first.first;
- int other_to = mp_it->first.second;
- if ((from >= other_from && from < other_to) ||
- (to >= other_from && to < other_to)) {
- fprintf(stderr, "WARNING: Mark pool %d-%d partially overlaps with %d-%d, you may get duplicate marks.\n",
- from, to, other_from, other_to);
- fprintf(stderr, " Mark pools must either be completely disjunct, or completely overlapping.\n");
- }
- }
-
- MarkPool *mark_pool = new MarkPool(from, to);
- mark_pools->insert(make_pair(mark_range, mark_pool));
- return mark_pool;
-}
-
-MarkPool *parse_mark_pool(map<pair<int, int>, MarkPool *> *mark_pools, const string &mark_str)
-{
- size_t split = mark_str.find_first_of('-');
- if (split == string::npos) {
- fprintf(stderr, "WARNING: Invalid mark specification '%s' (expected 'X-Y'), ignoring.\n",
- mark_str.c_str());
- return NULL;
- }
-
- string from_str(mark_str.begin(), mark_str.begin() + split);
- string to_str(mark_str.begin() + split + 1, mark_str.end());
- int from = atoi(from_str.c_str());
- int to = atoi(to_str.c_str());
-
- if (from <= 0 || from >= 65536 || to <= 0 || to >= 65536) {
- fprintf(stderr, "WARNING: Mark pool range %d-%d is outside legal range [1,65536>, ignoring.\n",
- from, to);
- return NULL;
- }
-
- return get_mark_pool(mark_pools, from, to);
-}
-
// Find all port statements in the configuration file, and create acceptors for htem.
vector<Acceptor *> create_acceptors(
- const vector<ConfigLine> &config,
+ const Config &config,
map<int, Acceptor *> *deserialized_acceptors)
{
vector<Acceptor *> acceptors;
- for (unsigned i = 0; i < config.size(); ++i) {
- if (config[i].keyword != "port") {
- continue;
- }
- if (config[i].arguments.size() != 1) {
- fprintf(stderr, "ERROR: 'port' takes exactly one argument\n");
- exit(1);
- }
- int port = atoi(config[i].arguments[0].c_str());
- if (port < 1 || port >= 65536) {
- fprintf(stderr, "WARNING: port %d is out of range (must be [1,65536>), ignoring\n", port);
- continue;
- }
-
+ for (unsigned i = 0; i < config.acceptors.size(); ++i) {
+ const AcceptorConfig &acceptor_config = config.acceptors[i];
Acceptor *acceptor = NULL;
map<int, Acceptor *>::iterator deserialized_acceptor_it =
- deserialized_acceptors->find(port);
+ deserialized_acceptors->find(acceptor_config.port);
if (deserialized_acceptor_it != deserialized_acceptors->end()) {
acceptor = deserialized_acceptor_it->second;
deserialized_acceptors->erase(deserialized_acceptor_it);
} else {
- int server_sock = create_server_socket(port, TCP_SOCKET);
- acceptor = new Acceptor(server_sock, port);
+ int server_sock = create_server_socket(acceptor_config.port, TCP_SOCKET);
+ acceptor = new Acceptor(server_sock, acceptor_config.port);
}
acceptor->run();
acceptors.push_back(acceptor);
}
// Find all streams in the configuration file, and create inputs for them.
-vector<Input *> create_inputs(const vector<ConfigLine> &config,
+vector<Input *> create_inputs(const Config &config,
map<string, Input *> *deserialized_inputs)
{
vector<Input *> inputs;
- for (unsigned i = 0; i < config.size(); ++i) {
- if (config[i].keyword != "stream") {
- continue;
- }
- assert(config[i].arguments.size() == 1);
- string stream_id = config[i].arguments[0];
-
- map<string, string>::const_iterator src_it =
- config[i].parameters.find("src");
- if (src_it == config[i].parameters.end()) {
- fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n",
- stream_id.c_str());
+ for (unsigned i = 0; i < config.streams.size(); ++i) {
+ const StreamConfig &stream_config = config.streams[i];
+ if (stream_config.src.empty()) {
continue;
}
- string src = src_it->second;
+ string stream_id = stream_config.stream_id;
+ string src = stream_config.src;
+
Input *input = NULL;
map<string, Input *>::iterator deserialized_input_it =
deserialized_inputs->find(stream_id);
return inputs;
}
-void create_streams(const vector<ConfigLine> &config,
+void create_streams(const Config &config,
const set<string> &deserialized_stream_ids,
map<string, Input *> *deserialized_inputs)
{
+ vector<MarkPool *> mark_pools; // FIXME: leak
+ for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
+ const MarkPoolConfig &mp_config = config.mark_pools[i];
+ mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
+ }
+
set<string> expecting_stream_ids = deserialized_stream_ids;
- map<pair<int, int>, MarkPool *> mark_pools;
- for (unsigned i = 0; i < config.size(); ++i) {
- if (config[i].keyword != "stream") {
- continue;
- }
- if (config[i].arguments.size() != 1) {
- fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n");
- exit(1);
+ for (unsigned i = 0; i < config.streams.size(); ++i) {
+ const StreamConfig &stream_config = config.streams[i];
+ if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
+ servers->add_stream(stream_config.stream_id);
}
- string stream_id = config[i].arguments[0];
- if (deserialized_stream_ids.count(stream_id) == 0) {
- servers->add_stream(stream_id);
- }
- expecting_stream_ids.erase(stream_id);
-
- // Set up marks, if so desired.
- map<string, string>::const_iterator mark_parm_it =
- config[i].parameters.find("mark");
- if (mark_parm_it != config[i].parameters.end()) {
- MarkPool *mark_pool = parse_mark_pool(&mark_pools, mark_parm_it->second);
- servers->set_mark_pool(stream_id, mark_pool);
+ expecting_stream_ids.erase(stream_config.stream_id);
+
+ if (stream_config.mark_pool != -1) {
+ servers->set_mark_pool(stream_config.stream_id,
+ mark_pools[stream_config.mark_pool]);
}
}
bool is_reexec = false;
string config_filename = (argc == 1) ? "cubemap.config" : argv[1];
- vector<ConfigLine> config = parse_config(config_filename);
-
- int num_servers = fetch_config_int(config, "num_servers", 1, 20000, PARAMATER_MANDATORY); // Insanely high max limit.
+ Config config;
+ if (!parse_config(config_filename, &config)) {
+ exit(1);
+ }
- servers = new ServerPool(num_servers);
+ servers = new ServerPool(config.num_servers);
CubemapStateProto loaded_state;
set<string> deserialized_stream_ids;
// Find all streams in the configuration file, and create them.
create_streams(config, deserialized_stream_ids, &deserialized_inputs);
- // See if the user wants stats.
- string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL);
- int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1);
- if (stats_interval != -1 && stats_file.empty()) {
- fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n");
- }
-
servers->run();
vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
// Start writing statistics.
StatsThread *stats_thread = NULL;
- if (!stats_file.empty()) {
- stats_thread = new StatsThread(stats_file, stats_interval);
+ if (!config.stats_file.empty()) {
+ stats_thread = new StatsThread(config.stats_file, config.stats_interval);
stats_thread->run();
+ } else if (config.stats_interval != -1) {
+ fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n");
}
signal(SIGHUP, hup);
fprintf(stderr, "Serializing state and re-execing...\n");
int state_fd = make_tempfile(collect_state(
- serialize_start, acceptors, inputs, servers, num_servers));
+ serialize_start, acceptors, inputs, servers, config.num_servers));
delete servers;
char buf[16];