5 #include <condition_variable>
14 #include <sys/types.h>
20 #include <libavformat/avformat.h>
23 #include "clip_list.h"
27 #include "frame_on_disk.h"
28 #include "mainwindow.h"
30 #include "shared/context.h"
31 #include "shared/disk_space_estimator.h"
32 #include "shared/ffmpeg_raii.h"
33 #include "shared/httpd.h"
34 #include "shared/metrics.h"
35 #include "shared/post_to_main_thread.h"
36 #include "shared/ref_counted_gl_sync.h"
37 #include "shared/timebase.h"
38 #include "ui_mainwindow.h"
39 #include "vaapi_jpeg_decoder.h"
41 #include <QApplication>
43 #include <QProgressDialog>
44 #include <QSurfaceFormat>
45 #include <movit/init.h>
46 #include <movit/util.h>
49 using namespace std::chrono;
51 constexpr char frame_magic[] = "Ftbifrm0";
52 constexpr size_t frame_magic_len = 8;
54 mutex RefCountedGLsync::fence_lock;
55 atomic<bool> should_quit{ false };
57 int64_t start_pts = -1;
59 // TODO: Replace by some sort of GUI control, I guess.
60 int64_t current_pts = 0;
64 unsigned filename_idx;
65 size_t frames_written_so_far = 0;
67 std::map<int, FrameFile> open_frame_files;
70 vector<FrameOnDisk> frames[MAX_STREAMS]; // Under frame_mu.
71 vector<string> frame_filenames; // Under frame_mu.
73 atomic<int64_t> metric_received_frames[MAX_STREAMS]{ { 0 } };
74 Summary metric_received_frame_size_bytes;
78 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
80 if (open_frame_files.count(stream_idx) == 0) {
82 snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
83 global_flags.working_directory.c_str(), stream_idx, pts);
84 FILE *fp = fopen(filename, "wb");
90 lock_guard<mutex> lock(frame_mu);
91 unsigned filename_idx = frame_filenames.size();
92 frame_filenames.push_back(filename);
93 open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
96 FrameFile &file = open_frame_files[stream_idx];
97 unsigned filename_idx = file.filename_idx;
100 lock_guard<mutex> lock(frame_mu);
101 filename = frame_filenames[filename_idx];
104 FrameHeaderProto hdr;
105 hdr.set_stream_idx(stream_idx);
107 hdr.set_file_size(size);
110 if (!hdr.SerializeToString(&serialized)) {
111 fprintf(stderr, "Frame header serialization failed.\n");
114 uint32_t len = htonl(serialized.size());
116 if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
120 if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
124 if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
128 off_t offset = ftell(file.fp);
129 if (fwrite(data, size, 1, file.fp) != 1) {
133 fflush(file.fp); // No fsync(), though. We can accept losing a few frames.
134 global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
138 frame.filename_idx = filename_idx;
139 frame.offset = offset;
143 lock_guard<mutex> lock(frame_mu);
144 assert(stream_idx < MAX_STREAMS);
145 frames[stream_idx].push_back(frame);
148 if (++file.frames_written_so_far >= FRAMES_PER_FILE) {
149 size_t size = ftell(file.fp);
151 // Start a new file next time.
152 if (fclose(file.fp) != 0) {
156 open_frame_files.erase(stream_idx);
158 // Write information about all frames in the finished file to SQLite.
159 // (If we crash before getting to do this, we'll be scanning through
160 // the file on next startup, and adding it to the database then.)
161 // NOTE: Since we don't fsync(), we could in theory get broken data
162 // but with the right size, but it would seem unlikely.
163 vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
165 lock_guard<mutex> lock(frame_mu);
166 for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
167 for (const FrameOnDisk &frame : frames[stream_idx]) {
168 if (frame.filename_idx == filename_idx) {
169 frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
175 const char *basename = filename.c_str();
176 while (strchr(basename, '/') != nullptr) {
177 basename = strchr(basename, '/') + 1;
179 db->store_frame_file(basename, size, frames_this_file);
189 void load_existing_frames();
190 void record_thread_func();
192 int main(int argc, char **argv)
194 parse_flags(argc, argv);
195 if (optind == argc) {
196 global_flags.stream_source = "multiangle.mp4";
197 global_flags.slow_down_input = true;
198 } else if (optind + 1 == argc) {
199 global_flags.stream_source = argv[optind];
205 string frame_dir = global_flags.working_directory + "/frames";
207 if (mkdir(frame_dir.c_str(), 0777) == 0) {
208 fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
209 } else if (errno != EEXIST) {
210 perror(global_flags.working_directory.c_str());
214 avformat_network_init();
215 global_metrics.set_prefix("futatabi");
216 global_httpd = new HTTPD;
217 global_metrics.remove("num_connected_multicam_clients");
219 QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
222 fmt.setDepthBufferSize(0);
223 fmt.setStencilBufferSize(0);
224 fmt.setProfile(QSurfaceFormat::CoreProfile);
225 fmt.setMajorVersion(4);
226 fmt.setMinorVersion(5);
228 // Turn off vsync, since Qt generally gives us at most frame rate
229 // (display frequency) / (number of QGLWidgets active).
230 fmt.setSwapInterval(0);
232 QSurfaceFormat::setDefaultFormat(fmt);
234 QGLFormat::setDefaultFormat(QGLFormat::fromSurfaceFormat(fmt));
236 QApplication app(argc, argv);
237 global_share_widget = new QGLWidget();
238 if (!global_share_widget->isValid()) {
239 fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
245 QSurface *surface = create_surface();
246 QOpenGLContext *context = create_context(surface);
247 if (!make_current(context, surface)) {
251 CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
252 delete_context(context);
253 // TODO: Delete the surface, too.
256 load_existing_frames();
258 for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
259 if (!frames[stream_idx].empty()) {
260 assert(start_pts > frames[stream_idx].back().pts);
264 MainWindow main_window;
267 global_httpd->add_endpoint("/queue_status", bind(&MainWindow::get_queue_status, &main_window), HTTPD::NO_CORS_POLICY);
268 global_httpd->start(global_flags.http_port);
272 thread record_thread(record_thread_func);
274 int ret = app.exec();
277 record_thread.join();
278 JPEGFrameView::shutdown();
283 void load_frame_file(const char *filename, const string &basename, unsigned filename_idx, DB *db)
286 if (stat(filename, &st) == -1) {
291 vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
292 if (!all_frames.empty()) {
293 // We already had this cached in the database, so no need to look in the file.
294 for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
295 if (frame.stream_idx < MAX_STREAMS) {
296 frames[frame.stream_idx].push_back(frame.frame);
297 start_pts = max(start_pts, frame.frame.pts);
303 FILE *fp = fopen(filename, "rb");
309 size_t magic_offset = 0;
310 size_t skipped_bytes = 0;
311 while (!feof(fp) && !ferror(fp)) {
316 if (ch != frame_magic[magic_offset++]) {
317 skipped_bytes += magic_offset;
321 if (magic_offset < frame_magic_len) {
322 // Still reading the magic (hopefully).
326 // OK, found the magic. Try to parse the frame header.
329 if (skipped_bytes > 0) {
330 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
331 filename, skipped_bytes);
336 if (fread(&len, sizeof(len), 1, fp) != 1) {
337 fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
342 serialized.resize(ntohl(len));
343 if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
344 fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
348 FrameHeaderProto hdr;
349 if (!hdr.ParseFromString(serialized)) {
350 fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
355 frame.pts = hdr.pts();
356 frame.offset = ftell(fp);
357 if (frame.offset == -1) {
358 fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
361 frame.filename_idx = filename_idx;
362 frame.size = hdr.file_size();
364 if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
365 fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
369 if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
370 frames[hdr.stream_idx()].push_back(frame);
371 start_pts = max(start_pts, hdr.pts());
373 all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
376 if (skipped_bytes > 0) {
377 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
378 filename, skipped_bytes);
381 off_t size = ftell(fp);
385 fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
389 db->store_frame_file(basename, size, all_frames);
392 void load_existing_frames()
394 QProgressDialog progress("Scanning frame directory...", "Abort", 0, 1);
395 progress.setWindowTitle("Futatabi");
396 progress.setWindowModality(Qt::WindowModal);
397 progress.setMinimumDuration(1000);
398 progress.setMaximum(1);
399 progress.setValue(0);
401 string frame_dir = global_flags.working_directory + "/frames";
402 DIR *dir = opendir(frame_dir.c_str());
403 if (dir == nullptr) {
409 vector<string> frame_basenames;
412 dirent *de = readdir(dir);
421 if (de->d_type == DT_REG || de->d_type == DT_LNK) {
422 string filename = frame_dir + "/" + de->d_name;
423 frame_filenames.push_back(filename);
424 frame_basenames.push_back(de->d_name);
427 if (progress.wasCanceled()) {
433 progress.setMaximum(frame_filenames.size() + 2);
434 progress.setValue(1);
436 progress.setLabelText("Opening database...");
437 DB db(global_flags.working_directory + "/futatabi.db");
439 progress.setLabelText("Reading frame files...");
440 progress.setValue(2);
442 for (size_t i = 0; i < frame_filenames.size(); ++i) {
443 load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
444 progress.setValue(i + 3);
445 if (progress.wasCanceled()) {
450 if (start_pts == -1) {
453 // Add a gap of one second from the old frames to the new ones.
454 start_pts += TIMEBASE;
456 current_pts = start_pts;
458 for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
459 sort(frames[stream_idx].begin(), frames[stream_idx].end(),
460 [](const auto &a, const auto &b) { return a.pts < b.pts; });
463 db.clean_unused_frame_files(frame_basenames);
466 void record_thread_func()
468 for (unsigned i = 0; i < MAX_STREAMS; ++i) {
469 global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]);
471 global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes);
473 if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
474 // Save the user from some repetitive messages.
478 pthread_setname_np(pthread_self(), "ReceiveFrames");
480 int64_t pts_offset = 0; // Needs to be initialized due to a spurious GCC warning.
481 DB db(global_flags.working_directory + "/futatabi.db");
483 while (!should_quit.load()) {
484 auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
485 if (format_ctx == nullptr) {
486 fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
491 int64_t last_pts = -1;
493 while (!should_quit.load()) {
495 unique_ptr<AVPacket, decltype(av_packet_unref) *> pkt_cleanup(
496 &pkt, av_packet_unref);
497 av_init_packet(&pkt);
501 // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
502 // right now, should_quit will be ignored if it's hung on I/O.
503 if (av_read_frame(format_ctx.get(), &pkt) != 0) {
506 if (pkt.stream_index >= MAX_STREAMS) {
510 ++metric_received_frames[pkt.stream_index];
511 metric_received_frame_size_bytes.count_event(pkt.size);
513 // Convert pts to our own timebase.
514 AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
515 int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
517 // Translate offset into our stream.
518 if (last_pts == -1) {
519 pts_offset = start_pts - pts;
521 pts = std::max(pts + pts_offset, start_pts);
523 //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
524 // pkt.stream_index, pts, pkt.size);
525 FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
527 post_to_main_thread([pkt, frame] {
528 global_mainwindow->display_frame(pkt.stream_index, frame);
531 if (last_pts != -1 && global_flags.slow_down_input) {
532 this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
538 if (!should_quit.load()) {
539 fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
543 start_pts = last_pts + TIMEBASE;