]> git.sesse.net Git - nageru/blob - futatabi/main.cpp
7343df189a3d86aa41ee357c9841aa60d723dab2
[nageru] / futatabi / main.cpp
1 #include <assert.h>
2 #include <arpa/inet.h>
3 #include <atomic>
4 #include <chrono>
5 #include <condition_variable>
6 #include <dirent.h>
7 #include <getopt.h>
8 #include <memory>
9 #include <mutex>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string>
13 #include <sys/stat.h>
14 #include <sys/types.h>
15 #include <thread>
16 #include <unistd.h>
17 #include <vector>
18
19 extern "C" {
20 #include <libavformat/avformat.h>
21 }
22
23 #include "clip_list.h"
24 #include "shared/context.h"
25 #include "defs.h"
26 #include "shared/disk_space_estimator.h"
27 #include "shared/ffmpeg_raii.h"
28 #include "flags.h"
29 #include "frame_on_disk.h"
30 #include "frame.pb.h"
31 #include "shared/httpd.h"
32 #include "mainwindow.h"
33 #include "player.h"
34 #include "shared/post_to_main_thread.h"
35 #include "shared/ref_counted_gl_sync.h"
36 #include "shared/timebase.h"
37 #include "ui_mainwindow.h"
38 #include "vaapi_jpeg_decoder.h"
39
40 #include <QApplication>
41 #include <QGLFormat>
42 #include <QSurfaceFormat>
43 #include <QProgressDialog>
44 #include <movit/init.h>
45 #include <movit/util.h>
46
47 using namespace std;
48 using namespace std::chrono;
49
50 constexpr char frame_magic[] = "Ftbifrm0";
51 constexpr size_t frame_magic_len = 8;
52
53 mutex RefCountedGLsync::fence_lock;
54 atomic<bool> should_quit{false};
55
56 int64_t start_pts = -1;
57
58 // TODO: Replace by some sort of GUI control, I guess.
59 int64_t current_pts = 0;
60
61 struct FrameFile {
62         FILE *fp = nullptr;
63         unsigned filename_idx;
64         size_t frames_written_so_far = 0;
65 };
66 std::map<int, FrameFile> open_frame_files;
67
68 mutex frame_mu;
69 vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
70 vector<string> frame_filenames;  // Under frame_mu.
71
72 namespace {
73
74 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
75 {
76         if (open_frame_files.count(stream_idx) == 0) {
77                 char filename[256];
78                 snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
79                         global_flags.working_directory.c_str(), stream_idx, pts);
80                 FILE *fp = fopen(filename, "wb");
81                 if (fp == nullptr) {
82                         perror(filename);
83                         exit(1);
84                 }
85
86                 lock_guard<mutex> lock(frame_mu);
87                 unsigned filename_idx = frame_filenames.size();
88                 frame_filenames.push_back(filename);
89                 open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
90         }
91
92         FrameFile &file = open_frame_files[stream_idx];
93         unsigned filename_idx = file.filename_idx;
94         string filename;
95         {
96                 lock_guard<mutex> lock(frame_mu);
97                 filename = frame_filenames[filename_idx];
98         }
99
100         FrameHeaderProto hdr;
101         hdr.set_stream_idx(stream_idx);
102         hdr.set_pts(pts);
103         hdr.set_file_size(size);
104
105         string serialized;
106         if (!hdr.SerializeToString(&serialized)) {
107                 fprintf(stderr, "Frame header serialization failed.\n");
108                 exit(1);
109         }
110         uint32_t len = htonl(serialized.size());
111
112         if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
113                 perror("fwrite");
114                 exit(1);
115         }
116         if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
117                 perror("fwrite");
118                 exit(1);
119         }
120         if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
121                 perror("fwrite");
122                 exit(1);
123         }
124         off_t offset = ftell(file.fp);
125         if (fwrite(data, size, 1, file.fp) != 1) {
126                 perror("fwrite");
127                 exit(1);
128         }
129         fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
130         global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
131
132         FrameOnDisk frame;
133         frame.pts = pts;
134         frame.filename_idx = filename_idx;
135         frame.offset = offset;
136         frame.size = size;
137
138         {
139                 lock_guard<mutex> lock(frame_mu);
140                 assert(stream_idx < MAX_STREAMS);
141                 frames[stream_idx].push_back(frame);
142         }
143
144         if (++file.frames_written_so_far >= 1000) {
145                 size_t size = ftell(file.fp);
146
147                 // Start a new file next time.
148                 if (fclose(file.fp) != 0) {
149                         perror("fclose");
150                         exit(1);
151                 }
152                 open_frame_files.erase(stream_idx);
153
154                 // Write information about all frames in the finished file to SQLite.
155                 // (If we crash before getting to do this, we'll be scanning through
156                 // the file on next startup, and adding it to the database then.)
157                 // NOTE: Since we don't fsync(), we could in theory get broken data
158                 // but with the right size, but it would seem unlikely.
159                 vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
160                 {
161                         lock_guard<mutex> lock(frame_mu);
162                         for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
163                                 for (const FrameOnDisk &frame : frames[stream_idx]) {
164                                         if (frame.filename_idx == filename_idx) {
165                                                 frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
166                                         }
167                                 }
168                         }
169                 }
170
171                 const char *basename = filename.c_str();
172                 while (strchr(basename, '/') != nullptr) {
173                         basename = strchr(basename, '/') + 1;
174                 }
175                 db->store_frame_file(basename, size, frames_this_file);
176         }
177
178         return frame;
179 }
180
181 } // namespace
182
183 HTTPD *global_httpd;
184
185 void load_existing_frames();
186 int record_thread_func();
187
188 int main(int argc, char **argv)
189 {
190         parse_flags(argc, argv);
191         if (optind == argc) {
192                 global_flags.stream_source = "multiangle.mp4";
193                 global_flags.slow_down_input = true;
194         } else if (optind + 1 == argc) {
195                 global_flags.stream_source = argv[optind];
196         } else {
197                 usage();
198                 exit(1);
199         }
200
201         string frame_dir = global_flags.working_directory + "/frames";
202
203         struct stat st;
204         if (stat(frame_dir.c_str(), &st) == -1) {
205                 fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
206                 if (mkdir(frame_dir.c_str(), 0777) == -1) {
207                         perror(global_flags.working_directory.c_str());
208                         exit(1);
209                 }
210         }
211
212         avformat_network_init();
213         global_httpd = new HTTPD;
214
215         QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
216
217         QSurfaceFormat fmt;
218         fmt.setDepthBufferSize(0);
219         fmt.setStencilBufferSize(0);
220         fmt.setProfile(QSurfaceFormat::CoreProfile);
221         fmt.setMajorVersion(4);
222         fmt.setMinorVersion(5);
223
224         // Turn off vsync, since Qt generally gives us at most frame rate
225         // (display frequency) / (number of QGLWidgets active).
226         fmt.setSwapInterval(0);
227
228         QSurfaceFormat::setDefaultFormat(fmt);
229
230         QGLFormat::setDefaultFormat(QGLFormat::fromSurfaceFormat(fmt));
231
232         QApplication app(argc, argv);
233         global_share_widget = new QGLWidget();
234         if (!global_share_widget->isValid()) {
235                 fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
236                 exit(1);
237         }
238
239         // Initialize Movit.
240         {
241                 QSurface *surface = create_surface();
242                 QOpenGLContext *context = create_context(surface);
243                 make_current(context, surface);
244                 CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
245                 delete_context(context);
246                 // TODO: Delete the surface, too.
247         }
248
249         load_existing_frames();
250
251         MainWindow main_window;
252         main_window.show();
253
254         global_httpd->add_endpoint("/queue_status", bind(&MainWindow::get_queue_status, &main_window), HTTPD::NO_CORS_POLICY);
255         global_httpd->start(global_flags.http_port);
256
257         init_jpeg_vaapi();
258
259         thread record_thread(record_thread_func);
260
261         int ret = app.exec();
262
263         should_quit = true;
264         record_thread.join();
265         JPEGFrameView::shutdown();
266
267         return ret;
268 }
269
270 void load_frame_file(const char *filename, const string &basename, unsigned filename_idx, DB *db)
271 {
272         struct stat st;
273         if (stat(filename, &st) == -1) {
274                 perror(filename);
275                 exit(1);
276         }
277
278         vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
279         if (!all_frames.empty()) {
280                 // We already had this cached in the database, so no need to look in the file.
281                 for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
282                         if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) {
283                                 frames[frame.stream_idx].push_back(frame.frame);
284                                 start_pts = max(start_pts, frame.frame.pts);
285                         }
286                 }
287                 return;
288         }
289
290         FILE *fp = fopen(filename, "rb");
291         if (fp == nullptr) {
292                 perror(filename);
293                 exit(1);
294         }
295
296         size_t magic_offset = 0;
297         size_t skipped_bytes = 0;
298         while (!feof(fp) && !ferror(fp)) {
299                 int ch = getc(fp);
300                 if (ch == -1) {
301                         break;
302                 }
303                 if (ch != frame_magic[magic_offset++]) {
304                         skipped_bytes += magic_offset;
305                         magic_offset = 0;
306                         continue;
307                 }
308                 if (magic_offset < frame_magic_len) {
309                         // Still reading the magic (hopefully).
310                         continue;
311                 }
312
313                 // OK, found the magic. Try to parse the frame header.
314                 magic_offset = 0;
315
316                 if (skipped_bytes > 0)  {
317                         fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
318                                 filename, skipped_bytes);
319                         skipped_bytes = 0;
320                 }
321
322                 uint32_t len;
323                 if (fread(&len, sizeof(len), 1, fp) != 1) {
324                         fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
325                         break;
326                 }
327
328                 string serialized;
329                 serialized.resize(ntohl(len));
330                 if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
331                         fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
332                         break;
333                 }
334
335                 FrameHeaderProto hdr;
336                 if (!hdr.ParseFromString(serialized)) {
337                         fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
338                         continue;
339                 }
340
341                 FrameOnDisk frame;
342                 frame.pts = hdr.pts();
343                 frame.offset = ftell(fp);
344                 frame.filename_idx = filename_idx;
345                 frame.size = hdr.file_size();
346
347                 if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
348                         fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
349                         continue;
350                 }
351
352                 if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
353                         frames[hdr.stream_idx()].push_back(frame);
354                         start_pts = max(start_pts, hdr.pts());
355                 }
356                 all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
357         }
358
359         if (skipped_bytes > 0) {
360                 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
361                         filename, skipped_bytes);
362         }
363
364         size_t size = ftell(fp);
365         fclose(fp);
366
367         db->store_frame_file(basename, size, all_frames);
368 }
369
370 void load_existing_frames()
371 {
372         QProgressDialog progress("Scanning frame directory...", "Abort", 0, 1);
373         progress.setWindowTitle("Futatabi");
374         progress.setWindowModality(Qt::WindowModal);
375         progress.setMinimumDuration(1000);
376         progress.setMaximum(1);
377         progress.setValue(0);
378
379         string frame_dir = global_flags.working_directory + "/frames";
380         DIR *dir = opendir(frame_dir.c_str());
381         if (dir == nullptr) {
382                 perror("frames/");
383                 start_pts = 0;
384                 return;
385         }
386
387         vector<string> frame_basenames;
388         for ( ;; ) {
389                 errno = 0;
390                 dirent *de = readdir(dir);
391                 if (de == nullptr) {
392                         if (errno != 0) {
393                                 perror("readdir");
394                                 exit(1);
395                         }
396                         break;
397                 }
398
399                 if (de->d_type == DT_REG || de->d_type == DT_LNK) {
400                         string filename = frame_dir + "/" + de->d_name;
401                         frame_filenames.push_back(filename);
402                         frame_basenames.push_back(de->d_name);
403                 }
404
405                 if (progress.wasCanceled()) {
406                         exit(1);
407                 }
408         }
409         closedir(dir);
410
411         progress.setMaximum(frame_filenames.size() + 2);
412         progress.setValue(1);
413
414         progress.setLabelText("Opening database...");
415         DB db(global_flags.working_directory + "/futatabi.db");
416
417         progress.setLabelText("Reading frame files...");
418         progress.setValue(2);
419
420         for (size_t i = 0; i < frame_filenames.size(); ++i) {
421                 load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
422                 progress.setValue(i + 3);
423                 if (progress.wasCanceled()) {
424                         exit(1);
425                 }
426         }
427
428         if (start_pts == -1) {
429                 start_pts = 0;
430         } else {
431                 // Add a gap of one second from the old frames to the new ones.
432                 start_pts += TIMEBASE;
433         }
434         current_pts = start_pts;
435
436         for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
437                 sort(frames[stream_idx].begin(), frames[stream_idx].end(),
438                         [](const auto &a, const auto &b) { return a.pts < b.pts; });
439         }
440
441         db.clean_unused_frame_files(frame_basenames);
442 }
443
444 int record_thread_func()
445 {
446         pthread_setname_np(pthread_self(), "ReceiveFrames");
447
448         int64_t pts_offset;
449         DB db(global_flags.working_directory + "/futatabi.db");
450
451         while (!should_quit.load()) {
452                 auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
453                 if (format_ctx == nullptr) {
454                         fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
455                         sleep(1);
456                         continue;
457                 }
458
459                 int64_t last_pts = -1;
460
461                 while (!should_quit.load()) {
462                         AVPacket pkt;
463                         unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
464                                 &pkt, av_packet_unref);
465                         av_init_packet(&pkt);
466                         pkt.data = nullptr;
467                         pkt.size = 0;
468
469                         // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
470                         // right now, should_quit will be ignored if it's hung on I/O.
471                         if (av_read_frame(format_ctx.get(), &pkt) != 0) {
472                                 break;
473                         }
474
475                         // Convert pts to our own timebase.
476                         AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
477                         int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
478
479                         // Translate offset into our stream.
480                         if (last_pts == -1) {
481                                 pts_offset = start_pts - pts;
482                         }
483                         pts = std::max(pts + pts_offset, start_pts);
484
485                         //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
486                         //      pkt.stream_index, pts, pkt.size);
487                         FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
488
489                         post_to_main_thread([pkt, frame] {
490                                 global_mainwindow->display_frame(pkt.stream_index, frame);
491                         });
492
493                         if (last_pts != -1 && global_flags.slow_down_input) {
494                                 this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
495                         }
496                         last_pts = pts;
497                         current_pts = pts;
498                 }
499
500                 fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
501                 sleep(1);
502
503                 start_pts = last_pts + TIMEBASE;
504         }
505         return 0;
506 }