-#include <assert.h>
#include <arpa/inet.h>
+#include <assert.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <sys/stat.h>
#include <sys/types.h>
#include <thread>
+#include <unistd.h>
#include <vector>
extern "C" {
}
#include "clip_list.h"
-#include "shared/context.h"
#include "defs.h"
-#include "disk_space_estimator.h"
-#include "shared/ffmpeg_raii.h"
#include "flags.h"
-#include "frame_on_disk.h"
#include "frame.pb.h"
-#include "httpd.h"
+#include "frame_on_disk.h"
#include "mainwindow.h"
#include "player.h"
+#include "shared/context.h"
+#include "shared/disk_space_estimator.h"
+#include "shared/ffmpeg_raii.h"
+#include "shared/httpd.h"
+#include "shared/metrics.h"
#include "shared/post_to_main_thread.h"
#include "shared/ref_counted_gl_sync.h"
#include "shared/timebase.h"
#include <QApplication>
#include <QGLFormat>
-#include <QSurfaceFormat>
#include <QProgressDialog>
+#include <QSurfaceFormat>
#include <movit/init.h>
#include <movit/util.h>
constexpr size_t frame_magic_len = 8;
mutex RefCountedGLsync::fence_lock;
-atomic<bool> should_quit{false};
+atomic<bool> should_quit{ false };
int64_t start_pts = -1;
vector<FrameOnDisk> frames[MAX_STREAMS]; // Under frame_mu.
vector<string> frame_filenames; // Under frame_mu.
+atomic<int64_t> metric_received_frames[MAX_STREAMS]{ { 0 } };
+Summary metric_received_frame_size_bytes;
+
namespace {
FrameOnDisk write_frame(int stream_idx, int64_t pts, const uint8_t *data, size_t size, DB *db)
if (open_frame_files.count(stream_idx) == 0) {
char filename[256];
snprintf(filename, sizeof(filename), "%s/frames/cam%d-pts%09ld.frames",
- global_flags.working_directory.c_str(), stream_idx, pts);
+ global_flags.working_directory.c_str(), stream_idx, pts);
FILE *fp = fopen(filename, "wb");
if (fp == nullptr) {
perror(filename);
const char *basename = filename.c_str();
while (strchr(basename, '/') != nullptr) {
- basename = strchr(basename, '/');
+ basename = strchr(basename, '/') + 1;
}
db->store_frame_file(basename, size, frames_this_file);
}
return frame;
}
-} // namespace
+} // namespace
HTTPD *global_httpd;
void load_existing_frames();
-int record_thread_func();
+void record_thread_func();
int main(int argc, char **argv)
{
string frame_dir = global_flags.working_directory + "/frames";
- struct stat st;
- if (stat(frame_dir.c_str(), &st) == -1) {
+ if (mkdir(frame_dir.c_str(), 0777) == 0) {
fprintf(stderr, "%s does not exist, creating it.\n", frame_dir.c_str());
- if (mkdir(frame_dir.c_str(), 0777) == -1) {
- perror(global_flags.working_directory.c_str());
- exit(1);
- }
+ } else if (errno != EEXIST) {
+ perror(global_flags.working_directory.c_str());
+ exit(1);
}
avformat_network_init();
+ global_metrics.set_prefix("futatabi");
global_httpd = new HTTPD;
+ global_metrics.remove("num_connected_multicam_clients");
QCoreApplication::setAttribute(Qt::AA_ShareOpenGLContexts, true);
{
QSurface *surface = create_surface();
QOpenGLContext *context = create_context(surface);
- make_current(context, surface);
+ if (!make_current(context, surface)) {
+ printf("oops\n");
+ exit(1);
+ }
CHECK(movit::init_movit(MOVIT_SHADER_DIR, movit::MOVIT_DEBUG_OFF));
delete_context(context);
// TODO: Delete the surface, too.
if (!all_frames.empty()) {
// We already had this cached in the database, so no need to look in the file.
for (const DB::FrameOnDiskAndStreamIdx &frame : all_frames) {
- if (frame.stream_idx >= 0 && frame.stream_idx < MAX_STREAMS) {
+ if (frame.stream_idx < MAX_STREAMS) {
frames[frame.stream_idx].push_back(frame.frame);
start_pts = max(start_pts, frame.frame.pts);
}
// OK, found the magic. Try to parse the frame header.
magic_offset = 0;
- if (skipped_bytes > 0) {
+ if (skipped_bytes > 0) {
fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes in the middle.\n",
- filename, skipped_bytes);
+ filename, skipped_bytes);
skipped_bytes = 0;
}
FrameOnDisk frame;
frame.pts = hdr.pts();
frame.offset = ftell(fp);
+ if (frame.offset == -1) {
+ fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
+ break;
+ }
frame.filename_idx = filename_idx;
frame.size = hdr.file_size();
if (skipped_bytes > 0) {
fprintf(stderr, "WARNING: %s: Skipped %zu garbage bytes at the end.\n",
- filename, skipped_bytes);
+ filename, skipped_bytes);
}
- size_t size = ftell(fp);
+ off_t size = ftell(fp);
fclose(fp);
+ if (size == -1) {
+ fprintf(stderr, "WARNING: %s: ftell() failed (%s).\n", filename, strerror(errno));
+ return;
+ }
+
db->store_frame_file(basename, size, all_frames);
}
}
vector<string> frame_basenames;
- for ( ;; ) {
+ for (;;) {
errno = 0;
dirent *de = readdir(dir);
if (de == nullptr) {
// Add a gap of one second from the old frames to the new ones.
start_pts += TIMEBASE;
}
+ current_pts = start_pts;
for (int stream_idx = 0; stream_idx < MAX_STREAMS; ++stream_idx) {
sort(frames[stream_idx].begin(), frames[stream_idx].end(),
- [](const auto &a, const auto &b) { return a.pts < b.pts; });
+ [](const auto &a, const auto &b) { return a.pts < b.pts; });
}
db.clean_unused_frame_files(frame_basenames);
}
-int record_thread_func()
+void record_thread_func()
{
- auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
- if (format_ctx == nullptr) {
- fprintf(stderr, "%s: Error opening file\n", global_flags.stream_source.c_str());
- return 1;
+ for (unsigned i = 0; i < MAX_STREAMS; ++i) {
+ global_metrics.add("received_frames", { { "stream", to_string(i) } }, &metric_received_frames[i]);
}
+ global_metrics.add("received_frame_size_bytes", &metric_received_frame_size_bytes);
+
+ if (global_flags.stream_source.empty() || global_flags.stream_source == "/dev/null") {
+ // Save the user from some repetitive messages.
+ return;
+ }
+
+ pthread_setname_np(pthread_self(), "ReceiveFrames");
- int64_t last_pts = -1;
- int64_t pts_offset;
+ int64_t pts_offset = 0; // Needs to be initialized due to a spurious GCC warning.
DB db(global_flags.working_directory + "/futatabi.db");
while (!should_quit.load()) {
- AVPacket pkt;
- unique_ptr<AVPacket, decltype(av_packet_unref)*> pkt_cleanup(
- &pkt, av_packet_unref);
- av_init_packet(&pkt);
- pkt.data = nullptr;
- pkt.size = 0;
-
- // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
- // right now, should_quit will be ignored if it's hung on I/O.
- if (av_read_frame(format_ctx.get(), &pkt) != 0) {
- break;
+ auto format_ctx = avformat_open_input_unique(global_flags.stream_source.c_str(), nullptr, nullptr);
+ if (format_ctx == nullptr) {
+ fprintf(stderr, "%s: Error opening file. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
+ sleep(1);
+ continue;
}
- // Convert pts to our own timebase.
- AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
- int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
+ int64_t last_pts = -1;
- // Translate offset into our stream.
- if (last_pts == -1) {
- pts_offset = start_pts - pts;
- }
- pts = std::max(pts + pts_offset, start_pts);
-
- //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
- // pkt.stream_index, pts, pkt.size);
- FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
-
- post_to_main_thread([pkt, frame] {
- if (pkt.stream_index == 0) {
- global_mainwindow->ui->input1_display->setFrame(pkt.stream_index, frame);
- } else if (pkt.stream_index == 1) {
- global_mainwindow->ui->input2_display->setFrame(pkt.stream_index, frame);
- } else if (pkt.stream_index == 2) {
- global_mainwindow->ui->input3_display->setFrame(pkt.stream_index, frame);
- } else if (pkt.stream_index == 3) {
- global_mainwindow->ui->input4_display->setFrame(pkt.stream_index, frame);
+ while (!should_quit.load()) {
+ AVPacket pkt;
+ unique_ptr<AVPacket, decltype(av_packet_unref) *> pkt_cleanup(
+ &pkt, av_packet_unref);
+ av_init_packet(&pkt);
+ pkt.data = nullptr;
+ pkt.size = 0;
+
+ // TODO: Make it possible to abort av_read_frame() (use an interrupt callback);
+ // right now, should_quit will be ignored if it's hung on I/O.
+ if (av_read_frame(format_ctx.get(), &pkt) != 0) {
+ break;
+ }
+ if (pkt.stream_index >= MAX_STREAMS) {
+ continue;
}
- });
- if (last_pts != -1 && global_flags.slow_down_input) {
- this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
+ ++metric_received_frames[pkt.stream_index];
+ metric_received_frame_size_bytes.count_event(pkt.size);
+
+ // Convert pts to our own timebase.
+ AVRational stream_timebase = format_ctx->streams[pkt.stream_index]->time_base;
+ int64_t pts = av_rescale_q(pkt.pts, stream_timebase, AVRational{ 1, TIMEBASE });
+
+ // Translate offset into our stream.
+ if (last_pts == -1) {
+ pts_offset = start_pts - pts;
+ }
+ pts = std::max(pts + pts_offset, start_pts);
+
+ //fprintf(stderr, "Got a frame from camera %d, pts = %ld, size = %d\n",
+ // pkt.stream_index, pts, pkt.size);
+ FrameOnDisk frame = write_frame(pkt.stream_index, pts, pkt.data, pkt.size, &db);
+
+ post_to_main_thread([pkt, frame] {
+ global_mainwindow->display_frame(pkt.stream_index, frame);
+ });
+
+ if (last_pts != -1 && global_flags.slow_down_input) {
+ this_thread::sleep_for(microseconds((pts - last_pts) * 1000000 / TIMEBASE));
+ }
+ last_pts = pts;
+ current_pts = pts;
}
- last_pts = pts;
- current_pts = pts;
- }
- return 0;
+ fprintf(stderr, "%s: Hit EOF. Waiting one second and trying again...\n", global_flags.stream_source.c_str());
+ sleep(1);
+
+ start_pts = last_pts + TIMEBASE;
+ }
}