]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Revert "Rewrite the entire internal signal handling/wakeup."
[cubemap] / server.cpp
index e4ea647c64d05555505f3f18358f4e9b642d5370..b5f58d4c56711968db64a8d6c63e7d9eb08d0e25 100644 (file)
@@ -1,9 +1,11 @@
-#include <arpa/inet.h>
 #include <assert.h>
 #include <errno.h>
+#include <netinet/in.h>
 #include <pthread.h>
+#include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 #include <sys/epoll.h>
 #include <sys/sendfile.h>
 #include <sys/socket.h>
@@ -145,18 +147,21 @@ void Server::add_client_deferred(int sock)
 
 void Server::add_client(int sock)
 {
-       clients.insert(make_pair(sock, Client(sock)));
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(sock, Client(sock)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
-       ev.data.u64 = reinterpret_cast<uint64_t>(&clients[sock]);
+       ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
                log_perror("epoll_ctl(EPOLL_CTL_ADD)");
                exit(1);
        }
 
-       process_client(&clients[sock]);
+       process_client(client_ptr);
 }
 
 void Server::add_client_from_serialized(const ClientProto &client)
@@ -169,8 +174,10 @@ void Server::add_client_from_serialized(const ClientProto &client)
        } else {
                stream = stream_it->second;
        }
-       clients.insert(make_pair(client.sock(), Client(client, stream)));
-       Client *client_ptr = &clients[client.sock()];
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(client.sock(), Client(client, stream)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
@@ -202,10 +209,10 @@ void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::En
        streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
 }
 
-void Server::add_stream_from_serialized(const StreamProto &stream)
+void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+       streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
 }
        
 void Server::set_backlog_size(const string &stream_id, size_t new_size)
@@ -376,13 +383,17 @@ sending_data_again:
                        return;
                }
                if (bytes_to_send > stream->backlog_size) {
-                       log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection",
-                               client->remote_addr.c_str(),
-                               (long long int)(bytes_to_send - stream->backlog_size));
+                       size_t bytes_lost = bytes_to_send - stream->backlog_size;
                        client->stream_pos = stream->bytes_received - stream->backlog_size;
-                       client->bytes_lost += bytes_to_send - stream->backlog_size;
+                       client->bytes_lost += bytes_lost;
                        ++client->num_loss_events;
                        bytes_to_send = stream->backlog_size;
+
+                       double loss_fraction = double(client->bytes_lost) / double(client->bytes_lost + client->bytes_sent);
+                       log(WARNING, "[%s] Client lost %lld bytes (total loss: %.2f%%), maybe too slow connection",
+                               client->remote_addr.c_str(),
+                               (long long int)(bytes_lost),
+                               100.0 * loss_fraction);
                }
 
                // See if we need to split across the circular buffer.
@@ -470,16 +481,18 @@ void Server::construct_header(Client *client)
                        "\r\n" +
                        stream->stream_header;
        } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
-               metacube_block_header hdr;
-               memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
-               hdr.size = htonl(stream->stream_header.size());
-               hdr.flags = htonl(METACUBE_FLAGS_HEADER);
-
                client->header_or_error = stream->http_header +
                        "Content-encoding: metacube\r\n" +
-                       "\r\n" +
-                       string(reinterpret_cast<char *>(&hdr), sizeof(hdr)) +
-                       stream->stream_header;
+                       "\r\n";
+               if (!stream->stream_header.empty()) {
+                       metacube_block_header hdr;
+                       memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+                       hdr.size = htonl(stream->stream_header.size());
+                       hdr.flags = htonl(METACUBE_FLAGS_HEADER);
+                       client->header_or_error.append(
+                               string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
+               }
+               client->header_or_error.append(stream->stream_header);
        } else {
                assert(false);
        }