]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Replace an assert with a small hack.
[cubemap] / server.cpp
index ee91ed8449faa1c5c6b6b344dbc733ca000aa6c8..079b629dcaa9b601b31033c3a4c55626711f7662 100644 (file)
@@ -19,7 +19,6 @@
 
 #include "accesslog.h"
 #include "log.h"
-#include "markpool.h"
 #include "metacube2.h"
 #include "mutexlock.h"
 #include "parse.h"
@@ -36,6 +35,23 @@ using namespace std;
 
 extern AccessLogThread *access_log;
 
+namespace {
+
+inline bool is_equal(timespec a, timespec b)
+{
+       return a.tv_sec == b.tv_sec &&
+              a.tv_nsec == b.tv_nsec;
+}
+
+inline bool is_earlier(timespec a, timespec b)
+{
+       if (a.tv_sec != b.tv_sec)
+               return a.tv_sec < b.tv_sec;
+       return a.tv_nsec < b.tv_nsec;
+}
+
+}  // namespace
+
 Server::Server()
 {
        pthread_mutex_init(&mutex, NULL);
@@ -91,6 +107,7 @@ void Server::do_work()
        
                process_queued_data();
 
+               // Process each client where we have socket activity.
                for (int i = 0; i < nfds; ++i) {
                        Client *client = reinterpret_cast<Client *>(events[i].data.u64);
 
@@ -102,6 +119,8 @@ void Server::do_work()
                        process_client(client);
                }
 
+               // Process each client where its stream has new data,
+               // even if there was no socket activity.
                for (size_t i = 0; i < streams.size(); ++i) {   
                        vector<Client *> to_process;
                        swap(streams[i]->to_process, to_process);
@@ -109,6 +128,49 @@ void Server::do_work()
                                process_client(to_process[i]);
                        }
                }
+
+               // Finally, go through each client to see if it's timed out
+               // in the READING_REQUEST state. (Seemingly there are clients
+               // that can hold sockets up for days at a time without sending
+               // anything at all.)
+               timespec timeout_time;
+               if (clock_gettime(CLOCK_MONOTONIC_COARSE, &timeout_time) == -1) {
+                       log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+                       continue;
+               }
+               timeout_time.tv_sec -= REQUEST_READ_TIMEOUT_SEC;
+               while (!clients_ordered_by_connect_time.empty()) {
+                       pair<timespec, int> &connect_time_and_fd = clients_ordered_by_connect_time.front();
+
+                       // See if we have reached the end of clients to process.
+                       if (is_earlier(timeout_time, connect_time_and_fd.first)) {
+                               break;
+                       }
+
+                       // If this client doesn't exist anymore, just ignore it
+                       // (it was deleted earlier).
+                       std::map<int, Client>::iterator client_it = clients.find(connect_time_and_fd.second);
+                       if (client_it == clients.end()) {
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+                       Client *client = &client_it->second;
+                       if (!is_equal(client->connect_time, connect_time_and_fd.first)) {
+                               // Another client has taken this fd in the meantime.
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+
+                       if (client->state != Client::READING_REQUEST) {
+                               // Only READING_REQUEST can time out.
+                               clients_ordered_by_connect_time.pop();
+                               continue;
+                       }
+
+                       // OK, it timed out.
+                       close_client(client);
+                       clients_ordered_by_connect_time.pop();
+               }
        }
 }
 
@@ -155,6 +217,17 @@ void Server::add_client(int sock)
        assert(ret.second == true);  // Should not already exist.
        Client *client_ptr = &ret.first->second;
 
+       // Connection timestamps must be nondecreasing. I can't find any guarantee
+       // that even the monotonic clock can't go backwards by a small amount
+       // (think switching between CPUs with non-synchronized TSCs), so if
+       // this actually should happen, we hack around it by fudging
+       // connect_time.
+       if (!clients_ordered_by_connect_time.empty() &&
+           is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first)) {
+               client_ptr->connect_time = clients_ordered_by_connect_time.back().first;
+       }
+       clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, sock));
+
        // Start listening on data from this socket.
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
@@ -183,6 +256,11 @@ void Server::add_client_from_serialized(const ClientProto &client)
        assert(ret.second == true);  // Should not already exist.
        Client *client_ptr = &ret.first->second;
 
