Time out clients still in READING_REQUEST after 60 seconds.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Tue, 21 Jul 2015 23:57:04 +0000 (01:57 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 22 Jul 2015 00:04:06 +0000 (02:04 +0200)
Seemingly there are some of these around, and I've seen them eat up
fds in a long-running server. There's some pain in sorting the clients
on deserialization, but apart from that, this ended up being relatively
pain-free and should be efficient enough.

client.cpp
main.cpp
server.cpp
server.h

index e7a12f6..f191a40 100644 (file)
@@ -81,17 +81,8 @@ Client::Client(const ClientProto &serialized, Stream *stream)
                        }
                }
        }
-       if (serialized.has_connect_time_old()) {
-               // Do a rough conversion from time() to monotonic time.
-               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &connect_time) == -1) {
-                       log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
-                       return;
-               }
-               connect_time.tv_sec += serialized.connect_time_old() - time(NULL);
-       } else {
-               connect_time.tv_sec = serialized.connect_time_sec();
-               connect_time.tv_nsec = serialized.connect_time_nsec();
-       }
+       connect_time.tv_sec = serialized.connect_time_sec();
+       connect_time.tv_nsec = serialized.connect_time_nsec();
 }
 
 ClientProto Client::serialize() const
index 0218623..2080d73 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -10,6 +10,7 @@
 #include <sys/time.h>
 #include <sys/wait.h>
 #include <unistd.h>
+#include <algorithm>
 #include <map>
 #include <set>
 #include <string>
@@ -37,6 +38,18 @@ ServerPool *servers = NULL;
 volatile bool hupped = false;
 volatile bool stopped = false;
 
+namespace {
+
+struct OrderByConnectionTime {
+       bool operator() (const ClientProto &a, const ClientProto &b) const {
+               if (a.connect_time_sec() != b.connect_time_sec())
+                       return a.connect_time_sec() < b.connect_time_sec();
+               return a.connect_time_nsec() < b.connect_time_nsec();
+       }
+};
+
+}  // namespace
+
 struct InputWithRefcount {
        Input *input;
        int refcount;
@@ -444,9 +457,33 @@ start:
        // Find all streams in the configuration file, create them, and connect to the inputs.
        create_streams(config, deserialized_urls, &inputs);
        vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
+
+       // Convert old-style timestamps to new-style timestamps for all clients;
+       // this simplifies the sort below.
+       {
+               timespec now_monotonic;
+               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now_monotonic) == -1) {
+                       log(ERROR, "clock_gettime(CLOCK_MONOTONIC_COARSE) failed.");
+                       exit(1);
+               }
+               long delta_sec = now_monotonic.tv_sec - time(NULL);
+
+               for (int i = 0; i < loaded_state.clients_size(); ++i) {
+                       ClientProto* client = loaded_state.mutable_clients(i);
+                       if (client->has_connect_time_old()) {
+                               client->set_connect_time_sec(client->connect_time_old() + delta_sec);
+                               client->set_connect_time_nsec(now_monotonic.tv_nsec);
+                               client->clear_connect_time_old();
+                       }
+               }
+       }
        
        // Put back the existing clients. It doesn't matter which server we
