]> git.sesse.net Git - nageru/blobdiff - futatabi/main.cpp
Get rid of an unneeded sleep and message on Futatabi exit.
[nageru] / futatabi / main.cpp
index 7c9f49db069c325fe05242dff0845b6e37b6e744..7c5f660ae5b97f928aed6b9c8daabe18a5744cc4 100644 (file)
@@ -1,5 +1,5 @@
-#include <assert.h>
 #include <arpa/inet.h>
+#include <assert.h>
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
@@ -13,6 +13,7 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <thread>
+#include <unistd.h>
 #include <vector>
 
 extern "C" {
@@ -20,16 +21,17 @@ extern "C" {
 }
 
 #include "clip_list.h"
-#include "shared/context.h"
 #include "defs.h"
-#include "shared/disk_space_estimator.h"
-#include "shared/ffmpeg_raii.h"
 #include "flags.h"
-#include "frame_on_disk.h"
 #include "frame.pb.h"
-#include "shared/httpd.h"
+#include "frame_on_disk.h"
 #include "mainwindow.h"
 #include "player.h"
+#include "shared/context.h"
+#include "shared/disk_space_estimator.h"
+#include "shared/ffmpeg_raii.h"
+#include "shared/httpd.h"
+#include "shared/metrics.h"
 #include "shared/post_to_main_thread.h"
 #include "shared/ref_counted_gl_sync.h"
 #include "shared/timebase.h"
@@ -38,8 +40,8 @@ extern "C" {
 
 #include <QApplication>
 #include <QGLFormat>
-#include <QSurfaceFormat>
 #include <QProgressDialog>
+#include <QSurfaceFormat>
 #include <movit/init.h>
 #include <movit/util.h>
 
@@ -50,7 +52,7 @@ constexpr char frame_magic[] = "Ftbifrm0";
 constexpr size_t frame_magic_len = 8;
 
 mutex RefCountedGLsync::fence_lock;
-atomic<bool> should_quit{false};
+atomic<bool> should_quit{ false };
 
 int64_t start_pts = -1;
 
@@ -68,6 +70,9 @@ mutex frame_mu;
 vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
 vector<string> frame_filenames;  // Under frame_mu.
 
+atomic<int64_t> metric_received_frames[MAX_STREAMS]{ { 0 } };
+Summary metric_received_frame_size_bytes;
+
 namespace {
 
 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
@@ -75,7 +80,7 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
        if (open_frame_files.count(stream_idx) == 0) {
                char filename[256];
                snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
-                       global_flags.working_directory.c_str(), stream_idx, pts);
+                        global_flags.working_directory.c_str(), stream_idx, pts);
                FILE *fp = fopen(filename, "wb");
                if (fp == nullptr) {
                        perror(filename);
@@ -140,7 +145,7 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
                frames[stream_idx].push_back(frame);
        }
 
-       if (++file.frames_written_so_far >= 1000) {
+       if (++file.frames_written_so_far >= FRAMES_PER_FILE) {
                size_t size = ftell(file.fp);
 
                // Start a new file next time.
@@ -177,12 +182,12 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
        return frame;
 }
 
-} // namespace
+}  // namespace
 
 HTTPD *global_httpd;
 
 void load_existing_frames();
-int record_thread_func();
+void record_thread_func();
 
 int main(int argc, char **argv)
 {
@@ -199,17 +204,17 @@ int main(int argc, char **argv)
 
        string frame_dir = global_flags.working_directory + "/frames";
 
-       struct stat st;
-       if (stat(frame_dir.c_str(), &st) == -1) {
+       if (mkdir(frame_dir.c_str(), 0777) == 0) {
                fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
-               if (mkdir(frame_dir.c_str(), 0777) == -1) {
-                       perror(global_flags.working_directory.c_str());
-                       exit(1);
-               }
+       } else if (errno != EEXIST) {
+               perror(global_flags.working_directory.c_str());
+               exit(1);
        }
 
        avformat_network_init();
+       global_metrics.set_prefix("futatabi");
        global_httpd = new HTTPD;
+       global_metrics.remove("num_connected_multicam_clients");
 
        QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
 
@@ -239,7 +244,10 @@ int main(int argc, char **argv)
        {
                QSurface *surface = create_surface();
                QOpenGLContext *context = create_context(surface);
-               make_current(context, surface);
+               if (!make_current(context, surface)) {
+                       printf("oops\n");
+                       exit(1);
+               }
                CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
                delete_context(context);
                // TODO: Delete the surface, too.
@@ -247,6 +255,12 @@ int main(int argc, char **argv)
 
        load_existing_frames();
 
+       for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
+               if (!frames[stream_idx].empty()) {
+                       assert(start_pts > frames[stream_idx].back().pts);
+               }
+       }
+
        MainWindow main_window;
        main_window.show();
 
@@ -278,7 +292,7 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
        if (!all_frames.empty()) {
                // We already had this cached in the database, so no need to look in the file.
                for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
-                       if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) {
+                       if (frame.stream_idx < MAX_STREAMS) {
                                frames[frame.stream_idx].push_back(frame.frame);
                                start_pts = max(start_pts, frame.frame.pts);
                        }
@@ -312,9 +326,9 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
                // OK, found the magic. Try to parse the frame header.
                magic_offset = 0;
 
-               if (skipped_bytes > 0)  {
+               if (skipped_bytes > 0) {
                        fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
-                               filename, skipped_bytes);
+                               filename, skipped_bytes);
                        skipped_bytes = 0;
                }
 
@@ -340,6 +354,10 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
                FrameOnDisk frame;
                frame.pts = hdr.pts();
                frame.offset = ftell(fp);
+               if (frame.offset == -1) {
+                       fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
+                       break;
+               }
                frame.filename_idx = filename_idx;
                frame.size = hdr.file_size();
 
@@ -357,12 +375,17 @@ void load_frame_file(const char *filename, const string &basename, unsigned file
 
        if (skipped_bytes > 0) {
                fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
-                       filename, skipped_bytes);
+                       filename, skipped_bytes);
        }
 
-       size_t size = ftell(fp);
+       off_t size = ftell(fp);
        fclose(fp);
 
+       if (size == -1) {
+               fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
+               return;
+       }
+
        db->store_frame_file(basename, size, all_frames);
 }
 
@@ -384,7 +407,7 @@ void load_existing_frames()
        }
 
        vector<string> frame_basenames;
