]> git.sesse.net Git - nageru/blobdiff - futatabi/db.cpp
Log a warning when we kill a client that is not keeping up.
[nageru] / futatabi / db.cpp
index fb6694be3a2f2c729edefedcd3d923a9a4ca2209..9c6748fb257ffaa439c21e0e0f510565b0fa77cc 100644 (file)
@@ -12,24 +12,40 @@ DB::DB(const string &filename)
        int ret = sqlite3_open(filename.c_str(), &db);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "%s: %s\n", filename.c_str(), sqlite3_errmsg(db));
-               exit(1);
+               abort();
+       }
+
+       // Set an effectively infinite timeout for waiting for write locks;
+       // if we get SQLITE_LOCKED, we just exit out, so this is much better.
+       ret = sqlite3_busy_timeout(db, 3600000);
+       if (ret != SQLITE_OK) {
+               fprintf(stderr, "sqlite3_busy_timeout: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        sqlite3_exec(db, R"(
                CREATE TABLE IF NOT EXISTS state (state BLOB);
-       )", nullptr, nullptr, nullptr);  // Ignore errors.
+       )",
+                    nullptr, nullptr, nullptr);  // Ignore errors.
+
+       sqlite3_exec(db, "CREATE UNIQUE INDEX only_one_state ON state (1);", nullptr, nullptr, nullptr);  // Ignore errors.
 
        sqlite3_exec(db, R"(
                CREATE TABLE IF NOT EXISTS settings (settings BLOB);
-       )", nullptr, nullptr, nullptr);  // Ignore errors.
+       )",
+                    nullptr, nullptr, nullptr);  // Ignore errors.
+
+       sqlite3_exec(db, "CREATE UNIQUE INDEX only_one_settings ON settings (1);", nullptr, nullptr, nullptr);  // Ignore errors.
 
        sqlite3_exec(db, R"(
                DROP TABLE file;
-       )", nullptr, nullptr, nullptr);  // Ignore errors.
+       )",
+                    nullptr, nullptr, nullptr);  // Ignore errors.
 
        sqlite3_exec(db, R"(
                DROP TABLE frame;
-       )", nullptr, nullptr, nullptr);  // Ignore errors.
+       )",
+                    nullptr, nullptr, nullptr);  // Ignore errors.
 
        sqlite3_exec(db, R"(
                CREATE TABLE IF NOT EXISTS filev2 (
@@ -38,7 +54,8 @@ DB::DB(const string &filename)
                        size BIGINT NOT NULL,
                        frames BLOB NOT NULL
                );
-       )", nullptr, nullptr, nullptr);  // Ignore errors.
+       )",
+                    nullptr, nullptr, nullptr);  // Ignore errors.
 
        sqlite3_exec(db, "PRAGMA journal_mode=WAL", nullptr, nullptr, nullptr);  // Ignore errors.
        sqlite3_exec(db, "PRAGMA synchronous=NORMAL", nullptr, nullptr, nullptr);  // Ignore errors.
@@ -52,7 +69,7 @@ StateProto DB::get_state()
        int ret = sqlite3_prepare_v2(db, "SELECT state FROM state", -1, &stmt, 0);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_step(stmt);
