]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Replace an assert with a small hack.
[cubemap] / server.cpp
index a2f28e62a1d5ca370194fcd58b4f72bc97473e30..079b629dcaa9b601b31033c3a4c55626711f7662 100644 (file)
@@ -35,6 +35,23 @@ using namespace std;
 
 extern AccessLogThread *access_log;
 
+namespace {
+
+inline bool is_equal(timespec a, timespec b)
+{
+       return a.tv_sec == b.tv_sec &&
+              a.tv_nsec == b.tv_nsec;
+}
+
+inline bool is_earlier(timespec a, timespec b)
+{
+       if (a.tv_sec != b.tv_sec)
+               return a.tv_sec < b.tv_sec;
+       return a.tv_nsec < b.tv_nsec;
+}
+
+}  // namespace
+
 Server::Server()
 {
        pthread_mutex_init(&mutex, NULL);
@@ -90,6 +107,7 @@ void Server::do_work()
        
                process_queued_data();
 
+               // Process each client where we have socket activity.
                for (int i = 0; i < nfds; ++i) {
                        Client *client = reinterpret_cast<Client *>(events[i].data.u64);
 
@@ -101,6 +119,8 @@ void Server::do_work()
                        process_client(client);
                }
 
+               // Process each client where its stream has new data,
+               // even if there was no socket activity.
                for (size_t i = 0; i < streams.size(); ++i) {   
                        vector<Client *> to_process;
                        swap(streams[i]->to_process, to_process);
@@ -108,6 +128,49 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
+
+               // Finally, go through each client to see if it's timed out
+               // in the READING_REQUEST state. (Seemingly there are clients
+               // that can hold sockets up for days at a time without sending
+               // anything at all.)
+               timespec timeout_time;
+               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &timeout_time) == -1) {
+                       log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+                       continue;
+               }
+               timeout_time.tv_sec -= REQUEST_READ_TIMEOUT_SEC;
+               while (!clients_ordered_by_connect_time.empty()) {
+                       pair<timespec, int> &connect_time_and_fd = clients_ordered_by_connect_time.front();
+
+                       // See if we have reached the end of clients to process.
+                       if (is_earlier(timeout_time, connect_time_and_fd.first)) {
+                               break;
+                       }
+
+                       // If this client doesn't exist anymore, just ignore it
+                       // (it was deleted earlier).
+                       std::map<int, Client>::iterator client_it = clients.find(connect_time_and_fd.second);
+                       if (client_it == clients.end()) {
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+                       Client *client = &client_it->second;
+                       if (!is_equal(client->connect_time, connect_time_and_fd.first)) {
+                               // Another client has taken this fd in the meantime.
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+
+                       if (client->state != Client::READING_REQUEST) {
+                               // Only READING_REQUEST can time out.
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+
+                       // OK, it timed out.
+                       close_client(client);
+                       clients_ordered_by_connect_time.pop();
+               }
        }
 }
 
@@ -154,6 +217,17 @@ void Server::add_client(int sock)
        assert(ret.second == true);  // Should not already exist.
        Client *client_ptr = &ret.first->second;
 
+       // Connection timestamps must be nondecreasing. I can't find any guarantee
+       // that even the monotonic clock can't go backwards by a small amount
+       // (think switching between CPUs with non-synchronized TSCs), so if
+       // this actually should happen, we hack around it by fudging
+       // connect_time.
+       if (!clients_ordered_by_connect_time.empty() &&
+           is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first)) {
+               client_ptr->connect_time = clients_ordered_by_connect_time.back().first;
+       }
+       clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, sock));
+
        // Start listening on data from this socket.
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
@@ -182,6 +256,11 @@ void Server::add_client_from_serialized(const ClientProto &client)
        assert(ret.second == true);  // Should not already exist.
        Client *client_ptr = &ret.first->second;
 
+       // Connection timestamps must be nondecreasing.
+       assert(clients_ordered_by_connect_time.empty() ||
+              !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
+       clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, client.sock()));
+
        // Start listening on data from this socket.
        epoll_event ev;
        if (client.state() == Client::READING_REQUEST) {
@@ -198,6 +277,7 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 
        if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+           client_ptr->state == Client::PREBUFFERING ||
            (client_ptr->state == Client::SENDING_DATA &&
             client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
                client_ptr->stream->put_client_to_sleep(client_ptr);
@@ -215,11 +295,11 @@ int Server::lookup_stream_by_url(const std::string &url) const
        return url_it->second;
 }
 
-int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
        url_map.insert(make_pair(url, streams.size()));
-       streams.push_back(new Stream(url, backlog_size, encoding));
+       streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
        return streams.size() - 1;
 }
 
@@ -399,6 +479,18 @@ sending_header_or_error_again:
                        return;
                }
                client->stream_pos = stream->last_suitable_starting_point;
+               client->state = Client::PREBUFFERING;
+               // Fall through.
+       }
+       case Client::PREBUFFERING: {
+               Stream *stream = client->stream;
+               size_t bytes_to_send = stream->bytes_received - client->stream_pos;
+               assert(bytes_to_send <= stream->backlog_size);
+               if (bytes_to_send < stream->prebuffering_bytes) {
+                       // We don't have enough bytes buffered to start this client yet.
+                       stream->put_client_to_sleep(client);
+                       return;
+               }
                client->state = Client::SENDING_DATA;
                // Fall through.
        }