-       for ( ;; ) {
+       for (;;) {
                errno = 0;
                dirent *de = readdir(dir);
                if (de == nullptr) {
@@ -430,73 +453,93 @@ void load_existing_frames()
                // Add a gap of one second from the old frames to the new ones.
                start_pts += TIMEBASE;
        }
+       current_pts = start_pts;
 
        for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
                sort(frames[stream_idx].begin(), frames[stream_idx].end(),
-                       [](const auto &a, const auto &b) { return a.pts < b.pts; });
+                    [](const auto &a, const auto &b) { return a.pts < b.pts; });
        }
 
        db.clean_unused_frame_files(frame_basenames);
 }
 
-int record_thread_func()
+void record_thread_func()
 {
-       auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
-       if (format_ctx == nullptr) {
-               fprintf(stderr, "%s: Error opening file\n", global_flags.stream_source.c_str());
-               return 1;
+       for (unsigned i = 0; i < MAX_STREAMS; ++i) {
+               global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]);
        }
+       global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes);
 
-       int64_t last_pts = -1;
-       int64_t pts_offset;
+       if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
+               // Save the user from some repetitive messages.
+               return;
+       }
+
+       pthread_setname_np(pthread_self(), "ReceiveFrames");
+
+       int64_t pts_offset = 0;  // Needs to be initialized due to a spurious GCC warning.
        DB db(global_flags.working_directory + "/futatabi.db");
 
        while (!should_quit.load()) {
-               AVPacket pkt;
-               unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
-                       &pkt, av_packet_unref);
-               av_init_packet(&pkt);
-               pkt.data = nullptr;
-               pkt.size = 0;
-
-               // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
-               // right now, should_quit will be ignored if it's hung on I/O.
-               if (av_read_frame(format_ctx.get(), &pkt) != 0) {
-                       break;
+               auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
+               if (format_ctx == nullptr) {
+                       fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
+                       sleep(1);
+                       continue;
                }
 
-               // Convert pts to our own timebase.
-               AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
-               int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
+               int64_t last_pts = -1;
 
-               // Translate offset into our stream.
-               if (last_pts == -1) {
-                       pts_offset = start_pts - pts;
-               }
-               pts = std::max(pts + pts_offset, start_pts);
-
-               //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
-               //      pkt.stream_index, pts, pkt.size);
-               FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
-
-               post_to_main_thread([pkt, frame] {
-                       if (pkt.stream_index == 0) {
-                               global_mainwindow->ui->input1_display->setFrame(pkt.stream_index, frame);
-                       } else if (pkt.stream_index == 1) {
-                               global_mainwindow->ui->input2_display->setFrame(pkt.stream_index, frame);
-                       } else if (pkt.stream_index == 2) {
-                               global_mainwindow->ui->input3_display->setFrame(pkt.stream_index, frame);
-                       } else if (pkt.stream_index == 3) {
-                               global_mainwindow->ui->input4_display->setFrame(pkt.stream_index, frame);
+               while (!should_quit.load()) {
+                       AVPacket pkt;
+                       unique_ptr<AVPacket, decltype(av_packet_unref) *> pkt_cleanup(
+                               &pkt, av_packet_unref);
+                       av_init_packet(&pkt);
+                       pkt.data = nullptr;
+                       pkt.size = 0;
+
+                       // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
+                       // right now, should_quit will be ignored if it's hung on I/O.
+                       if (av_read_frame(format_ctx.get(), &pkt) != 0) {
+                               break;
                        }
-               });
+                       if (pkt.stream_index >= MAX_STREAMS) {
+                               continue;
+                       }
+
+                       ++metric_received_frames[pkt.stream_index];
+                       metric_received_frame_size_bytes.count_event(pkt.size);
+
+                       // Convert pts to our own timebase.
+                       AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
+                       int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
+
+                       // Translate offset into our stream.
+                       if (last_pts == -1) {
+                               pts_offset = start_pts - pts;
+                       }
+                       pts = std::max(pts + pts_offset, start_pts);
+
+                       //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
+                       //      pkt.stream_index, pts, pkt.size);
+                       FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
 
-               if (last_pts != -1 && global_flags.slow_down_input) {
-                       this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
+                       post_to_main_thread([pkt, frame] {
+                               global_mainwindow->display_frame(pkt.stream_index, frame);
+                       });
+
+                       if (last_pts != -1 && global_flags.slow_down_input) {
+                               this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
+                       }
+                       last_pts = pts;
+                       current_pts = pts;
                }
-               last_pts = pts;
-               current_pts = pts;
-       }
 
-       return 0;
+               if (!should_quit.load()) {
+                       fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
+                       sleep(1);
+               }
+
+               start_pts = last_pts + TIMEBASE;
+       }
 }