]> git.sesse.net Git - nageru/blob - futatabi/db.cpp
Fix an issue where scrubbing in Futatabi could lock up if there were no inputs going...
[nageru] / futatabi / db.cpp
1 #include "db.h"
2
3 #include "frame.pb.h"
4
5 #include <string>
6 #include <unordered_set>
7
8 using namespace std;
9
10 DB::DB(const string &filename)
11 {
12         int ret = sqlite3_open(filename.c_str(), &db);
13         if (ret != SQLITE_OK) {
14                 fprintf(stderr, "%s: %s\n", filename.c_str(), sqlite3_errmsg(db));
15                 abort();
16         }
17
18         // Set an effectively infinite timeout for waiting for write locks;
19         // if we get SQLITE_LOCKED, we just exit out, so this is much better.
20         ret = sqlite3_busy_timeout(db, 3600000);
21         if (ret != SQLITE_OK) {
22                 fprintf(stderr, "sqlite3_busy_timeout: %s\n", sqlite3_errmsg(db));
23                 abort();
24         }
25
26         sqlite3_exec(db, R"(
27                 CREATE TABLE IF NOT EXISTS state (state BLOB);
28         )",
29                      nullptr, nullptr, nullptr);  // Ignore errors.
30
31         sqlite3_exec(db, "CREATE UNIQUE INDEX only_one_state ON state (1);", nullptr, nullptr, nullptr);  // Ignore errors.
32
33         sqlite3_exec(db, R"(
34                 CREATE TABLE IF NOT EXISTS settings (settings BLOB);
35         )",
36                      nullptr, nullptr, nullptr);  // Ignore errors.
37
38         sqlite3_exec(db, "CREATE UNIQUE INDEX only_one_settings ON settings (1);", nullptr, nullptr, nullptr);  // Ignore errors.
39
40         sqlite3_exec(db, R"(
41                 DROP TABLE file;
42         )",
43                      nullptr, nullptr, nullptr);  // Ignore errors.
44
45         sqlite3_exec(db, R"(
46                 DROP TABLE frame;
47         )",
48                      nullptr, nullptr, nullptr);  // Ignore errors.
49
50         sqlite3_exec(db, R"(
51                 CREATE TABLE IF NOT EXISTS filev2 (
52                         file INTEGER NOT NULL PRIMARY KEY,
53                         filename VARCHAR NOT NULL UNIQUE,
54                         size BIGINT NOT NULL,
55                         frames BLOB NOT NULL
56                 );
57         )",
58                      nullptr, nullptr, nullptr);  // Ignore errors.
59
60         sqlite3_exec(db, "PRAGMA journal_mode=WAL", nullptr, nullptr, nullptr);  // Ignore errors.
61         sqlite3_exec(db, "PRAGMA synchronous=NORMAL", nullptr, nullptr, nullptr);  // Ignore errors.
62 }
63
64 StateProto DB::get_state()
65 {
66         StateProto state;
67
68         sqlite3_stmt *stmt;
69         int ret = sqlite3_prepare_v2(db, "SELECT state FROM state", -1, &stmt, 0);
70         if (ret != SQLITE_OK) {
71                 fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
72                 abort();
73         }
74
75         ret = sqlite3_step(stmt);
76         if (ret == SQLITE_ROW) {
77                 bool ok = state.ParseFromArray(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
78                 if (!ok) {
79                         fprintf(stderr, "State in database is corrupted!\n");
80                         abort();
81                 }
82         } else if (ret != SQLITE_DONE) {
83                 fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
84                 abort();
85         }
86
87         ret = sqlite3_finalize(stmt);
88         if (ret != SQLITE_OK) {
89                 fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
90                 abort();
91         }
92
93         return state;
94 }
95
96 void DB::store_state(const StateProto &state)
97 {
98         string serialized;
99         state.SerializeToString(&serialized);
100
101         int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
102         if (ret != SQLITE_OK) {
103                 fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
104                 abort();
105         }
106
107         sqlite3_stmt *stmt;
108         ret = sqlite3_prepare_v2(db, "REPLACE INTO state VALUES (?)", -1, &stmt, 0);
109         if (ret != SQLITE_OK) {
110                 fprintf(stderr, "REPLACE prepare: %s\n", sqlite3_errmsg(db));
111                 abort();
112         }
113
114         sqlite3_bind_blob(stmt, 1, serialized.data(), serialized.size(), SQLITE_STATIC);
115
116         ret = sqlite3_step(stmt);
117         if (ret == SQLITE_ROW) {
118                 fprintf(stderr, "REPLACE step: %s\n", sqlite3_errmsg(db));
119                 abort();
120         }
121
122         ret = sqlite3_finalize(stmt);
123         if (ret != SQLITE_OK) {
124                 fprintf(stderr, "REPLACE finalize: %s\n", sqlite3_errmsg(db));
125                 abort();
126         }
127
128         ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
129         if (ret != SQLITE_OK) {
130                 fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
131                 abort();
132         }
133 }
134
135 SettingsProto DB::get_settings()
136 {
137         SettingsProto settings;
138
139         sqlite3_stmt *stmt;
140         int ret = sqlite3_prepare_v2(db, "SELECT settings FROM settings", -1, &stmt, 0);
141         if (ret != SQLITE_OK) {
142                 fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
143                 abort();
144         }
145
146         ret = sqlite3_step(stmt);
147         if (ret == SQLITE_ROW) {
148                 bool ok = settings.ParseFromArray(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
149                 if (!ok) {
150                         fprintf(stderr, "State in database is corrupted!\n");
151                         abort();
152                 }
153         } else if (ret != SQLITE_DONE) {
154                 fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
155                 abort();
156         }
157
158         ret = sqlite3_finalize(stmt);
159         if (ret != SQLITE_OK) {
160                 fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
161                 abort();
162         }
163
164         return settings;
165 }
166
167 void DB::store_settings(const SettingsProto &settings)
168 {
169         string serialized;
170         settings.SerializeToString(&serialized);
171
172         int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
173         if (ret != SQLITE_OK) {
174                 fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
175                 abort();
176         }
177
178         sqlite3_stmt *stmt;
179         ret = sqlite3_prepare_v2(db, "REPLACE INTO settings VALUES (?)", -1, &stmt, 0);
180         if (ret != SQLITE_OK) {
181                 fprintf(stderr, "REPLACE prepare: %s\n", sqlite3_errmsg(db));
182                 abort();
183         }
184
185         sqlite3_bind_blob(stmt, 1, serialized.data(), serialized.size(), SQLITE_STATIC);
186
187         ret = sqlite3_step(stmt);
188         if (ret == SQLITE_ROW) {
189                 fprintf(stderr, "REPLACE step: %s\n", sqlite3_errmsg(db));
190                 abort();
191         }
192
193         ret = sqlite3_finalize(stmt);
194         if (ret != SQLITE_OK) {
195                 fprintf(stderr, "REPLACE finalize: %s\n", sqlite3_errmsg(db));
196                 abort();
197         }
198
199         ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
200         if (ret != SQLITE_OK) {
201                 fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
202                 abort();
203         }
204 }
205
206 vector<DB::FrameOnDiskAndStreamIdx> DB::load_frame_file(const string &filename, size_t size, unsigned filename_idx)
207 {
208         FileContentsProto file_contents;
209
210         sqlite3_stmt *stmt;
211         int ret = sqlite3_prepare_v2(db, "SELECT frames FROM filev2 WHERE filename=? AND size=?", -1, &stmt, 0);
212         if (ret != SQLITE_OK) {
213                 fprintf(stderr, "SELECT prepare: %s\n", sqlite3_errmsg(db));
214                 abort();
215         }
216
217         sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
218         sqlite3_bind_int64(stmt, 2, size);
219
220         ret = sqlite3_step(stmt);
221         if (ret == SQLITE_ROW) {
222                 bool ok = file_contents.ParseFromArray(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
223                 if (!ok) {
224                         fprintf(stderr, "Frame list in database is corrupted!\n");
225                         abort();
226                 }
227         } else if (ret != SQLITE_DONE) {
228                 fprintf(stderr, "SELECT step: %s\n", sqlite3_errmsg(db));
229                 abort();
230         }
231
232         ret = sqlite3_finalize(stmt);
233         if (ret != SQLITE_OK) {
234                 fprintf(stderr, "SELECT finalize: %s\n", sqlite3_errmsg(db));
235                 abort();
236         }
237
238         vector<FrameOnDiskAndStreamIdx> frames;
239         for (const StreamContentsProto &stream : file_contents.stream()) {
240                 FrameOnDiskAndStreamIdx frame;
241                 frame.stream_idx = stream.stream_idx();
242                 for (int i = 0; i < stream.pts_size(); ++i) {
243                         frame.frame.filename_idx = filename_idx;
244                         frame.frame.pts = stream.pts(i);
245                         frame.frame.offset = stream.offset(i);
246                         frame.frame.size = stream.file_size(i);
247                         if (i < stream.audio_size_size()) {
248                                 frame.frame.audio_size = stream.audio_size(i);
249                         } else {
250                                 frame.frame.audio_size = 0;
251                         }
252                         frames.push_back(frame);
253                 }
254         }
255
256         return frames;
257 }
258
259 void DB::store_frame_file(const string &filename, size_t size, const vector<FrameOnDiskAndStreamIdx> &frames)
260 {
261         int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
262         if (ret != SQLITE_OK) {
263                 fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
264                 abort();
265         }
266
267         // Delete any existing instances with this filename.
268         sqlite3_stmt *stmt;
269
270         // Create the protobuf blob for the new row.
271         FileContentsProto file_contents;
272         unordered_set<unsigned> seen_stream_idx;  // Usually only one.
273         for (const FrameOnDiskAndStreamIdx &frame : frames) {
274                 seen_stream_idx.insert(frame.stream_idx);
275         }
276         for (unsigned stream_idx : seen_stream_idx) {
277                 StreamContentsProto *stream = file_contents.add_stream();
278                 stream->set_stream_idx(stream_idx);
279                 stream->mutable_pts()->Reserve(frames.size());
280                 stream->mutable_offset()->Reserve(frames.size());
281                 stream->mutable_file_size()->Reserve(frames.size());
282                 stream->mutable_audio_size()->Reserve(frames.size());
283                 for (const FrameOnDiskAndStreamIdx &frame : frames) {
284                         if (frame.stream_idx != stream_idx) {
285                                 continue;
286                         }
287                         stream->add_pts(frame.frame.pts);
288                         stream->add_offset(frame.frame.offset);
289                         stream->add_file_size(frame.frame.size);
290                         stream->add_audio_size(frame.frame.audio_size);
291                 }
292         }
293         string serialized;
294         file_contents.SerializeToString(&serialized);
295
296         // Insert the new row.
297         ret = sqlite3_prepare_v2(db, "REPLACE INTO filev2 (filename, size, frames) VALUES (?, ?, ?)", -1, &stmt, 0);
298         if (ret != SQLITE_OK) {
299                 fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
300                 abort();
301         }
302
303         sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
304         sqlite3_bind_int64(stmt, 2, size);
305         sqlite3_bind_blob(stmt, 3, serialized.data(), serialized.size(), SQLITE_STATIC);
306
307         ret = sqlite3_step(stmt);
308         if (ret == SQLITE_ROW) {
309                 fprintf(stderr, "REPLACE step: %s\n", sqlite3_errmsg(db));
310                 abort();
311         }
312
313         ret = sqlite3_finalize(stmt);
314         if (ret != SQLITE_OK) {
315                 fprintf(stderr, "REPLACE finalize: %s\n", sqlite3_errmsg(db));
316                 abort();
317         }
318
319         // Commit.
320         ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
321         if (ret != SQLITE_OK) {
322                 fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
323                 abort();
324         }
325 }
326
327 void DB::clean_unused_frame_files(const vector<string> &used_filenames)
328 {
329         int ret = sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr);
330         if (ret != SQLITE_OK) {
331                 fprintf(stderr, "BEGIN: %s\n", sqlite3_errmsg(db));
332                 abort();
333         }
334
335         ret = sqlite3_exec(db, R"(
336                 CREATE TEMPORARY TABLE used_filenames ( filename VARCHAR NOT NULL PRIMARY KEY )
337         )",
338                            nullptr, nullptr, nullptr);
339
340         if (ret != SQLITE_OK) {
341                 fprintf(stderr, "CREATE TEMPORARY TABLE: %s\n", sqlite3_errmsg(db));
342                 abort();
343         }
344
345         // Insert the new rows.
346         sqlite3_stmt *stmt;
347         ret = sqlite3_prepare_v2(db, "INSERT INTO used_filenames (filename) VALUES (?)", -1, &stmt, 0);
348         if (ret != SQLITE_OK) {
349                 fprintf(stderr, "INSERT prepare: %s\n", sqlite3_errmsg(db));
350                 abort();
351         }
352
353         for (const string &filename : used_filenames) {
354                 sqlite3_bind_text(stmt, 1, filename.data(), filename.size(), SQLITE_STATIC);
355
356                 ret = sqlite3_step(stmt);
357                 if (ret == SQLITE_ROW) {
358                         fprintf(stderr, "INSERT step: %s\n", sqlite3_errmsg(db));
359                         abort();
360                 }
361
362                 ret = sqlite3_reset(stmt);
363                 if (ret == SQLITE_ROW) {
364                         fprintf(stderr, "INSERT reset: %s\n", sqlite3_errmsg(db));
365                         abort();
366                 }
367         }
368
369         ret = sqlite3_finalize(stmt);
370         if (ret != SQLITE_OK) {
371                 fprintf(stderr, "INSERT finalize: %s\n", sqlite3_errmsg(db));
372                 abort();
373         }
374
375         ret = sqlite3_exec(db, R"(
376                 DELETE FROM filev2 WHERE filename NOT IN ( SELECT filename FROM used_filenames )
377         )",
378                            nullptr, nullptr, nullptr);
379
380         if (ret != SQLITE_OK) {
381                 fprintf(stderr, "DELETE: %s\n", sqlite3_errmsg(db));
382                 abort();
383         }
384
385         ret = sqlite3_exec(db, R"(
386                 DROP TABLE used_filenames
387         )",
388                            nullptr, nullptr, nullptr);
389
390         if (ret != SQLITE_OK) {
391                 fprintf(stderr, "DROP TABLE: %s\n", sqlite3_errmsg(db));
392                 abort();
393         }
394
395         // Commit.
396         ret = sqlite3_exec(db, "COMMIT", nullptr, nullptr, nullptr);
397         if (ret != SQLITE_OK) {
398                 fprintf(stderr, "COMMIT: %s\n", sqlite3_errmsg(db));
399                 abort();
400         }
401 }