+       // Connection timestamps must be nondecreasing.
+       assert(clients_ordered_by_connect_time.empty() ||
+              !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
+       clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, client.sock()));
+
        // Start listening on data from this socket.
        epoll_event ev;
        if (client.state() == Client::READING_REQUEST) {
@@ -199,6 +277,7 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 
        if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+           client_ptr->state == Client::PREBUFFERING ||
            (client_ptr->state == Client::SENDING_DATA &&
             client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
                client_ptr->stream->put_client_to_sleep(client_ptr);
@@ -216,11 +295,11 @@ int Server::lookup_stream_by_url(const std::string &url) const
        return url_it->second;
 }
 
-int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+int Server::add_stream(const string &url, size_t backlog_size, size_t prebuffering_bytes, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
        url_map.insert(make_pair(url, streams.size()));
-       streams.push_back(new Stream(url, backlog_size, encoding));
+       streams.push_back(new Stream(url, backlog_size, prebuffering_bytes, encoding));
        return streams.size() - 1;
 }
 
@@ -252,29 +331,8 @@ void Server::set_header(int stream_index, const string &http_header, const strin
        assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
        streams[stream_index]->http_header = http_header;
        streams[stream_index]->stream_header = stream_header;
-
-       // If there are clients we haven't sent anything to yet, we should give
-       // them the header, so push back into the SENDING_HEADER state.
-       for (map<int, Client>::iterator client_it = clients.begin();
-            client_it != clients.end();
-            ++client_it) {
-               Client *client = &client_it->second;
-               if (client->state == Client::WAITING_FOR_KEYFRAME ||
-                   (client->state == Client::SENDING_DATA &&
-                    client->stream_pos == 0)) {
-                       construct_header(client);
-               }
-       }
 }
        
-void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
-{
-       MutexLock lock(&mutex);
-       assert(clients.empty());
-       assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
-       streams[stream_index]->mark_pool = mark_pool;
-}
-
 void Server::set_pacing_rate(int stream_index, uint32_t pacing_rate)
 {
        MutexLock lock(&mutex);
@@ -421,6 +479,18 @@ sending_header_or_error_again:
                        return;
                }
                client->stream_pos = stream->last_suitable_starting_point;
+               client->state = Client::PREBUFFERING;
+               // Fall through.
+       }
+       case Client::PREBUFFERING: {
+               Stream *stream = client->stream;
+               size_t bytes_to_send = stream->bytes_received - client->stream_pos;
+               assert(bytes_to_send <= stream->backlog_size);
+               if (bytes_to_send < stream->prebuffering_bytes) {
+                       // We don't have enough bytes buffered to start this client yet.
+                       stream->put_client_to_sleep(client);
+                       return;
+               }
                client->state = Client::SENDING_DATA;
                // Fall through.
        }
@@ -491,12 +561,6 @@ void Server::skip_lost_data(Client *client)
                client->stream_pos = stream->bytes_received - stream->backlog_size;
                client->bytes_lost += bytes_lost;
                ++client->num_loss_events;
-
-               double loss_fraction = double(client->bytes_lost) / double(client->bytes_lost + client->bytes_sent);
-               log(WARNING, "[%s] Client lost %lld bytes (total loss: %.2f%%), maybe too slow connection",
-                       client->remote_addr.c_str(),
-                       (long long int)(bytes_lost),
-                       100.0 * loss_fraction);
        }
 }
 
@@ -528,18 +592,13 @@ int Server::parse_request(Client *client)
                return 404;  // Not found.
        }
 
-       client->url = request_tokens[1];
-       client->stream = streams[url_map_it->second];
-       if (client->stream->mark_pool != NULL) {
-               client->fwmark = client->stream->mark_pool->get_mark();
-       } else {
-               client->fwmark = 0;  // No mark.
-       }
-       if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) {                          
-               if (client->fwmark != 0) {
-                       log_perror("setsockopt(SO_MARK)");
-               }
+       Stream *stream = streams[url_map_it->second];
+       if (stream->http_header.empty()) {
+               return 503;  // Service unavailable.
        }
+
+       client->url = request_tokens[1];
+       client->stream = stream;
        if (setsockopt(client->sock, SOL_SOCKET, SO_MAX_PACING_RATE, &client->stream->pacing_rate, sizeof(client->stream->pacing_rate)) == -1) {
                if (client->stream->pacing_rate != ~0U) {
                        log_perror("setsockopt(SO_MAX_PACING_RATE)");
@@ -626,10 +685,6 @@ void Server::close_client(Client *client)
        if (client->stream != NULL) {
                delete_from(&client->stream->sleeping_clients, client);
                delete_from(&client->stream->to_process, client);
-               if (client->stream->mark_pool != NULL) {
-                       int fwmark = client->fwmark;
-                       client->stream->mark_pool->release_mark(fwmark);
-               }
        }
 
        // Log to access_log.