]> git.sesse.net Git - cubemap/blob - stream.cpp
0b494fb8d5527d4ba51290d62b109ad510f4cc9a
[cubemap] / stream.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <limits.h>
4 #include <netinet/in.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <sys/types.h>
9 #include <algorithm>
10 #include <string>
11 #include <queue>
12 #include <vector>
13
14 #include "log.h"
15 #include "metacube2.h"
16 #include "state.pb.h"
17 #include "stream.h"
18 #include "util.h"
19
20 using namespace std;
21
22 Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding, Encoding src_encoding)
23         : url(url),
24           encoding(encoding),
25           src_encoding(src_encoding),
26           data_fd(make_tempfile("")),
27           backlog_size(backlog_size),
28           prebuffering_bytes(prebuffering_bytes)
29 {
30         if (data_fd == -1) {
31                 exit(1);
32         }
33 }
34
35 Stream::~Stream()
36 {
37         if (data_fd != -1) {
38                 safe_close(data_fd);
39         }
40 }
41
42 Stream::Stream(const StreamProto &serialized, int data_fd)
43         : url(serialized.url()),
44           http_header(serialized.http_header()),
45           stream_header(serialized.stream_header()),
46           encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
47           data_fd(data_fd),
48           backlog_size(serialized.backlog_size()),
49           bytes_received(serialized.bytes_received())
50 {
51         if (data_fd == -1) {
52                 exit(1);
53         }
54
55         for (ssize_t point : serialized.suitable_starting_point()) {
56                 if (point == -1) {
57                         // Can happen when upgrading from before 1.1.3,
58                         // where this was an optional field with -1 signifying
59                         // "no such point".
60                         continue;
61                 }
62                 suitable_starting_points.push_back(point);
63         }
64 }
65
66 StreamProto Stream::serialize()
67 {
68         StreamProto serialized;
69         serialized.set_http_header(http_header);
70         serialized.set_stream_header(stream_header);
71         serialized.add_data_fds(data_fd);
72         serialized.set_backlog_size(backlog_size);
73         serialized.set_bytes_received(bytes_received);
74         for (size_t point : suitable_starting_points) {
75                 serialized.add_suitable_starting_point(point);
76         }
77         serialized.set_url(url);
78         data_fd = -1;
79         return serialized;
80 }
81         
82 void Stream::set_backlog_size(size_t new_size)
83 {
84         if (backlog_size == new_size) {
85                 return;
86         }
87
88         string existing_data;
89         if (!read_tempfile_and_close(data_fd, &existing_data)) {
90                 exit(1);
91         }
92
93         // Unwrap the data so it's no longer circular.
94         if (bytes_received <= backlog_size) {
95                 existing_data.resize(bytes_received);
96         } else {
97                 size_t pos = bytes_received % backlog_size;
98                 existing_data = existing_data.substr(pos, string::npos) +
99                         existing_data.substr(0, pos);
100         }
101
102         // See if we need to discard data.
103         if (new_size < existing_data.size()) {
104                 size_t to_discard = existing_data.size() - new_size;
105                 existing_data = existing_data.substr(to_discard, string::npos);
106         }
107
108         // Create a new, empty data file.
109         data_fd = make_tempfile("");
110         if (data_fd == -1) {
111                 exit(1);
112         }
113         backlog_size = new_size;
114
115         // Now cheat a bit by rewinding, and adding all the old data back.
116         bytes_received -= existing_data.size();
117         DataElement data_element;
118         data_element.data.iov_base = const_cast<char *>(existing_data.data());
119         data_element.data.iov_len = existing_data.size();
120         data_element.metacube_flags = 0;  // Ignored by add_data_raw().
121
122         vector<DataElement> data_elements;
123         data_elements.push_back(data_element);
124         add_data_raw(data_elements);
125         remove_obsolete_starting_points();
126 }
127
128 void Stream::put_client_to_sleep(Client *client)
129 {
130         sleeping_clients.push_back(client);
131 }
132
133 // Return a new set of iovecs that contains only the first <bytes_wanted> bytes of <data>.
134 vector<iovec> collect_iovecs(const vector<Stream::DataElement> &data, size_t bytes_wanted)
135 {
136         vector<iovec> ret;
137         size_t max_iovecs = min<size_t>(data.size(), IOV_MAX);
138         for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) {
139                 if (data[i].data.iov_len <= bytes_wanted) {
140                         // Consume the entire iovec.
141                         ret.push_back(data[i].data);
142                         bytes_wanted -= data[i].data.iov_len;
143                 } else {
144                         // Take only parts of this iovec.
145                         iovec iov;
146                         iov.iov_base = data[i].data.iov_base;
147                         iov.iov_len = bytes_wanted;
148                         ret.push_back(iov);
149                         bytes_wanted = 0;
150                 }
151         }
152         return ret;
153 }
154
155 // Return a new set of iovecs that contains all of <data> except the first <bytes_wanted> bytes.
156 vector<Stream::DataElement> remove_iovecs(const vector<Stream::DataElement> &data, size_t bytes_wanted)
157 {
158         vector<Stream::DataElement> ret;
159         size_t i;
160         for (i = 0; i < data.size() && bytes_wanted > 0; ++i) {
161                 if (data[i].data.iov_len <= bytes_wanted) {
162                         // Consume the entire iovec.
163                         bytes_wanted -= data[i].data.iov_len;
164                 } else {
165                         // Take only parts of this iovec.
166                         Stream::DataElement data_element;
167                         data_element.data.iov_base = reinterpret_cast<char *>(data[i].data.iov_base) + bytes_wanted;
168                         data_element.data.iov_len = data[i].data.iov_len - bytes_wanted;
169                         data_element.metacube_flags = METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
170                         ret.push_back(data_element);
171                         bytes_wanted = 0;
172                 }
173         }
174
175         // Add the rest of the iovecs unchanged.
176         ret.insert(ret.end(), data.begin() + i, data.end());
177         return ret;
178 }
179
180 void Stream::add_data_raw(const vector<DataElement> &orig_data)
181 {
182         vector<DataElement> data = orig_data;
183         while (!data.empty()) {
184                 size_t pos = bytes_received % backlog_size;
185
186                 // Collect as many iovecs as we can before we hit the point
187                 // where the circular buffer wraps around.
188                 vector<iovec> to_write = collect_iovecs(data, backlog_size - pos);
189                 ssize_t ret;
190                 do {
191                         ret = pwritev(data_fd, to_write.data(), to_write.size(), pos);
192                 } while (ret == -1 && errno == EINTR);
193
194                 if (ret == -1) {
195                         log_perror("pwritev");
196                         // Dazed and confused, but trying to continue...
197                         return;
198                 }
199                 bytes_received += ret;
200
201                 // Remove the data that was actually written from the set of iovecs.
202                 data = remove_iovecs(data, ret);
203         }
204 }
205
206 void Stream::remove_obsolete_starting_points()
207 {
208         // We could do a binary search here (std::lower_bound), but it seems
209         // overkill for removing what's probably only a few points.
210         while (!suitable_starting_points.empty() &&
211                bytes_received - suitable_starting_points[0] > backlog_size) {
212                 suitable_starting_points.pop_front();
213         }
214 }
215
216 void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags)
217 {
218         // For regular output, we don't want to send the client twice
219         // (it's already sent out together with the HTTP header).
220         // However, for Metacube output, we need to send it so that
221         // the Cubemap instance in the other end has a chance to update it.
222         // It may come twice in its stream, but Cubemap doesn't care.
223         if (encoding == Stream::STREAM_ENCODING_RAW &&
224             (metacube_flags & METACUBE_FLAGS_HEADER) != 0) {
225                 return;
226         }
227
228         lock_guard<mutex> lock(queued_data_mutex);
229
230         DataElement data_element;
231         data_element.metacube_flags = metacube_flags;
232
233         if (encoding == Stream::STREAM_ENCODING_METACUBE) {
234                 // Add a Metacube block header before the data.
235                 metacube2_block_header hdr;
236                 memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
237                 hdr.size = htonl(bytes);
238                 hdr.flags = htons(metacube_flags);
239                 hdr.csum = htons(metacube2_compute_crc(&hdr));
240
241                 data_element.data.iov_base = new char[bytes + sizeof(hdr)];
242                 data_element.data.iov_len = bytes + sizeof(hdr);
243
244                 memcpy(data_element.data.iov_base, &hdr, sizeof(hdr));
245                 memcpy(reinterpret_cast<char *>(data_element.data.iov_base) + sizeof(hdr), data, bytes);
246
247                 queued_data.push_back(data_element);
248         } else if (encoding == Stream::STREAM_ENCODING_RAW) {
249                 // Just add the data itself.
250                 data_element.data.iov_base = new char[bytes];
251                 memcpy(data_element.data.iov_base, data, bytes);
252                 data_element.data.iov_len = bytes;
253
254                 queued_data.push_back(data_element);
255         } else {
256                 assert(false);
257         }
258 }
259
260 void Stream::process_queued_data()
261 {
262         vector<DataElement> queued_data_copy;
263
264         // Hold the lock for as short as possible, since add_data_raw() can possibly
265         // write to disk, which might disturb the input thread.
266         {
267                 lock_guard<mutex> lock(queued_data_mutex);
268                 if (queued_data.empty()) {
269                         return;
270                 }
271
272                 swap(queued_data, queued_data_copy);
273         }
274
275         // Add suitable starting points for the stream, if the queued data
276         // contains such starting points. Note that we drop starting points
277         // if they're less than 10 kB apart, so that we don't get a huge
278         // amount of them for e.g. each and every MPEG-TS 188-byte cell.
279         // The 10 kB value is somewhat arbitrary, but at least it should make
280         // the RAM cost of saving the position ~0.1% (or less) of the actual
281         // data, and 10 kB is a very fine granularity in most streams.
282         static const int minimum_start_point_distance = 10240;
283         size_t byte_position = bytes_received;
284         for (const DataElement &elem : queued_data_copy) {
285                 if ((elem.metacube_flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0) {
286                         size_t num_points = suitable_starting_points.size();
287                         if (num_points >= 2 &&
288                             suitable_starting_points[num_points - 1] - suitable_starting_points[num_points - 2] < minimum_start_point_distance) {
289                                 // p[n-1] - p[n-2] < 10 kB, so drop p[n-1].
290                                 suitable_starting_points.pop_back();
291                         }
292                         suitable_starting_points.push_back(byte_position);
293                 }
294                 byte_position += elem.data.iov_len;
295         }
296
297         add_data_raw(queued_data_copy);
298         remove_obsolete_starting_points();
299         for (const DataElement &elem : queued_data_copy) {
300                 char *data = reinterpret_cast<char *>(elem.data.iov_base);
301                 delete[] data;
302         }
303
304         // We have more data, so wake up all clients.
305         if (to_process.empty()) {
306                 swap(sleeping_clients, to_process);
307         } else {
308                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
309                 sleeping_clients.clear();
310         }
311 }