From 60e3c27663ffc301f21ca9ce0e76c223e880047c Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Tue, 27 Nov 2018 23:51:24 +0100 Subject: [PATCH] Move to a denormalized protobuf-based schema for frames; a bit faster (~2x), and much more compact (~2.5x). --- db.cpp | 134 ++++++++++++++++++++++++++-------------------------- frame.proto | 13 ++++- 2 files changed, 78 insertions(+), 69 deletions(-) diff --git a/db.cpp b/db.cpp index 4f7856b..e928e96 100644 --- a/db.cpp +++ b/db.cpp @@ -1,5 +1,7 @@ #include "db.h" +#include "frame.pb.h" + #include using namespace std; @@ -17,24 +19,21 @@ DB::DB(const string &filename) )", nullptr, nullptr, nullptr); // Ignore errors. sqlite3_exec(db, R"( - CREATE TABLE IF NOT EXISTS file ( - file INTEGER NOT NULL PRIMARY KEY, - filename VARCHAR NOT NULL UNIQUE, - size BIGINT NOT NULL - ); + DROP TABLE file; )", nullptr, nullptr, nullptr); // Ignore errors. sqlite3_exec(db, R"( - CREATE TABLE IF NOT EXISTS frame ( - file INTEGER NOT NULL REFERENCES file ON DELETE CASCADE, - stream_idx INTEGER NOT NULL, - pts BIGINT NOT NULL, - offset BIGINT NOT NULL, - size INTEGER NOT NULL - ); + DROP TABLE frame; )", nullptr, nullptr, nullptr); // Ignore errors. - sqlite3_exec(db, "CREATE INDEX frame_file ON FRAME ( file );", nullptr, nullptr, nullptr); // Ignore errors. + sqlite3_exec(db, R"( + CREATE TABLE IF NOT EXISTS filev2 ( + file INTEGER NOT NULL PRIMARY KEY, + filename VARCHAR NOT NULL UNIQUE, + size BIGINT NOT NULL, + frames BLOB NOT NULL + ); + )", nullptr, nullptr, nullptr); // Ignore errors. sqlite3_exec(db, "PRAGMA journal_mode=WAL", nullptr, nullptr, nullptr); // Ignore errors. sqlite3_exec(db, "PRAGMA synchronous=NORMAL", nullptr, nullptr, nullptr); // Ignore errors. @@ -119,8 +118,10 @@ void DB::store_state(const StateProto &state) vector DB::load_frame_file(const string &filename, size_t size, unsigned filename_idx) { + FileContentsProto file_contents; + sqlite3_stmt *stmt; - int ret = sqlite3_prepare_v2(db, "SELECT pts, offset, frame.size, stream_idx FROM file JOIN frame USING (file) WHERE filename=? AND file.size=?", -1, &stmt, 0); + int ret = sqlite3_prepare_v2(db, "SELECT frames FROM filev2 WHERE filename=? AND size=?", -1, &stmt, 0); if (ret != SQLITE_OK) { fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db)); exit(1); @@ -129,22 +130,17 @@ vector DB::load_frame_file(const string &filename, sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC); sqlite3_bind_int64(stmt, 2, size); - vector frames; - do { - ret = sqlite3_step(stmt); - if (ret == SQLITE_ROW) { - FrameOnDiskAndStreamIdx frame; - frame.frame.filename_idx = filename_idx; - frame.frame.pts = sqlite3_column_int64(stmt, 0); - frame.frame.offset = sqlite3_column_int64(stmt, 1); - frame.frame.size = sqlite3_column_int(stmt, 2); - frame.stream_idx = sqlite3_column_int(stmt, 3); - frames.push_back(frame); - } else if (ret != SQLITE_DONE) { - fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db)); + ret = sqlite3_step(stmt); + if (ret == SQLITE_ROW) { + bool ok = file_contents.ParseFromArray(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0)); + if (!ok) { + fprintf(stderr, "Frame list in database is corrupted!\n"); exit(1); } - } while (ret != SQLITE_DONE); + } else if (ret != SQLITE_DONE) { + fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db)); + exit(1); + } ret = sqlite3_finalize(stmt); if (ret != SQLITE_OK) { @@ -152,6 +148,19 @@ vector DB::load_frame_file(const string &filename, exit(1); } + vector frames; + for (const StreamContentsProto &stream : file_contents.stream()) { + FrameOnDiskAndStreamIdx frame; + frame.stream_idx = stream.stream_idx(); + for (int i = 0; i < stream.pts_size(); ++i) { + frame.frame.filename_idx = filename_idx; + frame.frame.pts = stream.pts(i); + frame.frame.offset = stream.offset(i); + frame.frame.size = stream.file_size(i); + frames.push_back(frame); + } + } + return frames; } @@ -163,10 +172,10 @@ void DB::store_frame_file(const string &filename, size_t size, const vector seen_stream_idx; // Usually only one. + for (const FrameOnDiskAndStreamIdx &frame : frames) { + seen_stream_idx.insert(frame.stream_idx); + } + for (unsigned stream_idx : seen_stream_idx) { + StreamContentsProto *stream = file_contents.add_stream(); + stream->set_stream_idx(stream_idx); + stream->mutable_pts()->Reserve(frames.size()); + stream->mutable_offset()->Reserve(frames.size()); + stream->mutable_file_size()->Reserve(frames.size()); + for (const FrameOnDiskAndStreamIdx &frame : frames) { + if (frame.stream_idx != stream_idx) { + continue; + } + stream->add_pts(frame.frame.pts); + stream->add_offset(frame.frame.offset); + stream->add_file_size(frame.frame.size); + } + } + string serialized; + file_contents.SerializeToString(&serialized); + // Insert the new row. - ret = sqlite3_prepare_v2(db, "INSERT INTO file (filename, size) VALUES (?, ?)", -1, &stmt, 0); + ret = sqlite3_prepare_v2(db, "INSERT INTO filev2 (filename, size, frames) VALUES (?, ?, ?)", -1, &stmt, 0); if (ret != SQLITE_OK) { fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db)); exit(1); @@ -195,6 +228,7 @@ void DB::store_frame_file(const string &filename, size_t size, const vector