]> git.sesse.net Git - nageru/blob - main.cpp
8f9530459149cc77dbf3389ca823b0267d98b948
[nageru] / main.cpp
1 #include <assert.h>
2 #include <arpa/inet.h>
3 #include <atomic>
4 #include <chrono>
5 #include <condition_variable>
6 #include <dirent.h>
7 #include <getopt.h>
8 #include <memory>
9 #include <mutex>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string>
13 #include <sys/stat.h>
14 #include <sys/types.h>
15 #include <thread>
16 #include <vector>
17
18 extern "C" {
19 #include <libavformat/avformat.h>
20 }
21
22 #include "clip_list.h"
23 #include "context.h"
24 #include "defs.h"
25 #include "disk_space_estimator.h"
26 #include "ffmpeg_raii.h"
27 #include "flags.h"
28 #include "frame_on_disk.h"
29 #include "frame.pb.h"
30 #include "httpd.h"
31 #include "mainwindow.h"
32 #include "player.h"
33 #include "post_to_main_thread.h"
34 #include "ref_counted_gl_sync.h"
35 #include "timebase.h"
36 #include "ui_mainwindow.h"
37 #include "vaapi_jpeg_decoder.h"
38
39 #include <QApplication>
40 #include <QGLFormat>
41 #include <QSurfaceFormat>
42 #include <movit/init.h>
43 #include <movit/util.h>
44
45 using namespace std;
46 using namespace std::chrono;
47
48 constexpr char frame_magic[] = "Ftbifrm0";
49 constexpr size_t frame_magic_len = 8;
50
51 mutex RefCountedGLsync::fence_lock;
52 atomic<bool> should_quit{false};
53
54 int64_t start_pts = -1;
55
56 // TODO: Replace by some sort of GUI control, I guess.
57 int64_t current_pts = 0;
58
59 struct FrameFile {
60         FILE *fp = nullptr;
61         unsigned filename_idx;
62         size_t frames_written_so_far = 0;
63 };
64 std::map<int, FrameFile> open_frame_files;
65
66 mutex frame_mu;
67 vector<FrameOnDisk> frames[MAX_STREAMS];  // Under frame_mu.
68 vector<string> frame_filenames;  // Under frame_mu.
69
70 namespace {
71
72 FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
73 {
74         if (open_frame_files.count(stream_idx) == 0) {
75                 char filename[256];
76                 snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
77                         global_flags.working_directory.c_str(), stream_idx, pts);
78                 FILE *fp = fopen(filename, "wb");
79                 if (fp == nullptr) {
80                         perror(filename);
81                         exit(1);
82                 }
83
84                 lock_guard<mutex> lock(frame_mu);
85                 unsigned filename_idx = frame_filenames.size();
86                 frame_filenames.push_back(filename);
87                 open_frame_files[stream_idx] = FrameFile{ fp, filename_idx, 0 };
88         }
89
90         FrameFile &file = open_frame_files[stream_idx];
91         unsigned filename_idx = file.filename_idx;
92         string filename;
93         {
94                 lock_guard<mutex> lock(frame_mu);
95                 filename = frame_filenames[filename_idx];
96         }
97
98         FrameHeaderProto hdr;
99         hdr.set_stream_idx(stream_idx);
100         hdr.set_pts(pts);
101         hdr.set_file_size(size);
102
103         string serialized;
104         if (!hdr.SerializeToString(&serialized)) {
105                 fprintf(stderr, "Frame header serialization failed.\n");
106                 exit(1);
107         }
108         uint32_t len = htonl(serialized.size());
109
110         if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
111                 perror("fwrite");
112                 exit(1);
113         }
114         if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
115                 perror("fwrite");
116                 exit(1);
117         }
118         if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
119                 perror("fwrite");
120                 exit(1);
121         }
122         off_t offset = ftell(file.fp);
123         if (fwrite(data, size, 1, file.fp) != 1) {
124                 perror("fwrite");
125                 exit(1);
126         }
127         fflush(file.fp);  // No fsync(), though. We can accept losing a few frames.
128         global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
129
130         FrameOnDisk frame;
131         frame.pts = pts;
132         frame.filename_idx = filename_idx;
133         frame.offset = offset;
134         frame.size = size;
135
136         {
137                 lock_guard<mutex> lock(frame_mu);
138                 assert(stream_idx < MAX_STREAMS);
139                 frames[stream_idx].push_back(frame);
140         }
141
142         if (++file.frames_written_so_far >= 1000) {
143                 size_t size = ftell(file.fp);
144
145                 // Start a new file next time.
146                 if (fclose(file.fp) != 0) {
147                         perror("fclose");
148                         exit(1);
149                 }
150                 open_frame_files.erase(stream_idx);
151
152                 // Write information about all frames in the finished file to SQLite.
153                 // (If we crash before getting to do this, we'll be scanning through
154                 // the file on next startup, and adding it to the database then.)
155                 // NOTE: Since we don't fsync(), we could in theory get broken data
156                 // but with the right size, but it would seem unlikely.
157                 vector<DB::FrameOnDiskAndStreamIdx> frames_this_file;
158                 {
159                         lock_guard<mutex> lock(frame_mu);
160                         for (size_t stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
161                                 for (const FrameOnDisk &frame : frames[stream_idx]) {
162                                         if (frame.filename_idx == filename_idx) {
163                                                 frames_this_file.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(stream_idx) });
164                                         }
165                                 }
166                         }
167                 }
168                 db->store_frame_file(filename, size, frames_this_file);
169         }
170
171         return frame;
172 }
173
174 } // namespace
175
176 HTTPD *global_httpd;
177
178 void load_existing_frames();
179 int record_thread_func();
180
181 int main(int argc, char **argv)
182 {
183         parse_flags(argc, argv);
184         if (optind == argc) {
185                 global_flags.stream_source = "multiangle.mp4";
186                 global_flags.slow_down_input = true;
187         } else if (optind + 1 == argc) {
188                 global_flags.stream_source = argv[optind];
189         } else {
190                 usage();
191                 exit(1);
192         }
193
194         string frame_dir = global_flags.working_directory + "/frames";
195
196         struct stat st;
197         if (stat(frame_dir.c_str(), &st) == -1) {
198                 fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
199                 if (mkdir(frame_dir.c_str(), 0777) == -1) {
200                         perror(global_flags.working_directory.c_str());
201                         exit(1);
202                 }
203         }
204
205         avformat_network_init();
206         global_httpd = new HTTPD;
207
208         QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
209
210         QSurfaceFormat fmt;
211         fmt.setDepthBufferSize(0);
212         fmt.setStencilBufferSize(0);
213         fmt.setProfile(QSurfaceFormat::CoreProfile);
214         fmt.setMajorVersion(4);
215         fmt.setMinorVersion(5);
216
217         // Turn off vsync, since Qt generally gives us at most frame rate
218         // (display frequency) / (number of QGLWidgets active).
219         fmt.setSwapInterval(0);
220
221         QSurfaceFormat::setDefaultFormat(fmt);
222
223         QGLFormat::setDefaultFormat(QGLFormat::fromSurfaceFormat(fmt));
224
225         QApplication app(argc, argv);
226         global_share_widget = new QGLWidget();
227         if (!global_share_widget->isValid()) {
228                 fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
229                 exit(1);
230         }
231
232         // Initialize Movit.
233         {
234                 QSurface *surface = create_surface();
235                 QOpenGLContext *context = create_context(surface);
236                 make_current(context, surface);
237                 CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
238                 delete_context(context);
239                 // TODO: Delete the surface, too.
240         }
241
242         MainWindow main_window;
243         main_window.show();
244
245         global_httpd->add_endpoint("/queue_status", bind(&MainWindow::get_queue_status, &main_window), HTTPD::NO_CORS_POLICY);
246         global_httpd->start(global_flags.http_port);
247
248         init_jpeg_vaapi();
249
250         load_existing_frames();
251         thread record_thread(record_thread_func);
252
253         int ret = app.exec();
254
255         should_quit = true;
256         record_thread.join();
257         JPEGFrameView::shutdown();
258
259         return ret;
260 }
261
262 void load_frame_file(const char *filename, unsigned filename_idx, DB *db)
263 {
264         struct stat st;
265         if (stat(filename, &st) == -1) {
266                 perror(filename);
267                 exit(1);
268         }
269
270         vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(filename, st.st_size, filename_idx);
271         if (!all_frames.empty()) {
272                 // We already had this cached in the database, so no need to look in the file.
273                 for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
274                         if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) {
275                                 frames[frame.stream_idx].push_back(frame.frame);
276                                 start_pts = max(start_pts, frame.frame.pts);
277                         }
278                 }
279                 return;
280         }
281
282         FILE *fp = fopen(filename, "rb");
283         if (fp == nullptr) {
284                 perror(filename);
285                 exit(1);
286         }
287
288         size_t magic_offset = 0;
289         size_t skipped_bytes = 0;
290         while (!feof(fp) && !ferror(fp)) {
291                 int ch = getc(fp);
292                 if (ch == -1) {
293                         break;
294                 }
295                 if (ch != frame_magic[magic_offset++]) {
296                         skipped_bytes += magic_offset;
297                         magic_offset = 0;
298                         continue;
299                 }
300                 if (magic_offset < frame_magic_len) {
301                         // Still reading the magic (hopefully).
302                         continue;
303                 }
304
305                 // OK, found the magic. Try to parse the frame header.
306                 magic_offset = 0;
307
308                 if (skipped_bytes > 0)  {
309                         fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
310                                 filename, skipped_bytes);
311                         skipped_bytes = 0;
312                 }
313
314                 uint32_t len;
315                 if (fread(&len, sizeof(len), 1, fp) != 1) {
316                         fprintf(stderr, "WARNING: %s: Short read when getting length.\n", filename);
317                         break;
318                 }
319
320                 string serialized;
321                 serialized.resize(ntohl(len));
322                 if (fread(&serialized[0], serialized.size(), 1, fp) != 1) {
323                         fprintf(stderr, "WARNING: %s: Short read when reading frame header (%zu bytes).\n", filename, serialized.size());
324                         break;
325                 }
326
327                 FrameHeaderProto hdr;
328                 if (!hdr.ParseFromString(serialized)) {
329                         fprintf(stderr, "WARNING: %s: Corrupted frame header.\n", filename);
330                         continue;
331                 }
332
333                 FrameOnDisk frame;
334                 frame.pts = hdr.pts();
335                 frame.offset = ftell(fp);
336                 frame.filename_idx = filename_idx;
337                 frame.size = hdr.file_size();
338
339                 if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
340                         fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
341                         continue;
342                 }
343
344                 if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
345                         frames[hdr.stream_idx()].push_back(frame);
346                         start_pts = max(start_pts, hdr.pts());
347                 }
348                 all_frames.emplace_back(DB::FrameOnDiskAndStreamIdx{ frame, unsigned(hdr.stream_idx()) });
349         }
350
351         if (skipped_bytes > 0) {
352                 fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
353                         filename, skipped_bytes);
354         }
355
356         size_t size = ftell(fp);
357         fclose(fp);
358
359         db->store_frame_file(filename, size, all_frames);
360 }
361
362 void load_existing_frames()
363 {
364         DB db(global_flags.working_directory + "/futatabi.db");
365
366         string frame_dir = global_flags.working_directory + "/frames";
367         DIR *dir = opendir(frame_dir.c_str());
368         if (dir == nullptr) {
369                 perror("frames/");
370                 start_pts = 0;
371                 return;
372         }
373
374         for ( ;; ) {
375                 errno = 0;
376                 dirent *de = readdir(dir);
377                 if (de == nullptr) {
378                         if (errno != 0) {
379                                 perror("readdir");
380                                 exit(1);
381                         }
382                         break;
383                 }
384
385                 if (de->d_type == DT_REG) {
386                         string filename = frame_dir + "/" + de->d_name;
387                         load_frame_file(filename.c_str(), frame_filenames.size(), &db);
388                         frame_filenames.push_back(filename);
389                 }
390         }
391
392         closedir(dir);
393
394         if (start_pts == -1) {
395                 start_pts = 0;
396         } else {
397                 // Add a gap of one second from the old frames to the new ones.
398                 start_pts += TIMEBASE;
399         }
400
401         for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
402                 sort(frames[stream_idx].begin(), frames[stream_idx].end(),
403                         [](const auto &a, const auto &b) { return a.pts < b.pts; });
404         }
405 }
406
407 int record_thread_func()
408 {
409         auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
410         if (format_ctx == nullptr) {
411                 fprintf(stderr, "%s: Error opening file\n", global_flags.stream_source.c_str());
412                 return 1;
413         }
414
415         int64_t last_pts = -1;
416         int64_t pts_offset;
417         DB db(global_flags.working_directory + "/futatabi.db");
418
419         while (!should_quit.load()) {
420                 AVPacket pkt;
421                 unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
422                         &pkt, av_packet_unref);
423                 av_init_packet(&pkt);
424                 pkt.data = nullptr;
425                 pkt.size = 0;
426
427                 // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
428                 // right now, should_quit will be ignored if it's hung on I/O.
429                 if (av_read_frame(format_ctx.get(), &pkt) != 0) {
430                         break;
431                 }
432
433                 // Convert pts to our own timebase.
434                 AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
435                 int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
436
437                 // Translate offset into our stream.
438                 if (last_pts == -1) {
439                         pts_offset = start_pts - pts;
440                 }
441                 pts = std::max(pts + pts_offset, start_pts);
442
443                 //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
444                 //      pkt.stream_index, pts, pkt.size);
445                 FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
446
447                 post_to_main_thread([pkt, frame] {
448                         if (pkt.stream_index == 0) {
449                                 global_mainwindow->ui->input1_display->setFrame(pkt.stream_index, frame);
450                         } else if (pkt.stream_index == 1) {
451                                 global_mainwindow->ui->input2_display->setFrame(pkt.stream_index, frame);
452                         } else if (pkt.stream_index == 2) {
453                                 global_mainwindow->ui->input3_display->setFrame(pkt.stream_index, frame);
454                         } else if (pkt.stream_index == 3) {
455                                 global_mainwindow->ui->input4_display->setFrame(pkt.stream_index, frame);
456                         }
457                 });
458
459                 if (last_pts != -1 && global_flags.slow_down_input) {
460                         this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
461                 }
462                 last_pts = pts;
463                 current_pts = pts;
464         }
465
466         return 0;
467 }