X-Git-Url: https://git.sesse.net/?p=nageru;a=blobdiff_plain;f=futatabi%2Fmain.cpp;h=19cd5fb3e4162439e22e7cac2cd8208b3a84686e;hp=14bc12ac3a35edf87103449f67e3ab731cb92211;hb=2cb648106d32b9968f2026536fbead096308c7d1;hpb=b563b8903fa84bb7fd62d7d0b84b70cb26843dbf diff --git a/futatabi/main.cpp b/futatabi/main.cpp index 14bc12a..19cd5fb 100644 --- a/futatabi/main.cpp +++ b/futatabi/main.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include #include #include @@ -13,6 +13,7 @@ #include #include #include +#include #include extern "C" { @@ -20,26 +21,27 @@ extern "C" { } #include "clip_list.h" -#include "context.h" #include "defs.h" -#include "disk_space_estimator.h" -#include "shared/ffmpeg_raii.h" #include "flags.h" -#include "frame_on_disk.h" #include "frame.pb.h" -#include "httpd.h" +#include "frame_on_disk.h" #include "mainwindow.h" #include "player.h" +#include "shared/context.h" +#include "shared/disk_space_estimator.h" +#include "shared/ffmpeg_raii.h" +#include "shared/httpd.h" +#include "shared/metrics.h" #include "shared/post_to_main_thread.h" -#include "ref_counted_gl_sync.h" -#include "timebase.h" +#include "shared/ref_counted_gl_sync.h" +#include "shared/timebase.h" #include "ui_mainwindow.h" #include "vaapi_jpeg_decoder.h" #include #include -#include #include +#include #include #include @@ -50,7 +52,7 @@ constexpr char frame_magic[] = "Ftbifrm0"; constexpr size_t frame_magic_len = 8; mutex RefCountedGLsync::fence_lock; -atomic should_quit{false}; +atomic should_quit{ false }; int64_t start_pts = -1; @@ -68,18 +70,21 @@ mutex frame_mu; vector frames[MAX_STREAMS]; // Under frame_mu. vector frame_filenames; // Under frame_mu. +atomic metric_received_frames[MAX_STREAMS]{ { 0 } }; +Summary metric_received_frame_size_bytes; + namespace { -FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db) +FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, vector audio, DB *db) { if (open_frame_files.count(stream_idx) == 0) { char filename[256]; - snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames", - global_flags.working_directory.c_str(), stream_idx, pts); + snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09" PRId64 ".frames", + global_flags.working_directory.c_str(), stream_idx, pts); FILE *fp = fopen(filename, "wb"); if (fp == nullptr) { perror(filename); - exit(1); + abort(); } lock_guard lock(frame_mu); @@ -100,30 +105,37 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t hdr.set_stream_idx(stream_idx); hdr.set_pts(pts); hdr.set_file_size(size); + hdr.set_audio_size(audio.size() * sizeof(audio[0])); string serialized; if (!hdr.SerializeToString(&serialized)) { fprintf(stderr, "Frame header serialization failed.\n"); - exit(1); + abort(); } uint32_t len = htonl(serialized.size()); if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) { perror("fwrite"); - exit(1); + abort(); } if (fwrite(&len, sizeof(len), 1, file.fp) != 1) { perror("fwrite"); - exit(1); + abort(); } if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) { perror("fwrite"); - exit(1); + abort(); } off_t offset = ftell(file.fp); if (fwrite(data, size, 1, file.fp) != 1) { perror("fwrite"); - exit(1); + abort(); + } + if (audio.size() > 0) { + if (fwrite(audio.data(), hdr.audio_size(), 1, file.fp) != 1) { + perror("fwrite"); + exit(1); + } } fflush(file.fp); // No fsync(), though. We can accept losing a few frames. global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts); @@ -133,6 +145,7 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t frame.filename_idx = filename_idx; frame.offset = offset; frame.size = size; + frame.audio_size = audio.size() * sizeof(audio[0]); { lock_guard lock(frame_mu); @@ -140,13 +153,13 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t frames[stream_idx].push_back(frame); } - if (++file.frames_written_so_far >= 1000) { + if (++file.frames_written_so_far >= FRAMES_PER_FILE) { size_t size = ftell(file.fp); // Start a new file next time. if (fclose(file.fp) != 0) { perror("fclose"); - exit(1); + abort(); } open_frame_files.erase(stream_idx); @@ -169,7 +182,7 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t const char *basename = filename.c_str(); while (strchr(basename, '/') != nullptr) { - basename = strchr(basename, '/'); + basename = strchr(basename, '/') + 1; } db->store_frame_file(basename, size, frames_this_file); } @@ -177,12 +190,12 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t return frame; } -} // namespace +} // namespace HTTPD *global_httpd; void load_existing_frames(); -int record_thread_func(); +void record_thread_func(); int main(int argc, char **argv) { @@ -194,22 +207,22 @@ int main(int argc, char **argv) global_flags.stream_source = argv[optind]; } else { usage(); - exit(1); + abort(); } string frame_dir = global_flags.working_directory + "/frames"; - struct stat st; - if (stat(frame_dir.c_str(), &st) == -1) { + if (mkdir(frame_dir.c_str(), 0777) == 0) { fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str()); - if (mkdir(frame_dir.c_str(), 0777) == -1) { - perror(global_flags.working_directory.c_str()); - exit(1); - } + } else if (errno != EEXIST) { + perror(global_flags.working_directory.c_str()); + abort(); } avformat_network_init(); + global_metrics.set_prefix("futatabi"); global_httpd = new HTTPD; + global_metrics.remove("num_connected_multicam_clients"); QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true); @@ -232,14 +245,17 @@ int main(int argc, char **argv) global_share_widget = new QGLWidget(); if (!global_share_widget->isValid()) { fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n"); - exit(1); + abort(); } // Initialize Movit. { QSurface *surface = create_surface(); QOpenGLContext *context = create_context(surface); - make_current(context, surface); + if (!make_current(context, surface)) { + printf("oops\n"); + abort(); + } CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF)); delete_context(context); // TODO: Delete the surface, too. @@ -247,6 +263,12 @@ int main(int argc, char **argv) load_existing_frames(); + for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) { + if (!frames[stream_idx].empty()) { + assert(start_pts > frames[stream_idx].back().pts); + } + } + MainWindow main_window; main_window.show(); @@ -261,7 +283,6 @@ int main(int argc, char **argv) should_quit = true; record_thread.join(); - JPEGFrameView::shutdown(); return ret; } @@ -271,14 +292,14 @@ void load_frame_file(const char *filename, const string &basename, unsigned file struct stat st; if (stat(filename, &st) == -1) { perror(filename); - exit(1); + abort(); } vector all_frames = db->load_frame_file(basename, st.st_size, filename_idx); if (!all_frames.empty()) { // We already had this cached in the database, so no need to look in the file. for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) { - if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) { + if (frame.stream_idx < MAX_STREAMS) { frames[frame.stream_idx].push_back(frame.frame); start_pts = max(start_pts, frame.frame.pts); } @@ -289,7 +310,19 @@ void load_frame_file(const char *filename, const string &basename, unsigned file FILE *fp = fopen(filename, "rb"); if (fp == nullptr) { perror(filename); - exit(1); + abort(); + } + + // Find the actual length of the file, since fseek() past the end of the file + // will succeed without an error. + if (fseek(fp, 0, SEEK_END) == -1) { + perror("fseek(SEEK_END)"); + abort(); + } + off_t file_len = ftell(fp); + if (fseek(fp, 0, SEEK_SET) == -1) { + perror("fseek(SEEK_SET)"); + abort(); } size_t magic_offset = 0; @@ -312,9 +345,9 @@ void load_frame_file(const char *filename, const string &basename, unsigned file // OK, found the magic. Try to parse the frame header. magic_offset = 0; - if (skipped_bytes > 0) { + if (skipped_bytes > 0) { fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n", - filename, skipped_bytes); + filename, skipped_bytes); skipped_bytes = 0; } @@ -340,12 +373,18 @@ void load_frame_file(const char *filename, const string &basename, unsigned file FrameOnDisk frame; frame.pts = hdr.pts(); frame.offset = ftell(fp); + if (frame.offset == -1) { + fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno)); + break; + } frame.filename_idx = filename_idx; frame.size = hdr.file_size(); + frame.audio_size = hdr.audio_size(); - if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) { + if (frame.offset + frame.size + frame.audio_size > file_len || + fseek(fp, frame.offset + frame.size + frame.audio_size, SEEK_SET) == -1) { fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename); - continue; + break; } if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) { @@ -357,12 +396,17 @@ void load_frame_file(const char *filename, const string &basename, unsigned file if (skipped_bytes > 0) { fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n", - filename, skipped_bytes); + filename, skipped_bytes); } - size_t size = ftell(fp); + off_t size = ftell(fp); fclose(fp); + if (size == -1) { + fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno)); + return; + } + db->store_frame_file(basename, size, all_frames); } @@ -384,13 +428,13 @@ void load_existing_frames() } vector frame_basenames; - for ( ;; ) { + for (;;) { errno = 0; dirent *de = readdir(dir); if (de == nullptr) { if (errno != 0) { perror("readdir"); - exit(1); + abort(); } break; } @@ -402,7 +446,7 @@ void load_existing_frames() } if (progress.wasCanceled()) { - exit(1); + abort(); } } closedir(dir); @@ -420,7 +464,7 @@ void load_existing_frames() load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db); progress.setValue(i + 3); if (progress.wasCanceled()) { - exit(1); + abort(); } } @@ -430,73 +474,123 @@ void load_existing_frames() // Add a gap of one second from the old frames to the new ones. start_pts += TIMEBASE; } + current_pts = start_pts; for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) { sort(frames[stream_idx].begin(), frames[stream_idx].end(), - [](const auto &a, const auto &b) { return a.pts < b.pts; }); + [](const auto &a, const auto &b) { return a.pts < b.pts; }); } db.clean_unused_frame_files(frame_basenames); } -int record_thread_func() +void record_thread_func() { - auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr); - if (format_ctx == nullptr) { - fprintf(stderr, "%s: Error opening file\n", global_flags.stream_source.c_str()); - return 1; + for (unsigned i = 0; i < MAX_STREAMS; ++i) { + global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]); } + global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes); + + if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") { + // Save the user from some repetitive messages. + return; + } + + pthread_setname_np(pthread_self(), "ReceiveFrames"); - int64_t last_pts = -1; - int64_t pts_offset; + int64_t pts_offset = 0; // Needs to be initialized due to a spurious GCC warning. DB db(global_flags.working_directory + "/futatabi.db"); while (!should_quit.load()) { - AVPacket pkt; - unique_ptr pkt_cleanup( - &pkt, av_packet_unref); - av_init_packet(&pkt); - pkt.data = nullptr; - pkt.size = 0; - - // TODO: Make it possible to abort av_read_frame() (use an interrupt callback); - // right now, should_quit will be ignored if it's hung on I/O. - if (av_read_frame(format_ctx.get(), &pkt) != 0) { - break; + auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr); + if (format_ctx == nullptr) { + fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str()); + sleep(1); + continue; } - // Convert pts to our own timebase. - AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base; - int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE }); - - // Translate offset into our stream. - if (last_pts == -1) { - pts_offset = start_pts - pts; + // Match any audio streams to video streams, sequentially. + vector video_stream_idx, audio_stream_idx; + for (unsigned i = 0; i < format_ctx->nb_streams; ++i) { + if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { + video_stream_idx.push_back(i); + } else if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { + audio_stream_idx.push_back(i); + } } - pts = std::max(pts + pts_offset, start_pts); - - //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n", - // pkt.stream_index, pts, pkt.size); - FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db); - - post_to_main_thread([pkt, frame] { - if (pkt.stream_index == 0) { - global_mainwindow->ui->input1_display->setFrame(pkt.stream_index, frame); - } else if (pkt.stream_index == 1) { - global_mainwindow->ui->input2_display->setFrame(pkt.stream_index, frame); - } else if (pkt.stream_index == 2) { - global_mainwindow->ui->input3_display->setFrame(pkt.stream_index, frame); - } else if (pkt.stream_index == 3) { - global_mainwindow->ui->input4_display->setFrame(pkt.stream_index, frame); + unordered_map audio_stream_to_video_stream_idx; + for (size_t i = 0; i < min(video_stream_idx.size(), audio_stream_idx.size()); ++i) { + audio_stream_to_video_stream_idx[audio_stream_idx[i]] = video_stream_idx[i]; + } + + vector pending_audio[MAX_STREAMS]; + int64_t last_pts = -1; + while (!should_quit.load()) { + AVPacket pkt; + unique_ptr pkt_cleanup( + &pkt, av_packet_unref); + av_init_packet(&pkt); + pkt.data = nullptr; + pkt.size = 0; + + // TODO: Make it possible to abort av_read_frame() (use an interrupt callback); + // right now, should_quit will be ignored if it's hung on I/O. + if (av_read_frame(format_ctx.get(), &pkt) != 0) { + break; + } + + AVStream *stream = format_ctx->streams[pkt.stream_index]; + if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && + audio_stream_to_video_stream_idx.count(pkt.stream_index)) { + if ((pkt.size % (sizeof(uint32_t) * 2)) != 0) { + fprintf(stderr, "Audio stream %u had a packet of strange length %d, ignoring.\n", + pkt.stream_index, pkt.size); + } else { + // TODO: Endianness? + const uint32_t *begin = (const uint32_t *)pkt.data; + const uint32_t *end = (const uint32_t *)(pkt.data + pkt.size); + pending_audio[audio_stream_to_video_stream_idx[pkt.stream_index]].assign(begin, end); + } + } + + if (pkt.stream_index >= MAX_STREAMS || + stream->codecpar->codec_type != AVMEDIA_TYPE_VIDEO) { + continue; + } + + ++metric_received_frames[pkt.stream_index]; + metric_received_frame_size_bytes.count_event(pkt.size); + + // Convert pts to our own timebase. + AVRational stream_timebase = stream->time_base; + int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE }); + + // Translate offset into our stream. + if (last_pts == -1) { + pts_offset = start_pts - pts; } - }); + pts = std::max(pts + pts_offset, start_pts); - if (last_pts != -1 && global_flags.slow_down_input) { - this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE)); + //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n", + // pkt.stream_index, pts, pkt.size); + FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, move(pending_audio[pkt.stream_index]), &db); + + post_to_main_thread([pkt, frame] { + global_mainwindow->display_frame(pkt.stream_index, frame); + }); + + if (last_pts != -1 && global_flags.slow_down_input) { + this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE)); + } + last_pts = pts; + current_pts = pts; } - last_pts = pts; - current_pts = pts; - } - return 0; + if (!should_quit.load()) { + fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str()); + sleep(1); + } + + start_pts = last_pts + TIMEBASE; + } }