]> git.sesse.net Git - nageru/blob - main.cpp
Remove frame files that do not exist from the database.
[nageru] / main.cpp
1 #include <assert.h>
2 #include <arpa/inet.h>
3 #include <atomic>
4 #include <chrono>
5 #include <condition_variable>
6 #include <dirent.h>
7 #include <getopt.h>
8 #include <memory>
9 #include <mutex>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string>
13 #include <sys/stat.h>
14 #include <sys/types.h>
15 #include <thread>
16 #include <vector>
17
18 extern "C" {
19 #include <libavformat/avformat.h>
20 }
21
22 #include "clip_list.h"
23 #include "context.h"
24 #include "defs.h"
25 #include "disk_space_estimator.h"
26 #include "ffmpeg_raii.h"
27 #include "flags.h"
28 #include "frame_on_disk.h"
29 #include "frame.pb.h"
30 #include "httpd.h"
31 #include "mainwindow.h"
32 #include "player.h"
33 #include "post_to_main_thread.h"
34 #include "ref_counted_gl_sync.h"
35 #include "timebase.h"
36 #include "ui_mainwindow.h"
37 #include "vaapi_jpeg_decoder.h"
38
39 #include <QApplication>
40 #include <QGLFormat>
41 #include <QSurfaceFormat>
42 #include <QProgressDialog>
43 #include <movit/init.h>
44 #include <movit/util.h>
45
46 using namespace std;
47 using namespace std::chrono;
48
49 constexpr char frame_magic[] = "Ftbifrm0";
50 constexpr size_t frame_magic_len = 8;
51
52 mutex RefCountedGLsync::fence_lock;
53 atomic<bool> should_quit{false};
54
55 int64_t start_pts = -1;
56
57 // TODO: Replace by some sort of GUI control, I guess.
58 int64_t current_pts = 0;
59
60 struct FrameFile {
61         FILE *fp = nullptr;
62         unsigned filename_idx;
63         size_t frames_written_so_far = 0;
64 };
65 std::map<int, FrameFile> open_frame_files;
66
67 mutex frame_mu;
68 vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
69 vector<string> frame_filenames;  // Under frame_mu.
70
71 namespace {
72
73 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
74 {
75         if (open_frame_files.count(stream_idx) == 0) {
76                 char filename[256];
77                 snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
78                         global_flags.working_directory.c_str(), stream_idx, pts);
79                 FILE *fp = fopen(filename, "wb");
80                 if (fp == nullptr) {
81                         perror(filename);
82                         exit(1);
83                 }
84
85                 lock_guard<mutex> lock(frame_mu);
86                 unsigned filename_idx = frame_filenames.size();
87                 frame_filenames.push_back(filename);
88                 open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
89         }
90
91         FrameFile &file = open_frame_files[stream_idx];
92         unsigned filename_idx = file.filename_idx;
93         string filename;
94         {
95                 lock_guard<mutex> lock(frame_mu);
96                 filename = frame_filenames[filename_idx];
97         }
98
99         FrameHeaderProto hdr;
100         hdr.set_stream_idx(stream_idx);
101         hdr.set_pts(pts);
102         hdr.set_file_size(size);
103
104         string serialized;
105         if (!hdr.SerializeToString(&serialized)) {
106                 fprintf(stderr, "Frame header serialization failed.\n");
107                 exit(1);
108         }
109         uint32_t len = htonl(serialized.size());
110
111         if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
112                 perror("fwrite");
113                 exit(1);
114         }
115         if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
116                 perror("fwrite");
117                 exit(1);
118         }
119         if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
120                 perror("fwrite");
121                 exit(1);
122         }
123         off_t offset = ftell(file.fp);
124         if (fwrite(data, size, 1, file.fp) != 1) {
125                 perror("fwrite");
126                 exit(1);
127         }
128         fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
129         global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
130
131         FrameOnDisk frame;
132         frame.pts = pts;
133         frame.filename_idx = filename_idx;
134         frame.offset = offset;
135         frame.size = size;
136
137         {
138                 lock_guard<mutex> lock(frame_mu);
139                 assert(stream_idx < MAX_STREAMS);
140                 frames[stream_idx].push_back(frame);
141         }
142
143         if (++file.frames_written_so_far >= 1000) {
144                 size_t size = ftell(file.fp);
145
146                 // Start a new file next time.
147                 if (fclose(file.fp) != 0) {
148                         perror("fclose");
149                         exit(1);
150                 }
151                 open_frame_files.erase(stream_idx);
152
153                 // Write information about all frames in the finished file to SQLite.
154                 // (If we crash before getting to do this, we'll be scanning through
155                 // the file on next startup, and adding it to the database then.)
156                 // NOTE: Since we don't fsync(), we could in theory get broken data
157                 // but with the right size, but it would seem unlikely.
158                 vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
159                 {
160                         lock_guard<mutex> lock(frame_mu);
161                         for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
162                                 for (const FrameOnDisk &frame : frames[stream_idx]) {
163                                         if (frame.filename_idx == filename_idx) {
164                                                 frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
165                                         }
166                                 }
167                         }
168                 }
169
170                 const char *basename = filename.c_str();
171                 while (strchr(basename, '/') != nullptr) {
172                         basename = strchr(basename, '/');
173                 }
174                 db->store_frame_file(basename, size, frames_this_file);
175         }
176
177         return frame;
178 }
179
180 } // namespace
181
182 HTTPD *global_httpd;
183
184 void load_existing_frames();
185 int record_thread_func();
186
187 int main(int argc, char **argv)
188 {
189         parse_flags(argc, argv);
190         if (optind == argc) {
191                 global_flags.stream_source = "multiangle.mp4";
192                 global_flags.slow_down_input = true;
193         } else if (optind + 1 == argc) {
194                 global_flags.stream_source = argv[optind];
195         } else {
196                 usage();
197                 exit(1);
198         }
199
200         string frame_dir = global_flags.working_directory + "/frames";
201
202         struct stat st;
203         if (stat(frame_dir.c_str(), &st) == -1) {
204                 fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
205                 if (mkdir(frame_dir.c_str(), 0777) == -1) {
206                         perror(global_flags.working_directory.c_str());
207                         exit(1);
208                 }
209         }
210
211         avformat_network_init();
212         global_httpd = new HTTPD;
213
214         QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
215
216         QSurfaceFormat fmt;
217         fmt.setDepthBufferSize(0);
218         fmt.setStencilBufferSize(0);
219         fmt.setProfile(QSurfaceFormat::CoreProfile);
220         fmt.setMajorVersion(4);
221         fmt.setMinorVersion(5);
222
223         // Turn off vsync, since Qt generally gives us at most frame rate
224         // (display frequency) / (number of QGLWidgets active).
225         fmt.setSwapInterval(0);
226
227         QSurfaceFormat::setDefaultFormat(fmt);
228
229         QGLFormat::setDefaultFormat(QGLFormat::fromSurfaceFormat(fmt));
230
231         QApplication app(argc, argv);
232         global_share_widget = new QGLWidget();
233         if (!global_share_widget->isValid()) {
234                 fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
235                 exit(1);
236         }
237
238         // Initialize Movit.
239         {
240                 QSurface *surface = create_surface();
241                 QOpenGLContext *context = create_context(surface);
242                 make_current(context, surface);
243                 CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
244                 delete_context(context);
245                 // TODO: Delete the surface, too.
246         }
247
248         load_existing_frames();
249
250         MainWindow main_window;
251         main_window.show();
252
253         global_httpd->add_endpoint("/queue_status", bind(&MainWindow::get_queue_status, &main_window), HTTPD::NO_CORS_POLICY);
254         global_httpd->start(global_flags.http_port);
255
256         init_jpeg_vaapi();
257
258         thread record_thread(record_thread_func);
259
260         int ret = app.exec();
261
262         should_quit = true;
263         record_thread.join();
264         JPEGFrameView::shutdown();
265
266         return ret;
267 }
268
269 void load_frame_file(const char *filename, const string &basename, unsigned filename_idx, DB *db)
270 {
271         struct stat st;
272         if (stat(filename, &st) == -1) {
273                 perror(filename);
274                 exit(1);
275         }
276
277         vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
278         if (!all_frames.empty()) {
279                 // We already had this cached in the database, so no need to look in the file.
280                 for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
281                         if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) {
282                                 frames[frame.stream_idx].push_back(frame.frame);
283                                 start_pts = max(start_pts, frame.frame.pts);
284                         }
285                 }
286                 return;
287         }
288
289         FILE *fp = fopen(filename, "rb");
290         if (fp == nullptr) {
291                 perror(filename);
292                 exit(1);
293         }
294
295         size_t magic_offset = 0;
296         size_t skipped_bytes = 0;
297         while (!feof(fp) && !ferror(fp)) {
298                 int ch = getc(fp);
299                 if (ch == -1) {
300                         break;
301                 }
302                 if (ch != frame_magic[magic_offset++]) {
303                         skipped_bytes += magic_offset;
304                         magic_offset = 0;
305                         continue;
306                 }
307                 if (magic_offset < frame_magic_len) {
308                         // Still reading the magic (hopefully).
309                         continue;
310                 }
311
312                 // OK, found the magic. Try to parse the frame header.
313                 magic_offset = 0;
314
315                 if (skipped_bytes > 0)  {
316                         fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
317                                 filename, skipped_bytes);
318                         skipped_bytes = 0;
319                 }
320
321                 uint32_t len;
322                 if (fread(&len, sizeof(len), 1, fp) != 1) {
323                         fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
324                         break;
325                 }
326
327                 string serialized;
328                 serialized.resize(ntohl(len));
329                 if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
330                         fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
331                         break;
332                 }
333
334                 FrameHeaderProto hdr;
335                 if (!hdr.ParseFromString(serialized)) {
336                         fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
337                         continue;
338                 }
339
340                 FrameOnDisk frame;
341                 frame.pts = hdr.pts();
342                 frame.offset = ftell(fp);
343                 frame.filename_idx = filename_idx;
344                 frame.size = hdr.file_size();
345
346                 if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
347                         fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
348                         continue;
349                 }
350
351                 if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
352                         frames[hdr.stream_idx()].push_back(frame);
353                         start_pts = max(start_pts, hdr.pts());
354                 }
355                 all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
356         }
357
358         if (skipped_bytes > 0) {
359                 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
360                         filename, skipped_bytes);
361         }
362
363         size_t size = ftell(fp);
364         fclose(fp);
365
366         db->store_frame_file(basename, size, all_frames);
367 }
368
369 void load_existing_frames()
370 {
371         QProgressDialog progress("Scanning frame directory...", "Abort", 0, 1);
372         progress.setWindowTitle("Futatabi");
373         progress.setWindowModality(Qt::WindowModal);
374         progress.setMinimumDuration(1000);
375         progress.setMaximum(1);
376         progress.setValue(0);
377
378         string frame_dir = global_flags.working_directory + "/frames";
379         DIR *dir = opendir(frame_dir.c_str());
380         if (dir == nullptr) {
381                 perror("frames/");
382                 start_pts = 0;
383                 return;
384         }
385
386         vector<string> frame_basenames;
387         for ( ;; ) {
388                 errno = 0;
389                 dirent *de = readdir(dir);
390                 if (de == nullptr) {
391                         if (errno != 0) {
392                                 perror("readdir");
393                                 exit(1);
394                         }
395                         break;
396                 }
397
398                 if (de->d_type == DT_REG || de->d_type == DT_LNK) {
399                         string filename = frame_dir + "/" + de->d_name;
400                         frame_filenames.push_back(filename);
401                         frame_basenames.push_back(de->d_name);
402                 }
403
404                 if (progress.wasCanceled()) {
405                         exit(1);
406                 }
407         }
408         closedir(dir);
409
410         progress.setMaximum(frame_filenames.size() + 2);
411         progress.setValue(1);
412
413         progress.setLabelText("Opening database...");
414         DB db(global_flags.working_directory + "/futatabi.db");
415
416         progress.setLabelText("Reading frame files...");
417         progress.setValue(2);
418
419         for (size_t i = 0; i < frame_filenames.size(); ++i) {
420                 load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
421                 progress.setValue(i + 3);
422                 if (progress.wasCanceled()) {
423                         exit(1);
424                 }
425         }
426
427         if (start_pts == -1) {
428                 start_pts = 0;
429         } else {
430                 // Add a gap of one second from the old frames to the new ones.
431                 start_pts += TIMEBASE;
432         }
433
434         for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
435                 sort(frames[stream_idx].begin(), frames[stream_idx].end(),
436                         [](const auto &a, const auto &b) { return a.pts < b.pts; });
437         }
438
439         db.clean_unused_frame_files(frame_basenames);
440 }
441
442 int record_thread_func()
443 {
444         auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
445         if (format_ctx == nullptr) {
446                 fprintf(stderr, "%s: Error opening file\n", global_flags.stream_source.c_str());
447                 return 1;
448         }
449
450         int64_t last_pts = -1;
451         int64_t pts_offset;
452         DB db(global_flags.working_directory + "/futatabi.db");
453
454         while (!should_quit.load()) {
455                 AVPacket pkt;
456                 unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
457                         &pkt, av_packet_unref);
458                 av_init_packet(&pkt);
459                 pkt.data = nullptr;
460                 pkt.size = 0;
461
462                 // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
463                 // right now, should_quit will be ignored if it's hung on I/O.
464                 if (av_read_frame(format_ctx.get(), &pkt) != 0) {
465                         break;
466                 }
467
468                 // Convert pts to our own timebase.
469                 AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
470                 int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
471
472                 // Translate offset into our stream.
473                 if (last_pts == -1) {
474                         pts_offset = start_pts - pts;
475                 }
476                 pts = std::max(pts + pts_offset, start_pts);
477
478                 //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
479                 //      pkt.stream_index, pts, pkt.size);
480                 FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
481
482                 post_to_main_thread([pkt, frame] {
483                         if (pkt.stream_index == 0) {
484                                 global_mainwindow->ui->input1_display->setFrame(pkt.stream_index, frame);
485                         } else if (pkt.stream_index == 1) {
486                                 global_mainwindow->ui->input2_display->setFrame(pkt.stream_index, frame);
487                         } else if (pkt.stream_index == 2) {
488                                 global_mainwindow->ui->input3_display->setFrame(pkt.stream_index, frame);
489                         } else if (pkt.stream_index == 3) {
490                                 global_mainwindow->ui->input4_display->setFrame(pkt.stream_index, frame);
491                         }
492                 });
493
494                 if (last_pts != -1 && global_flags.slow_down_input) {
495                         this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
496                 }
497                 last_pts = pts;
498                 current_pts = pts;
499         }
500
501         return 0;
502 }