]> git.sesse.net Git - nageru/commitdiff
Use SQLite to store metadata about finished frame files.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 25 Nov 2018 22:34:38 +0000 (23:34 +0100)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 25 Nov 2018 22:36:26 +0000 (23:36 +0100)
db.cpp
db.h
main.cpp

diff --git a/db.cpp b/db.cpp
index b0f529b4331d2b4d0019223d43b7b6b3edb40cce..4f7856b5be4958b058a9945faafedf188faca4e2 100644 (file)
--- a/db.cpp
+++ b/db.cpp
@@ -4,7 +4,7 @@
 
 using namespace std;
 
-DB::DB(const std::string &filename)
+DB::DB(const string &filename)
 {
        int ret = sqlite3_open(filename.c_str(), &db);
        if (ret != SQLITE_OK) {
@@ -16,6 +16,26 @@ DB::DB(const std::string &filename)
                CREATE TABLE IF NOT EXISTS state (state BLOB);
        )", nullptr, nullptr, nullptr);  // Ignore errors.
 
+       sqlite3_exec(db, R"(
+               CREATE TABLE IF NOT EXISTS file (
+                       file INTEGER NOT NULL PRIMARY KEY,
+                       filename VARCHAR NOT NULL UNIQUE,
+                       size BIGINT NOT NULL
+               );
+       )", nullptr, nullptr, nullptr);  // Ignore errors.
+
+       sqlite3_exec(db, R"(
+               CREATE TABLE IF NOT EXISTS frame (
+                       file INTEGER NOT NULL REFERENCES file ON DELETE CASCADE,
+                       stream_idx INTEGER NOT NULL,
+                       pts BIGINT NOT NULL,
+                       offset BIGINT NOT NULL,
+                       size INTEGER NOT NULL
+               );
+       )", nullptr, nullptr, nullptr);  // Ignore errors.
+
+       sqlite3_exec(db, "CREATE INDEX frame_file ON FRAME ( file );", nullptr, nullptr, nullptr);  // Ignore errors.
+
        sqlite3_exec(db, "PRAGMA journal_mode=WAL", nullptr, nullptr, nullptr);  // Ignore errors.
        sqlite3_exec(db, "PRAGMA synchronous=NORMAL", nullptr, nullptr, nullptr);  // Ignore errors.
 }
@@ -96,3 +116,138 @@ void DB::store_state(const StateProto &state)
                exit(1);
        }
 }
