]> git.sesse.net Git - nageru/blob - futatabi/main.cpp
Prefix all the Futatabi Prometheus metrics by futatabi_ instead of nageru_.
[nageru] / futatabi / main.cpp
1 #include <assert.h>
2 #include <arpa/inet.h>
3 #include <atomic>
4 #include <chrono>
5 #include <condition_variable>
6 #include <dirent.h>
7 #include <getopt.h>
8 #include <memory>
9 #include <mutex>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string>
13 #include <sys/stat.h>
14 #include <sys/types.h>
15 #include <thread>
16 #include <unistd.h>
17 #include <vector>
18
19 extern "C" {
20 #include <libavformat/avformat.h>
21 }
22
23 #include "clip_list.h"
24 #include "shared/context.h"
25 #include "defs.h"
26 #include "shared/disk_space_estimator.h"
27 #include "shared/ffmpeg_raii.h"
28 #include "flags.h"
29 #include "frame_on_disk.h"
30 #include "frame.pb.h"
31 #include "shared/httpd.h"
32 #include "mainwindow.h"
33 #include "player.h"
34 #include "shared/post_to_main_thread.h"
35 #include "shared/ref_counted_gl_sync.h"
36 #include "shared/timebase.h"
37 #include "shared/metrics.h"
38 #include "ui_mainwindow.h"
39 #include "vaapi_jpeg_decoder.h"
40
41 #include <QApplication>
42 #include <QGLFormat>
43 #include <QSurfaceFormat>
44 #include <QProgressDialog>
45 #include <movit/init.h>
46 #include <movit/util.h>
47
48 using namespace std;
49 using namespace std::chrono;
50
51 constexpr char frame_magic[] = "Ftbifrm0";
52 constexpr size_t frame_magic_len = 8;
53
54 mutex RefCountedGLsync::fence_lock;
55 atomic<bool> should_quit{false};
56
57 int64_t start_pts = -1;
58
59 // TODO: Replace by some sort of GUI control, I guess.
60 int64_t current_pts = 0;
61
62 struct FrameFile {
63         FILE *fp = nullptr;
64         unsigned filename_idx;
65         size_t frames_written_so_far = 0;
66 };
67 std::map<int, FrameFile> open_frame_files;
68
69 mutex frame_mu;
70 vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
71 vector<string> frame_filenames;  // Under frame_mu.
72
73 atomic<int64_t> metric_received_frames[MAX_STREAMS]{{0}};
74
75 namespace {
76
77 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
78 {
79         if (open_frame_files.count(stream_idx) == 0) {
80                 char filename[256];
81                 snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
82                         global_flags.working_directory.c_str(), stream_idx, pts);
83                 FILE *fp = fopen(filename, "wb");
84                 if (fp == nullptr) {
85                         perror(filename);
86                         exit(1);
87                 }
88
89                 lock_guard<mutex> lock(frame_mu);
90                 unsigned filename_idx = frame_filenames.size();
91                 frame_filenames.push_back(filename);
92                 open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
93         }
94
95         FrameFile &file = open_frame_files[stream_idx];
96         unsigned filename_idx = file.filename_idx;
97         string filename;
98         {
99                 lock_guard<mutex> lock(frame_mu);
100                 filename = frame_filenames[filename_idx];
101         }
102
103         FrameHeaderProto hdr;
104         hdr.set_stream_idx(stream_idx);
105         hdr.set_pts(pts);
106         hdr.set_file_size(size);
107
108         string serialized;
109         if (!hdr.SerializeToString(&serialized)) {
110                 fprintf(stderr, "Frame header serialization failed.\n");
111                 exit(1);
112         }
113         uint32_t len = htonl(serialized.size());
114
115         if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
116                 perror("fwrite");
117                 exit(1);
118         }
119         if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
120                 perror("fwrite");
121                 exit(1);
122         }
123         if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
124                 perror("fwrite");
125                 exit(1);
126         }
127         off_t offset = ftell(file.fp);
128         if (fwrite(data, size, 1, file.fp) != 1) {
129                 perror("fwrite");
130                 exit(1);
131         }
132         fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
133         global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
134
135         FrameOnDisk frame;
136         frame.pts = pts;
137         frame.filename_idx = filename_idx;
138         frame.offset = offset;
139         frame.size = size;
140
141         {
142                 lock_guard<mutex> lock(frame_mu);
143                 assert(stream_idx < MAX_STREAMS);
144                 frames[stream_idx].push_back(frame);
145         }
146
147         if (++file.frames_written_so_far >= 1000) {
148                 size_t size = ftell(file.fp);
149
150                 // Start a new file next time.
151                 if (fclose(file.fp) != 0) {
152                         perror("fclose");
153                         exit(1);
154                 }
155                 open_frame_files.erase(stream_idx);
156
157                 // Write information about all frames in the finished file to SQLite.
158                 // (If we crash before getting to do this, we'll be scanning through
159                 // the file on next startup, and adding it to the database then.)
160                 // NOTE: Since we don't fsync(), we could in theory get broken data
161                 // but with the right size, but it would seem unlikely.
162                 vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
163                 {
164                         lock_guard<mutex> lock(frame_mu);
165                         for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
166                                 for (const FrameOnDisk &frame : frames[stream_idx]) {
167                                         if (frame.filename_idx == filename_idx) {
168                                                 frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
169                                         }
170                                 }
171                         }
172                 }
173
174                 const char *basename = filename.c_str();
175                 while (strchr(basename, '/') != nullptr) {
176                         basename = strchr(basename, '/') + 1;
177                 }
178                 db->store_frame_file(basename, size, frames_this_file);
179         }
180
181         return frame;
182 }
183
184 } // namespace
185
186 HTTPD *global_httpd;
187
188 void load_existing_frames();
189 void record_thread_func();
190
191 int main(int argc, char **argv)
192 {
193         parse_flags(argc, argv);
194         if (optind == argc) {
195                 global_flags.stream_source = "multiangle.mp4";
196                 global_flags.slow_down_input = true;
197         } else if (optind + 1 == argc) {
198                 global_flags.stream_source = argv[optind];
199         } else {
200                 usage();
201                 exit(1);
202         }
203
204         string frame_dir = global_flags.working_directory + "/frames";
205
206         if (mkdir(frame_dir.c_str(), 0777) == 0) {
207                 fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
208         } else if (errno != EEXIST) {
209                 perror(global_flags.working_directory.c_str());
210                 exit(1);
211         }
212
213         avformat_network_init();
214         global_metrics.set_prefix("futatabi");
215         global_httpd = new HTTPD;
216
217         QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
218
219         QSurfaceFormat fmt;
220         fmt.setDepthBufferSize(0);
221         fmt.setStencilBufferSize(0);
222         fmt.setProfile(QSurfaceFormat::CoreProfile);
223         fmt.setMajorVersion(4);
224         fmt.setMinorVersion(5);
225
226         // Turn off vsync, since Qt generally gives us at most frame rate
227         // (display frequency) / (number of QGLWidgets active).
228         fmt.setSwapInterval(0);
229
230         QSurfaceFormat::setDefaultFormat(fmt);
231
232         QGLFormat::setDefaultFormat(QGLFormat::fromSurfaceFormat(fmt));
233
234         QApplication app(argc, argv);
235         global_share_widget = new QGLWidget();
236         if (!global_share_widget->isValid()) {
237                 fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
238                 exit(1);
239         }
240
241         // Initialize Movit.
242         {
243                 QSurface *surface = create_surface();
244                 QOpenGLContext *context = create_context(surface);
245                 if (!make_current(context, surface)) {
246                         printf("oops\n");
247                         exit(1);
248                 }
249                 CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
250                 delete_context(context);
251                 // TODO: Delete the surface, too.
252         }
253
254         load_existing_frames();
255
256         MainWindow main_window;
257         main_window.show();
258
259         global_httpd->add_endpoint("/queue_status", bind(&MainWindow::get_queue_status, &main_window), HTTPD::NO_CORS_POLICY);
260         global_httpd->start(global_flags.http_port);
261
262         init_jpeg_vaapi();
263
264         thread record_thread(record_thread_func);
265
266         int ret = app.exec();
267
268         should_quit = true;
269         record_thread.join();
270         JPEGFrameView::shutdown();
271
272         return ret;
273 }
274
275 void load_frame_file(const char *filename, const string &basename, unsigned filename_idx, DB *db)
276 {
277         struct stat st;
278         if (stat(filename, &st) == -1) {
279                 perror(filename);
280                 exit(1);
281         }
282
283         vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
284         if (!all_frames.empty()) {
285                 // We already had this cached in the database, so no need to look in the file.
286                 for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
287                         if (frame.stream_idx < MAX_STREAMS) {
288                                 frames[frame.stream_idx].push_back(frame.frame);
289                                 start_pts = max(start_pts, frame.frame.pts);
290                         }
291                 }
292                 return;
293         }
294
295         FILE *fp = fopen(filename, "rb");
296         if (fp == nullptr) {
297                 perror(filename);
298                 exit(1);
299         }
300
301         size_t magic_offset = 0;
302         size_t skipped_bytes = 0;
303         while (!feof(fp) && !ferror(fp)) {
304                 int ch = getc(fp);
305                 if (ch == -1) {
306                         break;
307                 }
308                 if (ch != frame_magic[magic_offset++]) {
309                         skipped_bytes += magic_offset;
310                         magic_offset = 0;
311                         continue;
312                 }
313                 if (magic_offset < frame_magic_len) {
314                         // Still reading the magic (hopefully).
315                         continue;
316                 }
317
318                 // OK, found the magic. Try to parse the frame header.
319                 magic_offset = 0;
320
321                 if (skipped_bytes > 0)  {
322                         fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
323                                 filename, skipped_bytes);
324                         skipped_bytes = 0;
325                 }
326
327                 uint32_t len;
328                 if (fread(&len, sizeof(len), 1, fp) != 1) {
329                         fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
330                         break;
331                 }
332
333                 string serialized;
334                 serialized.resize(ntohl(len));
335                 if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
336                         fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
337                         break;
338                 }
339
340                 FrameHeaderProto hdr;
341                 if (!hdr.ParseFromString(serialized)) {
342                         fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
343                         continue;
344                 }
345
346                 FrameOnDisk frame;
347                 frame.pts = hdr.pts();
348                 frame.offset = ftell(fp);
349                 if (frame.offset == -1) {
350                         fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
351                         break;
352                 }
353                 frame.filename_idx = filename_idx;
354                 frame.size = hdr.file_size();
355
356                 if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
357                         fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
358                         continue;
359                 }
360
361                 if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
362                         frames[hdr.stream_idx()].push_back(frame);
363                         start_pts = max(start_pts, hdr.pts());
364                 }
365                 all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
366         }
367
368         if (skipped_bytes > 0) {
369                 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
370                         filename, skipped_bytes);
371         }
372
373         off_t size = ftell(fp);
374         fclose(fp);
375
376         if (size == -1) {
377                 fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
378                 return;
379         }
380
381         db->store_frame_file(basename, size, all_frames);
382 }
383
384 void load_existing_frames()
385 {
386         QProgressDialog progress("Scanning frame directory...", "Abort", 0, 1);
387         progress.setWindowTitle("Futatabi");
388         progress.setWindowModality(Qt::WindowModal);
389         progress.setMinimumDuration(1000);
390         progress.setMaximum(1);
391         progress.setValue(0);
392
393         string frame_dir = global_flags.working_directory + "/frames";
394         DIR *dir = opendir(frame_dir.c_str());
395         if (dir == nullptr) {
396                 perror("frames/");
397                 start_pts = 0;
398                 return;
399         }
400
401         vector<string> frame_basenames;
402         for ( ;; ) {
403                 errno = 0;
404                 dirent *de = readdir(dir);
405                 if (de == nullptr) {
406                         if (errno != 0) {
407                                 perror("readdir");
408                                 exit(1);
409                         }
410                         break;
411                 }
412
413                 if (de->d_type == DT_REG || de->d_type == DT_LNK) {
414                         string filename = frame_dir + "/" + de->d_name;
415                         frame_filenames.push_back(filename);
416                         frame_basenames.push_back(de->d_name);
417                 }
418
419                 if (progress.wasCanceled()) {
420                         exit(1);
421                 }
422         }
423         closedir(dir);
424
425         progress.setMaximum(frame_filenames.size() + 2);
426         progress.setValue(1);
427
428         progress.setLabelText("Opening database...");
429         DB db(global_flags.working_directory + "/futatabi.db");
430
431         progress.setLabelText("Reading frame files...");
432         progress.setValue(2);
433
434         for (size_t i = 0; i < frame_filenames.size(); ++i) {
435                 load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
436                 progress.setValue(i + 3);
437                 if (progress.wasCanceled()) {
438                         exit(1);
439                 }
440         }
441
442         if (start_pts == -1) {
443                 start_pts = 0;
444         } else {
445                 // Add a gap of one second from the old frames to the new ones.
446                 start_pts += TIMEBASE;
447         }
448         current_pts = start_pts;
449
450         for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
451                 sort(frames[stream_idx].begin(), frames[stream_idx].end(),
452                         [](const auto &a, const auto &b) { return a.pts < b.pts; });
453         }
454
455         db.clean_unused_frame_files(frame_basenames);
456 }
457
458 void record_thread_func()
459 {
460         for (unsigned i = 0; i < MAX_STREAMS; ++i) {
461                 global_metrics.add("received_frames", {{ "stream", to_string(i) }}, &metric_received_frames[i]);
462         }
463
464         if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
465                 // Save the user from some repetitive messages.
466                 return;
467         }
468
469         pthread_setname_np(pthread_self(), "ReceiveFrames");
470
471         int64_t pts_offset = 0;  // Needs to be initialized due to a spurious GCC warning.
472         DB db(global_flags.working_directory + "/futatabi.db");
473
474         while (!should_quit.load()) {
475                 auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
476                 if (format_ctx == nullptr) {
477                         fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
478                         sleep(1);
479                         continue;
480                 }
481
482                 int64_t last_pts = -1;
483
484                 while (!should_quit.load()) {
485                         AVPacket pkt;
486                         unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
487                                 &pkt, av_packet_unref);
488                         av_init_packet(&pkt);
489                         pkt.data = nullptr;
490                         pkt.size = 0;
491
492                         // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
493                         // right now, should_quit will be ignored if it's hung on I/O.
494                         if (av_read_frame(format_ctx.get(), &pkt) != 0) {
495                                 break;
496                         }
497                         if (pkt.stream_index >= MAX_STREAMS) {
498                                 continue;
499                         }
500
501                         ++metric_received_frames[pkt.stream_index];
502
503                         // Convert pts to our own timebase.
504                         AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
505                         int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
506
507                         // Translate offset into our stream.
508                         if (last_pts == -1) {
509                                 pts_offset = start_pts - pts;
510                         }
511                         pts = std::max(pts + pts_offset, start_pts);
512
513                         //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
514                         //      pkt.stream_index, pts, pkt.size);
515                         FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
516
517                         post_to_main_thread([pkt, frame] {
518                                 global_mainwindow->display_frame(pkt.stream_index, frame);
519                         });
520
521                         if (last_pts != -1 && global_flags.slow_down_input) {
522                                 this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
523                         }
524                         last_pts = pts;
525                         current_pts = pts;
526                 }
527
528                 fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
529                 sleep(1);
530
531                 start_pts = last_pts + TIMEBASE;
532         }
533 }