]> git.sesse.net Git - nageru/blobdiff - futatabi/main.cpp
Log a warning when we kill a client that is not keeping up.
[nageru] / futatabi / main.cpp
index 52cdb80664379e5014fb0f5ea673234e087f14b8..19cd5fb3e4162439e22e7cac2cd8208b3a84686e 100644 (file)
@@ -1,5 +1,5 @@
-#include <assert.h>
 #include <arpa/inet.h>
+#include <assert.h>
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
@@ -21,27 +21,27 @@ extern "C" {
 }
 
 #include "clip_list.h"
-#include "shared/context.h"
 #include "defs.h"
-#include "shared/disk_space_estimator.h"
-#include "shared/ffmpeg_raii.h"
 #include "flags.h"
-#include "frame_on_disk.h"
 #include "frame.pb.h"
-#include "shared/httpd.h"
+#include "frame_on_disk.h"
 #include "mainwindow.h"
 #include "player.h"
+#include "shared/context.h"
+#include "shared/disk_space_estimator.h"
+#include "shared/ffmpeg_raii.h"
+#include "shared/httpd.h"
+#include "shared/metrics.h"
 #include "shared/post_to_main_thread.h"
 #include "shared/ref_counted_gl_sync.h"
 #include "shared/timebase.h"
-#include "shared/metrics.h"
 #include "ui_mainwindow.h"
 #include "vaapi_jpeg_decoder.h"
 
 #include <QApplication>
 #include <QGLFormat>
-#include <QSurfaceFormat>
 #include <QProgressDialog>
+#include <QSurfaceFormat>
 #include <movit/init.h>
 #include <movit/util.h>
 
@@ -52,7 +52,7 @@ constexpr char frame_magic[] = "Ftbifrm0";
 constexpr size_t frame_magic_len = 8;
 
 mutex RefCountedGLsync::fence_lock;
-atomic<bool> should_quit{false};
+atomic<bool> should_quit{ false };
 
 int64_t start_pts = -1;
 
@@ -70,20 +70,21 @@ mutex frame_mu;
 vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
 vector<string> frame_filenames;  // Under frame_mu.
 
-atomic<int64_t> metric_received_frames[MAX_STREAMS]{{0}};
+atomic<int64_t> metric_received_frames[MAX_STREAMS]{ { 0 } };
+Summary metric_received_frame_size_bytes;
 
 namespace {
 
-FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
+FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, vector<uint32_t> audio, DB *db)
 {
        if (open_frame_files.count(stream_idx) == 0) {
                char filename[256];
-               snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
-                       global_flags.working_directory.c_str(), stream_idx, pts);
+               snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09" PRId64 ".frames",
+                        global_flags.working_directory.c_str(), stream_idx, pts);
                FILE *fp = fopen(filename, "wb");
                if (fp == nullptr) {
                        perror(filename);
-                       exit(1);
+                       abort();
                }
 
                lock_guard<mutex> lock(frame_mu);
@@ -104,30 +105,37 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
        hdr.set_stream_idx(stream_idx);
        hdr.set_pts(pts);
        hdr.set_file_size(size);
+       hdr.set_audio_size(audio.size() * sizeof(audio[0]));
 
        string serialized;
        if (!hdr.SerializeToString(&serialized)) {
                fprintf(stderr, "Frame header serialization failed.\n");
-               exit(1);
+               abort();
        }
        uint32_t len = htonl(serialized.size());
 
        if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
                perror("fwrite");
-               exit(1);
+               abort();
        }
        if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
                perror("fwrite");
-               exit(1);
+               abort();
        }
        if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
                perror("fwrite");
-               exit(1);
+               abort();
        }
        off_t offset = ftell(file.fp);
        if (fwrite(data, size, 1, file.fp) != 1) {
                perror("fwrite");
-               exit(1);
+               abort();
+       }
+       if (audio.size() > 0) {
+               if (fwrite(audio.data(), hdr.audio_size(), 1, file.fp) != 1) {
+                       perror("fwrite");
+                       exit(1);
+               }
        }
        fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
        global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
@@ -137,6 +145,7 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
        frame.filename_idx = filename_idx;
        frame.offset = offset;
        frame.size = size;
+       frame.audio_size = audio.size() * sizeof(audio[0]);
 
        {
                lock_guard<mutex> lock(frame_mu);
@@ -144,13 +153,13 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
                frames[stream_idx].push_back(frame);
        }
 
