-#include <assert.h>
#include <arpa/inet.h>
+#include <assert.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
}
#include "clip_list.h"
-#include "shared/context.h"
#include "defs.h"
-#include "shared/disk_space_estimator.h"
-#include "shared/ffmpeg_raii.h"
#include "flags.h"
-#include "frame_on_disk.h"
#include "frame.pb.h"
-#include "shared/httpd.h"
+#include "frame_on_disk.h"
#include "mainwindow.h"
#include "player.h"
+#include "shared/context.h"
+#include "shared/disk_space_estimator.h"
+#include "shared/ffmpeg_raii.h"
+#include "shared/httpd.h"
+#include "shared/metrics.h"
#include "shared/post_to_main_thread.h"
#include "shared/ref_counted_gl_sync.h"
#include "shared/timebase.h"
#include <QApplication>
#include <QGLFormat>
-#include <QSurfaceFormat>
#include <QProgressDialog>
+#include <QSurfaceFormat>
#include <movit/init.h>
#include <movit/util.h>
constexpr size_t frame_magic_len = 8;
mutex RefCountedGLsync::fence_lock;
-atomic<bool> should_quit{false};
+atomic<bool> should_quit{ false };
int64_t start_pts = -1;
vector<FrameOnDisk> frames[MAX_STREAMS]; // Under frame_mu.
vector<string> frame_filenames; // Under frame_mu.
+atomic<int64_t> metric_received_frames[MAX_STREAMS]{ { 0 } };
+Summary metric_received_frame_size_bytes;
+
namespace {
-FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
+FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, vector<uint32_t> audio, DB *db)
{
if (open_frame_files.count(stream_idx) == 0) {
char filename[256];
- snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
- global_flags.working_directory.c_str(), stream_idx, pts);
+ snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09" PRId64 ".frames",
+ global_flags.working_directory.c_str(), stream_idx, pts);
FILE *fp = fopen(filename, "wb");
if (fp == nullptr) {
perror(filename);
- exit(1);
+ abort();
}
lock_guard<mutex> lock(frame_mu);
hdr.set_stream_idx(stream_idx);
hdr.set_pts(pts);
hdr.set_file_size(size);
+ hdr.set_audio_size(audio.size() * sizeof(audio[0]));
string serialized;
if (!hdr.SerializeToString(&serialized)) {
fprintf(stderr, "Frame header serialization failed.\n");
- exit(1);
+ abort();
}
uint32_t len = htonl(serialized.size());
if (fwrite(frame_magic, frame_magic_len, 1, file.fp) != 1) {
perror("fwrite");
- exit(1);
+ abort();
}
if (fwrite(&len, sizeof(len), 1, file.fp) != 1) {
perror("fwrite");
- exit(1);
+ abort();
}
if (fwrite(serialized.data(), serialized.size(), 1, file.fp) != 1) {
perror("fwrite");
- exit(1);
+ abort();
}
off_t offset = ftell(file.fp);
if (fwrite(data, size, 1, file.fp) != 1) {
perror("fwrite");
- exit(1);
+ abort();
+ }
+ if (audio.size() > 0) {
+ if (fwrite(audio.data(), hdr.audio_size(), 1, file.fp) != 1) {
+ perror("fwrite");
+ exit(1);
+ }
}
fflush(file.fp); // No fsync(), though. We can accept losing a few frames.
global_disk_space_estimator->report_write(filename, 8 + sizeof(len) + serialized.size() + size, pts);
frame.filename_idx = filename_idx;
frame.offset = offset;
frame.size = size;
+ frame.audio_size = audio.size() * sizeof(audio[0]);
{
lock_guard<mutex> lock(frame_mu);
frames[stream_idx].push_back(frame);
}
- if (++file.frames_written_so_far >= 1000) {
+ if (++file.frames_written_so_far >= FRAMES_PER_FILE) {
size_t size = ftell(file.fp);
// Start a new file next time.
if (fclose(file.fp) != 0) {
perror("fclose");
- exit(1);
+ abort();
}
open_frame_files.erase(stream_idx);
return frame;
}
-} // namespace
+} // namespace
HTTPD *global_httpd;
global_flags.stream_source = argv[optind];
} else {
usage();
- exit(1);
+ abort();
}
string frame_dir = global_flags.working_directory + "/frames";
fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
} else if (errno != EEXIST) {
perror(global_flags.working_directory.c_str());
- exit(1);
+ abort();
}
avformat_network_init();
+ global_metrics.set_prefix("futatabi");
global_httpd = new HTTPD;
+ global_metrics.remove("num_connected_multicam_clients");
QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
global_share_widget = new QGLWidget();
if (!global_share_widget->isValid()) {
fprintf(stderr, "Failed to initialize OpenGL. Futatabi needs at least OpenGL 4.5 to function properly.\n");
- exit(1);
+ abort();
}
// Initialize Movit.
QOpenGLContext *context = create_context(surface);
if (!make_current(context, surface)) {
printf("oops\n");
- exit(1);
+ abort();
}
CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
delete_context(context);
load_existing_frames();
+ for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
+ if (!frames[stream_idx].empty()) {
+ assert(start_pts > frames[stream_idx].back().pts);
+ }
+ }
+
MainWindow main_window;
main_window.show();
should_quit = true;
record_thread.join();
- JPEGFrameView::shutdown();
return ret;
}
struct stat st;
if (stat(filename, &st) == -1) {
perror(filename);
- exit(1);
+ abort();
}
vector<DB::FrameOnDiskAndStreamIdx> all_frames = db->load_frame_file(basename, st.st_size, filename_idx);
FILE *fp = fopen(filename, "rb");
if (fp == nullptr) {
perror(filename);
- exit(1);
+ abort();
+ }
+
+ // Find the actual length of the file, since fseek() past the end of the file
+ // will succeed without an error.
+ if (fseek(fp, 0, SEEK_END) == -1) {
+ perror("fseek(SEEK_END)");
+ abort();
+ }
+ off_t file_len = ftell(fp);
+ if (fseek(fp, 0, SEEK_SET) == -1) {
+ perror("fseek(SEEK_SET)");
+ abort();
}
size_t magic_offset = 0;
// OK, found the magic. Try to parse the frame header.
magic_offset = 0;
- if (skipped_bytes > 0) {
+ if (skipped_bytes > 0) {
fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
- filename, skipped_bytes);
+ filename, skipped_bytes);
skipped_bytes = 0;
}
}
frame.filename_idx = filename_idx;
frame.size = hdr.file_size();
+ frame.audio_size = hdr.audio_size();
- if (fseek(fp, frame.offset + frame.size, SEEK_SET) == -1) {
+ if (frame.offset + frame.size + frame.audio_size > file_len ||
+ fseek(fp, frame.offset + frame.size + frame.audio_size, SEEK_SET) == -1) {
fprintf(stderr, "WARNING: %s: Could not seek past frame (probably truncated).\n", filename);
- continue;
+ break;
}
if (hdr.stream_idx() >= 0 && hdr.stream_idx() < MAX_STREAMS) {
if (skipped_bytes > 0) {
fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
- filename, skipped_bytes);
+ filename, skipped_bytes);
}
off_t size = ftell(fp);
}
vector<string> frame_basenames;
- for ( ;; ) {
+ for (;;) {
errno = 0;
dirent *de = readdir(dir);
if (de == nullptr) {
if (errno != 0) {
perror("readdir");
- exit(1);
+ abort();
}
break;
}
}
if (progress.wasCanceled()) {
- exit(1);
+ abort();
}
}
closedir(dir);
load_frame_file(frame_filenames[i].c_str(), frame_basenames[i], i, &db);
progress.setValue(i + 3);
if (progress.wasCanceled()) {
- exit(1);
+ abort();
}
}
for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
sort(frames[stream_idx].begin(), frames[stream_idx].end(),
- [](const auto &a, const auto &b) { return a.pts < b.pts; });
+ [](const auto &a, const auto &b) { return a.pts < b.pts; });
}
db.clean_unused_frame_files(frame_basenames);
void record_thread_func()
{
+ for (unsigned i = 0; i < MAX_STREAMS; ++i) {
+ global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]);
+ }
+ global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes);
+
if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
// Save the user from some repetitive messages.
return;
pthread_setname_np(pthread_self(), "ReceiveFrames");
- int64_t pts_offset;
+ int64_t pts_offset = 0; // Needs to be initialized due to a spurious GCC warning.
DB db(global_flags.working_directory + "/futatabi.db");
while (!should_quit.load()) {
continue;
}
- int64_t last_pts = -1;
+ // Match any audio streams to video streams, sequentially.
+ vector<int> video_stream_idx, audio_stream_idx;
+ for (unsigned i = 0; i < format_ctx->nb_streams; ++i) {
+ if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
+ video_stream_idx.push_back(i);
+ } else if (format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
+ audio_stream_idx.push_back(i);
+ }
+ }
+ unordered_map<int, int> audio_stream_to_video_stream_idx;
+ for (size_t i = 0; i < min(video_stream_idx.size(), audio_stream_idx.size()); ++i) {
+ audio_stream_to_video_stream_idx[audio_stream_idx[i]] = video_stream_idx[i];
+ }
+ vector<uint32_t> pending_audio[MAX_STREAMS];
+ int64_t last_pts = -1;
while (!should_quit.load()) {
AVPacket pkt;
- unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
+ unique_ptr<AVPacket, decltype(av_packet_unref) *> pkt_cleanup(
&pkt, av_packet_unref);
av_init_packet(&pkt);
pkt.data = nullptr;
break;
}
+ AVStream *stream = format_ctx->streams[pkt.stream_index];
+ if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO &&
+ audio_stream_to_video_stream_idx.count(pkt.stream_index)) {
+ if ((pkt.size % (sizeof(uint32_t) * 2)) != 0) {
+ fprintf(stderr, "Audio stream %u had a packet of strange length %d, ignoring.\n",
+ pkt.stream_index, pkt.size);
+ } else {
+ // TODO: Endianness?
+ const uint32_t *begin = (const uint32_t *)pkt.data;
+ const uint32_t *end = (const uint32_t *)(pkt.data + pkt.size);
+ pending_audio[audio_stream_to_video_stream_idx[pkt.stream_index]].assign(begin, end);
+ }
+ }
+
+ if (pkt.stream_index >= MAX_STREAMS ||
+ stream->codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
+ continue;
+ }
+
+ ++metric_received_frames[pkt.stream_index];
+ metric_received_frame_size_bytes.count_event(pkt.size);
+
// Convert pts to our own timebase.
- AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
+ AVRational stream_timebase = stream->time_base;
int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
// Translate offset into our stream.
//fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
// pkt.stream_index, pts, pkt.size);
- FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
+ FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, move(pending_audio[pkt.stream_index]), &db);
post_to_main_thread([pkt, frame] {
global_mainwindow->display_frame(pkt.stream_index, frame);
current_pts = pts;
}
- fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
- sleep(1);
+ if (!should_quit.load()) {
+ fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
+ sleep(1);
+ }
start_pts = last_pts + TIMEBASE;
}