]> git.sesse.net Git - cubemap/blobdiff - acceptor.cpp
Simplify setting the non-blocking flag when creating HTTP sockets.
[cubemap] / acceptor.cpp
index 776f39b6c3f54e7e2ad0377aeb7d28c27bfb0d74..b3cd3c1bd92824d27840ef52a04b95bed22e5c62 100644 (file)
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include <signal.h>
+#include <assert.h>
 #include <errno.h>
-#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
 #include <sys/ioctl.h>
-#include <sys/poll.h>
 #include <sys/socket.h>
+#include <unistd.h>
 
 #include "acceptor.h"
+#include "log.h"
 #include "serverpool.h"
+#include "state.pb.h"
+#include "util.h"
 
 using namespace std;
 
 extern ServerPool *servers;
-extern volatile bool hupped;
 
-int create_server_socket(int port)
+int create_server_socket(const sockaddr_in6 &addr, SocketType socket_type)
 {
-       int server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
+       // NOTE: We set as non-blocking, so the acceptor thread can notice that we want to shut it down.
+       int server_sock;
+       if (socket_type == TCP_SOCKET) {
+               server_sock = socket(PF_INET6, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
+       } else {
+               assert(socket_type == UDP_SOCKET);
+               server_sock = socket(PF_INET6, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP);
+       }
        if (server_sock == -1) {
-               perror("socket");
+               log_perror("socket");
                exit(1);
        }
 
        int one = 1;
        if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) == -1) {
-               perror("setsockopt(SO_REUSEADDR)");
+               log_perror("setsockopt(SO_REUSEADDR)");
                exit(1);
        }
 
        // We want dual-stack sockets. (Sorry, OpenBSD and Windows XP...)
        int zero = 0;
        if (setsockopt(server_sock, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero)) == -1) {
-               perror("setsockopt(IPV6_V6ONLY)");
+               log_perror("setsockopt(IPV6_V6ONLY)");
                exit(1);
        }
 
-       // Set as non-blocking, so the acceptor thread can notice that we want to shut it down.
-       if (ioctl(server_sock, FIONBIO, &one) == -1) {
-               perror("ioctl(FIONBIO)");
+       if (bind(server_sock, reinterpret_cast<const sockaddr *>(&addr), sizeof(addr)) == -1) {
+               log_perror("bind");
                exit(1);
        }
 
-       sockaddr_in6 addr;
-       memset(&addr, 0, sizeof(addr));
-       addr.sin6_family = AF_INET6;
-       addr.sin6_port = htons(port);
-
-       if (bind(server_sock, reinterpret_cast<sockaddr *>(&addr), sizeof(addr)) == -1) {
-               perror("bind");
-               exit(1);
+       if (socket_type == TCP_SOCKET) {
+               if (listen(server_sock, 128) == -1) {
+                       log_perror("listen");
+                       exit(1);
+               }
        }
 
-       if (listen(server_sock, 128) == -1) {
-               perror("listen");
-               exit(1);
+       return server_sock;
+}
+
+sockaddr_in6 create_any_address(int port)
+{
+       sockaddr_in6 sin6;
+       memset(&sin6, 0, sizeof(sin6));
+       sin6.sin6_family = AF_INET6;
+       sin6.sin6_port = htons(port);
+       return sin6;
+}
+
+sockaddr_in6 extract_address_from_acceptor_proto(const AcceptorProto &proto)
+{
+       sockaddr_in6 sin6;
+       memset(&sin6, 0, sizeof(sin6));
+       sin6.sin6_family = AF_INET6;
+
+       if (!proto.addr().empty()) {
+               int ret = inet_pton(AF_INET6, proto.addr().c_str(), &sin6.sin6_addr);
+               assert(ret == 1);
        }
 
-       return server_sock;
+       sin6.sin6_port = htons(proto.port());
+       return sin6;
 }
        
-AcceptorThread::AcceptorThread(int server_sock)
-       : server_sock(server_sock)
+Acceptor::Acceptor(int server_sock, const sockaddr_in6 &addr,
+                   const string &certificate_chain, const string &private_key)
+       : server_sock(server_sock),
+         addr(addr),
+         certificate_chain(certificate_chain),
+         private_key(private_key)
 {
 }
 
-void AcceptorThread::run()
+Acceptor::Acceptor(const AcceptorProto &serialized)
+       : server_sock(serialized.server_sock()),
+         addr(extract_address_from_acceptor_proto(serialized)),
+         certificate_chain(serialized.certificate_chain()),
+         private_key(serialized.private_key())
 {
-       should_stop = false;
-       pthread_create(&worker_thread, NULL, &AcceptorThread::do_work_thunk, this);
 }
 
-void AcceptorThread::stop()
+AcceptorProto Acceptor::serialize() const
 {
-       should_stop = true;
-       pthread_kill(worker_thread, SIGHUP);
-       if (pthread_join(worker_thread, NULL) == -1) {
-               perror("pthread_join");
-               exit(1);
-       }
+       char buf[INET6_ADDRSTRLEN];
+       inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf));
+
+       AcceptorProto serialized;
+       serialized.set_server_sock(server_sock);
+       serialized.set_addr(buf);
+       serialized.set_port(ntohs(addr.sin6_port));
+       serialized.set_certificate_chain(certificate_chain);
+       serialized.set_private_key(private_key);
+       return serialized;
 }
 
-void *AcceptorThread::do_work_thunk(void *arg)
+void Acceptor::close_socket()
 {
-       AcceptorThread *acceptor_thread = reinterpret_cast<AcceptorThread *>(arg);
-       acceptor_thread->do_work();
-       return NULL;
+       safe_close(server_sock);
 }
 
-void AcceptorThread::do_work()
+void Acceptor::do_work()
 {
-       while (!hupped) {
-               // Since we are non-blocking, we need to wait for the right state first.
-               // Wait up to 50 ms, then check hupped.
-               pollfd pfd;
-               pfd.fd = server_sock;
-               pfd.events = POLLIN;
-
-               int nfds = poll(&pfd, 1, 50);
-               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
-                       continue;
-               }
-               if (nfds == -1) {
-                       perror("poll");
-                       usleep(100000);
+       while (!should_stop()) {
+               if (!wait_for_activity(server_sock, POLLIN, nullptr)) {
                        continue;
                }
 
                sockaddr_in6 addr;
                socklen_t addrlen = sizeof(addr);
 
-               // Get a new socket.
-               int sock = accept(server_sock, reinterpret_cast<sockaddr *>(&addr), &addrlen);
+               // Get a new socket, and set it as nonblocking.
+               int sock = accept4(server_sock, reinterpret_cast<sockaddr *>(&addr), &addrlen, SOCK_NONBLOCK);
                if (sock == -1 && errno == EINTR) {
                        continue;
                }
                if (sock == -1) {
-                       perror("accept");
+                       log_perror("accept");
                        usleep(100000);
                        continue;
                }
 
-               // Set the socket as nonblocking.
+               // Enable TCP_CORK for maximum throughput. In the rare case that the
+               // stream stops entirely, this will cause a small delay (~200 ms)
+               // before the last part is sent out, but that should be fine.
                int one = 1;
-               if (ioctl(sock, FIONBIO, &one) == -1) {
-                       perror("FIONBIO");
-                       exit(1);
+               if (setsockopt(sock, SOL_TCP, TCP_CORK, &one, sizeof(one)) == -1) {
+                       log_perror("setsockopt(TCP_CORK)");
+                       // Can still continue.
                }
 
                // Pick a server, round-robin, and hand over the socket to it.
-               servers->add_client(sock);
+               servers->add_client(sock, this);
        }
 }