86ba28a81cf41aef2927659375a169f16f03f434
[cubemap] / main.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <getopt.h>
4 #include <limits.h>
5 #include <signal.h>
6 #include <stddef.h>
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <sys/time.h>
11 #include <sys/wait.h>
12 #include <unistd.h>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18
19 #include "acceptor.h"
20 #include "accesslog.h"
21 #include "config.h"
22 #include "input.h"
23 #include "input_stats.h"
24 #include "log.h"
25 #include "markpool.h"
26 #include "serverpool.h"
27 #include "state.pb.h"
28 #include "stats.h"
29 #include "stream.h"
30 #include "util.h"
31 #include "version.h"
32
33 using namespace std;
34
35 AccessLogThread *access_log = NULL;
36 ServerPool *servers = NULL;
37 vector<MarkPool *> mark_pools;
38 volatile bool hupped = false;
39 volatile bool stopped = false;
40
41 struct InputWithRefcount {
42         Input *input;
43         int refcount;
44 };
45
46 void hup(int signum)
47 {
48         hupped = true;
49         if (signum == SIGINT) {
50                 stopped = true;
51         }
52 }
53
54 void do_nothing(int signum)
55 {
56 }
57
58 CubemapStateProto collect_state(const timeval &serialize_start,
59                                 const vector<Acceptor *> acceptors,
60                                 const multimap<string, InputWithRefcount> inputs,
61                                 ServerPool *servers)
62 {
63         CubemapStateProto state = servers->serialize();  // Fills streams() and clients().
64         state.set_serialize_start_sec(serialize_start.tv_sec);
65         state.set_serialize_start_usec(serialize_start.tv_usec);
66         
67         for (size_t i = 0; i < acceptors.size(); ++i) {
68                 state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
69         }
70
71         for (multimap<string, InputWithRefcount>::const_iterator input_it = inputs.begin();
72              input_it != inputs.end();
73              ++input_it) {
74                 state.add_inputs()->MergeFrom(input_it->second.input->serialize());
75         }
76
77         return state;
78 }
79
80 // Find all port statements in the configuration file, and create acceptors for htem.
81 vector<Acceptor *> create_acceptors(
82         const Config &config,
83         map<int, Acceptor *> *deserialized_acceptors)
84 {
85         vector<Acceptor *> acceptors;
86         for (unsigned i = 0; i < config.acceptors.size(); ++i) {
87                 const AcceptorConfig &acceptor_config = config.acceptors[i];
88                 Acceptor *acceptor = NULL;
89                 map<int, Acceptor *>::iterator deserialized_acceptor_it =
90                         deserialized_acceptors->find(acceptor_config.port);
91                 if (deserialized_acceptor_it != deserialized_acceptors->end()) {
92                         acceptor = deserialized_acceptor_it->second;
93                         deserialized_acceptors->erase(deserialized_acceptor_it);
94                 } else {
95                         int server_sock = create_server_socket(acceptor_config.port, TCP_SOCKET);
96                         acceptor = new Acceptor(server_sock, acceptor_config.port);
97                 }
98                 acceptor->run();
99                 acceptors.push_back(acceptor);
100         }
101
102         // Close all acceptors that are no longer in the configuration file.
103         for (map<int, Acceptor *>::iterator acceptor_it = deserialized_acceptors->begin();
104              acceptor_it != deserialized_acceptors->end();
105              ++acceptor_it) {
106                 acceptor_it->second->close_socket();
107                 delete acceptor_it->second;
108         }
109
110         return acceptors;
111 }
112
113 void create_config_input(const string &src, multimap<string, InputWithRefcount> *inputs)
114 {
115         if (src.empty()) {
116                 return;
117         }
118         if (inputs->count(src) != 0) {
119                 return;
120         }
121
122         InputWithRefcount iwr;
123         iwr.input = create_input(src);
124         if (iwr.input == NULL) {
125                 log(ERROR, "did not understand URL '%s', clients will not get any data.",
126                         src.c_str());
127                 return;
128         }
129         iwr.refcount = 0;
130         inputs->insert(make_pair(src, iwr));
131 }
132
133 // Find all streams in the configuration file, and create inputs for them.
134 void create_config_inputs(const Config &config, multimap<string, InputWithRefcount> *inputs)
135 {
136         for (unsigned i = 0; i < config.streams.size(); ++i) {
137                 const StreamConfig &stream_config = config.streams[i];
138                 if (stream_config.src != "delete") {
139                         create_config_input(stream_config.src, inputs);
140                 }
141         }
142         for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
143                 const UDPStreamConfig &udpstream_config = config.udpstreams[i];
144                 create_config_input(udpstream_config.src, inputs);
145         }
146 }
147
148 void create_streams(const Config &config,
149                     const set<string> &deserialized_urls,
150                     multimap<string, InputWithRefcount> *inputs)
151 {
152         for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
153                 const MarkPoolConfig &mp_config = config.mark_pools[i];
154                 mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
155         }
156
157         // HTTP streams.
158         set<string> expecting_urls = deserialized_urls;
159         for (unsigned i = 0; i < config.streams.size(); ++i) {
160                 const StreamConfig &stream_config = config.streams[i];
161                 int stream_index;
162
163                 expecting_urls.erase(stream_config.url);
164
165                 // Special-case deleted streams; they were never deserialized in the first place,
166                 // so just ignore them.
167                 if (stream_config.src == "delete") {
168                         continue;
169                 }
170
171                 if (deserialized_urls.count(stream_config.url) == 0) {
172                         stream_index = servers->add_stream(stream_config.url,
173                                                            stream_config.backlog_size,
174                                                            Stream::Encoding(stream_config.encoding));
175                 } else {
176                         stream_index = servers->lookup_stream_by_url(stream_config.url);
177                         assert(stream_index != -1);
178                         servers->set_backlog_size(stream_index, stream_config.backlog_size);
179                         servers->set_encoding(stream_index,
180                                               Stream::Encoding(stream_config.encoding));
181                 }
182
183                 if (stream_config.mark_pool != -1) {
184                         servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
185                 }
186
187                 servers->set_pacing_rate(stream_index, stream_config.pacing_rate);
188
189                 string src = stream_config.src;
190                 if (!src.empty()) {
191                         multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
192                         if (input_it != inputs->end()) {
193                                 input_it->second.input->add_destination(stream_index);
194                                 ++input_it->second.refcount;
195                         }
196                 }
197         }
198
199         // Warn about any streams servers we've lost.
200         for (set<string>::const_iterator stream_it = expecting_urls.begin();
201              stream_it != expecting_urls.end();
202              ++stream_it) {
203                 string url = *stream_it;
204                 log(WARNING, "stream '%s' disappeared from the configuration file. "
205                              "It will not be deleted, but clients will not get any new inputs. "
206                              "If you really meant to delete it, set src=delete and reload.",
207                              url.c_str());
208         }
209
210         // UDP streams.
211         for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
212                 const UDPStreamConfig &udpstream_config = config.udpstreams[i];
213                 MarkPool *mark_pool = NULL;
214                 if (udpstream_config.mark_pool != -1) {
215                         mark_pool = mark_pools[udpstream_config.mark_pool];
216                 }
217                 int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool, udpstream_config.pacing_rate);
218
219                 string src = udpstream_config.src;
220                 if (!src.empty()) {
221                         multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
222                         assert(input_it != inputs->end());
223                         input_it->second.input->add_destination(stream_index);
224                         ++input_it->second.refcount;
225                 }
226         }
227 }
228         
229 void open_logs(const vector<LogConfig> &log_destinations)
230 {
231         for (size_t i = 0; i < log_destinations.size(); ++i) {
232                 if (log_destinations[i].type == LogConfig::LOG_TYPE_FILE) {
233                         add_log_destination_file(log_destinations[i].filename);
234                 } else if (log_destinations[i].type == LogConfig::LOG_TYPE_CONSOLE) {
235                         add_log_destination_console();
236                 } else if (log_destinations[i].type == LogConfig::LOG_TYPE_SYSLOG) {
237                         add_log_destination_syslog();
238                 } else {
239                         assert(false);
240                 }
241         }
242         start_logging();
243 }
244         
245 bool dry_run_config(const std::string &argv0, const std::string &config_filename)
246 {
247         char *argv0_copy = strdup(argv0.c_str());
248         char *config_filename_copy = strdup(config_filename.c_str());
249
250         pid_t pid = fork();
251         switch (pid) {
252         case -1:
253                 log_perror("fork()");
254                 free(argv0_copy);
255                 free(config_filename_copy);
256                 return false;
257         case 0:
258                 // Child.
259                 execlp(argv0_copy, argv0_copy, "--test-config", config_filename_copy, NULL);
260                 log_perror(argv0_copy);
261                 _exit(1);
262         default:
263                 // Parent.
264                 break;
265         }
266                 
267         free(argv0_copy);
268         free(config_filename_copy);
269
270         int status;
271         pid_t err;
272         do {
273                 err = waitpid(pid, &status, 0);
274         } while (err == -1 && errno == EINTR);
275
276         if (err == -1) {
277                 log_perror("waitpid()");
278                 return false;
279         }       
280
281         return (WIFEXITED(status) && WEXITSTATUS(status) == 0);
282 }
283
284 void find_deleted_streams(const Config &config, set<string> *deleted_urls)
285 {
286         for (unsigned i = 0; i < config.streams.size(); ++i) {
287                 const StreamConfig &stream_config = config.streams[i];
288                 if (stream_config.src == "delete") {
289                         log(INFO, "Deleting stream '%s'.", stream_config.url.c_str());
290                         deleted_urls->insert(stream_config.url);
291                 }
292         }
293 }
294
295 int main(int argc, char **argv)
296 {
297         signal(SIGHUP, hup);
298         signal(SIGINT, hup);
299         signal(SIGUSR1, do_nothing);  // Used in internal signalling.
300         signal(SIGPIPE, SIG_IGN);
301         
302         // Parse options.
303         int state_fd = -1;
304         bool test_config = false;
305         for ( ;; ) {
306                 static const option long_options[] = {
307                         { "state", required_argument, 0, 's' },
308                         { "test-config", no_argument, 0, 't' },
309                         { 0, 0, 0, 0 }
310                 };
311                 int option_index = 0;
312                 int c = getopt_long(argc, argv, "s:t", long_options, &option_index);
313      
314                 if (c == -1) {
315                         break;
316                 }
317                 switch (c) {
318                 case 's':
319                         state_fd = atoi(optarg);
320                         break;
321                 case 't':
322                         test_config = true;
323                         break;
324                 default:
325                         fprintf(stderr, "Unknown option '%s'\n", argv[option_index]);
326                         exit(1);
327                 }
328         }
329
330         string config_filename = "cubemap.config";
331         if (optind < argc) {
332                 config_filename = argv[optind++];
333         }
334
335         // Canonicalize argv[0] and config_filename.
336         char argv0_canon[PATH_MAX];
337         char config_filename_canon[PATH_MAX];
338
339         if (realpath(argv[0], argv0_canon) == NULL) {
340                 log_perror(argv[0]);
341                 exit(1);
342         }
343         if (realpath(config_filename.c_str(), config_filename_canon) == NULL) {
344                 log_perror(config_filename.c_str());
345                 exit(1);
346         }
347
348         // Now parse the configuration file.
349         Config config;
350         if (!parse_config(config_filename_canon, &config)) {
351                 exit(1);
352         }
353         if (test_config) {
354                 exit(0);
355         }
356         
357         // Ideally we'd like to daemonize only when we've started up all threads etc.,
358         // but daemon() forks, which is not good in multithreaded software, so we'll
359         // have to do it here.
360         if (config.daemonize) {
361                 if (daemon(0, 0) == -1) {
362                         log_perror("daemon");
363                         exit(1);
364                 }
365         }
366
367 start:
368         // Open logs as soon as possible.
369         open_logs(config.log_destinations);
370
371         log(INFO, "Cubemap " SERVER_VERSION " starting.");
372         if (config.access_log_file.empty()) {
373                 // Create a dummy logger.
374                 access_log = new AccessLogThread();
375         } else {
376                 access_log = new AccessLogThread(config.access_log_file);
377         }
378         access_log->run();
379
380         servers = new ServerPool(config.num_servers);
381
382         // Find all the streams that are to be deleted.
383         set<string> deleted_urls;
384         find_deleted_streams(config, &deleted_urls);
385
386         CubemapStateProto loaded_state;
387         struct timeval serialize_start;
388         set<string> deserialized_urls;
389         map<int, Acceptor *> deserialized_acceptors;
390         multimap<string, InputWithRefcount> inputs;  // multimap due to older versions without deduplication.
391         if (state_fd != -1) {
392                 log(INFO, "Deserializing state from previous process...");
393                 string serialized;
394                 if (!read_tempfile(state_fd, &serialized)) {
395                         exit(1);
396                 }
397                 if (!loaded_state.ParseFromString(serialized)) {
398                         log(ERROR, "Failed deserialization of state.");
399                         exit(1);
400                 }
401
402                 serialize_start.tv_sec = loaded_state.serialize_start_sec();
403                 serialize_start.tv_usec = loaded_state.serialize_start_usec();
404
405                 // Deserialize the streams.
406                 map<string, string> stream_headers_for_url;  // See below.
407                 for (int i = 0; i < loaded_state.streams_size(); ++i) {
408                         const StreamProto &stream = loaded_state.streams(i);
409
410                         if (deleted_urls.count(stream.url()) != 0) {
411                                 // Delete the stream backlogs.
412                                 for (int j = 0; j < stream.data_fds_size(); ++j) {
413                                         safe_close(stream.data_fds(j));
414                                 }
415                         } else {
416                                 vector<int> data_fds;
417                                 for (int j = 0; j < stream.data_fds_size(); ++j) {
418                                         data_fds.push_back(stream.data_fds(j));
419                                 }
420
421                                 // Older versions stored the data once in the protobuf instead of
422                                 // sending around file descriptors.
423                                 if (data_fds.empty() && stream.has_data()) {
424                                         data_fds.push_back(make_tempfile(stream.data()));
425                                 }
426
427                                 servers->add_stream_from_serialized(stream, data_fds);
428                                 deserialized_urls.insert(stream.url());
429
430                                 stream_headers_for_url.insert(make_pair(stream.url(), stream.stream_header()));
431                         }
432                 }
433
434                 // Deserialize the inputs. Note that we don't actually add them to any stream yet.
435                 for (int i = 0; i < loaded_state.inputs_size(); ++i) {
436                         InputProto serialized_input = loaded_state.inputs(i);
437
438                         // Older versions did not store the stream header in the input,
439                         // only in each stream. We need to have the stream header in the
440                         // input as well, in case we create a new stream reusing the same input.
441                         // Thus, we put it into place here if it's missing.
442                         if (!serialized_input.has_stream_header() &&
443                             stream_headers_for_url.count(serialized_input.url()) != 0) {
444                                 serialized_input.set_stream_header(stream_headers_for_url[serialized_input.url()]);
445                         }
446
447                         InputWithRefcount iwr;
448                         iwr.input = create_input(serialized_input);
449                         iwr.refcount = 0;
450                         inputs.insert(make_pair(serialized_input.url(), iwr));
451                 } 
452
453                 // Deserialize the acceptors.
454                 for (int i = 0; i < loaded_state.acceptors_size(); ++i) {
455                         deserialized_acceptors.insert(make_pair(
456                                 loaded_state.acceptors(i).port(),
457                                 new Acceptor(loaded_state.acceptors(i))));
458                 }
459
460                 log(INFO, "Deserialization done.");
461         }
462
463         // Add any new inputs coming from the config.
464         create_config_inputs(config, &inputs);
465         
466         // Find all streams in the configuration file, create them, and connect to the inputs.
467         create_streams(config, deserialized_urls, &inputs);
468         vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
469         
470         // Put back the existing clients. It doesn't matter which server we
471         // allocate them to, so just do round-robin. However, we need to add
472         // them after the mark pools have been set up.
473         for (int i = 0; i < loaded_state.clients_size(); ++i) {
474                 if (deleted_urls.count(loaded_state.clients(i).url()) != 0) {
475                         safe_close(loaded_state.clients(i).sock());
476                 } else {
477                         servers->add_client_from_serialized(loaded_state.clients(i));
478                 }
479         }
480         
481         servers->run();
482
483         // Now delete all inputs that are longer in use, and start the others.
484         for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
485              input_it != inputs.end(); ) {
486                 if (input_it->second.refcount == 0) {
487                         log(WARNING, "Input '%s' no longer in use, closing.",
488                             input_it->first.c_str());
489                         input_it->second.input->close_socket();
490                         delete input_it->second.input;
491                         inputs.erase(input_it++);
492                 } else {
493                         input_it->second.input->run();
494                         ++input_it;
495                 }
496         }
497
498         // Start writing statistics.
499         StatsThread *stats_thread = NULL;
500         if (!config.stats_file.empty()) {
501                 stats_thread = new StatsThread(config.stats_file, config.stats_interval);
502                 stats_thread->run();
503         }
504
505         InputStatsThread *input_stats_thread = NULL;
506         if (!config.input_stats_file.empty()) {
507                 vector<Input*> inputs_no_refcount;
508                 for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
509                      input_it != inputs.end(); ++input_it) {
510                         inputs_no_refcount.push_back(input_it->second.input);
511                 }
512
513                 input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount);
514                 input_stats_thread->run();
515         }
516
517         struct timeval server_start;
518         gettimeofday(&server_start, NULL);
519         if (state_fd != -1) {
520                 // Measure time from we started deserializing (below) to now, when basically everything
521                 // is up and running. This is, in other words, a conservative estimate of how long our
522                 // “glitch” period was, not counting of course reconnects if the configuration changed.
523                 double glitch_time = server_start.tv_sec - serialize_start.tv_sec +
524                         1e-6 * (server_start.tv_usec - serialize_start.tv_usec);
525                 log(INFO, "Re-exec happened in approx. %.0f ms.", glitch_time * 1000.0);
526         }
527
528         while (!hupped) {
529                 usleep(100000);
530         }
531
532         // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec.
533         gettimeofday(&serialize_start, NULL);
534
535         if (input_stats_thread != NULL) {
536                 input_stats_thread->stop();
537                 delete input_stats_thread;
538         }
539         if (stats_thread != NULL) {
540                 stats_thread->stop();
541                 delete stats_thread;
542         }
543         for (size_t i = 0; i < acceptors.size(); ++i) {
544                 acceptors[i]->stop();
545         }
546         for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
547              input_it != inputs.end();
548              ++input_it) {
549                 input_it->second.input->stop();
550         }
551         servers->stop();
552
553         CubemapStateProto state;
554         if (stopped) {
555                 log(INFO, "Shutting down.");
556         } else {
557                 log(INFO, "Serializing state and re-execing...");
558                 state = collect_state(
559                         serialize_start, acceptors, inputs, servers);
560                 string serialized;
561                 state.SerializeToString(&serialized);
562                 state_fd = make_tempfile(serialized);
563                 if (state_fd == -1) {
564                         exit(1);
565                 }
566         }
567         delete servers;
568
569         for (unsigned i = 0; i < mark_pools.size(); ++i) {
570                 delete mark_pools[i];
571         }
572         mark_pools.clear();
573
574         access_log->stop();
575         delete access_log;
576         shut_down_logging();
577
578         if (stopped) {
579                 exit(0);
580         }
581
582         // OK, so the signal was SIGHUP. Check that the new config is okay, then exec the new binary.
583         if (!dry_run_config(argv0_canon, config_filename_canon)) {
584                 open_logs(config.log_destinations);
585                 log(ERROR, "%s --test-config failed. Restarting old version instead of new.", argv[0]);
586                 hupped = false;
587                 shut_down_logging();
588                 goto start;
589         }
590          
591         char buf[16];
592         sprintf(buf, "%d", state_fd);
593
594         for ( ;; ) {
595                 execlp(argv0_canon, argv0_canon, config_filename_canon, "--state", buf, NULL);
596                 open_logs(config.log_destinations);
597                 log_perror("execlp");
598                 log(ERROR, "re-exec of %s failed. Waiting 0.2 seconds and trying again...", argv0_canon);
599                 shut_down_logging();
600                 usleep(200000);
601         }
602 }