Support multiple listening sockets. Actually mostly because it makes the code somewha...
[cubemap] / main.cpp
index 1f7313a..db3be63 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -69,8 +69,7 @@ int make_tempfile(const CubemapStateProto &state)
 }
 
 CubemapStateProto collect_state(const timeval &serialize_start,
-                                int server_sock,
-                                int port,
+                                const vector<Acceptor *> acceptors,
                                 const vector<Input *> inputs,
                                 ServerPool *servers,
                                 int num_servers)
@@ -78,8 +77,10 @@ CubemapStateProto collect_state(const timeval &serialize_start,
        CubemapStateProto state;
        state.set_serialize_start_sec(serialize_start.tv_sec);
        state.set_serialize_start_usec(serialize_start.tv_usec);
-       state.set_server_sock(server_sock);
-       state.set_port(port);
+       
+       for (size_t i = 0; i < acceptors.size(); ++i) {
+               state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
+       }
 
        for (size_t i = 0; i < inputs.size(); ++i) {
                state.add_inputs()->MergeFrom(inputs[i]->serialize());
@@ -185,7 +186,52 @@ MarkPool *parse_mark_pool(map<pair<int, int>, MarkPool *> *mark_pools, const str
 
        return get_mark_pool(mark_pools, from, to);
 }
-       
+
+// Find all port statements in the configuration file, and create acceptors for htem.
+vector<Acceptor *> create_acceptors(
+       const vector<ConfigLine> &config,
+       map<int, Acceptor *> *deserialized_acceptors)
+{
+       vector<Acceptor *> acceptors;
+       for (unsigned i = 0; i < config.size(); ++i) {
+               if (config[i].keyword != "port") {
+                       continue;
+               }
+               if (config[i].arguments.size() != 1) {
+                       fprintf(stderr, "ERROR: 'port' takes exactly one argument\n");
+                       exit(1);
+               }
+               int port = atoi(config[i].arguments[0].c_str());
+               if (port < 1 || port >= 65536) {
+                       fprintf(stderr, "WARNING: port %d is out of range (must be [1,65536>), ignoring\n", port);
+                       continue;
+               }
+
+               Acceptor *acceptor = NULL;
+               map<int, Acceptor *>::iterator deserialized_acceptor_it =
+                       deserialized_acceptors->find(port);
+               if (deserialized_acceptor_it != deserialized_acceptors->end()) {
+                       acceptor = deserialized_acceptor_it->second;
+                       deserialized_acceptors->erase(deserialized_acceptor_it);
+               } else {
+                       int server_sock = create_server_socket(port);
+                       acceptor = new Acceptor(server_sock, port);
+               }
+               acceptor->run();
+               acceptors.push_back(acceptor);
+       }
+
+       // Close all acceptors that are no longer in the configuration file.
+       for (map<int, Acceptor *>::iterator acceptor_it = deserialized_acceptors->begin();
+            acceptor_it != deserialized_acceptors->end();
+            ++acceptor_it) {
+               acceptor_it->second->close_socket();
+               delete acceptor_it->second;
+       }
+
+       return acceptors;
+}
+
 // Find all streams in the configuration file, and create inputs for them.
 vector<Input *> create_inputs(const vector<ConfigLine> &config,
                               map<string, Input *> *deserialized_inputs)
@@ -284,15 +330,14 @@ int main(int argc, char **argv)
        string config_filename = (argc == 1) ? "cubemap.config" : argv[1];
        vector<ConfigLine> config = parse_config(config_filename);
 
-       int port = fetch_config_int(config, "port", 1, 65535, PARAMATER_MANDATORY);
        int num_servers = fetch_config_int(config, "num_servers", 1, 20000, PARAMATER_MANDATORY);  // Insanely high max limit.
 
        servers = new ServerPool(num_servers);
 
        CubemapStateProto loaded_state;
-       int server_sock = -1, old_port = -1;
        set<string> deserialized_stream_ids;
        map<string, Input *> deserialized_inputs;
+       map<int, Acceptor *> deserialized_acceptors;
        if (argc == 4 && strcmp(argv[2], "-state") == 0) {
                is_reexec = true;
 
@@ -316,9 +361,19 @@ int main(int argc, char **argv)
                                new Input(loaded_state.inputs(i))));
                } 
 
-               // Deserialize the server socket.
-               server_sock = loaded_state.server_sock();
-               old_port = loaded_state.port();
+               // Convert the acceptor from older serialized formats.
+               if (loaded_state.has_server_sock() && loaded_state.has_port()) {
+                       AcceptorProto *acceptor = loaded_state.add_acceptors();
+                       acceptor->set_server_sock(loaded_state.server_sock());
+                       acceptor->set_port(loaded_state.port());
+               }
+
+               // Deserialize the acceptors.
+               for (int i = 0; i < loaded_state.acceptors_size(); ++i) {
+                       deserialized_acceptors.insert(make_pair(
+                               loaded_state.acceptors(i).port(),
+                               new Acceptor(loaded_state.acceptors(i))));
+               }
 
                fprintf(stderr, "done.\n");
        }
@@ -326,16 +381,6 @@ int main(int argc, char **argv)
        // Find all streams in the configuration file, and create them.
        create_streams(config, deserialized_stream_ids, &deserialized_inputs);
 
-       // Open a new server socket if we do not already have one, or if we changed ports.
-       if (server_sock != -1 && port != old_port) {
-               fprintf(stderr, "NOTE: Port changed from %d to %d; opening new socket.\n", old_port, port);
-               close(server_sock);
-               server_sock = -1;
-       }
-       if (server_sock == -1) {
-               server_sock = create_server_socket(port);
-       }
-
        // See if the user wants stats.
        string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL);
        int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1);
@@ -345,9 +390,7 @@ int main(int argc, char **argv)
 
        servers->run();
 
-       AcceptorThread acceptor_thread(server_sock);
-       acceptor_thread.run();
-
+       vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
        vector<Input *> inputs = create_inputs(config, &deserialized_inputs);
        
        // All deserialized inputs should now have been taken care of, one way or the other.
@@ -392,7 +435,9 @@ int main(int argc, char **argv)
        if (stats_thread != NULL) {
                stats_thread->stop();
        }
-       acceptor_thread.stop();
+       for (size_t i = 0; i < acceptors.size(); ++i) {
+               acceptors[i]->stop();
+       }
        for (size_t i = 0; i < inputs.size(); ++i) {
                inputs[i]->stop();
        }
@@ -400,7 +445,7 @@ int main(int argc, char **argv)
 
        fprintf(stderr, "Serializing state and re-execing...\n");
        int state_fd = make_tempfile(collect_state(
-               serialize_start, server_sock, port, inputs, servers, num_servers));
+               serialize_start, acceptors, inputs, servers, num_servers));
        delete servers;
         
        char buf[16];