]> git.sesse.net Git - cubemap/blob - stream.cpp
When adding new streams that are copies of old streams, copy the HTTP header.
[cubemap] / stream.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <limits.h>
4 #include <netinet/in.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <sys/types.h>
8 #include <string>
9 #include <vector>
10
11 #include "log.h"
12 #include "metacube.h"
13 #include "state.pb.h"
14 #include "stream.h"
15 #include "util.h"
16
17 using namespace std;
18
19 Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
20         : url(url),
21           encoding(encoding),
22           data_fd(make_tempfile("")),
23           backlog_size(backlog_size),
24           bytes_received(0),
25           last_suitable_starting_point(-1),
26           mark_pool(NULL),
27           queued_data_last_starting_point(-1)
28 {
29         if (data_fd == -1) {
30                 exit(1);
31         }
32 }
33
34 Stream::~Stream()
35 {
36         if (data_fd != -1) {
37                 safe_close(data_fd);
38         }
39 }
40
41 Stream::Stream(const StreamProto &serialized, int data_fd)
42         : url(serialized.url()),
43           http_header(serialized.http_header()),
44           stream_header(serialized.stream_header()),
45           encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
46           data_fd(data_fd),
47           backlog_size(serialized.backlog_size()),
48           bytes_received(serialized.bytes_received()),
49           mark_pool(NULL),
50           queued_data_last_starting_point(-1)
51 {
52         if (data_fd == -1) {
53                 exit(1);
54         }
55
56         // Split old-style headers into HTTP and video headers.
57         if (!serialized.header().empty()) {
58                 string header = serialized.header();
59                 size_t split = header.find("\r\n\r\n");
60                 if (split == string::npos) {
61                         http_header = header;
62                         stream_header = "";
63                 } else {
64                         http_header = header.substr(0, split + 2);  // Split off the second \r\n.
65                         stream_header = header.substr(split, string::npos);
66                 }
67         }
68
69         // Older versions did not set last_suitable_starting_point.
70         if (serialized.has_last_suitable_starting_point()) {
71                 last_suitable_starting_point = serialized.last_suitable_starting_point();
72         } else {
73                 last_suitable_starting_point = bytes_received;
74         }
75 }
76
77 StreamProto Stream::serialize()
78 {
79         StreamProto serialized;
80         serialized.set_http_header(http_header);
81         serialized.set_stream_header(stream_header);
82         serialized.add_data_fds(data_fd);
83         serialized.set_backlog_size(backlog_size);
84         serialized.set_bytes_received(bytes_received);
85         serialized.set_last_suitable_starting_point(last_suitable_starting_point);
86         serialized.set_url(url);
87         data_fd = -1;
88         return serialized;
89 }
90         
91 void Stream::set_backlog_size(size_t new_size)
92 {
93         if (backlog_size == new_size) {
94                 return;
95         }
96
97         string existing_data;
98         if (!read_tempfile_and_close(data_fd, &existing_data)) {
99                 exit(1);
100         }
101
102         // Unwrap the data so it's no longer circular.
103         if (bytes_received <= backlog_size) {
104                 existing_data.resize(bytes_received);
105         } else {
106                 size_t pos = bytes_received % backlog_size;
107                 existing_data = existing_data.substr(pos, string::npos) +
108                         existing_data.substr(0, pos);
109         }
110
111         // See if we need to discard data.
112         if (new_size < existing_data.size()) {
113                 size_t to_discard = existing_data.size() - new_size;
114                 existing_data = existing_data.substr(to_discard, string::npos);
115         }
116
117         // Create a new, empty data file.
118         data_fd = make_tempfile("");
119         if (data_fd == -1) {
120                 exit(1);
121         }
122         backlog_size = new_size;
123
124         // Now cheat a bit by rewinding, and adding all the old data back.
125         bytes_received -= existing_data.size();
126         iovec iov;
127         iov.iov_base = const_cast<char *>(existing_data.data());
128         iov.iov_len = existing_data.size();
129
130         vector<iovec> iovs;
131         iovs.push_back(iov);
132         add_data_raw(iovs);
133 }
134
135 void Stream::put_client_to_sleep(Client *client)
136 {
137         sleeping_clients.push_back(client);
138 }
139
140 // Return a new set of iovecs that contains only the first <bytes_wanted> bytes of <data>.
141 vector<iovec> collect_iovecs(const vector<iovec> &data, size_t bytes_wanted)
142 {
143         vector<iovec> ret;
144         size_t max_iovecs = std::min<size_t>(data.size(), IOV_MAX);
145         for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) {
146                 if (data[i].iov_len <= bytes_wanted) {
147                         // Consume the entire iovec.
148                         ret.push_back(data[i]);
149                         bytes_wanted -= data[i].iov_len;
150                 } else {
151                         // Take only parts of this iovec.
152                         iovec iov;
153                         iov.iov_base = data[i].iov_base;
154                         iov.iov_len = bytes_wanted;     
155                         ret.push_back(iov);
156                         bytes_wanted = 0;
157                 }
158         }
159         return ret;
160 }
161
162 // Return a new set of iovecs that contains all of <data> except the first <bytes_wanted> bytes.
163 vector<iovec> remove_iovecs(const vector<iovec> &data, size_t bytes_wanted)
164 {
165         vector<iovec> ret;
166         size_t i;
167         for (i = 0; i < data.size() && bytes_wanted > 0; ++i) {
168                 if (data[i].iov_len <= bytes_wanted) {
169                         // Consume the entire iovec.
170                         bytes_wanted -= data[i].iov_len;
171                 } else {
172                         // Take only parts of this iovec.
173                         iovec iov;
174                         iov.iov_base = reinterpret_cast<char *>(data[i].iov_base) + bytes_wanted;
175                         iov.iov_len = data[i].iov_len - bytes_wanted;
176                         ret.push_back(iov);
177                         bytes_wanted = 0;
178                 }
179         }
180
181         // Add the rest of the iovecs unchanged.
182         ret.insert(ret.end(), data.begin() + i, data.end());
183         return ret;
184 }
185
186 void Stream::add_data_raw(const vector<iovec> &orig_data)
187 {
188         vector<iovec> data = orig_data;
189         while (!data.empty()) {
190                 size_t pos = bytes_received % backlog_size;
191
192                 // Collect as many iovecs as we can before we hit the point
193                 // where the circular buffer wraps around.
194                 vector<iovec> to_write = collect_iovecs(data, backlog_size - pos);
195                 ssize_t ret;
196                 do {
197                         ret = pwritev(data_fd, to_write.data(), to_write.size(), pos);
198                 } while (ret == -1 && errno == EINTR);
199
200                 if (ret == -1) {
201                         log_perror("pwritev");
202                         // Dazed and confused, but trying to continue...
203                         return;
204                 }
205                 bytes_received += ret;
206
207                 // Remove the data that was actually written from the set of iovecs.
208                 data = remove_iovecs(data, ret);
209         }
210 }
211
212 void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
213 {
214         assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
215                suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
216         if (suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
217                 queued_data_last_starting_point = queued_data.size();
218         }
219
220         if (encoding == Stream::STREAM_ENCODING_METACUBE) {
221                 // Add a Metacube block header before the data.
222                 metacube_block_header hdr;
223                 memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
224                 hdr.size = htonl(bytes);
225                 hdr.flags = htonl(0);
226                 if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) {
227                         hdr.flags |= htonl(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START);
228                 }
229
230                 iovec iov;
231                 iov.iov_base = new char[bytes + sizeof(hdr)];
232                 iov.iov_len = bytes + sizeof(hdr);
233
234                 memcpy(iov.iov_base, &hdr, sizeof(hdr));
235                 memcpy(reinterpret_cast<char *>(iov.iov_base) + sizeof(hdr), data, bytes);
236
237                 queued_data.push_back(iov);
238         } else if (encoding == Stream::STREAM_ENCODING_RAW) {
239                 // Just add the data itself.
240                 iovec iov;
241                 iov.iov_base = new char[bytes];
242                 memcpy(iov.iov_base, data, bytes);
243                 iov.iov_len = bytes;
244
245                 queued_data.push_back(iov);
246         } else {
247                 assert(false);
248         }
249 }
250
251 void Stream::process_queued_data()
252 {
253         if (queued_data.empty()) {
254                 return;
255         }
256
257         // Update the last suitable starting point for the stream,
258         // if the queued data contains such a starting point.
259         assert(queued_data_last_starting_point < ssize_t(queued_data.size()));
260         if (queued_data_last_starting_point >= 0) {
261                 last_suitable_starting_point = bytes_received;
262                 for (int i = 0; i < queued_data_last_starting_point; ++i) {
263                         last_suitable_starting_point += queued_data[i].iov_len;
264                 }
265         }
266
267         add_data_raw(queued_data);
268         for (size_t i = 0; i < queued_data.size(); ++i) {
269                 char *data = reinterpret_cast<char *>(queued_data[i].iov_base);
270                 delete[] data;
271         }
272         queued_data.clear();
273         queued_data_last_starting_point = -1;
274
275         // We have more data, so wake up all clients.
276         if (to_process.empty()) {
277                 swap(sleeping_clients, to_process);
278         } else {
279                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
280                 sleeping_clients.clear();
281         }
282 }