@@ -60,17 +77,17 @@ StateProto DB::get_state()
                bool ok = state.ParseFromArray(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
                if (!ok) {
                        fprintf(stderr, "State in database is corrupted!\n");
-                       exit(1);
+                       abort();
                }
        } else if (ret != SQLITE_DONE) {
                fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_finalize(stmt);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        return state;
@@ -84,40 +101,34 @@ void DB::store_state(const StateProto &state)
        int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
-               exit(1);
-       }
-
-       ret = sqlite3_exec(db, "DELETE FROM state", nullptr, nullptr, nullptr);
-       if (ret != SQLITE_OK) {
-               fprintf(stderr, "DELETE: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        sqlite3_stmt *stmt;
-       ret = sqlite3_prepare_v2(db, "INSERT INTO state VALUES (?)", -1, &stmt, 0);
+       ret = sqlite3_prepare_v2(db, "REPLACE INTO state VALUES (?)", -1, &stmt, 0);
        if (ret != SQLITE_OK) {
-               fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE prepare: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        sqlite3_bind_blob(stmt, 1, serialized.data(), serialized.size(), SQLITE_STATIC);
 
        ret = sqlite3_step(stmt);
        if (ret == SQLITE_ROW) {
-               fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE step: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        ret = sqlite3_finalize(stmt);
        if (ret != SQLITE_OK) {
-               fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE finalize: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 }
 
@@ -129,7 +140,7 @@ SettingsProto DB::get_settings()
        int ret = sqlite3_prepare_v2(db, "SELECT settings FROM settings", -1, &stmt, 0);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_step(stmt);
@@ -137,17 +148,17 @@ SettingsProto DB::get_settings()
                bool ok = settings.ParseFromArray(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
                if (!ok) {
                        fprintf(stderr, "State in database is corrupted!\n");
-                       exit(1);
+                       abort();
                }
        } else if (ret != SQLITE_DONE) {
                fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_finalize(stmt);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        return settings;
@@ -161,40 +172,34 @@ void DB::store_settings(const SettingsProto &settings)
        int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
-               exit(1);
-       }
-
-       ret = sqlite3_exec(db, "DELETE FROM settings", nullptr, nullptr, nullptr);
-       if (ret != SQLITE_OK) {
-               fprintf(stderr, "DELETE: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        sqlite3_stmt *stmt;
-       ret = sqlite3_prepare_v2(db, "INSERT INTO settings VALUES (?)", -1, &stmt, 0);
+       ret = sqlite3_prepare_v2(db, "REPLACE INTO settings VALUES (?)", -1, &stmt, 0);
        if (ret != SQLITE_OK) {
-               fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE prepare: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        sqlite3_bind_blob(stmt, 1, serialized.data(), serialized.size(), SQLITE_STATIC);
 
        ret = sqlite3_step(stmt);
        if (ret == SQLITE_ROW) {
-               fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE step: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        ret = sqlite3_finalize(stmt);
        if (ret != SQLITE_OK) {
-               fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE finalize: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 }
 
@@ -206,7 +211,7 @@ vector<DB::FrameOnDiskAndStreamIdx> DB::load_frame_file(const string &filename,
        int ret = sqlite3_prepare_v2(db, "SELECT frames FROM filev2 WHERE filename=? AND size=?", -1, &stmt, 0);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
@@ -217,17 +222,17 @@ vector<DB::FrameOnDiskAndStreamIdx> DB::load_frame_file(const string &filename,
                bool ok = file_contents.ParseFromArray(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
                if (!ok) {
                        fprintf(stderr, "Frame list in database is corrupted!\n");
-                       exit(1);
+                       abort();
                }
        } else if (ret != SQLITE_DONE) {
                fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_finalize(stmt);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        vector<FrameOnDiskAndStreamIdx> frames;
@@ -239,6 +244,11 @@ vector<DB::FrameOnDiskAndStreamIdx> DB::load_frame_file(const string &filename,
                        frame.frame.pts = stream.pts(i);
                        frame.frame.offset = stream.offset(i);
                        frame.frame.size = stream.file_size(i);
+                       if (i < stream.audio_size_size()) {
+                               frame.frame.audio_size = stream.audio_size(i);
+                       } else {
+                               frame.frame.audio_size = 0;
+                       }
                        frames.push_back(frame);
                }
        }
@@ -251,32 +261,12 @@ void DB::store_frame_file(const string &filename, size_t size, const vector<Fram
        int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        // Delete any existing instances with this filename.
        sqlite3_stmt *stmt;
 
-       ret = sqlite3_prepare_v2(db, "DELETE FROM filev2 WHERE filename=?", -1, &stmt, 0);
-       if (ret != SQLITE_OK) {
-               fprintf(stderr, "DELETE prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
-       }
-
-       sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
-
-       ret = sqlite3_step(stmt);
-       if (ret == SQLITE_ROW) {
-               fprintf(stderr, "DELETE step: %s\n", sqlite3_errmsg(db));
-               exit(1);
-       }
-
-       ret = sqlite3_finalize(stmt);
-       if (ret != SQLITE_OK) {
-               fprintf(stderr, "DELETE finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
-       }
-
        // Create the protobuf blob for the new row.
        FileContentsProto file_contents;
        unordered_set<unsigned> seen_stream_idx;  // Usually only one.
@@ -289,6 +279,7 @@ void DB::store_frame_file(const string &filename, size_t size, const vector<Fram
                stream->mutable_pts()->Reserve(frames.size());
                stream->mutable_offset()->Reserve(frames.size());
                stream->mutable_file_size()->Reserve(frames.size());
+               stream->mutable_audio_size()->Reserve(frames.size());
                for (const FrameOnDiskAndStreamIdx &frame : frames) {
                        if (frame.stream_idx != stream_idx) {
                                continue;
@@ -296,16 +287,17 @@ void DB::store_frame_file(const string &filename, size_t size, const vector<Fram
                        stream->add_pts(frame.frame.pts);
                        stream->add_offset(frame.frame.offset);
                        stream->add_file_size(frame.frame.size);
+                       stream->add_audio_size(frame.frame.audio_size);
                }
        }
        string serialized;
        file_contents.SerializeToString(&serialized);
 
        // Insert the new row.
-       ret = sqlite3_prepare_v2(db, "INSERT INTO filev2 (filename, size, frames) VALUES (?, ?, ?)", -1, &stmt, 0);
+       ret = sqlite3_prepare_v2(db, "REPLACE INTO filev2 (filename, size, frames) VALUES (?, ?, ?)", -1, &stmt, 0);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
@@ -314,21 +306,21 @@ void DB::store_frame_file(const string &filename, size_t size, const vector<Fram
 
        ret = sqlite3_step(stmt);
        if (ret == SQLITE_ROW) {
-               fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE step: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        ret = sqlite3_finalize(stmt);
        if (ret != SQLITE_OK) {
-               fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               fprintf(stderr, "REPLACE finalize: %s\n", sqlite3_errmsg(db));
+               abort();
        }
 
        // Commit.
        ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 }
 
@@ -337,16 +329,17 @@ void DB::clean_unused_frame_files(const vector<string> &used_filenames)
        int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_exec(db, R"(
                CREATE TEMPORARY TABLE used_filenames ( filename VARCHAR NOT NULL PRIMARY KEY )
-       )", nullptr, nullptr, nullptr);
+       )",
+                          nullptr, nullptr, nullptr);
 
        if (ret != SQLITE_OK) {
                fprintf(stderr, "CREATE TEMPORARY TABLE: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        // Insert the new rows.
@@ -354,7 +347,7 @@ void DB::clean_unused_frame_files(const vector<string> &used_filenames)
        ret = sqlite3_prepare_v2(db, "INSERT INTO used_filenames (filename) VALUES (?)", -1, &stmt, 0);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        for (const string &filename : used_filenames) {
@@ -363,44 +356,46 @@ void DB::clean_unused_frame_files(const vector<string> &used_filenames)
                ret = sqlite3_step(stmt);
                if (ret == SQLITE_ROW) {
                        fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
-                       exit(1);
+                       abort();
                }
 
                ret = sqlite3_reset(stmt);
                if (ret == SQLITE_ROW) {
                        fprintf(stderr, "INSERT reset: %s\n", sqlite3_errmsg(db));
-                       exit(1);
+                       abort();
                }
        }
 
        ret = sqlite3_finalize(stmt);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_exec(db, R"(
                DELETE FROM filev2 WHERE filename NOT IN ( SELECT filename FROM used_filenames )
-       )", nullptr, nullptr, nullptr);
+       )",
+                          nullptr, nullptr, nullptr);
 
        if (ret != SQLITE_OK) {
                fprintf(stderr, "DELETE: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        ret = sqlite3_exec(db, R"(
                DROP TABLE used_filenames
-       )", nullptr, nullptr, nullptr);
+       )",
+                          nullptr, nullptr, nullptr);
 
        if (ret != SQLITE_OK) {
                fprintf(stderr, "DROP TABLE: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 
        // Commit.
        ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
        if (ret != SQLITE_OK) {
                fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
-               exit(1);
+               abort();
        }
 }