]> git.sesse.net Git - nageru/blob - futatabi/main.cpp
Log a warning when we kill a client that is not keeping up.
[nageru] / futatabi / main.cpp
1 #include <arpa/inet.h>
2 #include <assert.h>
3 #include <atomic>
4 #include <chrono>
5 #include <condition_variable>
6 #include <dirent.h>
7 #include <getopt.h>
8 #include <memory>
9 #include <mutex>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string>
13 #include <sys/stat.h>
14 #include <sys/types.h>
15 #include <thread>
16 #include <unistd.h>
17 #include <vector>
18
19 extern "C" {
20 #include <libavformat/avformat.h>
21 }
22
23 #include "clip_list.h"
24 #include "defs.h"
25 #include "flags.h"
26 #include "frame.pb.h"
27 #include "frame_on_disk.h"
28 #include "mainwindow.h"
29 #include "player.h"
30 #include "shared/context.h"
31 #include "shared/disk_space_estimator.h"
32 #include "shared/ffmpeg_raii.h"
33 #include "shared/httpd.h"
34 #include "shared/metrics.h"
35 #include "shared/post_to_main_thread.h"
36 #include "shared/ref_counted_gl_sync.h"
37 #include "shared/timebase.h"
38 #include "ui_mainwindow.h"
39 #include "vaapi_jpeg_decoder.h"
40
41 #include <QApplication>
42 #include <QGLFormat>
43 #include <QProgressDialog>
44 #include <QSurfaceFormat>
45 #include <movit/init.h>
46 #include <movit/util.h>
47
48 using namespace std;
49 using namespace std::chrono;
50
51 constexpr char frame_magic[] = "Ftbifrm0";
52 constexpr size_t frame_magic_len = 8;
53
54 mutex RefCountedGLsync::fence_lock;
55 atomic<bool> should_quit{ false };
56
57 int64_t start_pts = -1;
58
59 // TODO: Replace by some sort of GUI control, I guess.
60 int64_t current_pts = 0;
61
62 struct FrameFile {
63         FILE *fp = nullptr;
64         unsigned filename_idx;
65         size_t frames_written_so_far = 0;
66 };
67 std::map<int, FrameFile> open_frame_files;
68
69 mutex frame_mu;
70 vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
71 vector<string> frame_filenames;  // Under frame_mu.
72
73 atomic<int64_t> metric_received_frames[MAX_STREAMS]{ { 0 } };
74 Summary metric_received_frame_size_bytes;
75
76 namespace {
77
78 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, vector<uint32_t> audio, DB *db)
79 {
80         if (open_frame_files.count(stream_idx) == 0) {
81                 char filename[256];
82                 snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09" PRId64 ".frames",
83                          global_flags.working_directory.c_str(), stream_idx, pts);
84                 FILE *fp = fopen(filename, "wb");
85                 if (fp == nullptr) {
86                         perror(filename);
87                         abort();
88                 }
89
90                 lock_guard<mutex> lock(frame_mu);
91                 unsigned filename_idx = frame_filenames.size();
92                 frame_filenames.push_back(filename);
93                 open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
94         }
95
96         FrameFile &file = open_frame_files[stream_idx];
97         unsigned filename_idx = file.filename_idx;
98         string filename;
99         {
100                 lock_guard<mutex> lock(frame_mu);
101                 filename = frame_filenames[filename_idx];
102         }
103
104         FrameHeaderProto hdr;
105         hdr.set_stream_idx(stream_idx);
106         hdr.set_pts(pts);
107         hdr.set_file_size(size);
108         hdr.set_audio_size(audio.size() * sizeof(audio[0]));
109
110         string serialized;
111         if (!hdr.SerializeToString(&serialized)) {
112                 fprintf(stderr, "Frame header serialization failed.\n");
113                 abort();
114         }
115         uint32_t len = htonl(serialized.size());
116
117         if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
118                 perror("fwrite");
119                 abort();
120         }
121         if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
122                 perror("fwrite");
123                 abort();
124         }
125         if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
126                 perror("fwrite");
127                 abort();
128         }
129         off_t offset = ftell(file.fp);
130         if (fwrite(data, size, 1, file.fp) != 1) {
131                 perror("fwrite");
132                 abort();
133         }
134         if (audio.size() > 0) {
135                 if (fwrite(audio.data(), hdr.audio_size(), 1, file.fp) != 1) {
136                         perror("fwrite");
137                         exit(1);
138                 }
139         }
140         fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
141         global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
142
143         FrameOnDisk frame;
144         frame.pts = pts;
145         frame.filename_idx = filename_idx;
146         frame.offset = offset;
147         frame.size = size;
148         frame.audio_size = audio.size() * sizeof(audio[0]);
149
150         {
151                 lock_guard<mutex> lock(frame_mu);
152                 assert(stream_idx < MAX_STREAMS);
153                 frames[stream_idx].push_back(frame);
154         }
155
156         if (++file.frames_written_so_far >= FRAMES_PER_FILE) {
157                 size_t size = ftell(file.fp);
158
159                 // Start a new file next time.
160                 if (fclose(file.fp) != 0) {
161                         perror("fclose");
162                         abort();
163                 }
164                 open_frame_files.erase(stream_idx);
165
166                 // Write information about all frames in the finished file to SQLite.
167                 // (If we crash before getting to do this, we'll be scanning through
168                 // the file on next startup, and adding it to the database then.)
169                 // NOTE: Since we don't fsync(), we could in theory get broken data
170                 // but with the right size, but it would seem unlikely.
171                 vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
172                 {
173                         lock_guard<mutex> lock(frame_mu);
174                         for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
175                                 for (const FrameOnDisk &frame : frames[stream_idx]) {
176                                         if (frame.filename_idx == filename_idx) {
177                                                 frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
178                                         }
179                                 }
180                         }
181                 }
182
183                 const char *basename = filename.c_str();
184                 while (strchr(basename, '/') != nullptr) {
185                         basename = strchr(basename, '/') + 1;
186                 }
187                 db->store_frame_file(basename, size, frames_this_file);
188         }
189
190         return frame;
191 }
192
193 }  // namespace
194
195 HTTPD *global_httpd;
196
197 void load_existing_frames();
198 void record_thread_func();
199
200 int main(int argc, char **argv)
201 {
202         parse_flags(argc, argv);
203         if (optind == argc) {
204                 global_flags.stream_source = "multiangle.mp4";
205                 global_flags.slow_down_input = true;
206         } else if (optind + 1 == argc) {
207                 global_flags.stream_source = argv[optind];
208         } else {
209                 usage();
210                 abort();
211         }
212
213         string frame_dir = global_flags.working_directory + "/frames";
214
215         if (mkdir(frame_dir.c_str(), 0777) == 0) {
216                 fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
217         } else if (errno != EEXIST) {
218                 perror(global_flags.working_directory.c_str());
219                 abort();
220         }
221
222         avformat_network_init();
223         global_metrics.set_prefix("futatabi");
224         global_httpd = new HTTPD;
225         global_metrics.remove("num_connected_multicam_clients");
226
227         QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
228
229         QSurfaceFormat fmt;
230         fmt.setDepthBufferSize(0);
231         fmt.setStencilBufferSize(0);
232         fmt.setProfile(QSurfaceFormat::CoreProfile);
233         fmt.setMajorVersion(4);
234         fmt.setMinorVersion(5);
235
236         // Turn off vsync, since Qt generally gives us at most frame rate
237         // (display frequency) / (number of QGLWidgets active).
238         fmt.setSwapInterval(0);
239
240         QSurfaceFormat::setDefaultFormat(fmt);
241
242         QGLFormat::setDefaultFormat(QGLFormat::fromSurfaceFormat(fmt));
243
244         QApplication app(argc, argv);
245         global_share_widget = new QGLWidget();
246         if (!global_share_widget->isValid()) {
247                 fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
248                 abort();
249         }
250
251         // Initialize Movit.
252         {
253                 QSurface *surface = create_surface();
254                 QOpenGLContext *context = create_context(surface);
255                 if (!make_current(context, surface)) {
256                         printf("oops\n");
257                         abort();
258                 }
259                 CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
260                 delete_context(context);
261                 // TODO: Delete the surface, too.
262         }
263
264         load_existing_frames();
265
266         for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
267                 if (!frames[stream_idx].empty()) {
268                         assert(start_pts > frames[stream_idx].back().pts);
269                 }
270         }
271
272         MainWindow main_window;
273         main_window.show();
274
275         global_httpd->add_endpoint("/queue_status", bind(&MainWindow::get_queue_status, &main_window), HTTPD::NO_CORS_POLICY);
276         global_httpd->start(global_flags.http_port);
277
278         init_jpeg_vaapi();
279
280         thread record_thread(record_thread_func);
281
282         int ret = app.exec();
283
284         should_quit = true;
285         record_thread.join();
286
287         return ret;
288 }
289
290 void load_frame_file(const char *filename, const string &basename, unsigned filename_idx, DB *db)
291 {
292         struct stat st;
293         if (stat(filename, &st) == -1) {
294                 perror(filename);
295                 abort();
296         }
297
298         vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
299         if (!all_frames.empty()) {
300                 // We already had this cached in the database, so no need to look in the file.
301                 for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
302                         if (frame.stream_idx < MAX_STREAMS) {
303                                 frames[frame.stream_idx].push_back(frame.frame);
304                                 start_pts = max(start_pts, frame.frame.pts);
305                         }
306                 }
307                 return;
308         }
309
310         FILE *fp = fopen(filename, "rb");
311         if (fp == nullptr) {
312                 perror(filename);
313                 abort();
314         }
315
316         // Find the actual length of the file, since fseek() past the end of the file
317         // will succeed without an error.
318         if (fseek(fp, 0, SEEK_END) == -1) {
319                 perror("fseek(SEEK_END)");
320                 abort();
321         }
322         off_t file_len = ftell(fp);
323         if (fseek(fp, 0, SEEK_SET) == -1) {
324                 perror("fseek(SEEK_SET)");
325                 abort();
326         }
327
328         size_t magic_offset = 0;
329         size_t skipped_bytes = 0;
330         while (!feof(fp) && !ferror(fp)) {
331                 int ch = getc(fp);
332                 if (ch == -1) {
333                         break;
334                 }
335                 if (ch != frame_magic[magic_offset++]) {
336                         skipped_bytes += magic_offset;
337                         magic_offset = 0;
338                         continue;
339                 }
340                 if (magic_offset < frame_magic_len) {
341                         // Still reading the magic (hopefully).
342                         continue;
343                 }
344
345                 // OK, found the magic. Try to parse the frame header.
346                 magic_offset = 0;
347
348                 if (skipped_bytes > 0) {
349                         fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
350                                 filename, skipped_bytes);
351                         skipped_bytes = 0;
352                 }
353
354                 uint32_t len;
355                 if (fread(&len, sizeof(len), 1, fp) != 1) {
356                         fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
357                         break;
358                 }
359
360                 string serialized;
361                 serialized.resize(ntohl(len));
362                 if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
363                         fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
364                         break;
365                 }
366
367                 FrameHeaderProto hdr;
368                 if (!hdr.ParseFromString(serialized)) {
369                         fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
370                         continue;
371                 }
372
373                 FrameOnDisk frame;
374                 frame.pts = hdr.pts();
375                 frame.offset = ftell(fp);
376                 if (frame.offset == -1) {
377                         fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
378                         break;
379                 }
380                 frame.filename_idx = filename_idx;
381                 frame.size = hdr.file_size();
382                 frame.audio_size = hdr.audio_size();
383
384                 if (frame.offset + frame.size + frame.audio_size > file_len ||
385                     fseek(fp, frame.offset + frame.size + frame.audio_size, SEEK_SET) == -1) {
386                         fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
387                         break;
388                 }
389
390                 if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
391                         frames[hdr.stream_idx()].push_back(frame);
392                         start_pts = max(start_pts, hdr.pts());
393                 }
394                 all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
395         }
396
397         if (skipped_bytes > 0) {
398                 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
399                         filename, skipped_bytes);
400         }
401
402         off_t size = ftell(fp);
403         fclose(fp);
404
405         if (size == -1) {
406                 fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
407                 return;
408         }
409
410         db->store_frame_file(basename, size, all_frames);
411 }
412
413 void load_existing_frames()
414 {
415         QProgressDialog progress("Scanning frame directory...", "Abort", 0, 1);
416         progress.setWindowTitle("Futatabi");
417         progress.setWindowModality(Qt::WindowModal);
418         progress.setMinimumDuration(1000);
419         progress.setMaximum(1);
420         progress.setValue(0);
421
422         string frame_dir = global_flags.working_directory + "/frames";
423         DIR *dir = opendir(frame_dir.c_str());
424         if (dir == nullptr) {
425                 perror("frames/");
426                 start_pts = 0;
427                 return;
428         }
429
430         vector<string> frame_basenames;
431         for (;;) {
432                 errno = 0;
433                 dirent *de = readdir(dir);
434                 if (de == nullptr) {
435                         if (errno != 0) {
436                                 perror("readdir");
437                                 abort();
438                         }
439                         break;
440                 }
441
442                 if (de->d_type == DT_REG || de->d_type == DT_LNK) {
443                         string filename = frame_dir + "/" + de->d_name;
444                         frame_filenames.push_back(filename);
445                         frame_basenames.push_back(de->d_name);
446                 }
447
448                 if (progress.wasCanceled()) {
449                         abort();
450                 }
451         }
452         closedir(dir);
453
454         progress.setMaximum(frame_filenames.size() + 2);
455         progress.setValue(1);
456
457         progress.setLabelText("Opening database...");
458         DB db(global_flags.working_directory + "/futatabi.db");
459
460         progress.setLabelText("Reading frame files...");
461         progress.setValue(2);
462
463         for (size_t i = 0; i < frame_filenames.size(); ++i) {
464                 load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
465                 progress.setValue(i + 3);
466                 if (progress.wasCanceled()) {
467                         abort();
468                 }
469         }
470
471         if (start_pts == -1) {
472                 start_pts = 0;
473         } else {
474                 // Add a gap of one second from the old frames to the new ones.
475                 start_pts += TIMEBASE;
476         }
477         current_pts = start_pts;
478
479         for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
480                 sort(frames[stream_idx].begin(), frames[stream_idx].end(),
481                      [](const auto &a, const auto &b) { return a.pts < b.pts; });
482         }
483
484         db.clean_unused_frame_files(frame_basenames);
485 }
486
487 void record_thread_func()
488 {
489         for (unsigned i = 0; i < MAX_STREAMS; ++i) {
490                 global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]);
491         }
492         global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes);
493
494         if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
495                 // Save the user from some repetitive messages.
496                 return;
497         }
498
499         pthread_setname_np(pthread_self(), "ReceiveFrames");
500
501         int64_t pts_offset = 0;  // Needs to be initialized due to a spurious GCC warning.
502         DB db(global_flags.working_directory + "/futatabi.db");
503
504         while (!should_quit.load()) {
505                 auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
506                 if (format_ctx == nullptr) {
507                         fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
508                         sleep(1);
509                         continue;
510                 }
511
512                 // Match any audio streams to video streams, sequentially.
513                 vector<int> video_stream_idx, audio_stream_idx;
514                 for (unsigned i = 0; i < format_ctx->nb_streams; ++i) {
515                         if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
516                                 video_stream_idx.push_back(i);
517                         } else if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
518                                 audio_stream_idx.push_back(i);
519                         }
520                 }
521                 unordered_map<int, int> audio_stream_to_video_stream_idx;
522                 for (size_t i = 0; i < min(video_stream_idx.size(), audio_stream_idx.size()); ++i) {
523                         audio_stream_to_video_stream_idx[audio_stream_idx[i]] = video_stream_idx[i];
524                 }
525
526                 vector<uint32_t> pending_audio[MAX_STREAMS];
527                 int64_t last_pts = -1;
528                 while (!should_quit.load()) {
529                         AVPacket pkt;
530                         unique_ptr<AVPacket, decltype(av_packet_unref) *> pkt_cleanup(
531                                 &pkt, av_packet_unref);
532                         av_init_packet(&pkt);
533                         pkt.data = nullptr;
534                         pkt.size = 0;
535
536                         // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
537                         // right now, should_quit will be ignored if it's hung on I/O.
538                         if (av_read_frame(format_ctx.get(), &pkt) != 0) {
539                                 break;
540                         }
541
542                         AVStream *stream = format_ctx->streams[pkt.stream_index];
543                         if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO &&
544                             audio_stream_to_video_stream_idx.count(pkt.stream_index)) {
545                                 if ((pkt.size % (sizeof(uint32_t) * 2)) != 0) {
546                                         fprintf(stderr, "Audio stream %u had a packet of strange length %d, ignoring.\n",
547                                                 pkt.stream_index, pkt.size);
548                                 } else {
549                                         // TODO: Endianness?
550                                         const uint32_t *begin = (const uint32_t *)pkt.data;
551                                         const uint32_t *end = (const uint32_t *)(pkt.data + pkt.size);
552                                         pending_audio[audio_stream_to_video_stream_idx[pkt.stream_index]].assign(begin, end);
553                                 }
554                         }
555
556                         if (pkt.stream_index >= MAX_STREAMS ||
557                             stream->codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
558                                 continue;
559                         }
560
561                         ++metric_received_frames[pkt.stream_index];
562                         metric_received_frame_size_bytes.count_event(pkt.size);
563
564                         // Convert pts to our own timebase.
565                         AVRational stream_timebase = stream->time_base;
566                         int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
567
568                         // Translate offset into our stream.
569                         if (last_pts == -1) {
570                                 pts_offset = start_pts - pts;
571                         }
572                         pts = std::max(pts + pts_offset, start_pts);
573
574                         //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
575                         //      pkt.stream_index, pts, pkt.size);
576                         FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, move(pending_audio[pkt.stream_index]), &db);
577
578                         post_to_main_thread([pkt, frame] {
579                                 global_mainwindow->display_frame(pkt.stream_index, frame);
580                         });
581
582                         if (last_pts != -1 && global_flags.slow_down_input) {
583                                 this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
584                         }
585                         last_pts = pts;
586                         current_pts = pts;
587                 }
588
589                 if (!should_quit.load()) {
590                         fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
591                         sleep(1);
592                 }
593
594                 start_pts = last_pts + TIMEBASE;
595         }
596 }