+
+vector<DB::FrameOnDiskAndStreamIdx> DB::load_frame_file(const string &filename, size_t size, unsigned filename_idx)
+{
+       sqlite3_stmt *stmt;
+       int ret = sqlite3_prepare_v2(db, "SELECT pts, offset, frame.size, stream_idx FROM file JOIN frame USING (file) WHERE filename=? AND file.size=?", -1, &stmt, 0);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
+       sqlite3_bind_int64(stmt, 2, size);
+
+       vector<FrameOnDiskAndStreamIdx> frames;
+       do {
+               ret = sqlite3_step(stmt);
+               if (ret == SQLITE_ROW) {
+                       FrameOnDiskAndStreamIdx frame;
+                       frame.frame.filename_idx = filename_idx;
+                       frame.frame.pts = sqlite3_column_int64(stmt, 0);
+                       frame.frame.offset = sqlite3_column_int64(stmt, 1);
+                       frame.frame.size = sqlite3_column_int(stmt, 2);
+                       frame.stream_idx = sqlite3_column_int(stmt, 3);
+                       frames.push_back(frame);
+               } else if (ret != SQLITE_DONE) {
+                       fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
+                       exit(1);
+               }
+       } while (ret != SQLITE_DONE);
+
+       ret = sqlite3_finalize(stmt);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       return frames;
+}
+
+void DB::store_frame_file(const string &filename, size_t size, const vector<FrameOnDiskAndStreamIdx> &frames)
+{
+       int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       // Delete any existing instances with this filename. This also includes
+       // deleting any associated frames, due to the ON CASCADE DELETE constraint.
+       sqlite3_stmt *stmt;
+       ret = sqlite3_prepare_v2(db, "DELETE FROM file WHERE filename=?", -1, &stmt, 0);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "DELETE prepare: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
+
+       ret = sqlite3_step(stmt);
+       if (ret == SQLITE_ROW) {
+               fprintf(stderr, "DELETE step: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       ret = sqlite3_finalize(stmt);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "DELETE finalize: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       // Insert the new row.
+       ret = sqlite3_prepare_v2(db, "INSERT INTO file (filename, size) VALUES (?, ?)", -1, &stmt, 0);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
+       sqlite3_bind_int64(stmt, 2, size);
+
+       ret = sqlite3_step(stmt);
+       if (ret == SQLITE_ROW) {
+               fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       ret = sqlite3_finalize(stmt);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       // Insert the actual frames.
+       int64_t rowid = sqlite3_last_insert_rowid(db);
+
+       ret = sqlite3_prepare_v2(db, "INSERT INTO frame (file, stream_idx, pts, offset, size) VALUES (?, ?, ?, ?, ?)", -1, &stmt, 0);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       sqlite3_bind_int64(stmt, 1, rowid);
+
+       for (const FrameOnDiskAndStreamIdx &frame : frames) {
+               sqlite3_bind_int64(stmt, 2, frame.stream_idx);
+               sqlite3_bind_int64(stmt, 3, frame.frame.pts);
+               sqlite3_bind_int64(stmt, 4, frame.frame.offset);
+               sqlite3_bind_int(stmt, 5, frame.frame.size);
+
+               ret = sqlite3_step(stmt);
+               if (ret == SQLITE_ROW) {
+                       fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
+                       exit(1);
+               }
+
+               ret = sqlite3_reset(stmt);
+               if (ret != SQLITE_OK) {
+                       fprintf(stderr, "INSERT reset: %s\n", sqlite3_errmsg(db));
+                       exit(1);
+               }
+       }
+
+       ret = sqlite3_finalize(stmt);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+
+       // Commit.
+       ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
+               exit(1);
+       }
+}
diff --git a/db.h b/db.h
index 40a6602d937f1945995ad5d34c2b020f9de92b3e..88196c7526407e1df24e80d5f6b4c4bd4122d74a 100644 (file)
--- a/db.h
+++ b/db.h
@@ -5,6 +5,9 @@
 
 #include <sqlite3.h>
 #include <string>
+#include <vector>
+
+#include "frame_on_disk.h"
 
 class DB {
 public:
@@ -14,6 +17,13 @@ public:
        StateProto get_state();
        void store_state(const StateProto &state);
 
+       struct FrameOnDiskAndStreamIdx {
+               FrameOnDisk frame;
+               unsigned stream_idx;
+       };
+       std::vector<FrameOnDiskAndStreamIdx> load_frame_file(const std::string &filename, size_t size, unsigned frame_idx);  // Empty = none found, or there were no frames.
+       void store_frame_file(const std::string &filename, size_t size, const std::vector<FrameOnDiskAndStreamIdx> &frames);
+
 private:
        StateProto state;
        sqlite3 *db;
index 681724a4ec4540d26b771bbf8ae4ee1be0f82b1f..8f9530459149cc77dbf3389ca823b0267d98b948 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -58,7 +58,7 @@ int64_t current_pts = 0;
 
 struct FrameFile {
        FILE *fp = nullptr;
-       int filename_idx;
+       unsigned filename_idx;
        size_t frames_written_so_far = 0;
 };
 std::map<int, FrameFile> open_frame_files;
@@ -69,7 +69,7 @@ vector<string> frame_filenames;  // Under frame_mu.
 
 namespace {
 
-FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size)
+FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
 {
        if (open_frame_files.count(stream_idx) == 0) {
                char filename[256];
@@ -82,16 +82,17 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
                }
 
                lock_guard<mutex> lock(frame_mu);
-               int filename_idx = frame_filenames.size();
+               unsigned filename_idx = frame_filenames.size();
                frame_filenames.push_back(filename);
                open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
        }
 
        FrameFile &file = open_frame_files[stream_idx];
+       unsigned filename_idx = file.filename_idx;
        string filename;
        {
                lock_guard<mutex> lock(frame_mu);
-               filename = frame_filenames[file.filename_idx];
+               filename = frame_filenames[filename_idx];
        }
 
        FrameHeaderProto hdr;
@@ -126,20 +127,9 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
        fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
        global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
 
-       if (++file.frames_written_so_far >= 1000) {
-               // Start a new file next time.
-               if (fclose(file.fp) != 0) {
-                       perror("fclose");
-                       exit(1);
-               }
-               open_frame_files.erase(stream_idx);
-
-               // TODO: Write to SQLite.
-       }
-
        FrameOnDisk frame;
        frame.pts = pts;
-       frame.filename_idx = file.filename_idx;
+       frame.filename_idx = filename_idx;
        frame.offset = offset;
        frame.size = size;
 
@@ -149,6 +139,35 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t
                frames[stream_idx].push_back(frame);
        }
 
+       if (++file.frames_written_so_far >= 1000) {
+               size_t size = ftell(file.fp);
+
+               // Start a new file next time.
+               if (fclose(file.fp) != 0) {
+                       perror("fclose");
+                       exit(1);
+               }
+               open_frame_files.erase(stream_idx);
+
+               // Write information about all frames in the finished file to SQLite.
+               // (If we crash before getting to do this, we'll be scanning through
+               // the file on next startup, and adding it to the database then.)
+               // NOTE: Since we don't fsync(), we could in theory get broken data
+               // but with the right size, but it would seem unlikely.
+               vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
+               {
+                       lock_guard<mutex> lock(frame_mu);
+                       for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
+                               for (const FrameOnDisk &frame : frames[stream_idx]) {
+                                       if (frame.filename_idx == filename_idx) {
+                                               frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
+                                       }
+                               }
+                       }
+               }
+               db->store_frame_file(filename, size, frames_this_file);
+       }
+
        return frame;
 }
 
