using namespace std;
-DB::DB(const std::string &filename)
+DB::DB(const string &filename)
{
int ret = sqlite3_open(filename.c_str(), &db);
if (ret != SQLITE_OK) {
CREATE TABLE IF NOT EXISTS state (state BLOB);
)", nullptr, nullptr, nullptr); // Ignore errors.
+ sqlite3_exec(db, R"(
+ CREATE TABLE IF NOT EXISTS file (
+ file INTEGER NOT NULL PRIMARY KEY,
+ filename VARCHAR NOT NULL UNIQUE,
+ size BIGINT NOT NULL
+ );
+ )", nullptr, nullptr, nullptr); // Ignore errors.
+
+ sqlite3_exec(db, R"(
+ CREATE TABLE IF NOT EXISTS frame (
+ file INTEGER NOT NULL REFERENCES file ON DELETE CASCADE,
+ stream_idx INTEGER NOT NULL,
+ pts BIGINT NOT NULL,
+ offset BIGINT NOT NULL,
+ size INTEGER NOT NULL
+ );
+ )", nullptr, nullptr, nullptr); // Ignore errors.
+
+ sqlite3_exec(db, "CREATE INDEX frame_file ON FRAME ( file );", nullptr, nullptr, nullptr); // Ignore errors.
+
sqlite3_exec(db, "PRAGMA journal_mode=WAL", nullptr, nullptr, nullptr); // Ignore errors.
sqlite3_exec(db, "PRAGMA synchronous=NORMAL", nullptr, nullptr, nullptr); // Ignore errors.
}
exit(1);
}
}
+
+vector<DB::FrameOnDiskAndStreamIdx> DB::load_frame_file(const string &filename, size_t size, unsigned filename_idx)
+{
+ sqlite3_stmt *stmt;
+ int ret = sqlite3_prepare_v2(db, "SELECT pts, offset, frame.size, stream_idx FROM file JOIN frame USING (file) WHERE filename=? AND file.size=?", -1, &stmt, 0);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
+ sqlite3_bind_int64(stmt, 2, size);
+
+ vector<FrameOnDiskAndStreamIdx> frames;
+ do {
+ ret = sqlite3_step(stmt);
+ if (ret == SQLITE_ROW) {
+ FrameOnDiskAndStreamIdx frame;
+ frame.frame.filename_idx = filename_idx;
+ frame.frame.pts = sqlite3_column_int64(stmt, 0);
+ frame.frame.offset = sqlite3_column_int64(stmt, 1);
+ frame.frame.size = sqlite3_column_int(stmt, 2);
+ frame.stream_idx = sqlite3_column_int(stmt, 3);
+ frames.push_back(frame);
+ } else if (ret != SQLITE_DONE) {
+ fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+ } while (ret != SQLITE_DONE);
+
+ ret = sqlite3_finalize(stmt);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ return frames;
+}
+
+void DB::store_frame_file(const string &filename, size_t size, const vector<FrameOnDiskAndStreamIdx> &frames)
+{
+ int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ // Delete any existing instances with this filename. This also includes
+ // deleting any associated frames, due to the ON CASCADE DELETE constraint.
+ sqlite3_stmt *stmt;
+ ret = sqlite3_prepare_v2(db, "DELETE FROM file WHERE filename=?", -1, &stmt, 0);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "DELETE prepare: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
+
+ ret = sqlite3_step(stmt);
+ if (ret == SQLITE_ROW) {
+ fprintf(stderr, "DELETE step: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ ret = sqlite3_finalize(stmt);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "DELETE finalize: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ // Insert the new row.
+ ret = sqlite3_prepare_v2(db, "INSERT INTO file (filename, size) VALUES (?, ?)", -1, &stmt, 0);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
+ sqlite3_bind_int64(stmt, 2, size);
+
+ ret = sqlite3_step(stmt);
+ if (ret == SQLITE_ROW) {
+ fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ ret = sqlite3_finalize(stmt);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ // Insert the actual frames.
+ int64_t rowid = sqlite3_last_insert_rowid(db);
+
+ ret = sqlite3_prepare_v2(db, "INSERT INTO frame (file, stream_idx, pts, offset, size) VALUES (?, ?, ?, ?, ?)", -1, &stmt, 0);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ sqlite3_bind_int64(stmt, 1, rowid);
+
+ for (const FrameOnDiskAndStreamIdx &frame : frames) {
+ sqlite3_bind_int64(stmt, 2, frame.stream_idx);
+ sqlite3_bind_int64(stmt, 3, frame.frame.pts);
+ sqlite3_bind_int64(stmt, 4, frame.frame.offset);
+ sqlite3_bind_int(stmt, 5, frame.frame.size);
+
+ ret = sqlite3_step(stmt);
+ if (ret == SQLITE_ROW) {
+ fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ ret = sqlite3_reset(stmt);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "INSERT reset: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+ }
+
+ ret = sqlite3_finalize(stmt);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+
+ // Commit.
+ ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
+ if (ret != SQLITE_OK) {
+ fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
+ exit(1);
+ }
+}
struct FrameFile {
FILE *fp = nullptr;
- int filename_idx;
+ unsigned filename_idx;
size_t frames_written_so_far = 0;
};
std::map<int, FrameFile> open_frame_files;
namespace {
-FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size)
+FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
{
if (open_frame_files.count(stream_idx) == 0) {
char filename[256];
}
lock_guard<mutex> lock(frame_mu);
- int filename_idx = frame_filenames.size();
+ unsigned filename_idx = frame_filenames.size();
frame_filenames.push_back(filename);
open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
}
FrameFile &file = open_frame_files[stream_idx];
+ unsigned filename_idx = file.filename_idx;
string filename;
{
lock_guard<mutex> lock(frame_mu);
- filename = frame_filenames[file.filename_idx];
+ filename = frame_filenames[filename_idx];
}
FrameHeaderProto hdr;
fflush(file.fp); // No fsync(), though. We can accept losing a few frames.
global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
- if (++file.frames_written_so_far >= 1000) {
- // Start a new file next time.
- if (fclose(file.fp) != 0) {
- perror("fclose");
- exit(1);
- }
- open_frame_files.erase(stream_idx);
-
- // TODO: Write to SQLite.
- }
-
FrameOnDisk frame;
frame.pts = pts;
- frame.filename_idx = file.filename_idx;
+ frame.filename_idx = filename_idx;
frame.offset = offset;
frame.size = size;
frames[stream_idx].push_back(frame);
}
+ if (++file.frames_written_so_far >= 1000) {
+ size_t size = ftell(file.fp);
+
+ // Start a new file next time.
+ if (fclose(file.fp) != 0) {
+ perror("fclose");
+ exit(1);
+ }
+ open_frame_files.erase(stream_idx);
+
+ // Write information about all frames in the finished file to SQLite.
+ // (If we crash before getting to do this, we'll be scanning through
+ // the file on next startup, and adding it to the database then.)
+ // NOTE: Since we don't fsync(), we could in theory get broken data
+ // but with the right size, but it would seem unlikely.
+ vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
+ {
+ lock_guard<mutex> lock(frame_mu);
+ for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
+ for (const FrameOnDisk &frame : frames[stream_idx]) {
+ if (frame.filename_idx == filename_idx) {
+ frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
+ }
+ }
+ }
+ }
+ db->store_frame_file(filename, size, frames_this_file);
+ }
+
return frame;
}
return ret;
}
-void load_frame_file(const char *filename, unsigned filename_idx)
+void load_frame_file(const char *filename, unsigned filename_idx, DB *db)
{
- // TODO: Look up in the SQLite database.
+ struct stat st;
+ if (stat(filename, &st) == -1) {
+ perror(filename);
+ exit(1);
+ }
+
+ vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(filename, st.st_size, filename_idx);
+ if (!all_frames.empty()) {
+ // We already had this cached in the database, so no need to look in the file.
+ for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
+ if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) {
+ frames[frame.stream_idx].push_back(frame.frame);
+ start_pts = max(start_pts, frame.frame.pts);
+ }
+ }
+ return;
+ }
FILE *fp = fopen(filename, "rb");
if (fp == nullptr) {
frames[hdr.stream_idx()].push_back(frame);
start_pts = max(start_pts, hdr.pts());
}
+ all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
}
if (skipped_bytes > 0) {
fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
filename, skipped_bytes);
}
+
+ size_t size = ftell(fp);
+ fclose(fp);
+
+ db->store_frame_file(filename, size, all_frames);
}
void load_existing_frames()
{
+ DB db(global_flags.working_directory + "/futatabi.db");
+
string frame_dir = global_flags.working_directory + "/frames";
DIR *dir = opendir(frame_dir.c_str());
if (dir == nullptr) {
if (de->d_type == DT_REG) {
string filename = frame_dir + "/" + de->d_name;
- load_frame_file(filename.c_str(), frame_filenames.size());
+ load_frame_file(filename.c_str(), frame_filenames.size(), &db);
frame_filenames.push_back(filename);
}
}
int64_t last_pts = -1;
int64_t pts_offset;
+ DB db(global_flags.working_directory + "/futatabi.db");
while (!should_quit.load()) {
AVPacket pkt;
//fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
// pkt.stream_index, pts, pkt.size);
- FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size);
+ FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
post_to_main_thread([pkt, frame] {
if (pkt.stream_index == 0) {