-       // allocate them to, so just do round-robin.
+       // allocate them to, so just do round-robin. However, we need to sort them
+       // by connection time first, since add_client_serialized() expects that.
+       sort(loaded_state.mutable_clients()->begin(),
+            loaded_state.mutable_clients()->end(),
+            OrderByConnectionTime());
        for (int i = 0; i < loaded_state.clients_size(); ++i) {
                if (deleted_urls.count(loaded_state.clients(i).url()) != 0) {
                        safe_close(loaded_state.clients(i).sock());
index 6fa4991..01ed0ff 100644 (file)
@@ -35,6 +35,23 @@ using namespace std;
 
 extern AccessLogThread *access_log;
 
+namespace {
+
+inline bool is_equal(timespec a, timespec b)
+{
+       return a.tv_sec == b.tv_sec &&
+              a.tv_nsec == b.tv_nsec;
+}
+
+inline bool is_earlier(timespec a, timespec b)
+{
+       if (a.tv_sec != b.tv_sec)
+               return a.tv_sec < b.tv_sec;
+       return a.tv_nsec < b.tv_nsec;
+}
+
+}  // namespace
+
 Server::Server()
 {
        pthread_mutex_init(&mutex, NULL);
@@ -90,6 +107,7 @@ void Server::do_work()
        
                process_queued_data();
 
+               // Process each client where we have socket activity.
                for (int i = 0; i < nfds; ++i) {
                        Client *client = reinterpret_cast<Client *>(events[i].data.u64);
 
@@ -101,6 +119,8 @@ void Server::do_work()
                        process_client(client);
                }
 
+               // Process each client where its stream has new data,
+               // even if there was no socket activity.
                for (size_t i = 0; i < streams.size(); ++i) {   
                        vector<Client *> to_process;
                        swap(streams[i]->to_process, to_process);
@@ -108,6 +128,49 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
+
+               // Finally, go through each client to see if it's timed out
+               // in the READING_REQUEST state. (Seemingly there are clients
+               // that can hold sockets up for days at a time without sending
+               // anything at all.)
+               timespec timeout_time;
+               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &timeout_time) == -1) {
+                       log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+                       continue;
+               }
+               timeout_time.tv_sec -= REQUEST_READ_TIMEOUT_SEC;
+               while (!clients_ordered_by_connect_time.empty()) {
+                       pair<timespec, int> &connect_time_and_fd = clients_ordered_by_connect_time.front();
+
+                       // See if we have reached the end of clients to process.
+                       if (is_earlier(timeout_time, connect_time_and_fd.first)) {
+                               break;
+                       }
+
+                       // If this client doesn't exist anymore, just ignore it
+                       // (it was deleted earlier).
+                       std::map<int, Client>::iterator client_it = clients.find(connect_time_and_fd.second);
+                       if (client_it == clients.end()) {
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+                       Client *client = &client_it->second;
+                       if (!is_equal(client->connect_time, connect_time_and_fd.first)) {
+                               // Another client has taken this fd in the meantime.
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+
+                       if (client->state != Client::READING_REQUEST) {
+                               // Only READING_REQUEST can time out.
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+
+                       // OK, it timed out.
+                       close_client(client);
+                       clients_ordered_by_connect_time.pop();
+               }
        }
 }
 
@@ -154,6 +217,11 @@ void Server::add_client(int sock)
        assert(ret.second == true);  // Should not already exist.
        Client *client_ptr = &ret.first->second;
 
+       // Connection timestamps must be nondecreasing.
+       assert(clients_ordered_by_connect_time.empty() ||
+              !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
+       clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, sock));
+
        // Start listening on data from this socket.
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
@@ -182,6 +250,11 @@ void Server::add_client_from_serialized(const ClientProto &client)
        assert(ret.second == true);  // Should not already exist.
        Client *client_ptr = &ret.first->second;
 
+       // Connection timestamps must be nondecreasing.
+       assert(clients_ordered_by_connect_time.empty() ||
+              !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
+       clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, client.sock()));
+
        // Start listening on data from this socket.
        epoll_event ev;
        if (client.state() == Client::READING_REQUEST) {
index 49f9d51..085da19 100644 (file)
--- a/server.h
+++ b/server.h
@@ -8,6 +8,7 @@
 #include <sys/types.h>
 #include <time.h>
 #include <map>
+#include <queue>
 #include <string>
 #include <vector>
 
@@ -21,6 +22,7 @@ struct Stream;
 #define EPOLL_MAX_EVENTS 8192
 #define EPOLL_TIMEOUT_MS 20
 #define MAX_CLIENT_REQUEST 16384
+#define REQUEST_READ_TIMEOUT_SEC 60
 
 class CubemapStateProto;
 class StreamProto;
@@ -89,6 +91,18 @@ private:
        // Map from file descriptor to client.
        std::map<int, Client> clients;
 
+       // A list of all clients, ordered by the time they connected (first element),
+       // and their file descriptor (second element). It is ordered by connection time
+       // (and thus also by read timeout time) so that we can read clients from the
+       // start and stop processing once we get to one that isn't ready to be
+       // timed out yet (which makes each processing run amortized O(1)).
+       //
+       // Note that when we delete a client, we don't update this queue.
+       // This means that when reading it, we need to check if the client it
+       // describes is still exists (ie., that the fd still exists, and that
+       // the timespec matches).
+       std::queue<std::pair<timespec, int> > clients_ordered_by_connect_time;
+
        // Used for epoll implementation (obviously).
        int epoll_fd;
        epoll_event events[EPOLL_MAX_EVENTS];