-       if (++file.frames_written_so_far >= 1000) {
+       if (++file.frames_written_so_far >= FRAMES_PER_FILE) {
                size_t size = ftell(file.fp);
 
                // Start a new file next time.
                if (fclose(file.fp) != 0) {
                        perror("fclose");
-                       exit(1);
+                       abort();
                }
                open_frame_files.erase(stream_idx);
 
@@ -181,7 +190,7 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
        return frame;
 }
 
-} // namespace
+}  // namespace
 
 HTTPD *global_httpd;
 
@@ -198,7 +207,7 @@ int main(int argc, char **argv)
                global_flags.stream_source = argv[optind];
        } else {
                usage();
-               exit(1);
+               abort();
        }
 
        string frame_dir = global_flags.working_directory + "/frames";
@@ -207,12 +216,13 @@ int main(int argc, char **argv)
                fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
        } else if (errno != EEXIST) {
                perror(global_flags.working_directory.c_str());
-               exit(1);
+               abort();
        }
 
        avformat_network_init();
        global_metrics.set_prefix("futatabi");
        global_httpd = new HTTPD;
+       global_metrics.remove("num_connected_multicam_clients");
 
        QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
 
@@ -235,7 +245,7 @@ int main(int argc, char **argv)
        global_share_widget = new QGLWidget();
        if (!global_share_widget->isValid()) {
                fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
-               exit(1);
+               abort();
        }
 
        // Initialize Movit.
@@ -244,7 +254,7 @@ int main(int argc, char **argv)
                QOpenGLContext *context = create_context(surface);
                if (!make_current(context, surface)) {
                        printf("oops\n");
-                       exit(1);
+                       abort();
                }
                CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
                delete_context(context);
@@ -253,6 +263,12 @@ int main(int argc, char **argv)
 
        load_existing_frames();
 
+       for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
+               if (!frames[stream_idx].empty()) {
+                       assert(start_pts > frames[stream_idx].back().pts);
+               }
+       }
+
        MainWindow main_window;
        main_window.show();
 
@@ -267,7 +283,6 @@ int main(int argc, char **argv)
 
        should_quit = true;
        record_thread.join();
-       JPEGFrameView::shutdown();
 
        return ret;
 }
@@ -277,7 +292,7 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
        struct stat st;
        if (stat(filename, &st) == -1) {
                perror(filename);
-               exit(1);
+               abort();
        }
 
        vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
@@ -295,7 +310,19 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
        FILE *fp = fopen(filename, "rb");
        if (fp == nullptr) {
                perror(filename);
-               exit(1);
+               abort();
+       }
+
+       // Find the actual length of the file, since fseek() past the end of the file
+       // will succeed without an error.
+       if (fseek(fp, 0, SEEK_END) == -1) {
+               perror("fseek(SEEK_END)");
+               abort();
+       }
+       off_t file_len = ftell(fp);
+       if (fseek(fp, 0, SEEK_SET) == -1) {
+               perror("fseek(SEEK_SET)");
+               abort();
        }
 
        size_t magic_offset = 0;
@@ -318,9 +345,9 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
                // OK, found the magic. Try to parse the frame header.
                magic_offset = 0;
 
-               if (skipped_bytes > 0)  {
+               if (skipped_bytes > 0) {
                        fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
-                               filename, skipped_bytes);
+                               filename, skipped_bytes);
                        skipped_bytes = 0;
                }
 
@@ -352,10 +379,12 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
                }
                frame.filename_idx = filename_idx;
                frame.size = hdr.file_size();
+               frame.audio_size = hdr.audio_size();
 
-               if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
+               if (frame.offset + frame.size + frame.audio_size > file_len ||
+                   fseek(fp, frame.offset + frame.size + frame.audio_size, SEEK_SET) == -1) {
                        fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
-                       continue;
+                       break;
                }
 
                if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
@@ -367,7 +396,7 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
 
        if (skipped_bytes > 0) {
                fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
-                       filename, skipped_bytes);
+                       filename, skipped_bytes);
        }
 
        off_t size = ftell(fp);
@@ -399,13 +428,13 @@ void load_existing_frames()
        }
 
        vector<string> frame_basenames;
-       for ( ;; ) {
+       for (;;) {
                errno = 0;
                dirent *de = readdir(dir);
                if (de == nullptr) {
                        if (errno != 0) {
                                perror("readdir");
-                               exit(1);
+                               abort();
                        }
                        break;
                }
@@ -417,7 +446,7 @@ void load_existing_frames()
                }
 
                if (progress.wasCanceled()) {
-                       exit(1);
+                       abort();
                }
        }
        closedir(dir);
