]> git.sesse.net Git - cubemap/commitdiff
Support multiple listening sockets. Actually mostly because it makes the code somewha...
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 10 Apr 2013 23:30:23 +0000 (01:30 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 10 Apr 2013 23:30:23 +0000 (01:30 +0200)
acceptor.cpp
acceptor.h
cubemap.config.sample
main.cpp
state.proto

index 7620b7991104d98d19364559b46d6778d4c7919e..87bf47b83ea1b1c09616fbd19c0410c6834bfc20 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "acceptor.h"
 #include "serverpool.h"
 
 #include "acceptor.h"
 #include "serverpool.h"
+#include "state.pb.h"
 
 using namespace std;
 
 
 using namespace std;
 
@@ -61,12 +62,39 @@ int create_server_socket(int port)
        return server_sock;
 }
        
        return server_sock;
 }
        
-AcceptorThread::AcceptorThread(int server_sock)
-       : server_sock(server_sock)
+Acceptor::Acceptor(int server_sock, int port)
+       : server_sock(server_sock),
+         port(port)
 {
 }
 
 {
 }
 
-void AcceptorThread::do_work()
+Acceptor::Acceptor(const AcceptorProto &serialized)
+       : server_sock(serialized.server_sock()),
+         port(serialized.port())
+{
+}
+
+AcceptorProto Acceptor::serialize() const
+{
+       AcceptorProto serialized;
+       serialized.set_server_sock(server_sock);
+       serialized.set_port(port);
+       return serialized;
+}
+
+void Acceptor::close_socket()
+{
+       int ret;
+       do {
+               ret = close(server_sock);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               perror("close");
+       }
+}
+
+void Acceptor::do_work()
 {
        while (!hupped) {
                // Since we are non-blocking, we need to wait for the right state first.
 {
        while (!hupped) {
                // Since we are non-blocking, we need to wait for the right state first.
index 97672338a133b0006b9203a8e33903057dd0aa33..caa46474ce48d83185244cc2e977c155ce882e1b 100644 (file)
@@ -5,16 +5,24 @@
 
 int create_server_socket(int port);
 
 
 int create_server_socket(int port);
 
+class AcceptorProto;
+
 // A thread that accepts new connections on a given socket,
 // and hands them off to the server pool.
 // A thread that accepts new connections on a given socket,
 // and hands them off to the server pool.
-class AcceptorThread : public Thread {
+class Acceptor : public Thread {
 public:
 public:
-       AcceptorThread(int server_sock);
+       Acceptor(int server_sock, int port);
+
+       // Serialization/deserialization.
+       Acceptor(const AcceptorProto &serialized);
+       AcceptorProto serialize() const;
+
+       void close_socket();
 
 private:
        virtual void do_work();
 
 
 private:
        virtual void do_work();
 
-       int server_sock;
+       int server_sock, port;
 };
 
 #endif  // !defined(_ACCEPTOR_H)
 };
 
 #endif  // !defined(_ACCEPTOR_H)
index fdd6a067fb4e8072e321b4605c3ca6225db53891..57739273885a704813c901a6acada33372909a96 100644 (file)
@@ -1,4 +1,8 @@
 num_servers 4   # one for each cpu
 num_servers 4   # one for each cpu
+
+#
+# All input ports are treated exactly the same, but you may use multiple ones nevertheless.
+#
 port 9094
 
 stats_file cubemap.stats
 port 9094
 
 stats_file cubemap.stats
index 1f7313ac0eb6c013648d7edafff1e93b99902460..db3be63f2d05a2cbcff27afa620205b97405f524 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -69,8 +69,7 @@ int make_tempfile(const CubemapStateProto &state)
 }
 
 CubemapStateProto collect_state(const timeval &serialize_start,
 }
 
 CubemapStateProto collect_state(const timeval &serialize_start,
-                                int server_sock,
-                                int port,
+                                const vector<Acceptor *> acceptors,
                                 const vector<Input *> inputs,
                                 ServerPool *servers,
                                 int num_servers)
                                 const vector<Input *> inputs,
                                 ServerPool *servers,
                                 int num_servers)
@@ -78,8 +77,10 @@ CubemapStateProto collect_state(const timeval &serialize_start,
        CubemapStateProto state;
        state.set_serialize_start_sec(serialize_start.tv_sec);
        state.set_serialize_start_usec(serialize_start.tv_usec);
        CubemapStateProto state;
        state.set_serialize_start_sec(serialize_start.tv_sec);
        state.set_serialize_start_usec(serialize_start.tv_usec);
-       state.set_server_sock(server_sock);
-       state.set_port(port);
+       
+       for (size_t i = 0; i < acceptors.size(); ++i) {
+               state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
+       }
 
        for (size_t i = 0; i < inputs.size(); ++i) {
                state.add_inputs()->MergeFrom(inputs[i]->serialize());
 
        for (size_t i = 0; i < inputs.size(); ++i) {
                state.add_inputs()->MergeFrom(inputs[i]->serialize());
@@ -185,7 +186,52 @@ MarkPool *parse_mark_pool(map<pair<int, int>, MarkPool *> *mark_pools, const str
 
        return get_mark_pool(mark_pools, from, to);
 }
 
        return get_mark_pool(mark_pools, from, to);
 }
-       
+
+// Find all port statements in the configuration file, and create acceptors for htem.
+vector<Acceptor *> create_acceptors(
+       const vector<ConfigLine> &config,
+       map<int, Acceptor *> *deserialized_acceptors)
+{
+       vector<Acceptor *> acceptors;
+       for (unsigned i = 0; i < config.size(); ++i) {
+               if (config[i].keyword != "port") {
+                       continue;
+               }
+               if (config[i].arguments.size() != 1) {
+                       fprintf(stderr, "ERROR: 'port' takes exactly one argument\n");
+                       exit(1);
+               }
+               int port = atoi(config[i].arguments[0].c_str());
+               if (port < 1 || port >= 65536) {
+                       fprintf(stderr, "WARNING: port %d is out of range (must be [1,65536>), ignoring\n", port);
+                       continue;
+               }
+
+               Acceptor *acceptor = NULL;
+               map<int, Acceptor *>::iterator deserialized_acceptor_it =
+                       deserialized_acceptors->find(port);
+               if (deserialized_acceptor_it != deserialized_acceptors->end()) {
+                       acceptor = deserialized_acceptor_it->second;
+                       deserialized_acceptors->erase(deserialized_acceptor_it);
+               } else {
+                       int server_sock = create_server_socket(port);
+                       acceptor = new Acceptor(server_sock, port);
+               }
+               acceptor->run();
+               acceptors.push_back(acceptor);
+       }
+
+       // Close all acceptors that are no longer in the configuration file.
+       for (map<int, Acceptor *>::iterator acceptor_it = deserialized_acceptors->begin();
+            acceptor_it != deserialized_acceptors->end();
+            ++acceptor_it) {
+               acceptor_it->second->close_socket();
+               delete acceptor_it->second;
+       }
+
+       return acceptors;
+}
+
 // Find all streams in the configuration file, and create inputs for them.
 vector<Input *> create_inputs(const vector<ConfigLine> &config,
                               map<string, Input *> *deserialized_inputs)
 // Find all streams in the configuration file, and create inputs for them.
 vector<Input *> create_inputs(const vector<ConfigLine> &config,
                               map<string, Input *> *deserialized_inputs)
@@ -284,15 +330,14 @@ int main(int argc, char **argv)
        string config_filename = (argc == 1) ? "cubemap.config" : argv[1];
        vector<ConfigLine> config = parse_config(config_filename);
 
        string config_filename = (argc == 1) ? "cubemap.config" : argv[1];
        vector<ConfigLine> config = parse_config(config_filename);
 
-       int port = fetch_config_int(config, "port", 1, 65535, PARAMATER_MANDATORY);
        int num_servers = fetch_config_int(config, "num_servers", 1, 20000, PARAMATER_MANDATORY);  // Insanely high max limit.
 
        servers = new ServerPool(num_servers);
 
        CubemapStateProto loaded_state;
        int num_servers = fetch_config_int(config, "num_servers", 1, 20000, PARAMATER_MANDATORY);  // Insanely high max limit.
 
        servers = new ServerPool(num_servers);
 
        CubemapStateProto loaded_state;
-       int server_sock = -1, old_port = -1;
        set<string> deserialized_stream_ids;
        map<string, Input *> deserialized_inputs;
        set<string> deserialized_stream_ids;
        map<string, Input *> deserialized_inputs;
+       map<int, Acceptor *> deserialized_acceptors;
        if (argc == 4 && strcmp(argv[2], "-state") == 0) {
                is_reexec = true;
 
        if (argc == 4 && strcmp(argv[2], "-state") == 0) {
                is_reexec = true;
 
@@ -316,9 +361,19 @@ int main(int argc, char **argv)
                                new Input(loaded_state.inputs(i))));
                } 
 
                                new Input(loaded_state.inputs(i))));
                } 
 
