Make the HTTP inputs time out after 30 seconds of no activity.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 17 Aug 2013 13:09:03 +0000 (15:09 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 17 Aug 2013 13:09:03 +0000 (15:09 +0200)
This is useful for the case where the remote machine is gone,
but TCP doesn't pick it up, and thus you can't easily restart
the streaming.

Makefile
httpinput.cpp

index e90ebb3..828c9fd 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ CXX=g++
 INSTALL=install
 PROTOC=protoc
 CXXFLAGS=-Wall -O2 -g -pthread
-LDLIBS=-lprotobuf -pthread
+LDLIBS=-lprotobuf -pthread -lrt
 
 OBJS=main.o client.o server.o stream.o udpstream.o serverpool.o mutexlock.o input.o input_stats.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o accesslog.o thread.o util.o log.o state.pb.o
 
index 2e6e617..7f96970 100644 (file)
 using namespace std;
 
 extern ServerPool *servers;
-         
+
+namespace {
+
+// Compute b-a.
+timespec clock_diff(const timespec &a, const timespec &b)
+{
+       timespec ret;
+       ret.tv_sec = b.tv_sec - a.tv_sec;
+       ret.tv_nsec = b.tv_nsec - a.tv_nsec;
+       if (ret.tv_nsec < 0) {
+               ret.tv_sec--;
+               ret.tv_nsec += 1000000000;
+       }
+       assert(ret.tv_nsec >= 0);
+       return ret;
+}
+
+}  // namespace
+
 HTTPInput::HTTPInput(const string &url)
        : state(NOT_CONNECTED),
          url(url),
@@ -273,11 +291,43 @@ bool HTTPInput::parse_response(const std::string &request)
 
 void HTTPInput::do_work()
 {
+       timespec last_activity;
+
+       // TODO: Make the timeout persist across restarts.
+       if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
+               int err = clock_gettime(CLOCK_MONOTONIC, &last_activity);
+               assert(err != -1);
+       }
+
        while (!should_stop()) {
                if (state == SENDING_REQUEST || state == RECEIVING_HEADER || state == RECEIVING_DATA) {
-                       bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, NULL);
-                       if (!activity) {
-                               // Most likely, should_stop was set.
+                       // Give the socket 30 seconds since last activity before we time out.
+                       static const int timeout_secs = 30;
+
+                       timespec now;
+                       int err = clock_gettime(CLOCK_MONOTONIC, &now);
+                       assert(err != -1);
+
+                       timespec elapsed = clock_diff(last_activity, now);
+                       if (elapsed.tv_sec >= timeout_secs) {
+                               // Timeout!
+                               log(ERROR, "[%s] Timeout after %d seconds, closing.", url.c_str(), elapsed.tv_sec);
+                               state = CLOSING_SOCKET;
+                               continue;
+                       }
+
+                       // Basically calculate (30 - (now - last_activity)) = (30 + (last_activity - now)).
+                       // Add a second of slack to account for differences between clocks.
+                       timespec timeout = clock_diff(now, last_activity);
+                       timeout.tv_sec += timeout_secs + 1;
+                       assert(timeout.tv_sec > 0 || (timeout.tv_sec >= 0 && timeout.tv_nsec > 0));
+
+                       bool activity = wait_for_activity(sock, (state == SENDING_REQUEST) ? POLLOUT : POLLIN, &timeout);
+                       if (activity) {
+                               err = clock_gettime(CLOCK_MONOTONIC, &last_activity);
+                               assert(err != -1);
+                       } else {
+                               // OK. Most likely, should_stop was set, or we have timed out.
                                continue;
                        }
                }
@@ -316,6 +366,7 @@ void HTTPInput::do_work()
 
                                MutexLock lock(&stats_mutex);
                                stats.connect_time = time(NULL);
+                               clock_gettime(CLOCK_MONOTONIC, &last_activity);
                        }
                        break;
                case SENDING_REQUEST: {