- set<string> expecting_stream_ids = deserialized_stream_ids;
- map<pair<int, int>, MarkPool *> mark_pools;
- for (unsigned i = 0; i < config.size(); ++i) {
- if (config[i].keyword != "stream") {
- continue;
- }
- if (config[i].arguments.size() != 1) {
- fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n");
- exit(1);
- }
- string stream_id = config[i].arguments[0];
- if (deserialized_stream_ids.count(stream_id) == 0) {
- servers->add_stream(stream_id);
- }
- expecting_stream_ids.erase(stream_id);
-
- // Set up marks, if so desired.
- if (config[i].parameters.count("mark")) {
- string mark_str = config[i].parameters["mark"];
- size_t split = mark_str.find_first_of('-');
- if (split == string::npos) {
- fprintf(stderr, "WARNING: Invalid mark specification '%s' (expected 'X-Y'), ignoring.\n",
- mark_str.c_str());
- continue;
- }
-
- string from_str(mark_str.begin(), mark_str.begin() + split);
- string to_str(mark_str.begin() + split + 1, mark_str.end());
- int from = atoi(from_str.c_str());
- int to = atoi(to_str.c_str());
-
- if (from <= 0 || from >= 65536 || to <= 0 || to >= 65536) {
- fprintf(stderr, "WARNING: Mark pool range %d-%d is outside legal range [1,65536>, ignoring.\n",
- from, to);
- continue;
- }
-
- MarkPool *mark_pool = get_mark_pool(&mark_pools, from, to);
- servers->set_mark_pool(stream_id, mark_pool);
- }
- }
-
- // Warn about any servers we've lost.
- // TODO: Make an option (delete=yes?) to actually shut down streams.
- for (set<string>::const_iterator stream_it = expecting_stream_ids.begin();
- stream_it != expecting_stream_ids.end();
- ++stream_it) {
- string stream_id = *stream_it;
- fprintf(stderr, "WARNING: stream '%s' disappeared from the configuration file.\n",
- stream_id.c_str());
- fprintf(stderr, " It will not be deleted, but clients will not get any new inputs.\n");
- if (deserialized_inputs.count(stream_id) != 0) {
- delete deserialized_inputs[stream_id];
- deserialized_inputs.erase(stream_id);
- }
- }
-
- // Open a new server socket if we do not already have one, or if we changed ports.
- if (server_sock != -1 && port != old_port) {
- fprintf(stderr, "NOTE: Port changed from %d to %d; opening new socket.\n", old_port, port);
- close(server_sock);
- server_sock = -1;
- }
- if (server_sock == -1) {
- server_sock = create_server_socket(port);
- }
-
- // See if the user wants stats.
- string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL);
- int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1);
- if (stats_interval != -1 && stats_file.empty()) {
- fprintf(stderr, "WARNING: 'stats_interval' given, but no 'stats_file'. No statistics will be written.\n");
- }
- StatsThread *stats_thread = NULL;
- if (!stats_file.empty()) {
- stats_thread = new StatsThread(stats_file, stats_interval);
- }
-
- servers->run();
-
- pthread_t acceptor_thread;
- pthread_create(&acceptor_thread, NULL, acceptor_thread_run, reinterpret_cast<void *>(server_sock));
-
- // Find all streams in the configuration file, and create inputs for them.
- vector<Input *> inputs;
- for (unsigned i = 0; i < config.size(); ++i) {
- if (config[i].keyword != "stream") {
- continue;
- }
- assert(config[i].arguments.size() == 1);
- string stream_id = config[i].arguments[0];