@@ -240,9 +259,25 @@ int main(int argc, char **argv)
        return ret;
 }
 
-void load_frame_file(const char *filename, unsigned filename_idx)
+void load_frame_file(const char *filename, unsigned filename_idx, DB *db)
 {
-       // TODO: Look up in the SQLite database.
+       struct stat st;
+       if (stat(filename, &st) == -1) {
+               perror(filename);
+               exit(1);
+       }
+
+       vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(filename, st.st_size, filename_idx);
+       if (!all_frames.empty()) {
+               // We already had this cached in the database, so no need to look in the file.
+               for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
+                       if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) {
+                               frames[frame.stream_idx].push_back(frame.frame);
+                               start_pts = max(start_pts, frame.frame.pts);
+                       }
+               }
+               return;
+       }
 
        FILE *fp = fopen(filename, "rb");
        if (fp == nullptr) {
@@ -310,16 +345,24 @@ void load_frame_file(const char *filename, unsigned filename_idx)
                        frames[hdr.stream_idx()].push_back(frame);
                        start_pts = max(start_pts, hdr.pts());
                }
+               all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
        }
 
        if (skipped_bytes > 0) {
                fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
                        filename, skipped_bytes);
        }
+
+       size_t size = ftell(fp);
+       fclose(fp);
+
+       db->store_frame_file(filename, size, all_frames);
 }
 
 void load_existing_frames()
 {
+       DB db(global_flags.working_directory + "/futatabi.db");
+
        string frame_dir = global_flags.working_directory + "/frames";
        DIR *dir = opendir(frame_dir.c_str());
        if (dir == nullptr) {
@@ -341,7 +384,7 @@ void load_existing_frames()
 
                if (de->d_type == DT_REG) {
                        string filename = frame_dir + "/" + de->d_name;
-                       load_frame_file(filename.c_str(), frame_filenames.size());
+                       load_frame_file(filename.c_str(), frame_filenames.size(), &db);
                        frame_filenames.push_back(filename);
                }
        }
@@ -371,6 +414,7 @@ int record_thread_func()
 
        int64_t last_pts = -1;
        int64_t pts_offset;
+       DB db(global_flags.working_directory + "/futatabi.db");
 
        while (!should_quit.load()) {
                AVPacket pkt;
@@ -398,7 +442,7 @@ int record_thread_func()
 
                //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
                //      pkt.stream_index, pts, pkt.size);
-               FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size);
+               FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
 
                post_to_main_thread([pkt, frame] {
                        if (pkt.stream_index == 0) {