+// Find all port statements in the configuration file, and create acceptors for htem.
+vector<Acceptor *> create_acceptors(
+ const vector<ConfigLine> &config,
+ map<int, Acceptor *> *deserialized_acceptors)
+{
+ vector<Acceptor *> acceptors;
+ for (unsigned i = 0; i < config.size(); ++i) {
+ if (config[i].keyword != "port") {
+ continue;
+ }
+ if (config[i].arguments.size() != 1) {
+ fprintf(stderr, "ERROR: 'port' takes exactly one argument\n");
+ exit(1);
+ }
+ int port = atoi(config[i].arguments[0].c_str());
+ if (port < 1 || port >= 65536) {
+ fprintf(stderr, "WARNING: port %d is out of range (must be [1,65536>), ignoring\n", port);
+ continue;
+ }
+
+ Acceptor *acceptor = NULL;
+ map<int, Acceptor *>::iterator deserialized_acceptor_it =
+ deserialized_acceptors->find(port);
+ if (deserialized_acceptor_it != deserialized_acceptors->end()) {
+ acceptor = deserialized_acceptor_it->second;
+ deserialized_acceptors->erase(deserialized_acceptor_it);
+ } else {
+ int server_sock = create_server_socket(port);
+ acceptor = new Acceptor(server_sock, port);
+ }
+ acceptor->run();
+ acceptors.push_back(acceptor);
+ }
+
+ // Close all acceptors that are no longer in the configuration file.
+ for (map<int, Acceptor *>::iterator acceptor_it = deserialized_acceptors->begin();
+ acceptor_it != deserialized_acceptors->end();
+ ++acceptor_it) {
+ acceptor_it->second->close_socket();
+ delete acceptor_it->second;
+ }
+
+ return acceptors;
+}
+
+// Find all streams in the configuration file, and create inputs for them.
+vector<Input *> create_inputs(const vector<ConfigLine> &config,
+ map<string, Input *> *deserialized_inputs)
+{
+ vector<Input *> inputs;
+ for (unsigned i = 0; i < config.size(); ++i) {
+ if (config[i].keyword != "stream") {
+ continue;
+ }
+ assert(config[i].arguments.size() == 1);
+ string stream_id = config[i].arguments[0];
+
+ map<string, string>::const_iterator src_it =
+ config[i].parameters.find("src");
+ if (src_it == config[i].parameters.end()) {
+ fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n",
+ stream_id.c_str());
+ continue;
+ }
+
+ string src = src_it->second;
+ Input *input = NULL;
+ map<string, Input *>::iterator deserialized_input_it =
+ deserialized_inputs->find(stream_id);
+ if (deserialized_input_it != deserialized_inputs->end()) {
+ input = deserialized_input_it->second;
+ if (input->get_url() != src) {
+ fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n",
+ stream_id.c_str(), input->get_url().c_str(), src.c_str());
+ delete input;
+ input = NULL;
+ }
+ deserialized_inputs->erase(deserialized_input_it);
+ }
+ if (input == NULL) {
+ input = new HTTPInput(stream_id, src);
+ }
+ input->run();
+ inputs.push_back(input);
+ }
+ return inputs;
+}
+
+void create_streams(const vector<ConfigLine> &config,
+ const set<string> &deserialized_stream_ids,
+ map<string, Input *> *deserialized_inputs)
+{
+ set<string> expecting_stream_ids = deserialized_stream_ids;
+ map<pair<int, int>, MarkPool *> mark_pools;
+ for (unsigned i = 0; i < config.size(); ++i) {
+ if (config[i].keyword != "stream") {
+ continue;
+ }
+ if (config[i].arguments.size() != 1) {
+ fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n");
+ exit(1);
+ }
+ string stream_id = config[i].arguments[0];
+ if (deserialized_stream_ids.count(stream_id) == 0) {
+ servers->add_stream(stream_id);
+ }
+ expecting_stream_ids.erase(stream_id);
+
+ // Set up marks, if so desired.
+ map<string, string>::const_iterator mark_parm_it =
+ config[i].parameters.find("mark");
+ if (mark_parm_it != config[i].parameters.end()) {
+ MarkPool *mark_pool = parse_mark_pool(&mark_pools, mark_parm_it->second);
+ servers->set_mark_pool(stream_id, mark_pool);
+ }
+ }
+
+ // Warn about any servers we've lost.
+ // TODO: Make an option (delete=yes?) to actually shut down streams.
+ for (set<string>::const_iterator stream_it = expecting_stream_ids.begin();
+ stream_it != expecting_stream_ids.end();
+ ++stream_it) {
+ string stream_id = *stream_it;
+ fprintf(stderr, "WARNING: stream '%s' disappeared from the configuration file.\n",
+ stream_id.c_str());
+ fprintf(stderr, " It will not be deleted, but clients will not get any new inputs.\n");
+ if (deserialized_inputs->count(stream_id) != 0) {
+ delete (*deserialized_inputs)[stream_id];
+ deserialized_inputs->erase(stream_id);
+ }
+ }
+}
+