-               // Deserialize the server socket.
-               server_sock = loaded_state.server_sock();
-               old_port = loaded_state.port();
+               // Convert the acceptor from older serialized formats.
+               if (loaded_state.has_server_sock() && loaded_state.has_port()) {
+                       AcceptorProto *acceptor = loaded_state.add_acceptors();
+                       acceptor->set_server_sock(loaded_state.server_sock());
+                       acceptor->set_port(loaded_state.port());
+               }
+
+               // Deserialize the acceptors.
+               for (int i = 0; i < loaded_state.acceptors_size(); ++i) {
+                       deserialized_acceptors.insert(make_pair(
+                               loaded_state.acceptors(i).port(),
+                               new Acceptor(loaded_state.acceptors(i))));
+               }
 
                fprintf(stderr, "done.\n");
        }
 
                fprintf(stderr, "done.\n");
        }
@@ -326,16 +381,6 @@ int main(int argc, char **argv)
        // Find all streams in the configuration file, and create them.
        create_streams(config, deserialized_stream_ids, &deserialized_inputs);
 
        // Find all streams in the configuration file, and create them.
        create_streams(config, deserialized_stream_ids, &deserialized_inputs);
 
-       // Open a new server socket if we do not already have one, or if we changed ports.
-       if (server_sock != -1 && port != old_port) {
-               fprintf(stderr, "NOTE: Port changed from %d to %d; opening new socket.\n", old_port, port);
-               close(server_sock);
-               server_sock = -1;
-       }
-       if (server_sock == -1) {
-               server_sock = create_server_socket(port);
-       }
-
        // See if the user wants stats.
        string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL);
        int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1);
        // See if the user wants stats.
        string stats_file = fetch_config_string(config, "stats_file", PARAMETER_OPTIONAL);
        int stats_interval = fetch_config_int(config, "stats_interval", 1, INT_MAX, PARAMETER_OPTIONAL, -1);
