X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=cubemap.cpp;h=1bcc07bd1e612573e85176749febdb204e9979f3;hp=b471afb254adf483175d4ff631cc171275726797;hb=9b565a9e6e66f076abb7266b2c2f015f585fa9cb;hpb=c2c9f6441f9ae8091a39aea0340417d5915e1ac9 diff --git a/cubemap.cpp b/cubemap.cpp index b471afb..1bcc07b 100644 --- a/cubemap.cpp +++ b/cubemap.cpp @@ -208,6 +208,7 @@ int main(int argc, char **argv) int server_sock = -1, old_port = -1; set deserialized_stream_ids; + map deserialized_inputs; if (argc == 4 && strcmp(argv[2], "-state") == 0) { fprintf(stderr, "Deserializing state from previous process... "); int state_fd = atoi(argv[3]); @@ -225,6 +226,13 @@ int main(int argc, char **argv) servers->add_client_from_serialized(loaded_state.clients(i)); } + // Deserialize the inputs. Note that we don't actually add them to any state yet. + for (int i = 0; i < loaded_state.inputs_size(); ++i) { + deserialized_inputs.insert(make_pair( + loaded_state.inputs(i).stream_id(), + new Input(loaded_state.inputs(i)))); + } + // Deserialize the server socket. server_sock = loaded_state.server_sock(); old_port = loaded_state.port(); @@ -254,9 +262,14 @@ int main(int argc, char **argv) for (set::const_iterator stream_it = expecting_stream_ids.begin(); stream_it != expecting_stream_ids.end(); ++stream_it) { + string stream_id = *stream_it; fprintf(stderr, "WARNING: stream '%s' disappeared from the configuration file.\n", - stream_it->c_str()); + stream_id.c_str()); fprintf(stderr, " It will not be deleted, but clients will not get any new inputs.\n"); + if (deserialized_inputs.count(stream_id) != 0) { + delete deserialized_inputs[stream_id]; + deserialized_inputs.erase(stream_id); + } } // Open a new server socket if we do not already have one, or if we changed ports. @@ -290,11 +303,27 @@ int main(int argc, char **argv) } string src = config[i].parameters["src"]; - Input *input = new Input(stream_id, src); + Input *input = NULL; + if (deserialized_inputs.count(stream_id) != 0) { + input = deserialized_inputs[stream_id]; + if (input->get_url() != src) { + fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n", + stream_id.c_str(), input->get_url().c_str(), src.c_str()); + delete input; + input = NULL; + } + deserialized_inputs.erase(stream_id); + } + if (input == NULL) { + input = new Input(stream_id, src); + } input->run(); inputs.push_back(input); } + // All deserialized inputs should now have been taken care of, one way or the other. + assert(deserialized_inputs.empty()); + signal(SIGHUP, hup); while (!hupped) { @@ -302,11 +331,6 @@ int main(int argc, char **argv) } // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. - for (size_t i = 0; i < inputs.size(); ++i) { - inputs[i]->stop(); - delete inputs[i]; // TODO: Serialize. - } - if (pthread_join(acceptor_thread, NULL) == -1) { perror("pthread_join"); exit(1); @@ -315,6 +339,12 @@ int main(int argc, char **argv) CubemapStateProto state; state.set_server_sock(server_sock); state.set_port(port); + + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i]->stop(); + state.add_inputs()->MergeFrom(inputs[i]->serialize()); + } + for (int i = 0; i < num_servers; ++i) { servers->get_server(i)->stop();