]> git.sesse.net Git - nageru/blob - futatabi/frame_on_disk.cpp
Log a warning when we kill a client that is not keeping up.
[nageru] / futatabi / frame_on_disk.cpp
1 #include "frame_on_disk.h"
2
3 #include "shared/metrics.h"
4
5 #include <atomic>
6 #include <chrono>
7 #include <assert.h>
8 #include <fcntl.h>
9 #include <mutex>
10 #include <unistd.h>
11
12 using namespace std;
13 using namespace std::chrono;
14
15 namespace {
16
17 // There can be multiple FrameReader classes, so make all the metrics static.
18 once_flag frame_metrics_inited;
19
20 atomic<int64_t> metric_frame_opened_files{ 0 };
21 atomic<int64_t> metric_frame_closed_files{ 0 };
22 atomic<int64_t> metric_frame_read_bytes{ 0 };
23 atomic<int64_t> metric_frame_read_frames{ 0 };
24
25 Summary metric_frame_read_time_seconds;
26
27 }  // namespace
28
29 FrameReader::FrameReader()
30 {
31         call_once(frame_metrics_inited, [] {
32                 global_metrics.add("frame_opened_files", &metric_frame_opened_files);
33                 global_metrics.add("frame_closed_files", &metric_frame_closed_files);
34                 global_metrics.add("frame_read_bytes", &metric_frame_read_bytes);
35                 global_metrics.add("frame_read_frames", &metric_frame_read_frames);
36
37                 vector<double> quantiles{ 0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99 };
38                 metric_frame_read_time_seconds.init(quantiles, 60.0);
39                 global_metrics.add("frame_read_time_seconds", &metric_frame_read_time_seconds);
40         });
41 }
42
43 FrameReader::~FrameReader()
44 {
45         if (fd != -1) {
46                 close(fd);
47                 ++metric_frame_closed_files;
48         }
49 }
50
51 namespace {
52
53 string read_string(int fd, size_t size, off_t offset)
54 {
55         string str;
56         str.resize(size);
57         size_t str_offset = 0;
58         while (str_offset < size) {
59                 int ret = pread(fd, &str[str_offset], size - str_offset, offset + str_offset);
60                 if (ret <= 0) {
61                         perror("pread");
62                         abort();
63                 }
64
65                 str_offset += ret;
66         }
67         return str;
68 }
69
70 }  // namespace
71
72 FrameReader::Frame FrameReader::read_frame(FrameOnDisk frame, bool read_video, bool read_audio)
73 {
74         assert(read_video || read_audio);
75         steady_clock::time_point start = steady_clock::now();
76
77         if (int(frame.filename_idx) != last_filename_idx) {
78                 if (fd != -1) {
79                         close(fd);  // Ignore errors.
80                         ++metric_frame_closed_files;
81                 }
82
83                 string filename;
84                 {
85                         lock_guard<mutex> lock(frame_mu);
86                         filename = frame_filenames[frame.filename_idx];
87                 }
88
89                 fd = open(filename.c_str(), O_RDONLY);
90                 if (fd == -1) {
91                         perror(filename.c_str());
92                         abort();
93                 }
94
95                 // We want readahead. (Ignore errors.)
96                 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
97
98                 last_filename_idx = frame.filename_idx;
99                 ++metric_frame_opened_files;
100         }
101
102         Frame ret;
103         if (read_video) {
104                 ret.video = read_string(fd, frame.size, frame.offset);
105         }
106         if (read_audio) {
107                 ret.audio = read_string(fd, frame.audio_size, frame.offset + frame.size);
108         }
109
110         steady_clock::time_point stop = steady_clock::now();
111         metric_frame_read_time_seconds.count_event(duration<double>(stop - start).count());
112
113         metric_frame_read_bytes += frame.size;
114         ++metric_frame_read_frames;
115
116         return ret;
117 }