From: Steinar H. Gunderson Date: Sun, 25 Nov 2018 22:34:38 +0000 (+0100) Subject: Use SQLite to store metadata about finished frame files. X-Git-Tag: 1.8.0~76^2~7 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=3859580a69bf0242c70a2ff77da50deb4ac363d2;p=nageru Use SQLite to store metadata about finished frame files. --- diff --git a/db.cpp b/db.cpp index b0f529b..4f7856b 100644 --- a/db.cpp +++ b/db.cpp @@ -4,7 +4,7 @@ using namespace std; -DB::DB(const std::string &filename) +DB::DB(const string &filename) { int ret = sqlite3_open(filename.c_str(), &db); if (ret != SQLITE_OK) { @@ -16,6 +16,26 @@ DB::DB(const std::string &filename) CREATE TABLE IF NOT EXISTS state (state BLOB); )", nullptr, nullptr, nullptr); // Ignore errors. + sqlite3_exec(db, R"( + CREATE TABLE IF NOT EXISTS file ( + file INTEGER NOT NULL PRIMARY KEY, + filename VARCHAR NOT NULL UNIQUE, + size BIGINT NOT NULL + ); + )", nullptr, nullptr, nullptr); // Ignore errors. + + sqlite3_exec(db, R"( + CREATE TABLE IF NOT EXISTS frame ( + file INTEGER NOT NULL REFERENCES file ON DELETE CASCADE, + stream_idx INTEGER NOT NULL, + pts BIGINT NOT NULL, + offset BIGINT NOT NULL, + size INTEGER NOT NULL + ); + )", nullptr, nullptr, nullptr); // Ignore errors. + + sqlite3_exec(db, "CREATE INDEX frame_file ON FRAME ( file );", nullptr, nullptr, nullptr); // Ignore errors. + sqlite3_exec(db, "PRAGMA journal_mode=WAL", nullptr, nullptr, nullptr); // Ignore errors. sqlite3_exec(db, "PRAGMA synchronous=NORMAL", nullptr, nullptr, nullptr); // Ignore errors. } @@ -96,3 +116,138 @@ void DB::store_state(const StateProto &state) exit(1); } } + +vector DB::load_frame_file(const string &filename, size_t size, unsigned filename_idx) +{ + sqlite3_stmt *stmt; + int ret = sqlite3_prepare_v2(db, "SELECT pts, offset, frame.size, stream_idx FROM file JOIN frame USING (file) WHERE filename=? AND file.size=?", -1, &stmt, 0); + if (ret != SQLITE_OK) { + fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC); + sqlite3_bind_int64(stmt, 2, size); + + vector frames; + do { + ret = sqlite3_step(stmt); + if (ret == SQLITE_ROW) { + FrameOnDiskAndStreamIdx frame; + frame.frame.filename_idx = filename_idx; + frame.frame.pts = sqlite3_column_int64(stmt, 0); + frame.frame.offset = sqlite3_column_int64(stmt, 1); + frame.frame.size = sqlite3_column_int(stmt, 2); + frame.stream_idx = sqlite3_column_int(stmt, 3); + frames.push_back(frame); + } else if (ret != SQLITE_DONE) { + fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db)); + exit(1); + } + } while (ret != SQLITE_DONE); + + ret = sqlite3_finalize(stmt); + if (ret != SQLITE_OK) { + fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + return frames; +} + +void DB::store_frame_file(const string &filename, size_t size, const vector &frames) +{ + int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr); + if (ret != SQLITE_OK) { + fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + // Delete any existing instances with this filename. This also includes + // deleting any associated frames, due to the ON CASCADE DELETE constraint. + sqlite3_stmt *stmt; + ret = sqlite3_prepare_v2(db, "DELETE FROM file WHERE filename=?", -1, &stmt, 0); + if (ret != SQLITE_OK) { + fprintf(stderr, "DELETE prepare: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC); + + ret = sqlite3_step(stmt); + if (ret == SQLITE_ROW) { + fprintf(stderr, "DELETE step: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + ret = sqlite3_finalize(stmt); + if (ret != SQLITE_OK) { + fprintf(stderr, "DELETE finalize: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + // Insert the new row. + ret = sqlite3_prepare_v2(db, "INSERT INTO file (filename, size) VALUES (?, ?)", -1, &stmt, 0); + if (ret != SQLITE_OK) { + fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC); + sqlite3_bind_int64(stmt, 2, size); + + ret = sqlite3_step(stmt); + if (ret == SQLITE_ROW) { + fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + ret = sqlite3_finalize(stmt); + if (ret != SQLITE_OK) { + fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + // Insert the actual frames. + int64_t rowid = sqlite3_last_insert_rowid(db); + + ret = sqlite3_prepare_v2(db, "INSERT INTO frame (file, stream_idx, pts, offset, size) VALUES (?, ?, ?, ?, ?)", -1, &stmt, 0); + if (ret != SQLITE_OK) { + fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + sqlite3_bind_int64(stmt, 1, rowid); + + for (const FrameOnDiskAndStreamIdx &frame : frames) { + sqlite3_bind_int64(stmt, 2, frame.stream_idx); + sqlite3_bind_int64(stmt, 3, frame.frame.pts); + sqlite3_bind_int64(stmt, 4, frame.frame.offset); + sqlite3_bind_int(stmt, 5, frame.frame.size); + + ret = sqlite3_step(stmt); + if (ret == SQLITE_ROW) { + fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + ret = sqlite3_reset(stmt); + if (ret != SQLITE_OK) { + fprintf(stderr, "INSERT reset: %s\n", sqlite3_errmsg(db)); + exit(1); + } + } + + ret = sqlite3_finalize(stmt); + if (ret != SQLITE_OK) { + fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db)); + exit(1); + } + + // Commit. + ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr); + if (ret != SQLITE_OK) { + fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db)); + exit(1); + } +} diff --git a/db.h b/db.h index 40a6602..88196c7 100644 --- a/db.h +++ b/db.h @@ -5,6 +5,9 @@ #include #include +#include + +#include "frame_on_disk.h" class DB { public: @@ -14,6 +17,13 @@ public: StateProto get_state(); void store_state(const StateProto &state); + struct FrameOnDiskAndStreamIdx { + FrameOnDisk frame; + unsigned stream_idx; + }; + std::vector load_frame_file(const std::string &filename, size_t size, unsigned frame_idx); // Empty = none found, or there were no frames. + void store_frame_file(const std::string &filename, size_t size, const std::vector &frames); + private: StateProto state; sqlite3 *db; diff --git a/main.cpp b/main.cpp index 681724a..8f95304 100644 --- a/main.cpp +++ b/main.cpp @@ -58,7 +58,7 @@ int64_t current_pts = 0; struct FrameFile { FILE *fp = nullptr; - int filename_idx; + unsigned filename_idx; size_t frames_written_so_far = 0; }; std::map open_frame_files; @@ -69,7 +69,7 @@ vector frame_filenames; // Under frame_mu. namespace { -FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size) +FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db) { if (open_frame_files.count(stream_idx) == 0) { char filename[256]; @@ -82,16 +82,17 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t } lock_guard lock(frame_mu); - int filename_idx = frame_filenames.size(); + unsigned filename_idx = frame_filenames.size(); frame_filenames.push_back(filename); open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 }; } FrameFile &file = open_frame_files[stream_idx]; + unsigned filename_idx = file.filename_idx; string filename; { lock_guard lock(frame_mu); - filename = frame_filenames[file.filename_idx]; + filename = frame_filenames[filename_idx]; } FrameHeaderProto hdr; @@ -126,20 +127,9 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t fflush(file.fp); // No fsync(), though. We can accept losing a few frames. global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts); - if (++file.frames_written_so_far >= 1000) { - // Start a new file next time. - if (fclose(file.fp) != 0) { - perror("fclose"); - exit(1); - } - open_frame_files.erase(stream_idx); - - // TODO: Write to SQLite. - } - FrameOnDisk frame; frame.pts = pts; - frame.filename_idx = file.filename_idx; + frame.filename_idx = filename_idx; frame.offset = offset; frame.size = size; @@ -149,6 +139,35 @@ FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t frames[stream_idx].push_back(frame); } + if (++file.frames_written_so_far >= 1000) { + size_t size = ftell(file.fp); + + // Start a new file next time. + if (fclose(file.fp) != 0) { + perror("fclose"); + exit(1); + } + open_frame_files.erase(stream_idx); + + // Write information about all frames in the finished file to SQLite. + // (If we crash before getting to do this, we'll be scanning through + // the file on next startup, and adding it to the database then.) + // NOTE: Since we don't fsync(), we could in theory get broken data + // but with the right size, but it would seem unlikely. + vector frames_this_file; + { + lock_guard lock(frame_mu); + for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) { + for (const FrameOnDisk &frame : frames[stream_idx]) { + if (frame.filename_idx == filename_idx) { + frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) }); + } + } + } + } + db->store_frame_file(filename, size, frames_this_file); + } + return frame; } @@ -240,9 +259,25 @@ int main(int argc, char **argv) return ret; } -void load_frame_file(const char *filename, unsigned filename_idx) +void load_frame_file(const char *filename, unsigned filename_idx, DB *db) { - // TODO: Look up in the SQLite database. + struct stat st; + if (stat(filename, &st) == -1) { + perror(filename); + exit(1); + } + + vector all_frames = db->load_frame_file(filename, st.st_size, filename_idx); + if (!all_frames.empty()) { + // We already had this cached in the database, so no need to look in the file. + for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) { + if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) { + frames[frame.stream_idx].push_back(frame.frame); + start_pts = max(start_pts, frame.frame.pts); + } + } + return; + } FILE *fp = fopen(filename, "rb"); if (fp == nullptr) { @@ -310,16 +345,24 @@ void load_frame_file(const char *filename, unsigned filename_idx) frames[hdr.stream_idx()].push_back(frame); start_pts = max(start_pts, hdr.pts()); } + all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) }); } if (skipped_bytes > 0) { fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n", filename, skipped_bytes); } + + size_t size = ftell(fp); + fclose(fp); + + db->store_frame_file(filename, size, all_frames); } void load_existing_frames() { + DB db(global_flags.working_directory + "/futatabi.db"); + string frame_dir = global_flags.working_directory + "/frames"; DIR *dir = opendir(frame_dir.c_str()); if (dir == nullptr) { @@ -341,7 +384,7 @@ void load_existing_frames() if (de->d_type == DT_REG) { string filename = frame_dir + "/" + de->d_name; - load_frame_file(filename.c_str(), frame_filenames.size()); + load_frame_file(filename.c_str(), frame_filenames.size(), &db); frame_filenames.push_back(filename); } } @@ -371,6 +414,7 @@ int record_thread_func() int64_t last_pts = -1; int64_t pts_offset; + DB db(global_flags.working_directory + "/futatabi.db"); while (!should_quit.load()) { AVPacket pkt; @@ -398,7 +442,7 @@ int record_thread_func() //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n", // pkt.stream_index, pts, pkt.size); - FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size); + FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db); post_to_main_thread([pkt, frame] { if (pkt.stream_index == 0) {