]> git.sesse.net Git - cubemap/blobdiff - server.cpp
Revert "Rewrite the entire internal signal handling/wakeup."
[cubemap] / server.cpp
index a11b55f72e6f62d7e1401925e4c113a58df9ceec..b5f58d4c56711968db64a8d6c63e7d9eb08d0e25 100644 (file)
@@ -1,8 +1,11 @@
 #include <assert.h>
 #include <errno.h>
+#include <netinet/in.h>
 #include <pthread.h>
+#include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 #include <sys/epoll.h>
 #include <sys/sendfile.h>
 #include <sys/socket.h>
 #include <utility>
 #include <vector>
 
+#include "accesslog.h"
+#include "log.h"
 #include "markpool.h"
+#include "metacube.h"
 #include "mutexlock.h"
 #include "parse.h"
 #include "server.h"
@@ -23,6 +29,8 @@
 
 using namespace std;
 
+extern AccessLogThread *access_log;
+
 Server::Server()
 {
        pthread_mutex_init(&mutex, NULL);
@@ -30,20 +38,26 @@ Server::Server()
 
        epoll_fd = epoll_create(1024);  // Size argument is ignored.
        if (epoll_fd == -1) {
-               perror("epoll_fd");
+               log_perror("epoll_fd");
                exit(1);
        }
 }
 
 Server::~Server()
 {
+       for (map<string, Stream *>::iterator stream_it = streams.begin();
+            stream_it != streams.end();
+            ++stream_it) {
+               delete stream_it->second;
+       }
+
        int ret;
        do {
                ret = close(epoll_fd);
        } while (ret == -1 && errno == EINTR);
 
        if (ret == -1) {
-               perror("close(epoll_fd)");
+               log_perror("close(epoll_fd)");
        }
 }
 
@@ -71,7 +85,7 @@ void Server::do_work()
                        continue;
                }
                if (nfds == -1) {
-                       perror("epoll_wait");
+                       log_perror("epoll_wait");
                        exit(1);
                }
 
@@ -80,9 +94,7 @@ void Server::do_work()
                process_queued_data();
 
                for (int i = 0; i < nfds; ++i) {
-                       int fd = events[i].data.fd;
-                       assert(clients.count(fd) != 0);
-                       Client *client = &clients[fd];
+                       Client *client = reinterpret_cast<Client *>(events[i].data.u64);
 
                        if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) {
                                close_client(client);
@@ -135,19 +147,21 @@ void Server::add_client_deferred(int sock)
 
 void Server::add_client(int sock)
 {
-       clients.insert(make_pair(sock, Client(sock)));
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(sock, Client(sock)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
-       ev.data.u64 = 0;  // Keep Valgrind happy.
-       ev.data.fd = sock;
+       ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
-               perror("epoll_ctl(EPOLL_CTL_ADD)");
+               log_perror("epoll_ctl(EPOLL_CTL_ADD)");
                exit(1);
        }
 
-       process_client(&clients[sock]);
+       process_client(client_ptr);
 }
 
 void Server::add_client_from_serialized(const ClientProto &client)
@@ -160,8 +174,10 @@ void Server::add_client_from_serialized(const ClientProto &client)
        } else {
                stream = stream_it->second;
        }
-       clients.insert(make_pair(client.sock(), Client(client, stream)));
-       Client *client_ptr = &clients[client.sock()];
+       pair<map<int, Client>::iterator, bool> ret =
+               clients.insert(make_pair(client.sock(), Client(client, stream)));
+       assert(ret.second == true);  // Should not already exist.
+       Client *client_ptr = &ret.first->second;
 
        // Start listening on data from this socket.
        epoll_event ev;
@@ -173,9 +189,9 @@ void Server::add_client_from_serialized(const ClientProto &client)
                ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
        }
        ev.data.u64 = 0;  // Keep Valgrind happy.
-       ev.data.fd = client.sock();
+       ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
-               perror("epoll_ctl(EPOLL_CTL_ADD)");
+               log_perror("epoll_ctl(EPOLL_CTL_ADD)");
                exit(1);
        }
 
@@ -187,29 +203,37 @@ void Server::add_client_from_serialized(const ClientProto &client)
        }
 }
 
-void Server::add_stream(const string &stream_id, size_t backlog_size)
+void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size)));
+       streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
 }
 
-void Server::add_stream_from_serialized(const StreamProto &stream)
+void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+       streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
 }
        
-void Server::set_backlog_size(const std::string &stream_id, size_t new_size)
+void Server::set_backlog_size(const string &stream_id, size_t new_size)
 {
        MutexLock lock(&mutex);
        assert(streams.count(stream_id) != 0);
        streams[stream_id]->set_backlog_size(new_size);
 }
        
-void Server::set_header(const string &stream_id, const string &header)
+void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
 {
        MutexLock lock(&mutex);
-       find_stream(stream_id)->header = header;
+       assert(streams.count(stream_id) != 0);
+       streams[stream_id]->encoding = encoding;
+}
+       
+void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+{
+       MutexLock lock(&mutex);
+       find_stream(stream_id)->http_header = http_header;
+       find_stream(stream_id)->stream_header = stream_header;
 
        // If there are clients we haven't sent anything to yet, we should give
        // them the header, so push back into the SENDING_HEADER state.
@@ -224,7 +248,7 @@ void Server::set_header(const string &stream_id, const string &header)
        }
 }
        
