Even more missing unistd.h includes.
[cubemap] / httpinput.cpp
1 #include <stdio.h>
2 #include <string.h>
3 #include <stdint.h>
4 #include <unistd.h>
5 #include <assert.h>
6 #include <arpa/inet.h>
7 #include <sys/socket.h>
8 #include <pthread.h>
9 #include <sys/types.h>
10 #include <sys/ioctl.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <netdb.h>
14 #include <poll.h>
15 #include <signal.h>
16 #include <errno.h>
17 #include <vector>
18 #include <string>
19 #include <map>
20
21 #include "metacube.h"
22 #include "mutexlock.h"
23 #include "httpinput.h"
24 #include "server.h"
25 #include "serverpool.h"
26 #include "parse.h"
27 #include "version.h"
28 #include "state.pb.h"
29
30 using namespace std;
31
32 extern ServerPool *servers;
33           
34 HTTPInput::HTTPInput(const string &stream_id, const string &url)
35         : state(NOT_CONNECTED),
36           stream_id(stream_id),
37           url(url),
38           has_metacube_header(false),
39           sock(-1)
40 {
41 }
42
43 HTTPInput::HTTPInput(const InputProto &serialized)
44         : state(State(serialized.state())),
45           stream_id(serialized.stream_id()),
46           url(serialized.url()),
47           request(serialized.request()),
48           request_bytes_sent(serialized.request_bytes_sent()),
49           response(serialized.response()),
50           http_header(serialized.http_header()),
51           has_metacube_header(serialized.has_metacube_header()),
52           sock(serialized.sock())
53 {
54         pending_data.resize(serialized.pending_data().size());
55         memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size());
56
57         string protocol;
58         parse_url(url, &protocol, &host, &port, &path);  // Don't care if it fails.
59 }
60
61 void HTTPInput::close_socket()
62 {
63         int ret;
64         do {
65                 ret = close(sock);
66         } while (ret == -1 && errno == EINTR);
67
68         if (ret == -1) {
69                 perror("close()");
70         }
71 }
72
73 InputProto HTTPInput::serialize() const
74 {
75         InputProto serialized;
76         serialized.set_state(state);
77         serialized.set_stream_id(stream_id);
78         serialized.set_url(url);
79         serialized.set_request(request);
80         serialized.set_request_bytes_sent(request_bytes_sent);
81         serialized.set_response(response);
82         serialized.set_http_header(http_header);
83         serialized.set_pending_data(string(pending_data.begin(), pending_data.end()));
84         serialized.set_has_metacube_header(has_metacube_header);
85         serialized.set_sock(sock);
86         return serialized;
87 }
88
89 int HTTPInput::lookup_and_connect(const string &host, const string &port)
90 {
91         addrinfo *ai;
92         int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
93         if (err == -1) {
94                 fprintf(stderr, "WARNING: Lookup of '%s' failed (%s).\n",
95                         host.c_str(), gai_strerror(err));
96                 freeaddrinfo(ai);
97                 return -1;
98         }
99
100         // Connect to everything in turn until we have a socket.
101         while (ai && !should_stop) {
102                 int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
103                 if (sock == -1) {
104                         // Could be e.g. EPROTONOSUPPORT. The show must go on.
105                         continue;
106                 }
107
108                 do {
109                         err = connect(sock, ai->ai_addr, ai->ai_addrlen);
110                 } while (err == -1 && errno == EINTR);
111
112                 if (err != -1) {
113                         freeaddrinfo(ai);
114                         return sock;
115                 }
116
117                 ai = ai->ai_next;
118         }
119
120         // Give the last one as error.
121         fprintf(stderr, "WARNING: Connect to '%s' failed (%s)\n",
122                 host.c_str(), strerror(errno));
123         freeaddrinfo(ai);
124         return -1;
125 }
126         
127 bool HTTPInput::parse_response(const std::string &request)
128 {
129         vector<string> lines = split_lines(response);
130         if (lines.empty()) {
131                 fprintf(stderr, "WARNING: Empty HTTP response from input.\n");
132                 return false;
133         }
134
135         vector<string> first_line_tokens = split_tokens(lines[0]);
136         if (first_line_tokens.size() < 2) {
137                 fprintf(stderr, "WARNING: Malformed response line '%s' from input.\n",
138                         lines[0].c_str());
139                 return false;
140         }
141
142         int response = atoi(first_line_tokens[1].c_str());
143         if (response != 200) {
144                 fprintf(stderr, "WARNING: Non-200 response '%s' from input.\n",
145                         lines[0].c_str());
146                 return false;
147         }
148
149         multimap<string, string> parameters;
150         for (size_t i = 1; i < lines.size(); ++i) {
151                 size_t split = lines[i].find(":");
152                 if (split == string::npos) {
153                         fprintf(stderr, "WARNING: Ignoring malformed HTTP response line '%s'\n",
154                                 lines[i].c_str());
155                         continue;
156                 }
157
158                 string key(lines[i].begin(), lines[i].begin() + split);
159
160                 // Skip any spaces after the colon.
161                 do {
162                         ++split;
163                 } while (split < lines[i].size() && lines[i][split] == ' ');
164
165                 string value(lines[i].begin() + split, lines[i].end());
166
167                 // Remove “Content-encoding: metacube”.
168                 // TODO: Make case-insensitive.
169                 if (key == "Content-encoding" && value == "metacube") {
170                         continue;
171                 }
172
173                 parameters.insert(make_pair(key, value));
174         }
175
176         // Change “Server: foo” to “Server: metacube/0.1 (reflecting: foo)”
177         // TODO: Make case-insensitive.
178         // XXX: Use a Via: instead?
179         if (parameters.count("Server") == 0) {
180                 parameters.insert(make_pair("Server", SERVER_IDENTIFICATION));
181         } else {
182                 for (multimap<string, string>::iterator it = parameters.begin();
183                      it != parameters.end();
184                      ++it) {
185                         if (it->first != "Server") {
186                                 continue;
187                         }
188                         it->second = SERVER_IDENTIFICATION " (reflecting: " + it->second + ")";
189                 }
190         }
191
192         // Construct the new HTTP header.
193         http_header = "HTTP/1.0 200 OK\r\n";
194         for (multimap<string, string>::iterator it = parameters.begin();
195              it != parameters.end();
196              ++it) {
197                 http_header.append(it->first + ": " + it->second + "\r\n");
198         }
199         http_header.append("\r\n");     
200         servers->set_header(stream_id, http_header);
201
202         return true;
203 }
204
205 void HTTPInput::do_work()
206 {
207         while (!should_stop) {
208                 if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
209                         // Since we are non-blocking, we need to wait for the right state first.
210                         // Wait up to 50 ms, then check should_stop.
211                         pollfd pfd;
212                         pfd.fd = sock;
213                         pfd.events = (state == SENDING_REQUEST) ? POLLOUT : POLLIN;
214                         pfd.events |= POLLRDHUP;
215
216                         int nfds = poll(&pfd, 1, 50);
217                         if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
218                                 continue;
219                         }
220                         if (nfds == -1) {
221                                 perror("poll");
222                                 state = CLOSING_SOCKET;
223                         }
224                 }
225
226                 switch (state) {
227                 case NOT_CONNECTED:
228                         request.clear();
229                         request_bytes_sent = 0;
230                         response.clear();
231                         pending_data.clear();
232
233                         {
234                                 string protocol;  // Thrown away.
235                                 if (!parse_url(url, &protocol, &host, &port, &path)) {
236                                         fprintf(stderr, "Failed to parse URL '%s'\n", url.c_str());
237                                         break;
238                                 }
239                         }
240
241                         sock = lookup_and_connect(host, port);
242                         if (sock != -1) {
243                                 // Yay, successful connect. Try to set it as nonblocking.
244                                 int one = 1;
245                                 if (ioctl(sock, FIONBIO, &one) == -1) {
246                                         perror("ioctl(FIONBIO)");
247                                         state = CLOSING_SOCKET;
248                                 } else {
249                                         state = SENDING_REQUEST;
250                                         request = "GET " + path + " HTTP/1.0\r\nUser-Agent: cubemap\r\n\r\n";
251                                         request_bytes_sent = 0;
252                                 }
253                         }
254                         break;
255                 case SENDING_REQUEST: {
256                         size_t to_send = request.size() - request_bytes_sent;
257                         int ret;
258
259                         do {
260                                 ret = write(sock, request.data() + request_bytes_sent, to_send);
261                         } while (ret == -1 && errno == EINTR);
262
263                         if (ret == -1) {
264                                 perror("write");
265                                 state = CLOSING_SOCKET;
266                                 continue;
267                         }
268
269                         assert(ret >= 0);
270                         request_bytes_sent += ret;
271
272                         if (request_bytes_sent == request.size()) {
273                                 state = RECEIVING_HEADER;
274                         }
275                         break;
276                 }
277                 case RECEIVING_HEADER: {
278                         char buf[4096];
279                         int ret;
280
281                         do {
282                                 ret = read(sock, buf, sizeof(buf));
283                         } while (ret == -1 && errno == EINTR);
284
285                         if (ret == -1) {
286                                 perror("read");
287                                 state = CLOSING_SOCKET;
288                                 continue;
289                         }
290
291                         if (ret == 0) {
292                                 // This really shouldn't happen...
293                                 fprintf(stderr, "Socket unexpectedly closed while reading header\n");
294                                 state = CLOSING_SOCKET;
295                                 continue;
296                         }
297                         
298                         RequestParseStatus status = wait_for_double_newline(&response, buf, ret);
299                         
300                         if (status == RP_OUT_OF_SPACE) {
301                                 fprintf(stderr, "WARNING: fd %d sent overlong response!\n", sock);
302                                 state = CLOSING_SOCKET;
303                                 continue;
304                         } else if (status == RP_NOT_FINISHED_YET) {
305                                 continue;
306                         }
307         
308                         // OK, so we're fine, but there might be some of the actual data after the response.
309                         // We'll need to deal with that separately.
310                         string extra_data;
311                         if (status == RP_EXTRA_DATA) {
312                                 char *ptr = static_cast<char *>(
313                                         memmem(response.data(), response.size(), "\r\n\r\n", 4));
314                                 assert(ptr != NULL);
315                                 extra_data = string(ptr, &response[0] + response.size());
316                                 response.resize(ptr - response.data());
317                         }
318
319                         if (!parse_response(response)) {
320                                 state = CLOSING_SOCKET;
321                                 continue;
322                         }
323
324                         if (!extra_data.empty()) {
325                                 process_data(&extra_data[0], extra_data.size());
326                         }
327
328                         state = RECEIVING_DATA;
329                         break;
330                 }
331                 case RECEIVING_DATA: {
332                         char buf[4096];
333                         int ret;
334
335                         do {
336                                 ret = read(sock, buf, sizeof(buf));
337                         } while (ret == -1 && errno == EINTR);
338
339                         if (ret == -1) {
340                                 perror("read");
341                                 state = CLOSING_SOCKET;
342                                 continue;
343                         }
344
345                         if (ret == 0) {
346                                 // This really shouldn't happen...
347                                 fprintf(stderr, "Socket unexpectedly closed while reading header\n");
348                                 state = CLOSING_SOCKET;
349                                 continue;
350                         }
351
352                         process_data(buf, ret);
353                         break;
354                 }
355                 case CLOSING_SOCKET: {
356                         int err;
357                         do {
358                                 err = close(sock);
359                         } while (err == -1 && errno == EINTR);
360
361                         if (err == -1) {
362                                 perror("close");
363                         }
364
365                         state = NOT_CONNECTED;
366                         break;
367                 }
368                 default:
369                         assert(false);
370                 }
371
372                 // If we are still in NOT_CONNECTED, either something went wrong,
373                 // or the connection just got closed.
374                 // The earlier steps have already given the error message, if any.
375                 if (state == NOT_CONNECTED && !should_stop) {
376                         fprintf(stderr, "Waiting 0.2 second and restarting...\n");
377                         usleep(200000);
378                 }
379         }
380 }
381
382 void HTTPInput::process_data(char *ptr, size_t bytes)
383 {
384         pending_data.insert(pending_data.end(), ptr, ptr + bytes);
385
386         for ( ;; ) {
387                 // If we don't have enough data (yet) for even the Metacube header, just return.
388                 if (pending_data.size() < sizeof(metacube_block_header)) {
389                         return;
390                 }
391
392                 // Make sure we have the Metacube sync header at the start.
393                 // We may need to skip over junk data (it _should_ not happen, though).
394                 if (!has_metacube_header) {
395                         char *ptr = static_cast<char *>(
396                                 memmem(pending_data.data(), pending_data.size(),
397                                        METACUBE_SYNC, strlen(METACUBE_SYNC)));
398                         if (ptr == NULL) {
399                                 // OK, so we didn't find the sync marker. We know then that
400                                 // we do not have the _full_ marker in the buffer, but we
401                                 // could have N-1 bytes. Drop everything before that,
402                                 // and then give up.
403                                 drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1));
404                                 return;
405                         } else {
406                                 // Yay, we found the header. Drop everything (if anything) before it.
407                                 drop_pending_data(ptr - pending_data.data());
408                                 has_metacube_header = true;
409
410                                 // Re-check that we have the entire header; we could have dropped data.
411                                 if (pending_data.size() < sizeof(metacube_block_header)) {
412                                         return;
413                                 }
414                         }
415                 }
416
417                 // Now it's safe to read the header.
418                 metacube_block_header *hdr = reinterpret_cast<metacube_block_header *>(pending_data.data());    
419                 assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0);
420                 uint32_t size = ntohl(hdr->size);
421                 uint32_t flags = ntohl(hdr->flags);
422
423                 // See if we have the entire block. If not, wait for more data.
424                 if (pending_data.size() < sizeof(metacube_block_header) + size) {
425                         return;
426                 }
427
428                 // Send this block on to the data.
429                 char *inner_data = pending_data.data() + sizeof(metacube_block_header);
430                 if (flags & METACUBE_FLAGS_HEADER) {
431                         string header(inner_data, inner_data + size);
432                         servers->set_header(stream_id, http_header + header);
433                 } else { 
434                         servers->add_data(stream_id, inner_data, size);
435                 }
436
437                 // Consume the block. This isn't the most efficient way of dealing with things
438                 // should we have many blocks, but these routines don't need to be too efficient
439                 // anyway.
440                 pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size);
441                 has_metacube_header = false;
442         }
443 }
444
445 void HTTPInput::drop_pending_data(size_t num_bytes)
446 {
447         if (num_bytes == 0) {
448                 return;
449         }
450         fprintf(stderr, "Warning: Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?\n",
451                 (long long)num_bytes);
452         pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
453 }
454