]> git.sesse.net Git - nageru/blobdiff - main.cpp
Change from file-per-frame to multiple files per frame.
[nageru] / main.cpp
index 1d29735f92a2180f8734e4793571a15befbe50c8..e04af2251240379c4288d4e72f7ab2ea670d42a0 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -1,4 +1,5 @@
 #include <assert.h>
+#include <arpa/inet.h>
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
@@ -24,6 +25,8 @@ extern "C" {
 #include "disk_space_estimator.h"
 #include "ffmpeg_raii.h"
 #include "flags.h"
+#include "frame_on_disk.h"
+#include "frame.pb.h"
 #include "httpd.h"
 #include "mainwindow.h"
 #include "player.h"
@@ -34,12 +37,17 @@ extern "C" {
 #include "vaapi_jpeg_decoder.h"
 
 #include <QApplication>
+#include <QGLFormat>
+#include <QSurfaceFormat>
 #include <movit/init.h>
 #include <movit/util.h>
 
 using namespace std;
 using namespace std::chrono;
 
+constexpr char frame_magic[] = "Ftbifrm0";
+constexpr size_t frame_magic_len = 8;
+
 mutex RefCountedGLsync::fence_lock;
 atomic<bool> should_quit{false};
 
@@ -48,16 +56,104 @@ int64_t start_pts = -1;
 // TODO: Replace by some sort of GUI control, I guess.
 int64_t current_pts = 0;
 
-string filename_for_frame(unsigned stream_idx, int64_t pts)
+struct FrameFile {
+       FILE *fp = nullptr;
+       int filename_idx;
+       size_t frames_written_so_far = 0;
+};
+std::map<int, FrameFile> open_frame_files;
+
+mutex frame_mu;
+vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
+vector<string> frame_filenames;  // Under frame_mu.
+
+namespace {
+
+FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size)
 {
-       char filename[256];
-       snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.jpeg",
-               global_flags.working_directory.c_str(), stream_idx, pts);
-       return filename;
+       if (open_frame_files.count(stream_idx) == 0) {
+               char filename[256];
+               snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
+                       global_flags.working_directory.c_str(), stream_idx, pts);
+               FILE *fp = fopen(filename, "wb");
+               if (fp == nullptr) {
+                       perror(filename);
+                       exit(1);
+               }
+
+               lock_guard<mutex> lock(frame_mu);
+               int filename_idx = frame_filenames.size();
+               frame_filenames.push_back(filename);
+               open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
+       }
+
+       FrameFile &file = open_frame_files[stream_idx];
+       string filename;
+       {
+               lock_guard<mutex> lock(frame_mu);
+               filename = frame_filenames[file.filename_idx];
+       }
+
+       FrameHeaderProto hdr;
+       hdr.set_stream_idx(stream_idx);
+       hdr.set_pts(pts);
+       hdr.set_file_size(size);
+
+       string serialized;
+       if (!hdr.SerializeToString(&serialized)) {
+               fprintf(stderr, "Frame header serialization failed.\n");
+               exit(1);
+       }
+       uint32_t len = htonl(serialized.size());
+
+       if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
+               perror("fwrite");
+               exit(1);
+       }
+       if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
+               perror("fwrite");
+               exit(1);
+       }
+       if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
+               perror("fwrite");
+               exit(1);
+       }
+       off_t offset = ftell(file.fp);
+       if (fwrite(data, size, 1, file.fp) != 1) {
+               perror("fwrite");
+               exit(1);
+       }
+       fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
+       global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
+
+       if (++file.frames_written_so_far >= 1000) {
+               // Start a new file next time.
+               if (fclose(file.fp) != 0) {
+                       perror("fclose");
+                       exit(1);
+               }
+               open_frame_files.erase(stream_idx);
+
+               // TODO: Write to SQLite.
+       }
+
+       FrameOnDisk frame;
+       frame.pts = pts;
+       frame.filename_idx = file.filename_idx;
+       frame.offset = offset;
+       frame.size = size;
+
+       {
+               lock_guard<mutex> lock(frame_mu);
+               assert(stream_idx < MAX_STREAMS);
+               frames[stream_idx].push_back(frame);
+       }
+
+       return frame;
 }
 
-mutex frame_mu;
-vector<int64_t> frames[MAX_STREAMS];
+} // namespace
+
 HTTPD *global_httpd;
 
 void load_existing_frames();
@@ -144,6 +240,84 @@ int main(int argc, char **argv)
        return ret;
 }
 
