X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=httpinput.cpp;h=4b34e85ddd146d5a2dd240aea551bd9498757f14;hp=5b74319d71f77645c99b45452b0a677298316901;hb=1135808bf9df44b879994e6dac07a31eb78c2fdb;hpb=a0fe013448d188b324c00383cfd91695d9d3d076 diff --git a/httpinput.cpp b/httpinput.cpp index 5b74319..4b34e85 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -20,7 +20,6 @@ #include "httpinput.h" #include "log.h" #include "metacube2.h" -#include "mutexlock.h" #include "parse.h" #include "serverpool.h" #include "state.pb.h" @@ -49,22 +48,14 @@ extern ServerPool *servers; HTTPInput::HTTPInput(const string &url, Input::Encoding encoding) : state(NOT_CONNECTED), url(url), - encoding(encoding), - has_metacube_header(false), - sock(-1), - num_connection_attempts(0), - suppress_logging(false) + encoding(encoding) { - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.bytes_received = 0; stats.data_bytes_received = 0; stats.metadata_bytes_received = 0; stats.connect_time = -1; stats.latency_sec = HUGE_VAL; - - last_verbose_connection.tv_sec = -3600; - last_verbose_connection.tv_nsec = 0; } HTTPInput::HTTPInput(const InputProto &serialized) @@ -79,9 +70,7 @@ HTTPInput::HTTPInput(const InputProto &serialized) http_header(serialized.http_header()), stream_header(serialized.stream_header()), has_metacube_header(serialized.has_metacube_header()), - sock(serialized.sock()), - num_connection_attempts(0), - suppress_logging(false) + sock(serialized.sock()) { pending_data.resize(serialized.pending_data().size()); memcpy(&pending_data[0], serialized.pending_data().data(), serialized.pending_data().size()); @@ -89,7 +78,6 @@ HTTPInput::HTTPInput(const InputProto &serialized) string protocol, user; parse_url(url, &protocol, &user, &host, &port, &path); // Don't care if it fails. - pthread_mutex_init(&stats_mutex, nullptr); stats.url = url; stats.bytes_received = serialized.bytes_received(); stats.data_bytes_received = serialized.data_bytes_received(); @@ -116,7 +104,7 @@ void HTTPInput::close_socket() sock = -1; } - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.connect_time = -1; } @@ -283,10 +271,9 @@ bool HTTPInput::parse_response(const string &request) } } - // Set “Connection: close”. + // Erase “Connection: close”; we'll set it on the sending side if needed. // TODO: Make case-insensitive. parameters.erase("Connection"); - parameters.insert(make_pair("Connection", "close")); // Construct the new HTTP header. http_header = "HTTP/1.0 200 OK\r\n"; @@ -405,7 +392,7 @@ void HTTPInput::do_work() request_bytes_sent = 0; } - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.connect_time = time(nullptr); clock_gettime(CLOCK_MONOTONIC_COARSE, &last_activity); } @@ -575,7 +562,7 @@ void HTTPInput::do_work() void HTTPInput::process_data(char *ptr, size_t bytes) { { - MutexLock mutex(&stats_mutex); + lock_guard lock(stats_mutex); stats.bytes_received += bytes; } @@ -654,14 +641,14 @@ void HTTPInput::process_data(char *ptr, size_t bytes) // TODO: Keep metadata when sending on to other Metacube users. if (flags & METACUBE_FLAGS_METADATA) { { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.metadata_bytes_received += size; } process_metacube_metadata_block(hdr, pending_data.data() + sizeof(hdr), size); } else { // Send this block on to the servers. { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.data_bytes_received += size; } char *inner_data = pending_data.data() + sizeof(metacube2_block_header); @@ -703,7 +690,7 @@ void HTTPInput::add_destination(int stream_index) InputStats HTTPInput::get_stats() const { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); return stats; } @@ -734,7 +721,7 @@ void HTTPInput::process_metacube_metadata_block(const metacube2_block_header &hd double elapsed = now.tv_sec - be64toh(pkt->tv_sec) + 1e-9 * (now.tv_nsec - long(be64toh(pkt->tv_nsec))); { - MutexLock lock(&stats_mutex); + lock_guard lock(stats_mutex); stats.latency_sec = elapsed; } }