-void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
 {
        MutexLock lock(&mutex);
        assert(clients.empty());
@@ -256,7 +280,7 @@ read_request_again:
                        return;
                }
                if (ret == -1) {
-                       perror("read");
+                       log_perror("read");
                        close_client(client);
                        return;
                }
@@ -270,7 +294,7 @@ read_request_again:
        
                switch (status) {
                case RP_OUT_OF_SPACE:
-                       fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock);
+                       log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str());
                        close_client(client);
                        return;
                case RP_NOT_FINISHED_YET:
@@ -278,7 +302,7 @@ read_request_again:
                        // See if there's more data for us.
                        goto read_request_again;
                case RP_EXTRA_DATA:
-                       fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock);
+                       log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str());
                        close_client(client);
                        return;
                case RP_FINISHED:
@@ -318,7 +342,7 @@ sending_header_or_error_again:
 
                if (ret == -1) {
                        // Error! Postcondition #1.
-                       perror("write");
+                       log_perror("write");
                        close_client(client);
                        return;
                }
@@ -359,13 +383,17 @@ sending_data_again:
                        return;
                }
                if (bytes_to_send > stream->backlog_size) {
-                       fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n",
-                               client->sock,
-                               (long long int)(bytes_to_send - stream->backlog_size));
+                       size_t bytes_lost = bytes_to_send - stream->backlog_size;
                        client->stream_pos = stream->bytes_received - stream->backlog_size;
-                       client->bytes_lost += bytes_to_send - stream->backlog_size;
+                       client->bytes_lost += bytes_lost;
                        ++client->num_loss_events;
                        bytes_to_send = stream->backlog_size;
+
+                       double loss_fraction = double(client->bytes_lost) / double(client->bytes_lost + client->bytes_sent);
+                       log(WARNING, "[%s] Client lost %lld bytes (total loss: %.2f%%), maybe too slow connection",
+                               client->remote_addr.c_str(),
+                               (long long int)(bytes_lost),
+                               100.0 * loss_fraction);
                }
 
                // See if we need to split across the circular buffer.
@@ -389,7 +417,7 @@ sending_data_again:
                }
                if (ret == -1) {
                        // Error, close; postcondition #1.
-                       perror("sendfile");
+                       log_perror("sendfile");
                        close_client(client);
                        return;
                }
@@ -400,7 +428,7 @@ sending_data_again:
                        // We don't have any more data for this client, so put it to sleep.
                        // This is postcondition #3.
                        stream->put_client_to_sleep(client);
-               } else if (more_data && ret == bytes_to_send) {
+               } else if (more_data && size_t(ret) == bytes_to_send) {
                        goto sending_data_again;
                }
                break;
@@ -437,7 +465,7 @@ int Server::parse_request(Client *client)
        }
        if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) {                          
                if (client->fwmark != 0) {
-                       perror("setsockopt(SO_MARK)");
+                       log_perror("setsockopt(SO_MARK)");
                }
        }
        client->request.clear();
@@ -447,18 +475,37 @@ int Server::parse_request(Client *client)
 
 void Server::construct_header(Client *client)
 {
-       client->header_or_error = find_stream(client->stream_id)->header;
+       Stream *stream = find_stream(client->stream_id);
+       if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
+               client->header_or_error = stream->http_header +
+                       "\r\n" +
+                       stream->stream_header;
+       } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+               client->header_or_error = stream->http_header +
+                       "Content-encoding: metacube\r\n" +
+                       "\r\n";
+               if (!stream->stream_header.empty()) {
+                       metacube_block_header hdr;
+                       memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+                       hdr.size = htonl(stream->stream_header.size());
+                       hdr.flags = htonl(METACUBE_FLAGS_HEADER);
+                       client->header_or_error.append(
+                               string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
+               }
+               client->header_or_error.append(stream->stream_header);
+       } else {
+               assert(false);
+       }
 
        // Switch states.
        client->state = Client::SENDING_HEADER;
 
        epoll_event ev;
        ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
-       ev.data.u64 = 0;  // Keep Valgrind happy.
-       ev.data.fd = client->sock;
+       ev.data.u64 = reinterpret_cast<uint64_t>(client);
 
        if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
-               perror("epoll_ctl(EPOLL_CTL_MOD)");
+               log_perror("epoll_ctl(EPOLL_CTL_MOD)");
                exit(1);
        }
 }
@@ -475,11 +522,10 @@ void Server::construct_error(Client *client, int error_code)
 
        epoll_event ev;
        ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
-       ev.data.u64 = 0;  // Keep Valgrind happy.
-       ev.data.fd = client->sock;
+       ev.data.u64 = reinterpret_cast<uint64_t>(client);
 
        if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
-               perror("epoll_ctl(EPOLL_CTL_MOD)");
+               log_perror("epoll_ctl(EPOLL_CTL_MOD)");
                exit(1);
        }
 }
@@ -494,7 +540,7 @@ void delete_from(vector<T> *v, T elem)
 void Server::close_client(Client *client)
 {
        if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) {
-               perror("epoll_ctl(EPOLL_CTL_DEL)");
+               log_perror("epoll_ctl(EPOLL_CTL_DEL)");
                exit(1);
        }
 
@@ -508,6 +554,9 @@ void Server::close_client(Client *client)
                }
        }
 
+       // Log to access_log.
+       access_log->write(client->get_stats());
+
        // Bye-bye!
        int ret;
        do {
@@ -515,7 +564,7 @@ void Server::close_client(Client *client)
        } while (ret == -1 && errno == EINTR);
 
        if (ret == -1) {
-               perror("close");
+               log_perror("close");
        }
 
        clients.erase(client->sock);