From 70c0baf4bcec3a77f0626d5a7bfde87fc7339698 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 17 Aug 2013 15:09:03 +0200 Subject: [PATCH] Make the HTTP inputs time out after 30 seconds of no activity. This is useful for the case where the remote machine is gone, but TCP doesn't pick it up, and thus you can't easily restart the streaming. --- Makefile | 2 +- httpinput.cpp | 59 +++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index e90ebb3..828c9fd 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ CXX=g++ INSTALL=install PROTOC=protoc CXXFLAGS=-Wall -O2 -g -pthread -LDLIBS=-lprotobuf -pthread +LDLIBS=-lprotobuf -pthread -lrt OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o diff --git a/httpinput.cpp b/httpinput.cpp index 2e6e617..7f96970 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -28,7 +28,25 @@ using namespace std; extern ServerPool *servers; - + +namespace { + +// Compute b-a. +timespec clock_diff(const timespec &a, const timespec &b) +{ + timespec ret; + ret.tv_sec = b.tv_sec - a.tv_sec; + ret.tv_nsec = b.tv_nsec - a.tv_nsec; + if (ret.tv_nsec < 0) { + ret.tv_sec--; + ret.tv_nsec += 1000000000; + } + assert(ret.tv_nsec >= 0); + return ret; +} + +} // namespace + HTTPInput::HTTPInput(const string &url) : state(NOT_CONNECTED), url(url), @@ -273,11 +291,43 @@ bool HTTPInput::parse_response(const std::string &request) void HTTPInput::do_work() { + timespec last_activity; + + // TODO: Make the timeout persist across restarts. + if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { + int err = clock_gettime(CLOCK_MONOTONIC, &last_activity); + assert(err != -1); + } + while (!should_stop()) { if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) { - bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL); - if (!activity) { - // Most likely, should_stop was set. + // Give the socket 30 seconds since last activity before we time out. + static const int timeout_secs = 30; + + timespec now; + int err = clock_gettime(CLOCK_MONOTONIC, &now); + assert(err != -1); + + timespec elapsed = clock_diff(last_activity, now); + if (elapsed.tv_sec >= timeout_secs) { + // Timeout! + log(ERROR, "[%s] Timeout after %d seconds, closing.", url.c_str(), elapsed.tv_sec); + state = CLOSING_SOCKET; + continue; + } + + // Basically calculate (30 - (now - last_activity)) = (30 + (last_activity - now)). + // Add a second of slack to account for differences between clocks. + timespec timeout = clock_diff(now, last_activity); + timeout.tv_sec += timeout_secs + 1; + assert(timeout.tv_sec > 0 || (timeout.tv_sec >= 0 && timeout.tv_nsec > 0)); + + bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, &timeout); + if (activity) { + err = clock_gettime(CLOCK_MONOTONIC, &last_activity); + assert(err != -1); + } else { + // OK. Most likely, should_stop was set, or we have timed out. continue; } } @@ -316,6 +366,7 @@ void HTTPInput::do_work() MutexLock lock(&stats_mutex); stats.connect_time = time(NULL); + clock_gettime(CLOCK_MONOTONIC, &last_activity); } break; case SENDING_REQUEST: { -- 2.39.2