@@ -435,7 +464,7 @@ void load_existing_frames()
                load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
                progress.setValue(i + 3);
                if (progress.wasCanceled()) {
-                       exit(1);
+                       abort();
                }
        }
 
@@ -449,7 +478,7 @@ void load_existing_frames()
 
        for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
                sort(frames[stream_idx].begin(), frames[stream_idx].end(),
-                       [](const auto &a, const auto &b) { return a.pts < b.pts; });
+                    [](const auto &a, const auto &b) { return a.pts < b.pts; });
        }
 
        db.clean_unused_frame_files(frame_basenames);
@@ -458,8 +487,9 @@ void load_existing_frames()
 void record_thread_func()
 {
        for (unsigned i = 0; i < MAX_STREAMS; ++i) {
-               global_metrics.add("received_frames", {{ "stream", to_string(i) }}, &metric_received_frames[i]);
+               global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]);
        }
+       global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes);
 
        if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
                // Save the user from some repetitive messages.
@@ -479,11 +509,25 @@ void record_thread_func()
                        continue;
                }
 
-               int64_t last_pts = -1;
+               // Match any audio streams to video streams, sequentially.
+               vector<int> video_stream_idx, audio_stream_idx;
+               for (unsigned i = 0; i < format_ctx->nb_streams; ++i) {
+                       if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
+                               video_stream_idx.push_back(i);
+                       } else if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
+                               audio_stream_idx.push_back(i);
+                       }
+               }
+               unordered_map<int, int> audio_stream_to_video_stream_idx;
+               for (size_t i = 0; i < min(video_stream_idx.size(), audio_stream_idx.size()); ++i) {
+                       audio_stream_to_video_stream_idx[audio_stream_idx[i]] = video_stream_idx[i];
+               }
 
+               vector<uint32_t> pending_audio[MAX_STREAMS];
+               int64_t last_pts = -1;
                while (!should_quit.load()) {
                        AVPacket pkt;
-                       unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
+                       unique_ptr<AVPacket, decltype(av_packet_unref) *> pkt_cleanup(
                                &pkt, av_packet_unref);
                        av_init_packet(&pkt);
                        pkt.data = nullptr;
@@ -494,14 +538,31 @@ void record_thread_func()
                        if (av_read_frame(format_ctx.get(), &pkt) != 0) {
                                break;
                        }
-                       if (pkt.stream_index >= MAX_STREAMS) {
+
+                       AVStream *stream = format_ctx->streams[pkt.stream_index];
+                       if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO &&
+                           audio_stream_to_video_stream_idx.count(pkt.stream_index)) {
+                               if ((pkt.size % (sizeof(uint32_t) * 2)) != 0) {
+                                       fprintf(stderr, "Audio stream %u had a packet of strange length %d, ignoring.\n",
+                                               pkt.stream_index, pkt.size);
+                               } else {
+                                       // TODO: Endianness?
+                                       const uint32_t *begin = (const uint32_t *)pkt.data;
+                                       const uint32_t *end = (const uint32_t *)(pkt.data + pkt.size);
+                                       pending_audio[audio_stream_to_video_stream_idx[pkt.stream_index]].assign(begin, end);
+                               }
+                       }
+
+                       if (pkt.stream_index >= MAX_STREAMS ||
+                           stream->codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
                                continue;
                        }
 
                        ++metric_received_frames[pkt.stream_index];
+                       metric_received_frame_size_bytes.count_event(pkt.size);
 
                        // Convert pts to our own timebase.
-                       AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
+                       AVRational stream_timebase = stream->time_base;
                        int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
 
                        // Translate offset into our stream.
@@ -512,7 +573,7 @@ void record_thread_func()
 
                        //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
                        //      pkt.stream_index, pts, pkt.size);
-                       FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
+                       FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, move(pending_audio[pkt.stream_index]), &db);
 
                        post_to_main_thread([pkt, frame] {
                                global_mainwindow->display_frame(pkt.stream_index, frame);
@@ -525,8 +586,10 @@ void record_thread_func()
                        current_pts = pts;
                }
 
-               fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
-               sleep(1);
+               if (!should_quit.load()) {
+                       fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
+                       sleep(1);
+               }
 
                start_pts = last_pts + TIMEBASE;
        }