Add preliminary support for input stream statistics.
[cubemap] / main.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <getopt.h>
4 #include <limits.h>
5 #include <signal.h>
6 #include <stddef.h>
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <sys/time.h>
11 #include <sys/wait.h>
12 #include <unistd.h>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18
19 #include "acceptor.h"
20 #include "accesslog.h"
21 #include "config.h"
22 #include "input.h"
23 #include "input_stats.h"
24 #include "log.h"
25 #include "markpool.h"
26 #include "serverpool.h"
27 #include "state.pb.h"
28 #include "stats.h"
29 #include "stream.h"
30 #include "util.h"
31 #include "version.h"
32
33 using namespace std;
34
35 AccessLogThread *access_log = NULL;
36 ServerPool *servers = NULL;
37 vector<MarkPool *> mark_pools;
38 volatile bool hupped = false;
39 volatile bool stopped = false;
40
41 struct InputWithRefcount {
42         Input *input;
43         int refcount;
44 };
45
46 void hup(int signum)
47 {
48         hupped = true;
49         if (signum == SIGINT) {
50                 stopped = true;
51         }
52 }
53
54 void do_nothing(int signum)
55 {
56 }
57
58 CubemapStateProto collect_state(const timeval &serialize_start,
59                                 const vector<Acceptor *> acceptors,
60                                 const multimap<string, InputWithRefcount> inputs,
61                                 ServerPool *servers)
62 {
63         CubemapStateProto state = servers->serialize();  // Fills streams() and clients().
64         state.set_serialize_start_sec(serialize_start.tv_sec);
65         state.set_serialize_start_usec(serialize_start.tv_usec);
66         
67         for (size_t i = 0; i < acceptors.size(); ++i) {
68                 state.add_acceptors()->MergeFrom(acceptors[i]->serialize());
69         }
70
71         for (multimap<string, InputWithRefcount>::const_iterator input_it = inputs.begin();
72              input_it != inputs.end();
73              ++input_it) {
74                 state.add_inputs()->MergeFrom(input_it->second.input->serialize());
75         }
76
77         return state;
78 }
79
80 // Find all port statements in the configuration file, and create acceptors for htem.
81 vector<Acceptor *> create_acceptors(
82         const Config &config,
83         map<int, Acceptor *> *deserialized_acceptors)
84 {
85         vector<Acceptor *> acceptors;
86         for (unsigned i = 0; i < config.acceptors.size(); ++i) {
87                 const AcceptorConfig &acceptor_config = config.acceptors[i];
88                 Acceptor *acceptor = NULL;
89                 map<int, Acceptor *>::iterator deserialized_acceptor_it =
90                         deserialized_acceptors->find(acceptor_config.port);
91                 if (deserialized_acceptor_it != deserialized_acceptors->end()) {
92                         acceptor = deserialized_acceptor_it->second;
93                         deserialized_acceptors->erase(deserialized_acceptor_it);
94                 } else {
95                         int server_sock = create_server_socket(acceptor_config.port, TCP_SOCKET);
96                         acceptor = new Acceptor(server_sock, acceptor_config.port);
97                 }
98                 acceptor->run();
99                 acceptors.push_back(acceptor);
100         }
101
102         // Close all acceptors that are no longer in the configuration file.
103         for (map<int, Acceptor *>::iterator acceptor_it = deserialized_acceptors->begin();
104              acceptor_it != deserialized_acceptors->end();
105              ++acceptor_it) {
106                 acceptor_it->second->close_socket();
107                 delete acceptor_it->second;
108         }
109
110         return acceptors;
111 }
112
113 void create_config_input(const string &src, multimap<string, InputWithRefcount> *inputs)
114 {
115         if (src.empty()) {
116                 return;
117         }
118         if (inputs->count(src) != 0) {
119                 return;
120         }
121
122         InputWithRefcount iwr;
123         iwr.input = create_input(src);
124         if (iwr.input == NULL) {
125                 log(ERROR, "did not understand URL '%s', clients will not get any data.",
126                         src.c_str());
127                 return;
128         }
129         iwr.refcount = 0;
130         inputs->insert(make_pair(src, iwr));
131 }
132
133 // Find all streams in the configuration file, and create inputs for them.
134 void create_config_inputs(const Config &config, multimap<string, InputWithRefcount> *inputs)
135 {
136         for (unsigned i = 0; i < config.streams.size(); ++i) {
137                 const StreamConfig &stream_config = config.streams[i];
138                 create_config_input(stream_config.src, inputs);
139         }
140         for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
141                 const UDPStreamConfig &udpstream_config = config.udpstreams[i];
142                 create_config_input(udpstream_config.src, inputs);
143         }
144 }
145
146 void create_streams(const Config &config,
147                     const set<string> &deserialized_urls,
148                     multimap<string, InputWithRefcount> *inputs)
149 {
150         for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
151                 const MarkPoolConfig &mp_config = config.mark_pools[i];
152                 mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
153         }
154
155         // HTTP streams.
156         set<string> expecting_urls = deserialized_urls;
157         for (unsigned i = 0; i < config.streams.size(); ++i) {
158                 const StreamConfig &stream_config = config.streams[i];
159                 int stream_index;
160                 if (deserialized_urls.count(stream_config.url) == 0) {
161                         stream_index = servers->add_stream(stream_config.url,
162                                                            stream_config.backlog_size,
163                                                            Stream::Encoding(stream_config.encoding));
164                 } else {
165                         stream_index = servers->lookup_stream_by_url(stream_config.url);
166                         assert(stream_index != -1);
167                         servers->set_backlog_size(stream_index, stream_config.backlog_size);
168                         servers->set_encoding(stream_index,
169                                               Stream::Encoding(stream_config.encoding));
170                 }
171                 expecting_urls.erase(stream_config.url);
172
173                 if (stream_config.mark_pool != -1) {
174                         servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
175                 }
176
177                 string src = stream_config.src;
178                 if (!src.empty()) {
179                         multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
180                         assert(input_it != inputs->end());
181                         input_it->second.input->add_destination(stream_index);
182                         ++input_it->second.refcount;
183                 }
184         }
185
186         // Warn about any HTTP servers we've lost.
187         // TODO: Make an option (delete=yes?) to actually shut down streams.
188         for (set<string>::const_iterator stream_it = expecting_urls.begin();
189              stream_it != expecting_urls.end();
190              ++stream_it) {
191                 string url = *stream_it;
192                 log(WARNING, "stream '%s' disappeared from the configuration file. "
193                              "It will not be deleted, but clients will not get any new inputs.",
194                              url.c_str());
195         }
196
197         // UDP streams.
198         for (unsigned i = 0; i < config.udpstreams.size(); ++i) {
199                 const UDPStreamConfig &udpstream_config = config.udpstreams[i];
200                 MarkPool *mark_pool = NULL;
201                 if (udpstream_config.mark_pool != -1) {
202                         mark_pool = mark_pools[udpstream_config.mark_pool];
203                 }
204                 int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool);
205
206                 string src = udpstream_config.src;
207                 if (!src.empty()) {
208                         multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
209                         assert(input_it != inputs->end());
210                         input_it->second.input->add_destination(stream_index);
211                         ++input_it->second.refcount;
212                 }
213         }
214 }
215         
216 void open_logs(const vector<LogConfig> &log_destinations)
217 {
218         for (size_t i = 0; i < log_destinations.size(); ++i) {
219                 if (log_destinations[i].type == LogConfig::LOG_TYPE_FILE) {
220                         add_log_destination_file(log_destinations[i].filename);
221                 } else if (log_destinations[i].type == LogConfig::LOG_TYPE_CONSOLE) {
222                         add_log_destination_console();
223                 } else if (log_destinations[i].type == LogConfig::LOG_TYPE_SYSLOG) {
224                         add_log_destination_syslog();
225                 } else {
226                         assert(false);
227                 }
228         }
229         start_logging();
230 }
231         
232 bool dry_run_config(const std::string &argv0, const std::string &config_filename)
233 {
234         char *argv0_copy = strdup(argv0.c_str());
235         char *config_filename_copy = strdup(config_filename.c_str());
236
237         pid_t pid = fork();
238         switch (pid) {
239         case -1:
240                 log_perror("fork()");
241                 free(argv0_copy);
242                 free(config_filename_copy);
243                 return false;
244         case 0:
245                 // Child.
246                 execlp(argv0_copy, argv0_copy, "--test-config", config_filename_copy, NULL);
247                 log_perror(argv0_copy);
248                 _exit(1);
249         default:
250                 // Parent.
251                 break;
252         }
253                 
254         free(argv0_copy);
255         free(config_filename_copy);
256
257         int status;
258         pid_t err;
259         do {
260                 err = waitpid(pid, &status, 0);
261         } while (err == -1 && errno == EINTR);
262
263         if (err == -1) {
264                 log_perror("waitpid()");
265                 return false;
266         }       
267
268         return (WIFEXITED(status) && WEXITSTATUS(status) == 0);
269 }
270
271 int main(int argc, char **argv)
272 {
273         signal(SIGHUP, hup);
274         signal(SIGINT, hup);
275         signal(SIGUSR1, do_nothing);  // Used in internal signalling.
276         signal(SIGPIPE, SIG_IGN);
277         
278         // Parse options.
279         int state_fd = -1;
280         bool test_config = false;
281         for ( ;; ) {
282                 static const option long_options[] = {
283                         { "state", required_argument, 0, 's' },
284                         { "test-config", no_argument, 0, 't' },
285                         { 0, 0, 0, 0 }
286                 };
287                 int option_index = 0;
288                 int c = getopt_long(argc, argv, "s:t", long_options, &option_index);
289      
290                 if (c == -1) {
291                         break;
292                 }
293                 switch (c) {
294                 case 's':
295                         state_fd = atoi(optarg);
296                         break;
297                 case 't':
298                         test_config = true;
299                         break;
300                 default:
301                         fprintf(stderr, "Unknown option '%s'\n", argv[option_index]);
302                         exit(1);
303                 }
304         }
305
306         string config_filename = "cubemap.config";
307         if (optind < argc) {
308                 config_filename = argv[optind++];
309         }
310
311         // Canonicalize argv[0] and config_filename.
312         char argv0_canon[PATH_MAX];
313         char config_filename_canon[PATH_MAX];
314
315         if (realpath(argv[0], argv0_canon) == NULL) {
316                 log_perror(argv[0]);
317                 exit(1);
318         }
319         if (realpath(config_filename.c_str(), config_filename_canon) == NULL) {
320                 log_perror(config_filename.c_str());
321                 exit(1);
322         }
323
324         // Now parse the configuration file.
325         Config config;
326         if (!parse_config(config_filename_canon, &config)) {
327                 exit(1);
328         }
329         if (test_config) {
330                 exit(0);
331         }
332         
333         // Ideally we'd like to daemonize only when we've started up all threads etc.,
334         // but daemon() forks, which is not good in multithreaded software, so we'll
335         // have to do it here.
336         if (config.daemonize) {
337                 if (daemon(0, 0) == -1) {
338                         log_perror("daemon");
339                         exit(1);
340                 }
341         }
342
343 start:
344         // Open logs as soon as possible.
345         open_logs(config.log_destinations);
346
347         log(INFO, "Cubemap " SERVER_VERSION " starting.");
348         if (config.access_log_file.empty()) {
349                 // Create a dummy logger.
350                 access_log = new AccessLogThread();
351         } else {
352                 access_log = new AccessLogThread(config.access_log_file);
353         }
354         access_log->run();
355
356         servers = new ServerPool(config.num_servers);
357
358         CubemapStateProto loaded_state;
359         struct timeval serialize_start;
360         set<string> deserialized_urls;
361         map<int, Acceptor *> deserialized_acceptors;
362         multimap<string, InputWithRefcount> inputs;  // multimap due to older versions without deduplication.
363         if (state_fd != -1) {
364                 log(INFO, "Deserializing state from previous process...");
365                 string serialized;
366                 if (!read_tempfile(state_fd, &serialized)) {
367                         exit(1);
368                 }
369                 if (!loaded_state.ParseFromString(serialized)) {
370                         log(ERROR, "Failed deserialization of state.");
371                         exit(1);
372                 }
373
374                 serialize_start.tv_sec = loaded_state.serialize_start_sec();
375                 serialize_start.tv_usec = loaded_state.serialize_start_usec();
376
377                 // Deserialize the streams.
378                 for (int i = 0; i < loaded_state.streams_size(); ++i) {
379                         const StreamProto &stream = loaded_state.streams(i);
380
381                         vector<int> data_fds;
382                         for (int j = 0; j < stream.data_fds_size(); ++j) {
383                                 data_fds.push_back(stream.data_fds(j));
384                         }
385
386                         // Older versions stored the data once in the protobuf instead of
387                         // sending around file descriptors.
388                         if (data_fds.empty() && stream.has_data()) {
389                                 data_fds.push_back(make_tempfile(stream.data()));
390                         }
391
392                         servers->add_stream_from_serialized(stream, data_fds);
393                         deserialized_urls.insert(stream.url());
394                 }
395
396                 // Deserialize the inputs. Note that we don't actually add them to any stream yet.
397                 for (int i = 0; i < loaded_state.inputs_size(); ++i) {
398                         InputWithRefcount iwr;
399                         iwr.input = create_input(loaded_state.inputs(i));
400                         iwr.refcount = 0;
401                         inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr));
402                 } 
403
404                 // Deserialize the acceptors.
405                 for (int i = 0; i < loaded_state.acceptors_size(); ++i) {
406                         deserialized_acceptors.insert(make_pair(
407                                 loaded_state.acceptors(i).port(),
408                                 new Acceptor(loaded_state.acceptors(i))));
409                 }
410
411                 log(INFO, "Deserialization done.");
412         }
413
414         // Add any new inputs coming from the config.
415         create_config_inputs(config, &inputs);
416         
417         // Find all streams in the configuration file, create them, and connect to the inputs.
418         create_streams(config, deserialized_urls, &inputs);
419         vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
420         
421         // Put back the existing clients. It doesn't matter which server we
422         // allocate them to, so just do round-robin. However, we need to add
423         // them after the mark pools have been set up.
424         for (int i = 0; i < loaded_state.clients_size(); ++i) {
425                 servers->add_client_from_serialized(loaded_state.clients(i));
426         }
427         
428         servers->run();
429
430         // Now delete all inputs that are longer in use, and start the others.
431         for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
432              input_it != inputs.end(); ) {
433                 if (input_it->second.refcount == 0) {
434                         log(WARNING, "Input '%s' no longer in use, closing.",
435                             input_it->first.c_str());
436                         input_it->second.input->close_socket();
437                         delete input_it->second.input;
438                         inputs.erase(input_it++);
439                 } else {
440                         input_it->second.input->run();
441                         ++input_it;
442                 }
443         }
444
445         // Start writing statistics.
446         StatsThread *stats_thread = NULL;
447         if (!config.stats_file.empty()) {
448                 stats_thread = new StatsThread(config.stats_file, config.stats_interval);
449                 stats_thread->run();
450         }
451
452         InputStatsThread *input_stats_thread = NULL;
453         if (!config.input_stats_file.empty()) {
454                 vector<Input*> inputs_no_refcount;
455                 for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
456                      input_it != inputs.end(); ++input_it) {
457                         inputs_no_refcount.push_back(input_it->second.input);
458                 }
459
460                 input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount);
461                 input_stats_thread->run();
462         }
463
464         struct timeval server_start;
465         gettimeofday(&server_start, NULL);
466         if (state_fd != -1) {
467                 // Measure time from we started deserializing (below) to now, when basically everything
468                 // is up and running. This is, in other words, a conservative estimate of how long our
469                 // “glitch” period was, not counting of course reconnects if the configuration changed.
470                 double glitch_time = server_start.tv_sec - serialize_start.tv_sec +
471                         1e-6 * (server_start.tv_usec - serialize_start.tv_usec);
472                 log(INFO, "Re-exec happened in approx. %.0f ms.", glitch_time * 1000.0);
473         }
474
475         while (!hupped) {
476                 usleep(100000);
477         }
478
479         // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec.
480         gettimeofday(&serialize_start, NULL);
481
482         if (input_stats_thread != NULL) {
483                 input_stats_thread->stop();
484                 delete input_stats_thread;
485         }
486         if (stats_thread != NULL) {
487                 stats_thread->stop();
488                 delete stats_thread;
489         }
490         for (size_t i = 0; i < acceptors.size(); ++i) {
491                 acceptors[i]->stop();
492         }
493         for (multimap<string, InputWithRefcount>::iterator input_it = inputs.begin();
494              input_it != inputs.end();
495              ++input_it) {
496                 input_it->second.input->stop();
497         }
498         servers->stop();
499
500         CubemapStateProto state;
501         if (stopped) {
502                 log(INFO, "Shutting down.");
503         } else {
504                 log(INFO, "Serializing state and re-execing...");
505                 state = collect_state(
506                         serialize_start, acceptors, inputs, servers);
507                 string serialized;
508                 state.SerializeToString(&serialized);
509                 state_fd = make_tempfile(serialized);
510                 if (state_fd == -1) {
511                         exit(1);
512                 }
513         }
514         delete servers;
515
516         for (unsigned i = 0; i < mark_pools.size(); ++i) {
517                 delete mark_pools[i];
518         }
519         mark_pools.clear();
520
521         access_log->stop();
522         delete access_log;
523         shut_down_logging();
524
525         if (stopped) {
526                 exit(0);
527         }
528
529         // OK, so the signal was SIGHUP. Check that the new config is okay, then exec the new binary.
530         if (!dry_run_config(argv0_canon, config_filename_canon)) {
531                 open_logs(config.log_destinations);
532                 log(ERROR, "%s --test-config failed. Restarting old version instead of new.", argv[0]);
533                 hupped = false;
534                 shut_down_logging();
535                 goto start;
536         }
537          
538         char buf[16];
539         sprintf(buf, "%d", state_fd);
540
541         for ( ;; ) {
542                 execlp(argv0_canon, argv0_canon, config_filename_canon, "--state", buf, NULL);
543                 open_logs(config.log_destinations);
544                 log_perror("execlp");
545                 log(ERROR, "re-exec of %s failed. Waiting 0.2 seconds and trying again...", argv0_canon);
546                 shut_down_logging();
547                 usleep(200000);
548         }
549 }