5 #include <condition_variable>
14 #include <sys/types.h>
20 #include <libavformat/avformat.h>
23 #include "clip_list.h"
27 #include "frame_on_disk.h"
28 #include "mainwindow.h"
30 #include "shared/context.h"
31 #include "shared/disk_space_estimator.h"
32 #include "shared/ffmpeg_raii.h"
33 #include "shared/httpd.h"
34 #include "shared/metrics.h"
35 #include "shared/post_to_main_thread.h"
36 #include "shared/ref_counted_gl_sync.h"
37 #include "shared/timebase.h"
38 #include "ui_mainwindow.h"
39 #include "vaapi_jpeg_decoder.h"
41 #include <QApplication>
43 #include <QProgressDialog>
44 #include <QSurfaceFormat>
45 #include <movit/init.h>
46 #include <movit/util.h>
49 using namespace std::chrono;
51 constexpr char frame_magic[] = "Ftbifrm0";
52 constexpr size_t frame_magic_len = 8;
54 mutex RefCountedGLsync::fence_lock;
55 atomic<bool> should_quit{ false };
57 int64_t start_pts = -1;
59 // TODO: Replace by some sort of GUI control, I guess.
60 int64_t current_pts = 0;
64 unsigned filename_idx;
65 size_t frames_written_so_far = 0;
67 std::map<int, FrameFile> open_frame_files;
70 vector<FrameOnDisk> frames[MAX_STREAMS]; // Under frame_mu.
71 vector<string> frame_filenames; // Under frame_mu.
73 atomic<int64_t> metric_received_frames[MAX_STREAMS]{ { 0 } };
74 Summary metric_received_frame_size_bytes;
78 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, vector<uint32_t> audio, DB *db)
80 if (open_frame_files.count(stream_idx) == 0) {
82 snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09" PRId64 ".frames",
83 global_flags.working_directory.c_str(), stream_idx, pts);
84 FILE *fp = fopen(filename, "wb");
90 lock_guard<mutex> lock(frame_mu);
91 unsigned filename_idx = frame_filenames.size();
92 frame_filenames.push_back(filename);
93 open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
96 FrameFile &file = open_frame_files[stream_idx];
97 unsigned filename_idx = file.filename_idx;
100 lock_guard<mutex> lock(frame_mu);
101 filename = frame_filenames[filename_idx];
104 FrameHeaderProto hdr;
105 hdr.set_stream_idx(stream_idx);
107 hdr.set_file_size(size);
108 hdr.set_audio_size(audio.size() * sizeof(audio[0]));
111 if (!hdr.SerializeToString(&serialized)) {
112 fprintf(stderr, "Frame header serialization failed.\n");
115 uint32_t len = htonl(serialized.size());
117 if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
121 if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
125 if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
129 off_t offset = ftell(file.fp);
130 if (fwrite(data, size, 1, file.fp) != 1) {
134 if (audio.size() > 0) {
135 if (fwrite(audio.data(), hdr.audio_size(), 1, file.fp) != 1) {
140 fflush(file.fp); // No fsync(), though. We can accept losing a few frames.
141 global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
145 frame.filename_idx = filename_idx;
146 frame.offset = offset;
148 frame.audio_size = audio.size() * sizeof(audio[0]);
151 lock_guard<mutex> lock(frame_mu);
152 assert(stream_idx < MAX_STREAMS);
153 frames[stream_idx].push_back(frame);
156 if (++file.frames_written_so_far >= FRAMES_PER_FILE) {
157 size_t size = ftell(file.fp);
159 // Start a new file next time.
160 if (fclose(file.fp) != 0) {
164 open_frame_files.erase(stream_idx);
166 // Write information about all frames in the finished file to SQLite.
167 // (If we crash before getting to do this, we'll be scanning through
168 // the file on next startup, and adding it to the database then.)
169 // NOTE: Since we don't fsync(), we could in theory get broken data
170 // but with the right size, but it would seem unlikely.
171 vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
173 lock_guard<mutex> lock(frame_mu);
174 for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
175 for (const FrameOnDisk &frame : frames[stream_idx]) {
176 if (frame.filename_idx == filename_idx) {
177 frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
183 const char *basename = filename.c_str();
184 while (strchr(basename, '/') != nullptr) {
185 basename = strchr(basename, '/') + 1;
187 db->store_frame_file(basename, size, frames_this_file);
197 void load_existing_frames();
198 void record_thread_func();
200 int main(int argc, char **argv)
202 parse_flags(argc, argv);
203 if (optind == argc) {
204 global_flags.stream_source = "multicam.mp4";
205 global_flags.slow_down_input = true;
206 } else if (optind + 1 == argc) {
207 global_flags.stream_source = argv[optind];
213 string frame_dir = global_flags.working_directory + "/frames";
215 if (mkdir(frame_dir.c_str(), 0777) == 0) {
216 fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
217 } else if (errno != EEXIST) {
218 perror(global_flags.working_directory.c_str());
222 avformat_network_init();
223 global_metrics.set_prefix("futatabi");
224 global_httpd = new HTTPD;
225 global_metrics.remove("num_connected_multicam_clients");
227 QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
230 fmt.setDepthBufferSize(0);
231 fmt.setStencilBufferSize(0);
232 fmt.setProfile(QSurfaceFormat::CoreProfile);
233 fmt.setMajorVersion(4);
234 fmt.setMinorVersion(5);
236 // Turn off vsync, since Qt generally gives us at most frame rate
237 // (display frequency) / (number of QGLWidgets active).
238 fmt.setSwapInterval(0);
240 QSurfaceFormat::setDefaultFormat(fmt);
242 QGLFormat::setDefaultFormat(QGLFormat::fromSurfaceFormat(fmt));
244 QApplication app(argc, argv);
245 global_share_widget = new QGLWidget();
246 if (!global_share_widget->isValid()) {
247 fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
253 QSurface *surface = create_surface();
254 QOpenGLContext *context = create_context(surface);
255 if (!make_current(context, surface)) {
259 CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
260 delete_context(context);
261 // TODO: Delete the surface, too.
264 load_existing_frames();
266 for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
267 if (!frames[stream_idx].empty()) {
268 assert(start_pts > frames[stream_idx].back().pts);
272 MainWindow main_window;
275 global_httpd->add_endpoint("/queue_status", bind(&MainWindow::get_queue_status, &main_window), HTTPD::NO_CORS_POLICY);
276 global_httpd->start(global_flags.http_port);
280 thread record_thread(record_thread_func);
282 int ret = app.exec();
285 record_thread.join();
290 void load_frame_file(const char *filename, const string &basename, unsigned filename_idx, DB *db)
293 if (stat(filename, &st) == -1) {
298 vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
299 if (!all_frames.empty()) {
300 // We already had this cached in the database, so no need to look in the file.
301 for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
302 if (frame.stream_idx < MAX_STREAMS) {
303 frames[frame.stream_idx].push_back(frame.frame);
304 start_pts = max(start_pts, frame.frame.pts);
310 FILE *fp = fopen(filename, "rb");
316 // Find the actual length of the file, since fseek() past the end of the file
317 // will succeed without an error.
318 if (fseek(fp, 0, SEEK_END) == -1) {
319 perror("fseek(SEEK_END)");
322 off_t file_len = ftell(fp);
323 if (fseek(fp, 0, SEEK_SET) == -1) {
324 perror("fseek(SEEK_SET)");
328 size_t magic_offset = 0;
329 size_t skipped_bytes = 0;
330 while (!feof(fp) && !ferror(fp)) {
335 if (ch != frame_magic[magic_offset++]) {
336 skipped_bytes += magic_offset;
340 if (magic_offset < frame_magic_len) {
341 // Still reading the magic (hopefully).
345 // OK, found the magic. Try to parse the frame header.
348 if (skipped_bytes > 0) {
349 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
350 filename, skipped_bytes);
355 if (fread(&len, sizeof(len), 1, fp) != 1) {
356 fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
361 serialized.resize(ntohl(len));
362 if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
363 fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
367 FrameHeaderProto hdr;
368 if (!hdr.ParseFromString(serialized)) {
369 fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
374 frame.pts = hdr.pts();
375 frame.offset = ftell(fp);
376 if (frame.offset == -1) {
377 fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
380 frame.filename_idx = filename_idx;
381 frame.size = hdr.file_size();
382 frame.audio_size = hdr.audio_size();
384 if (frame.offset + frame.size + frame.audio_size > file_len ||
385 fseek(fp, frame.offset + frame.size + frame.audio_size, SEEK_SET) == -1) {
386 fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
390 if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
391 frames[hdr.stream_idx()].push_back(frame);
392 start_pts = max(start_pts, hdr.pts());
394 all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
397 if (skipped_bytes > 0) {
398 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
399 filename, skipped_bytes);
402 off_t size = ftell(fp);
406 fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
410 db->store_frame_file(basename, size, all_frames);
413 void load_existing_frames()
415 QProgressDialog progress("Scanning frame directory...", "Abort", 0, 1);
416 progress.setWindowTitle("Futatabi");
417 progress.setWindowModality(Qt::WindowModal);
418 progress.setMinimumDuration(1000);
419 progress.setMaximum(1);
420 progress.setValue(0);
422 string frame_dir = global_flags.working_directory + "/frames";
423 DIR *dir = opendir(frame_dir.c_str());
424 if (dir == nullptr) {
430 vector<string> frame_basenames;
433 dirent *de = readdir(dir);
442 if (de->d_type == DT_REG || de->d_type == DT_LNK) {
443 string filename = frame_dir + "/" + de->d_name;
444 frame_filenames.push_back(filename);
445 frame_basenames.push_back(de->d_name);
448 if (progress.wasCanceled()) {
454 progress.setMaximum(frame_filenames.size() + 2);
455 progress.setValue(1);
457 progress.setLabelText("Opening database...");
458 DB db(global_flags.working_directory + "/futatabi.db");
460 progress.setLabelText("Reading frame files...");
461 progress.setValue(2);
463 for (size_t i = 0; i < frame_filenames.size(); ++i) {
464 load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
465 progress.setValue(i + 3);
466 if (progress.wasCanceled()) {
471 if (start_pts == -1) {
474 // Add a gap of one second from the old frames to the new ones.
475 start_pts += TIMEBASE;
477 current_pts = start_pts;
479 for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
480 sort(frames[stream_idx].begin(), frames[stream_idx].end(),
481 [](const auto &a, const auto &b) { return a.pts < b.pts; });
484 db.clean_unused_frame_files(frame_basenames);
487 void record_thread_func()
489 for (unsigned i = 0; i < MAX_STREAMS; ++i) {
490 global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]);
492 global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes);
494 if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
495 // Save the user from some repetitive messages.
499 pthread_setname_np(pthread_self(), "ReceiveFrames");
501 int64_t pts_offset = 0; // Needs to be initialized due to a spurious GCC warning.
502 DB db(global_flags.working_directory + "/futatabi.db");
504 while (!should_quit.load()) {
505 auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
506 if (format_ctx == nullptr) {
507 fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
512 // Match any audio streams to video streams, sequentially.
513 vector<int> video_stream_idx, audio_stream_idx;
514 for (unsigned i = 0; i < format_ctx->nb_streams; ++i) {
515 if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
516 video_stream_idx.push_back(i);
517 } else if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
518 audio_stream_idx.push_back(i);
521 unordered_map<int, int> audio_stream_to_video_stream_idx;
522 for (size_t i = 0; i < min(video_stream_idx.size(), audio_stream_idx.size()); ++i) {
523 audio_stream_to_video_stream_idx[audio_stream_idx[i]] = video_stream_idx[i];
526 vector<uint32_t> pending_audio[MAX_STREAMS];
527 int64_t last_pts = -1;
528 while (!should_quit.load()) {
530 unique_ptr<AVPacket, decltype(av_packet_unref) *> pkt_cleanup(
531 &pkt, av_packet_unref);
532 av_init_packet(&pkt);
536 // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
537 // right now, should_quit will be ignored if it's hung on I/O.
538 if (av_read_frame(format_ctx.get(), &pkt) != 0) {
542 AVStream *stream = format_ctx->streams[pkt.stream_index];
543 if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO &&
544 audio_stream_to_video_stream_idx.count(pkt.stream_index)) {
545 if ((pkt.size % (sizeof(uint32_t) * 2)) != 0) {
546 fprintf(stderr, "Audio stream %u had a packet of strange length %d, ignoring.\n",
547 pkt.stream_index, pkt.size);
550 const uint32_t *begin = (const uint32_t *)pkt.data;
551 const uint32_t *end = (const uint32_t *)(pkt.data + pkt.size);
552 pending_audio[audio_stream_to_video_stream_idx[pkt.stream_index]].assign(begin, end);
556 if (pkt.stream_index >= MAX_STREAMS ||
557 stream->codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
561 ++metric_received_frames[pkt.stream_index];
562 metric_received_frame_size_bytes.count_event(pkt.size);
564 // Convert pts to our own timebase.
565 AVRational stream_timebase = stream->time_base;
566 int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
568 // Translate offset into our stream.
569 if (last_pts == -1) {
570 pts_offset = start_pts - pts;
572 pts = std::max(pts + pts_offset, start_pts);
574 //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
575 // pkt.stream_index, pts, pkt.size);
576 FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, move(pending_audio[pkt.stream_index]), &db);
578 post_to_main_thread([pkt, frame] {
579 global_mainwindow->display_frame(pkt.stream_index, frame);
582 if (last_pts != -1 && global_flags.slow_down_input) {
583 this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
589 if (!should_quit.load()) {
590 fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
594 start_pts = last_pts + TIMEBASE;