24 #include "serverpool.h"
32 ServerPool *servers = NULL;
33 volatile bool hupped = false;
34 volatile bool stopped = false;
39 if (signum == SIGINT) {
44 CubemapStateProto collect_state(const timeval &serialize_start,
45 const vector<Acceptor *> acceptors,
46 const vector<Input *> inputs,
49 CubemapStateProto state = servers->serialize(); // Fills streams() and clients().
50 state.set_serialize_start_sec(serialize_start.tv_sec);
51 state.set_serialize_start_usec(serialize_start.tv_usec);
53 for (size_t i = 0; i < acceptors.size(); ++i) {
54 state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
57 for (size_t i = 0; i < inputs.size(); ++i) {
58 state.add_inputs()->MergeFrom(inputs[i]->serialize());
64 // Find all port statements in the configuration file, and create acceptors for htem.
65 vector<Acceptor *> create_acceptors(
67 map<int, Acceptor *> *deserialized_acceptors)
69 vector<Acceptor *> acceptors;
70 for (unsigned i = 0; i < config.acceptors.size(); ++i) {
71 const AcceptorConfig &acceptor_config = config.acceptors[i];
72 Acceptor *acceptor = NULL;
73 map<int, Acceptor *>::iterator deserialized_acceptor_it =
74 deserialized_acceptors->find(acceptor_config.port);
75 if (deserialized_acceptor_it != deserialized_acceptors->end()) {
76 acceptor = deserialized_acceptor_it->second;
77 deserialized_acceptors->erase(deserialized_acceptor_it);
79 int server_sock = create_server_socket(acceptor_config.port, TCP_SOCKET);
80 acceptor = new Acceptor(server_sock, acceptor_config.port);
83 acceptors.push_back(acceptor);
86 // Close all acceptors that are no longer in the configuration file.
87 for (map<int, Acceptor *>::iterator acceptor_it = deserialized_acceptors->begin();
88 acceptor_it != deserialized_acceptors->end();
90 acceptor_it->second->close_socket();
91 delete acceptor_it->second;
97 // Find all streams in the configuration file, and create inputs for them.
98 vector<Input *> create_inputs(const Config &config,
99 map<string, Input *> *deserialized_inputs)
101 vector<Input *> inputs;
102 for (unsigned i = 0; i < config.streams.size(); ++i) {
103 const StreamConfig &stream_config = config.streams[i];
104 if (stream_config.src.empty()) {
108 string stream_id = stream_config.stream_id;
109 string src = stream_config.src;
112 map<string, Input *>::iterator deserialized_input_it =
113 deserialized_inputs->find(stream_id);
114 if (deserialized_input_it != deserialized_inputs->end()) {
115 input = deserialized_input_it->second;
116 if (input->get_url() != src) {
117 log(INFO, "Stream '%s' has changed URL from '%s' to '%s', restarting input.",
118 stream_id.c_str(), input->get_url().c_str(), src.c_str());
119 input->close_socket();
123 deserialized_inputs->erase(deserialized_input_it);
126 input = create_input(stream_id, src);
128 log(ERROR, "did not understand URL '%s', clients will not get any data.",
134 inputs.push_back(input);
139 void create_streams(const Config &config,
140 const set<string> &deserialized_stream_ids,
141 map<string, Input *> *deserialized_inputs)
143 vector<MarkPool *> mark_pools; // FIXME: leak
144 for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
145 const MarkPoolConfig &mp_config = config.mark_pools[i];
146 mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
149 set<string> expecting_stream_ids = deserialized_stream_ids;
150 for (unsigned i = 0; i < config.streams.size(); ++i) {
151 const StreamConfig &stream_config = config.streams[i];
152 if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
153 servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
155 servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
157 expecting_stream_ids.erase(stream_config.stream_id);
159 if (stream_config.mark_pool != -1) {
160 servers->set_mark_pool(stream_config.stream_id,
161 mark_pools[stream_config.mark_pool]);
165 // Warn about any servers we've lost.
166 // TODO: Make an option (delete=yes?) to actually shut down streams.
167 for (set<string>::const_iterator stream_it = expecting_stream_ids.begin();
168 stream_it != expecting_stream_ids.end();
170 string stream_id = *stream_it;
171 log(WARNING, "stream '%s' disappeared from the configuration file. "
172 "It will not be deleted, but clients will not get any new inputs.",
174 if (deserialized_inputs->count(stream_id) != 0) {
175 delete (*deserialized_inputs)[stream_id];
176 deserialized_inputs->erase(stream_id);
181 void open_logs(const vector<LogConfig> &log_destinations)
183 for (size_t i = 0; i < log_destinations.size(); ++i) {
184 if (log_destinations[i].type == LogConfig::LOG_TYPE_FILE) {
185 add_log_destination_file(log_destinations[i].filename);
186 } else if (log_destinations[i].type == LogConfig::LOG_TYPE_CONSOLE) {
187 add_log_destination_console();
188 } else if (log_destinations[i].type == LogConfig::LOG_TYPE_SYSLOG) {
189 add_log_destination_syslog();
197 bool dry_run_config(const std::string &argv0, const std::string &config_filename)
199 char *argv0_copy = strdup(argv0.c_str());
200 char *config_filename_copy = strdup(config_filename.c_str());
205 log_perror("fork()");
207 free(config_filename_copy);
211 execlp(argv0_copy, argv0_copy, "--test-config", config_filename_copy, NULL);
212 log_perror(argv0_copy);
220 free(config_filename_copy);
225 err = waitpid(pid, &status, 0);
226 } while (err == -1 && errno == EINTR);
229 log_perror("waitpid()");
233 return (WIFEXITED(status) && WEXITSTATUS(status) == 0);
236 int main(int argc, char **argv)
240 signal(SIGPIPE, SIG_IGN);
244 bool test_config = false;
246 static const option long_options[] = {
247 { "state", required_argument, 0, 's' },
248 { "test-config", no_argument, 0, 't' },
250 int option_index = 0;
251 int c = getopt_long (argc, argv, "s:t", long_options, &option_index);
258 state_fd = atoi(optarg);
268 string config_filename = "cubemap.config";
270 config_filename = argv[optind++];
273 // Canonicalize argv[0] and config_filename.
274 char argv0_canon[PATH_MAX];
275 char config_filename_canon[PATH_MAX];
277 if (realpath(argv[0], argv0_canon) == NULL) {
278 log_perror("realpath");
281 if (realpath(config_filename.c_str(), config_filename_canon) == NULL) {
282 log_perror("realpath");
286 // Now parse the configuration file.
288 if (!parse_config(config_filename_canon, &config)) {
295 // Ideally we'd like to daemonize only when we've started up all threads etc.,
296 // but daemon() forks, which is not good in multithreaded software, so we'll
297 // have to do it here.
298 if (config.daemonize) {
299 if (daemon(0, 0) == -1) {
300 log_perror("daemon");
306 // Open logs as soon as possible.
307 open_logs(config.log_destinations);
309 log(INFO, "Cubemap " SERVER_VERSION " starting.");
310 servers = new ServerPool(config.num_servers);
312 CubemapStateProto loaded_state;
313 struct timeval serialize_start;
314 set<string> deserialized_stream_ids;
315 map<string, Input *> deserialized_inputs;
316 map<int, Acceptor *> deserialized_acceptors;
317 if (state_fd != -1) {
318 log(INFO, "Deserializing state from previous process...");
320 if (!read_tempfile(state_fd, &serialized)) {
323 if (!loaded_state.ParseFromString(serialized)) {
324 log(ERROR, "Failed deserialization of state.");
328 serialize_start.tv_sec = loaded_state.serialize_start_sec();
329 serialize_start.tv_usec = loaded_state.serialize_start_usec();
331 // Deserialize the streams.
332 for (int i = 0; i < loaded_state.streams_size(); ++i) {
333 servers->add_stream_from_serialized(loaded_state.streams(i));
334 deserialized_stream_ids.insert(loaded_state.streams(i).stream_id());
337 // Deserialize the inputs. Note that we don't actually add them to any state yet.
338 for (int i = 0; i < loaded_state.inputs_size(); ++i) {
339 deserialized_inputs.insert(make_pair(
340 loaded_state.inputs(i).stream_id(),
341 create_input(loaded_state.inputs(i))));
344 // Deserialize the acceptors.
345 for (int i = 0; i < loaded_state.acceptors_size(); ++i) {
346 deserialized_acceptors.insert(make_pair(
347 loaded_state.acceptors(i).port(),
348 new Acceptor(loaded_state.acceptors(i))));
351 log(INFO, "Deserialization done.");
354 // Find all streams in the configuration file, and create them.
355 create_streams(config, deserialized_stream_ids, &deserialized_inputs);
357 vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
358 vector<Input *> inputs = create_inputs(config, &deserialized_inputs);
360 // All deserialized inputs should now have been taken care of, one way or the other.
361 assert(deserialized_inputs.empty());
363 // Put back the existing clients. It doesn't matter which server we
364 // allocate them to, so just do round-robin. However, we need to add
365 // them after the mark pools have been set up.
366 for (int i = 0; i < loaded_state.clients_size(); ++i) {
367 servers->add_client_from_serialized(loaded_state.clients(i));
372 // Start writing statistics.
373 StatsThread *stats_thread = NULL;
374 if (!config.stats_file.empty()) {
375 stats_thread = new StatsThread(config.stats_file, config.stats_interval);
379 struct timeval server_start;
380 gettimeofday(&server_start, NULL);
381 if (state_fd != -1) {
382 // Measure time from we started deserializing (below) to now, when basically everything
383 // is up and running. This is, in other words, a conservative estimate of how long our
384 // “glitch” period was, not counting of course reconnects if the configuration changed.
385 double glitch_time = server_start.tv_sec - serialize_start.tv_sec +
386 1e-6 * (server_start.tv_usec - serialize_start.tv_usec);
387 log(INFO, "Re-exec happened in approx. %.0f ms.", glitch_time * 1000.0);
394 // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec.
395 gettimeofday(&serialize_start, NULL);
397 if (stats_thread != NULL) {
398 stats_thread->stop();
400 for (size_t i = 0; i < acceptors.size(); ++i) {
401 acceptors[i]->stop();
403 for (size_t i = 0; i < inputs.size(); ++i) {
408 CubemapStateProto state;
410 log(INFO, "Shutting down.");
412 log(INFO, "Serializing state and re-execing...");
413 state = collect_state(
414 serialize_start, acceptors, inputs, servers);
416 state.SerializeToString(&serialized);
417 state_fd = make_tempfile(serialized);
418 if (state_fd == -1) {
429 // OK, so the signal was SIGHUP. Check that the new config is okay, then exec the new binary.
430 if (!dry_run_config(argv[0], config_filename)) {
431 open_logs(config.log_destinations);
432 log(ERROR, "%s --test-config failed. Restarting old version instead of new.", argv[0]);
439 sprintf(buf, "%d", state_fd);
442 execlp(argv0_canon, argv0_canon, config_filename_canon, "--state", buf, NULL);
443 open_logs(config.log_destinations);
444 log_perror("execlp");
445 log(ERROR, "re-exec of %s failed. Waiting 0.2 seconds and trying again...", argv0_canon);