-#include <stdio.h>
#include <assert.h>
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
+#include <time.h>
#include <unistd.h>
#include <map>
#include <string>
void HTTPInput::close_socket()
{
- safe_close(sock);
+ if (sock != -1) {
+ safe_close(sock);
+ }
}
InputProto HTTPInput::serialize() const
{
addrinfo *ai;
int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai);
- if (err == -1) {
+ if (err != 0) {
log(WARNING, "[%s] Lookup of '%s' failed (%s).",
url.c_str(), host.c_str(), gai_strerror(err));
- freeaddrinfo(ai);
return -1;
}
addrinfo *base_ai = ai;
// Connect to everything in turn until we have a socket.
- while (ai && !should_stop()) {
+ for ( ; ai && !should_stop(); ai = ai->ai_next) {
int sock = socket(ai->ai_family, SOCK_STREAM, IPPROTO_TCP);
if (sock == -1) {
// Could be e.g. EPROTONOSUPPORT. The show must go on.
}
safe_close(sock);
- ai = ai->ai_next;
}
// Give the last one as error.
http_header.append(it->first + ": " + it->second + "\r\n");
}
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header, "");
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, "");
}
return true;
request_bytes_sent = 0;
response.clear();
pending_data.clear();
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], "", "");
+ has_metacube_header = false;
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], "", "");
}
{
char *ptr = static_cast<char *>(
memmem(response.data(), response.size(), "\r\n\r\n", 4));
assert(ptr != NULL);
- extra_data = string(ptr, &response[0] + response.size());
+ extra_data = string(ptr + 4, &response[0] + response.size());
response.resize(ptr - response.data());
}
if (ret == 0) {
// This really shouldn't happen...
- log(ERROR, "[%s] Socket unexpectedly closed while reading header",
+ log(ERROR, "[%s] Socket unexpectedly closed while reading data",
url.c_str());
state = CLOSING_SOCKET;
continue;
uint32_t size = ntohl(hdr->size);
uint32_t flags = ntohl(hdr->flags);
+ if (size > 65535) {
+ log(WARNING, "[%s] Metacube block of %d bytes (flags=%x); corrupted header?",
+ url.c_str(), size, flags);
+ }
+
// See if we have the entire block. If not, wait for more data.
if (pending_data.size() < sizeof(metacube_block_header) + size) {
return;
char *inner_data = pending_data.data() + sizeof(metacube_block_header);
if (flags & METACUBE_FLAGS_HEADER) {
string header(inner_data, inner_data + size);
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header, header);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, header);
}
} else {
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->add_data(stream_ids[i], inner_data, size);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->add_data(stream_indices[i], inner_data, size);
}
}
}
log(WARNING, "[%s] Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?",
url.c_str(), (long long)num_bytes);
+ assert(pending_data.size() >= num_bytes);
pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
}