+void load_frame_file(const char *filename, unsigned filename_idx)
+{
+       // TODO: Look up in the SQLite database.
+
+       FILE *fp = fopen(filename, "rb");
+       if (fp == nullptr) {
+               perror(filename);
+               exit(1);
+       }
+
+       size_t magic_offset = 0;
+       size_t skipped_bytes = 0;
+       while (!feof(fp) && !ferror(fp)) {
+               int ch = getc(fp);
+               if (ch == -1) {
+                       break;
+               }
+               if (ch != frame_magic[magic_offset++]) {
+                       skipped_bytes += magic_offset;
+                       magic_offset = 0;
+                       continue;
+               }
+               if (magic_offset < frame_magic_len) {
+                       // Still reading the magic (hopefully).
+                       continue;
+               }
+
+               // OK, found the magic. Try to parse the frame header.
+               magic_offset = 0;
+
+               if (skipped_bytes > 0)  {
+                       fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
+                               filename, skipped_bytes);
+                       skipped_bytes = 0;
+               }
+
+               uint32_t len;
+               if (fread(&len, sizeof(len), 1, fp) != 1) {
+                       fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
+                       break;
+               }
+
+               string serialized;
+               serialized.resize(ntohl(len));
+               if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
+                       fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
+                       break;
+               }
+
+               FrameHeaderProto hdr;
+               if (!hdr.ParseFromString(serialized)) {
+                       fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
+                       continue;
+               }
+
+               FrameOnDisk frame;
+               frame.pts = hdr.pts();
+               frame.offset = ftell(fp);
+               frame.filename_idx = filename_idx;
+               frame.size = hdr.file_size();
+
+               if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
+                       fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
+                       continue;
+               }
+
+               if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
+                       frames[hdr.stream_idx()].push_back(frame);
+                       start_pts = max(start_pts, hdr.pts());
+               }
+       }
+
+       if (skipped_bytes > 0) {
+               fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
+                       filename, skipped_bytes);
+       }
+}
+
 void load_existing_frames()
 {
        string frame_dir = global_flags.working_directory + "/frames";
@@ -165,12 +339,10 @@ void load_existing_frames()
                        break;
                }
 
-               int stream_idx;
-               int64_t pts;
-               if (sscanf(de->d_name, "cam%d-pts%ld.jpeg", &stream_idx, &pts) == 2 &&
-                   stream_idx >= 0 && stream_idx < MAX_STREAMS) {
-                       frames[stream_idx].push_back(pts);
-                       start_pts = max(start_pts, pts);
+               if (de->d_type == DT_REG) {
+                       string filename = frame_dir + "/" + de->d_name;
+                       load_frame_file(filename.c_str(), frame_filenames.size());
+                       frame_filenames.push_back(filename);
                }
        }
 
@@ -184,7 +356,8 @@ void load_existing_frames()
        }
 
        for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
-               sort(frames[stream_idx].begin(), frames[stream_idx].end());
+               sort(frames[stream_idx].begin(), frames[stream_idx].end(),
+                       [](const auto &a, const auto &b) { return a.pts < b.pts; });
        }
 }
 
@@ -225,32 +398,20 @@ int record_thread_func()
 
                //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
                //      pkt.stream_index, pts, pkt.size);
-               string filename = filename_for_frame(pkt.stream_index, pts);
-               FILE *fp = fopen(filename.c_str(), "wb");
-               if (fp == nullptr) {
-                       perror(filename.c_str());
-                       exit(1);
-               }
-               fwrite(pkt.data, pkt.size, 1, fp);
-               fclose(fp);
-
-               global_disk_space_estimator->report_write(filename, pts);
+               FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size);
 
-               post_to_main_thread([pkt, pts] {
+               post_to_main_thread([pkt, frame] {
                        if (pkt.stream_index == 0) {
-                               global_mainwindow->ui->input1_display->setFrame(pkt.stream_index, pts);
+                               global_mainwindow->ui->input1_display->setFrame(pkt.stream_index, frame);
                        } else if (pkt.stream_index == 1) {
-                               global_mainwindow->ui->input2_display->setFrame(pkt.stream_index, pts);
+                               global_mainwindow->ui->input2_display->setFrame(pkt.stream_index, frame);
                        } else if (pkt.stream_index == 2) {
-                               global_mainwindow->ui->input3_display->setFrame(pkt.stream_index, pts);
+                               global_mainwindow->ui->input3_display->setFrame(pkt.stream_index, frame);
                        } else if (pkt.stream_index == 3) {
-                               global_mainwindow->ui->input4_display->setFrame(pkt.stream_index, pts);
+                               global_mainwindow->ui->input4_display->setFrame(pkt.stream_index, frame);
                        }
                });
 
-               assert(pkt.stream_index < MAX_STREAMS);
-               frames[pkt.stream_index].push_back(pts);
-
                if (last_pts != -1 && global_flags.slow_down_input) {
                        this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
                }
@@ -260,3 +421,33 @@ int record_thread_func()
 
        return 0;
 }
+
+string read_frame(FrameOnDisk frame)
+{
+       string filename;
+       {
+               lock_guard<mutex> lock(frame_mu);
+               filename = frame_filenames[frame.filename_idx];
+       }
+
+       // TODO: cache the open file handles
+       FILE *fp = fopen(filename.c_str(), "rb");
+       if (fp == nullptr) {
+               perror(filename.c_str());
+               exit(1);
+       }
+       if (fseek(fp, frame.offset, SEEK_SET) == -1) {
+               perror("fseek");
+               exit(1);
+       }
+
+       string str;
+       str.resize(frame.size);
+       if (fread(&str[0], frame.size, 1, fp) != 1) {
+               perror("fread");
+               exit(1);
+       }
+
+       fclose(fp);
+       return str;
+}