2e6e617bad9905c78e1b3b7427cd53f2e5a6bb3f
[cubemap] / httpinput.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <netdb.h>
4 #include <netinet/in.h>
5 #include <poll.h>
6 #include <stdint.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <sys/ioctl.h>
10 #include <sys/socket.h>
11 #include <time.h>
12 #include <unistd.h>
13 #include <map>
14 #include <string>
15 #include <utility>
16 #include <vector>
17
18 #include "httpinput.h"
19 #include "log.h"
20 #include "metacube.h"
21 #include "mutexlock.h"
22 #include "parse.h"
23 #include "serverpool.h"
24 #include "state.pb.h"
25 #include "util.h"
26 #include "version.h"
27
28 using namespace std;
29
30 extern ServerPool *servers;
31           
32 HTTPInput::HTTPInput(const string &url)
33         : state(NOT_CONNECTED),
34           url(url),
35           has_metacube_header(false),
36           sock(-1)
37 {
38         pthread_mutex_init(&stats_mutex, NULL);
39         stats.url = url;
40         stats.bytes_received = 0;
41         stats.data_bytes_received = 0;
42         stats.connect_time = -1;
43 }
44
45 HTTPInput::HTTPInput(const InputProto &serialized)
46         : state(State(serialized.state())),
47           url(serialized.url()),
48           request(serialized.request()),
49           request_bytes_sent(serialized.request_bytes_sent()),
50           response(serialized.response()),
51           http_header(serialized.http_header()),
52           stream_header(serialized.stream_header()),
53           has_metacube_header(serialized.has_metacube_header()),
54           sock(serialized.sock())
55 {
56         pending_data.resize(serialized.pending_data().size());
57         memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size());
58
59         string protocol;
60         parse_url(url, &protocol, &host, &port, &path);  // Don't care if it fails.
61
62         // Older versions stored the extra \r\n in the HTTP header.
63         // Strip it if we find it.
64         if (http_header.size() >= 4 &&
65             memcmp(http_header.data() + http_header.size() - 4, "\r\n\r\n", 4) == 0) {
66                 http_header.resize(http_header.size() - 2);
67         }
68
69         pthread_mutex_init(&stats_mutex, NULL);
70         stats.url = url;
71         stats.bytes_received = serialized.bytes_received();
72         stats.data_bytes_received = serialized.data_bytes_received();
73         if (serialized.has_connect_time()) {
74                 stats.connect_time = serialized.connect_time();
75         } else {
76                 stats.connect_time = time(NULL);
77         }
78 }
79
80 void HTTPInput::close_socket()
81 {
82         if (sock != -1) {
83                 safe_close(sock);
84         }
85
86         MutexLock lock(&stats_mutex);
87         stats.connect_time = -1;
88 }
89
90 InputProto HTTPInput::serialize() const
91 {
92         InputProto serialized;
93         serialized.set_state(state);
94         serialized.set_url(url);
95         serialized.set_request(request);
96         serialized.set_request_bytes_sent(request_bytes_sent);
97         serialized.set_response(response);
98         serialized.set_http_header(http_header);
99         serialized.set_stream_header(stream_header);
100         serialized.set_pending_data(string(pending_data.begin(), pending_data.end()));
101         serialized.set_has_metacube_header(has_metacube_header);
102         serialized.set_sock(sock);
103         serialized.set_bytes_received(stats.bytes_received);
104         serialized.set_data_bytes_received(stats.data_bytes_received);
105         serialized.set_connect_time(stats.connect_time);
106         return serialized;
107 }
108
109 int HTTPInput::lookup_and_connect(const string &host, const string &port)
110 {
111         addrinfo *ai;
112         int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
113         if (err != 0) {
114                 log(WARNING, "[%s] Lookup of '%s' failed (%s).",
115                         url.c_str(), host.c_str(), gai_strerror(err));
116                 return -1;
117         }
118
119         addrinfo *base_ai = ai;
120
121         // Connect to everything in turn until we have a socket.
122         for ( ; ai && !should_stop(); ai = ai->ai_next) {
123                 int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
124                 if (sock == -1) {
125                         // Could be e.g. EPROTONOSUPPORT. The show must go on.
126                         continue;
127                 }
128
129                 // Now do a non-blocking connect. This is important because we want to be able to be
130                 // woken up, even though it's rather cumbersome.
131
132                 // Set the socket as nonblocking.
133                 int one = 1;
134                 if (ioctl(sock, FIONBIO, &one) == -1) {
135                         log_perror("ioctl(FIONBIO)");
136                         safe_close(sock);
137                         return -1;                      
138                 }
139
140                 // Do a non-blocking connect.
141                 do {
142                         err = connect(sock, ai->ai_addr, ai->ai_addrlen);
143                 } while (err == -1 && errno == EINTR);
144
145                 if (err == -1 && errno != EINPROGRESS) {
146                         log_perror("connect");
147                         safe_close(sock);
148                         continue;
149                 }
150
151                 // Wait for the connect to complete, or an error to happen.
152                 for ( ;; ) {
153                         bool complete = wait_for_activity(sock, POLLIN | POLLOUT, NULL);
154                         if (should_stop()) {
155                                 safe_close(sock);
156                                 return -1;
157                         }
158                         if (complete) {
159                                 break;
160                         }
161                 }
162
163                 // Check whether it ended in an error or not.
164                 socklen_t err_size = sizeof(err);
165                 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_size) == -1) {
166                         log_perror("getsockopt");
167                         safe_close(sock);
168                         continue;
169                 }
170
171                 errno = err;
172
173                 if (err == 0) {
174                         // Successful connect.
175                         freeaddrinfo(base_ai);
176                         return sock;
177                 }
178
179                 safe_close(sock);
180         }
181
182         // Give the last one as error.
183         log(WARNING, "[%s] Connect to '%s' failed (%s)",
184                 url.c_str(), host.c_str(), strerror(errno));
185         freeaddrinfo(base_ai);
186         return -1;
187 }
188         
189 bool HTTPInput::parse_response(const std::string &request)
190 {
191         vector<string> lines = split_lines(response);
192         if (lines.empty()) {
193                 log(WARNING, "[%s] Empty HTTP response from input.", url.c_str());
194                 return false;
195         }
196
197         vector<string> first_line_tokens = split_tokens(lines[0]);
198         if (first_line_tokens.size() < 2) {
199                 log(WARNING, "[%s] Malformed response line '%s' from input.",
200                         url.c_str(), lines[0].c_str());
201                 return false;
202         }
203
204         int response = atoi(first_line_tokens[1].c_str());
205         if (response != 200) {
206                 log(WARNING, "[%s] Non-200 response '%s' from input.",
207                         url.c_str(), lines[0].c_str());
208                 return false;
209         }
210
211         multimap<string, string> parameters;
212         for (size_t i = 1; i < lines.size(); ++i) {
213                 size_t split = lines[i].find(":");
214                 if (split == string::npos) {
215                         log(WARNING, "[%s] Ignoring malformed HTTP response line '%s'",
216                                 url.c_str(), lines[i].c_str());
217                         continue;
218                 }
219
220                 string key(lines[i].begin(), lines[i].begin() + split);
221
222                 // Skip any spaces after the colon.
223                 do {
224                         ++split;
225                 } while (split < lines[i].size() && lines[i][split] == ' ');
226
227                 string value(lines[i].begin() + split, lines[i].end());
228
229                 // Remove “Content-encoding: metacube”.
230                 // TODO: Make case-insensitive.
231                 if (key == "Content-encoding" && value == "metacube") {
232                         continue;
233                 }
234
235                 parameters.insert(make_pair(key, value));
236         }
237
238         // Change “Server: foo” to “Server: metacube/0.1 (reflecting: foo)”
239         // TODO: Make case-insensitive.
240         // XXX: Use a Via: instead?
241         if (parameters.count("Server") == 0) {
242                 parameters.insert(make_pair("Server", SERVER_IDENTIFICATION));
243         } else {
244                 for (multimap<string, string>::iterator it = parameters.begin();
245                      it != parameters.end();
246                      ++it) {
247                         if (it->first != "Server") {
248                                 continue;
249                         }
250                         it->second = SERVER_IDENTIFICATION " (reflecting: " + it->second + ")";
251                 }
252         }
253
254         // Set “Connection: close”.
255         // TODO: Make case-insensitive.
256         parameters.erase("Connection");
257         parameters.insert(make_pair("Connection", "close"));
258
259         // Construct the new HTTP header.
260         http_header = "HTTP/1.0 200 OK\r\n";
261         for (multimap<string, string>::iterator it = parameters.begin();
262              it != parameters.end();
263              ++it) {
264                 http_header.append(it->first + ": " + it->second + "\r\n");
265         }
266
267         for (size_t i = 0; i < stream_indices.size(); ++i) {
268                 servers->set_header(stream_indices[i], http_header, stream_header);
269         }
270
271         return true;
272 }
273
274 void HTTPInput::do_work()
275 {
276         while (!should_stop()) {
277                 if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
278                         bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
279                         if (!activity) {
280                                 // Most likely, should_stop was set.
281                                 continue;
282                         }
283                 }
284
285                 switch (state) {
286                 case NOT_CONNECTED:
287                         request.clear();
288                         request_bytes_sent = 0;
289                         response.clear();
290                         pending_data.clear();
291                         has_metacube_header = false;
292                         for (size_t i = 0; i < stream_indices.size(); ++i) {
293                                 servers->set_header(stream_indices[i], "", "");
294                         }
295
296                         {
297                                 string protocol;  // Thrown away.
298                                 if (!parse_url(url, &protocol, &host, &port, &path)) {
299                                         log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str());
300                                         break;
301                                 }
302                         }
303
304                         sock = lookup_and_connect(host, port);
305                         if (sock != -1) {
306                                 // Yay, successful connect. Try to set it as nonblocking.
307                                 int one = 1;
308                                 if (ioctl(sock, FIONBIO, &one) == -1) {
309                                         log_perror("ioctl(FIONBIO)");
310                                         state = CLOSING_SOCKET;
311                                 } else {
312                                         state = SENDING_REQUEST;
313                                         request = "GET " + path + " HTTP/1.0\r\nUser-Agent: cubemap\r\n\r\n";
314                                         request_bytes_sent = 0;
315                                 }
316
317                                 MutexLock lock(&stats_mutex);
318                                 stats.connect_time = time(NULL);
319                         }
320                         break;
321                 case SENDING_REQUEST: {
322                         size_t to_send = request.size() - request_bytes_sent;
323                         int ret;
324
325                         do {
326                                 ret = write(sock, request.data() + request_bytes_sent, to_send);
327                         } while (ret == -1 && errno == EINTR);
328
329                         if (ret == -1) {
330                                 log_perror("write");
331                                 state = CLOSING_SOCKET;
332                                 continue;
333                         }
334
335                         assert(ret >= 0);
336                         request_bytes_sent += ret;
337
338                         if (request_bytes_sent == request.size()) {
339                                 state = RECEIVING_HEADER;
340                         }
341                         break;
342                 }
343                 case RECEIVING_HEADER: {
344                         char buf[4096];
345                         int ret;
346
347                         do {
348                                 ret = read(sock, buf, sizeof(buf));
349                         } while (ret == -1 && errno == EINTR);
350
351                         if (ret == -1) {
352                                 log_perror("read");
353                                 state = CLOSING_SOCKET;
354                                 continue;
355                         }
356
357                         if (ret == 0) {
358                                 // This really shouldn't happen...
359                                 log(ERROR, "[%s] Socket unexpectedly closed while reading header",
360                                            url.c_str());
361                                 state = CLOSING_SOCKET;
362                                 continue;
363                         }
364                         
365                         RequestParseStatus status = wait_for_double_newline(&response, buf, ret);
366                         
367                         if (status == RP_OUT_OF_SPACE) {
368                                 log(WARNING, "[%s] Sever sent overlong HTTP response!", url.c_str());
369                                 state = CLOSING_SOCKET;
370                                 continue;
371                         } else if (status == RP_NOT_FINISHED_YET) {
372                                 continue;
373                         }
374         
375                         // OK, so we're fine, but there might be some of the actual data after the response.
376                         // We'll need to deal with that separately.
377                         string extra_data;
378                         if (status == RP_EXTRA_DATA) {
379                                 char *ptr = static_cast<char *>(
380                                         memmem(response.data(), response.size(), "\r\n\r\n", 4));
381                                 assert(ptr != NULL);
382                                 extra_data = string(ptr + 4, &response[0] + response.size());
383                                 response.resize(ptr - response.data());
384                         }
385
386                         if (!parse_response(response)) {
387                                 state = CLOSING_SOCKET;
388                                 continue;
389                         }
390
391                         if (!extra_data.empty()) {
392                                 process_data(&extra_data[0], extra_data.size());
393                         }
394
395                         log(INFO, "[%s] Connected to '%s', receiving data.",
396                                    url.c_str(), url.c_str());
397                         state = RECEIVING_DATA;
398                         break;
399                 }
400                 case RECEIVING_DATA: {
401                         char buf[4096];
402                         int ret;
403
404                         do {
405                                 ret = read(sock, buf, sizeof(buf));
406                         } while (ret == -1 && errno == EINTR);
407
408                         if (ret == -1) {
409                                 log_perror("read");
410                                 state = CLOSING_SOCKET;
411                                 continue;
412                         }
413
414                         if (ret == 0) {
415                                 // This really shouldn't happen...
416                                 log(ERROR, "[%s] Socket unexpectedly closed while reading data",
417                                            url.c_str());
418                                 state = CLOSING_SOCKET;
419                                 continue;
420                         }
421
422                         process_data(buf, ret);
423                         break;
424                 }
425                 case CLOSING_SOCKET: {
426                         close_socket();
427                         state = NOT_CONNECTED;
428                         break;
429                 }
430                 default:
431                         assert(false);
432                 }
433
434                 // If we are still in NOT_CONNECTED, either something went wrong,
435                 // or the connection just got closed.
436                 // The earlier steps have already given the error message, if any.
437                 if (state == NOT_CONNECTED && !should_stop()) {
438                         log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str());
439                         timespec timeout_ts;
440                         timeout_ts.tv_sec = 0;
441                         timeout_ts.tv_nsec = 200000000;
442                         wait_for_wakeup(&timeout_ts);
443                 }
444         }
445 }
446
447 void HTTPInput::process_data(char *ptr, size_t bytes)
448 {
449         pending_data.insert(pending_data.end(), ptr, ptr + bytes);
450         {
451                 MutexLock mutex(&stats_mutex);
452                 stats.bytes_received += bytes;
453         }
454
455         for ( ;; ) {
456                 // If we don't have enough data (yet) for even the Metacube header, just return.
457                 if (pending_data.size() < sizeof(metacube_block_header)) {
458                         return;
459                 }
460
461                 // Make sure we have the Metacube sync header at the start.
462                 // We may need to skip over junk data (it _should_ not happen, though).
463                 if (!has_metacube_header) {
464                         char *ptr = static_cast<char *>(
465                                 memmem(pending_data.data(), pending_data.size(),
466                                        METACUBE_SYNC, strlen(METACUBE_SYNC)));
467                         if (ptr == NULL) {
468                                 // OK, so we didn't find the sync marker. We know then that
469                                 // we do not have the _full_ marker in the buffer, but we
470                                 // could have N-1 bytes. Drop everything before that,
471                                 // and then give up.
472                                 drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1));
473                                 return;
474                         } else {
475                                 // Yay, we found the header. Drop everything (if anything) before it.
476                                 drop_pending_data(ptr - pending_data.data());
477                                 has_metacube_header = true;
478
479                                 // Re-check that we have the entire header; we could have dropped data.
480                                 if (pending_data.size() < sizeof(metacube_block_header)) {
481                                         return;
482                                 }
483                         }
484                 }
485
486                 // Now it's safe to read the header.
487                 metacube_block_header *hdr = reinterpret_cast<metacube_block_header *>(pending_data.data());    
488                 assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0);
489                 uint32_t size = ntohl(hdr->size);
490                 uint32_t flags = ntohl(hdr->flags);
491
492                 if (size > 262144) {
493                         log(WARNING, "[%s] Metacube block of %d bytes (flags=%x); corrupted header?",
494                                 url.c_str(), size, flags);
495                 }
496
497                 // See if we have the entire block. If not, wait for more data.
498                 if (pending_data.size() < sizeof(metacube_block_header) + size) {
499                         return;
500                 }
501
502                 // Send this block on to the servers.
503                 {
504                         MutexLock lock(&stats_mutex);
505                         stats.data_bytes_received += size;
506                 }
507                 char *inner_data = pending_data.data() + sizeof(metacube_block_header);
508                 if (flags & METACUBE_FLAGS_HEADER) {
509                         stream_header = string(inner_data, inner_data + size);
510                         for (size_t i = 0; i < stream_indices.size(); ++i) {
511                                 servers->set_header(stream_indices[i], http_header, stream_header);
512                         }
513                 } else {
514                         StreamStartSuitability suitable_for_stream_start;
515                         if (flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) {
516                                 suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START;
517                         } else {
518                                 suitable_for_stream_start = SUITABLE_FOR_STREAM_START;
519                         }
520                         for (size_t i = 0; i < stream_indices.size(); ++i) {
521                                 servers->add_data(stream_indices[i], inner_data, size, suitable_for_stream_start);
522                         }
523                 }
524
525                 // Consume the block. This isn't the most efficient way of dealing with things
526                 // should we have many blocks, but these routines don't need to be too efficient
527                 // anyway.
528                 pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size);
529                 has_metacube_header = false;
530         }
531 }
532
533 void HTTPInput::drop_pending_data(size_t num_bytes)
534 {
535         if (num_bytes == 0) {
536                 return;
537         }
538         log(WARNING, "[%s] Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?",
539                 url.c_str(), (long long)num_bytes);
540         assert(pending_data.size() >= num_bytes);
541         pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
542 }
543
544 void HTTPInput::add_destination(int stream_index)
545 {
546         stream_indices.push_back(stream_index);
547         servers->set_header(stream_index, http_header, stream_header);
548 }
549
550 InputStats HTTPInput::get_stats() const
551 {
552         MutexLock lock(&stats_mutex);
553         return stats;
554 }