From e20ad47985bdda71b7b58c26932dad9a3a50c066 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Tue, 9 Apr 2013 01:11:06 +0200 Subject: [PATCH] Deserialize/serialize inputs. Woo, totally glitch-free restarts! --- cubemap.cpp | 44 +++++++++++++++++++++++++++++++++++++------- input.cpp | 32 ++++++++++++++++++++++++++++++++ input.h | 8 ++++++++ state.proto | 14 ++++++++++++++ 4 files changed, 91 insertions(+), 7 deletions(-) diff --git a/cubemap.cpp b/cubemap.cpp index b471afb..1bcc07b 100644 --- a/cubemap.cpp +++ b/cubemap.cpp @@ -208,6 +208,7 @@ int main(int argc, char **argv) int server_sock = -1, old_port = -1; set deserialized_stream_ids; + map deserialized_inputs; if (argc == 4 && strcmp(argv[2], "-state") == 0) { fprintf(stderr, "Deserializing state from previous process... "); int state_fd = atoi(argv[3]); @@ -225,6 +226,13 @@ int main(int argc, char **argv) servers->add_client_from_serialized(loaded_state.clients(i)); } + // Deserialize the inputs. Note that we don't actually add them to any state yet. + for (int i = 0; i < loaded_state.inputs_size(); ++i) { + deserialized_inputs.insert(make_pair( + loaded_state.inputs(i).stream_id(), + new Input(loaded_state.inputs(i)))); + } + // Deserialize the server socket. server_sock = loaded_state.server_sock(); old_port = loaded_state.port(); @@ -254,9 +262,14 @@ int main(int argc, char **argv) for (set::const_iterator stream_it = expecting_stream_ids.begin(); stream_it != expecting_stream_ids.end(); ++stream_it) { + string stream_id = *stream_it; fprintf(stderr, "WARNING: stream '%s' disappeared from the configuration file.\n", - stream_it->c_str()); + stream_id.c_str()); fprintf(stderr, " It will not be deleted, but clients will not get any new inputs.\n"); + if (deserialized_inputs.count(stream_id) != 0) { + delete deserialized_inputs[stream_id]; + deserialized_inputs.erase(stream_id); + } } // Open a new server socket if we do not already have one, or if we changed ports. @@ -290,11 +303,27 @@ int main(int argc, char **argv) } string src = config[i].parameters["src"]; - Input *input = new Input(stream_id, src); + Input *input = NULL; + if (deserialized_inputs.count(stream_id) != 0) { + input = deserialized_inputs[stream_id]; + if (input->get_url() != src) { + fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n", + stream_id.c_str(), input->get_url().c_str(), src.c_str()); + delete input; + input = NULL; + } + deserialized_inputs.erase(stream_id); + } + if (input == NULL) { + input = new Input(stream_id, src); + } input->run(); inputs.push_back(input); } + // All deserialized inputs should now have been taken care of, one way or the other. + assert(deserialized_inputs.empty()); + signal(SIGHUP, hup); while (!hupped) { @@ -302,11 +331,6 @@ int main(int argc, char **argv) } // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. - for (size_t i = 0; i < inputs.size(); ++i) { - inputs[i]->stop(); - delete inputs[i]; // TODO: Serialize. - } - if (pthread_join(acceptor_thread, NULL) == -1) { perror("pthread_join"); exit(1); @@ -315,6 +339,12 @@ int main(int argc, char **argv) CubemapStateProto state; state.set_server_sock(server_sock); state.set_port(port); + + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i]->stop(); + state.add_inputs()->MergeFrom(inputs[i]->serialize()); + } + for (int i = 0; i < num_servers; ++i) { servers->get_server(i)->stop(); diff --git a/input.cpp b/input.cpp index 8307819..58d319f 100644 --- a/input.cpp +++ b/input.cpp @@ -22,6 +22,7 @@ #include "server.h" #include "serverpool.h" #include "parse.h" +#include "state.pb.h" using namespace std; @@ -79,6 +80,37 @@ Input::Input(const string &stream_id, const string &url) { } +Input::Input(const InputProto &serialized) + : state(State(serialized.state())), + stream_id(serialized.stream_id()), + url(serialized.url()), + request(serialized.request()), + request_bytes_sent(serialized.request_bytes_sent()), + response(serialized.response()), + has_metacube_header(serialized.has_metacube_header()), + sock(serialized.sock()) +{ + pending_data.resize(serialized.pending_data().size()); + memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size()); + + parse_url(url, &host, &port, &path); // Don't care if it fails. +} + +InputProto Input::serialize() const +{ + InputProto serialized; + serialized.set_state(state); + serialized.set_stream_id(stream_id); + serialized.set_url(url); + serialized.set_request(request); + serialized.set_request_bytes_sent(request_bytes_sent); + serialized.set_response(response); + serialized.set_pending_data(string(pending_data.begin(), pending_data.end())); + serialized.set_has_metacube_header(has_metacube_header); + serialized.set_sock(sock); + return serialized; +} + void Input::run() { should_stop = false; diff --git a/input.h b/input.h index 8a26ce6..8b536c4 100644 --- a/input.h +++ b/input.h @@ -4,16 +4,24 @@ #include #include +class InputProto; + class Input { public: Input(const std::string &stream_id, const std::string &url); + // Serialization/deserialization. + Input(const InputProto &serialized); + InputProto serialize() const; + // Connect to the given URL and start streaming. void run(); // Stops the streaming, but lets the file descriptor stay open. void stop(); + std::string get_url() const { return url; } + private: // Recovers the this pointer and calls do_work(). static void *do_work_thunk(void *arg); diff --git a/state.proto b/state.proto index 7d114da..8fc6321 100644 --- a/state.proto +++ b/state.proto @@ -17,9 +17,23 @@ message StreamProto { optional string stream_id = 4; }; +// Corresponds to class Input. +message InputProto { + optional int32 state = 1; + optional string stream_id = 2; + optional string url = 3; + optional bytes request = 4; + optional int32 request_bytes_sent = 5; + optional bytes response = 6; + optional bytes pending_data = 7; + optional bool has_metacube_header = 8; + optional int32 sock = 9; +}; + message CubemapStateProto { repeated ClientProto clients = 1; repeated StreamProto streams = 2; + repeated InputProto inputs = 5; optional int32 server_sock = 3; optional int32 port = 4; }; -- 2.39.2