@@ -345,9 +390,7 @@ int main(int argc, char **argv)
 
        servers->run();
 
 
        servers->run();
 
-       AcceptorThread acceptor_thread(server_sock);
-       acceptor_thread.run();
-
+       vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
        vector<Input *> inputs = create_inputs(config, &deserialized_inputs);
        
        // All deserialized inputs should now have been taken care of, one way or the other.
        vector<Input *> inputs = create_inputs(config, &deserialized_inputs);
        
        // All deserialized inputs should now have been taken care of, one way or the other.
@@ -392,7 +435,9 @@ int main(int argc, char **argv)
        if (stats_thread != NULL) {
                stats_thread->stop();
        }
        if (stats_thread != NULL) {
                stats_thread->stop();
        }
-       acceptor_thread.stop();
+       for (size_t i = 0; i < acceptors.size(); ++i) {
+               acceptors[i]->stop();
+       }
        for (size_t i = 0; i < inputs.size(); ++i) {
                inputs[i]->stop();
        }
        for (size_t i = 0; i < inputs.size(); ++i) {
                inputs[i]->stop();
        }
@@ -400,7 +445,7 @@ int main(int argc, char **argv)
 
        fprintf(stderr, "Serializing state and re-execing...\n");
        int state_fd = make_tempfile(collect_state(
 
        fprintf(stderr, "Serializing state and re-execing...\n");
        int state_fd = make_tempfile(collect_state(
-               serialize_start, server_sock, port, inputs, servers, num_servers));
+               serialize_start, acceptors, inputs, servers, num_servers));
        delete servers;
         
        char buf[16];
        delete servers;
         
        char buf[16];
index 1d5cf1702ee3081ef1e9a94e611256221979067a..cfad404da66eca1cc4d77b8f476b5d9416ebff90 100644 (file)
@@ -33,12 +33,21 @@ message InputProto {
        optional int32 sock = 9;
 };
 
        optional int32 sock = 9;
 };
 
+// Corresponds to class Acceptor.
+message AcceptorProto {
+       optional int32 server_sock = 1;
+       optional int32 port = 2;
+};
+
 message CubemapStateProto {
        optional int64 serialize_start_sec = 6;
        optional int64 serialize_start_usec = 7;
        repeated ClientProto clients = 1;
        repeated StreamProto streams = 2;
        repeated InputProto inputs = 5;
 message CubemapStateProto {
        optional int64 serialize_start_sec = 6;
        optional int64 serialize_start_usec = 7;
        repeated ClientProto clients = 1;
        repeated StreamProto streams = 2;
        repeated InputProto inputs = 5;
+       repeated AcceptorProto acceptors = 8;
+
+       // Deprecated. Use Acceptor instead.
        optional int32 server_sock = 3;
        optional int32 port = 4;
 };
        optional int32 server_sock = 3;
        optional int32 port = 4;
 };