Add support for forced prebuffering.
[cubemap] / stream.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <limits.h>
4 #include <netinet/in.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <sys/types.h>
9 #include <algorithm>
10 #include <string>
11 #include <vector>
12
13 #include "log.h"
14 #include "metacube2.h"
15 #include "mutexlock.h"
16 #include "state.pb.h"
17 #include "stream.h"
18 #include "util.h"
19
20 using namespace std;
21
22 Stream::Stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Encoding encoding)
23         : url(url),
24           encoding(encoding),
25           data_fd(make_tempfile("")),
26           backlog_size(backlog_size),
27           prebuffering_bytes(prebuffering_bytes),
28           bytes_received(0),
29           last_suitable_starting_point(-1),
30           pacing_rate(~0U),
31           queued_data_last_starting_point(-1)
32 {
33         if (data_fd == -1) {
34                 exit(1);
35         }
36
37         pthread_mutex_init(&queued_data_mutex, NULL);
38 }
39
40 Stream::~Stream()
41 {
42         if (data_fd != -1) {
43                 safe_close(data_fd);
44         }
45 }
46
47 Stream::Stream(const StreamProto &serialized, int data_fd)
48         : url(serialized.url()),
49           http_header(serialized.http_header()),
50           stream_header(serialized.stream_header()),
51           encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
52           data_fd(data_fd),
53           backlog_size(serialized.backlog_size()),
54           prebuffering_bytes(serialized.prebuffering_bytes()),
55           bytes_received(serialized.bytes_received()),
56           pacing_rate(~0U),
57           queued_data_last_starting_point(-1)
58 {
59         if (data_fd == -1) {
60                 exit(1);
61         }
62
63         assert(serialized.has_last_suitable_starting_point());
64         last_suitable_starting_point = serialized.last_suitable_starting_point();
65
66         pthread_mutex_init(&queued_data_mutex, NULL);
67 }
68
69 StreamProto Stream::serialize()
70 {
71         StreamProto serialized;
72         serialized.set_http_header(http_header);
73         serialized.set_stream_header(stream_header);
74         serialized.add_data_fds(data_fd);
75         serialized.set_backlog_size(backlog_size);
76         serialized.set_prebuffering_bytes(prebuffering_bytes);
77         serialized.set_bytes_received(bytes_received);
78         serialized.set_last_suitable_starting_point(last_suitable_starting_point);
79         serialized.set_url(url);
80         data_fd = -1;
81         return serialized;
82 }
83         
84 void Stream::set_backlog_size(size_t new_size)
85 {
86         if (backlog_size == new_size) {
87                 return;
88         }
89
90         string existing_data;
91         if (!read_tempfile_and_close(data_fd, &existing_data)) {
92                 exit(1);
93         }
94
95         // Unwrap the data so it's no longer circular.
96         if (bytes_received <= backlog_size) {
97                 existing_data.resize(bytes_received);
98         } else {
99                 size_t pos = bytes_received % backlog_size;
100                 existing_data = existing_data.substr(pos, string::npos) +
101                         existing_data.substr(0, pos);
102         }
103
104         // See if we need to discard data.
105         if (new_size < existing_data.size()) {
106                 size_t to_discard = existing_data.size() - new_size;
107                 existing_data = existing_data.substr(to_discard, string::npos);
108         }
109
110         // Create a new, empty data file.
111         data_fd = make_tempfile("");
112         if (data_fd == -1) {
113                 exit(1);
114         }
115         backlog_size = new_size;
116
117         // Now cheat a bit by rewinding, and adding all the old data back.
118         bytes_received -= existing_data.size();
119         iovec iov;
120         iov.iov_base = const_cast<char *>(existing_data.data());
121         iov.iov_len = existing_data.size();
122
123         vector<iovec> iovs;
124         iovs.push_back(iov);
125         add_data_raw(iovs);
126 }
127
128 void Stream::put_client_to_sleep(Client *client)
129 {
130         sleeping_clients.push_back(client);
131 }
132
133 // Return a new set of iovecs that contains only the first <bytes_wanted> bytes of <data>.
134 vector<iovec> collect_iovecs(const vector<iovec> &data, size_t bytes_wanted)
135 {
136         vector<iovec> ret;
137         size_t max_iovecs = std::min<size_t>(data.size(), IOV_MAX);
138         for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) {
139                 if (data[i].iov_len <= bytes_wanted) {
140                         // Consume the entire iovec.
141                         ret.push_back(data[i]);
142                         bytes_wanted -= data[i].iov_len;
143                 } else {
144                         // Take only parts of this iovec.
145                         iovec iov;
146                         iov.iov_base = data[i].iov_base;
147                         iov.iov_len = bytes_wanted;     
148                         ret.push_back(iov);
149                         bytes_wanted = 0;
150                 }
151         }
152         return ret;
153 }
154
155 // Return a new set of iovecs that contains all of <data> except the first <bytes_wanted> bytes.
156 vector<iovec> remove_iovecs(const vector<iovec> &data, size_t bytes_wanted)
157 {
158         vector<iovec> ret;
159         size_t i;
160         for (i = 0; i < data.size() && bytes_wanted > 0; ++i) {
161                 if (data[i].iov_len <= bytes_wanted) {
162                         // Consume the entire iovec.
163                         bytes_wanted -= data[i].iov_len;
164                 } else {
165                         // Take only parts of this iovec.
166                         iovec iov;
167                         iov.iov_base = reinterpret_cast<char *>(data[i].iov_base) + bytes_wanted;
168                         iov.iov_len = data[i].iov_len - bytes_wanted;
169                         ret.push_back(iov);
170                         bytes_wanted = 0;
171                 }
172         }
173
174         // Add the rest of the iovecs unchanged.
175         ret.insert(ret.end(), data.begin() + i, data.end());
176         return ret;
177 }
178
179 void Stream::add_data_raw(const vector<iovec> &orig_data)
180 {
181         vector<iovec> data = orig_data;
182         while (!data.empty()) {
183                 size_t pos = bytes_received % backlog_size;
184
185                 // Collect as many iovecs as we can before we hit the point
186                 // where the circular buffer wraps around.
187                 vector<iovec> to_write = collect_iovecs(data, backlog_size - pos);
188                 ssize_t ret;
189                 do {
190                         ret = pwritev(data_fd, to_write.data(), to_write.size(), pos);
191                 } while (ret == -1 && errno == EINTR);
192
193                 if (ret == -1) {
194                         log_perror("pwritev");
195                         // Dazed and confused, but trying to continue...
196                         return;
197                 }
198                 bytes_received += ret;
199
200                 // Remove the data that was actually written from the set of iovecs.
201                 data = remove_iovecs(data, ret);
202         }
203 }
204
205 void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
206 {
207         MutexLock lock(&queued_data_mutex);
208         assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
209                suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
210         if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
211                 queued_data_last_starting_point = queued_data.size();
212         }
213
214         if (encoding == Stream::STREAM_ENCODING_METACUBE) {
215                 // Add a Metacube block header before the data.
216                 metacube2_block_header hdr;
217                 memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
218                 hdr.size = htonl(bytes);
219                 hdr.flags = htons(0);
220                 if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) {
221                         hdr.flags |= htons(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START);
222                 }
223                 hdr.csum = htons(metacube2_compute_crc(&hdr));
224
225                 iovec iov;
226                 iov.iov_base = new char[bytes + sizeof(hdr)];
227                 iov.iov_len = bytes + sizeof(hdr);
228
229                 memcpy(iov.iov_base, &hdr, sizeof(hdr));
230                 memcpy(reinterpret_cast<char *>(iov.iov_base) + sizeof(hdr), data, bytes);
231
232                 queued_data.push_back(iov);
233         } else if (encoding == Stream::STREAM_ENCODING_RAW) {
234                 // Just add the data itself.
235                 iovec iov;
236                 iov.iov_base = new char[bytes];
237                 memcpy(iov.iov_base, data, bytes);
238                 iov.iov_len = bytes;
239
240                 queued_data.push_back(iov);
241         } else {
242                 assert(false);
243         }
244 }
245
246 void Stream::process_queued_data()
247 {
248         std::vector<iovec> queued_data_copy;
249         int queued_data_last_starting_point_copy = -1;
250
251         // Hold the lock for as short as possible, since add_data_raw() can possibly
252         // write to disk, which might disturb the input thread.
253         {
254                 MutexLock lock(&queued_data_mutex);
255                 if (queued_data.empty()) {
256                         return;
257                 }
258
259                 swap(queued_data, queued_data_copy);
260                 swap(queued_data_last_starting_point, queued_data_last_starting_point_copy);
261         }
262
263         // Update the last suitable starting point for the stream,
264         // if the queued data contains such a starting point.
265         assert(queued_data_last_starting_point_copy < ssize_t(queued_data_copy.size()));
266         if (queued_data_last_starting_point_copy >= 0) {
267                 last_suitable_starting_point = bytes_received;
268                 for (int i = 0; i < queued_data_last_starting_point_copy; ++i) {
269                         last_suitable_starting_point += queued_data_copy[i].iov_len;
270                 }
271         }
272
273         add_data_raw(queued_data_copy);
274         for (size_t i = 0; i < queued_data_copy.size(); ++i) {
275                 char *data = reinterpret_cast<char *>(queued_data_copy[i].iov_base);
276                 delete[] data;
277         }
278
279         // We have more data, so wake up all clients.
280         if (to_process.empty()) {
281                 swap(sleeping_clients, to_process);
282         } else {
283                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
284                 sleeping_clients.clear();
285         }
286 }