X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=acceptor.cpp;h=7c28ca58038c27b107cad1b8dd82d6dd3b78bd3c;hp=3172c73b80a1821e0973e08c267f6f8ccae74c10;hb=ef4cf9957b668845eb1064f583a0021009cbeeac;hpb=5cd46e39b4063d94f6dc559ae350beeb8406a8f9 diff --git a/acceptor.cpp b/acceptor.cpp index 3172c73..7c28ca5 100644 --- a/acceptor.cpp +++ b/acceptor.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -12,13 +13,13 @@ #include "log.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" using namespace std; extern ServerPool *servers; -extern volatile bool hupped; -int create_server_socket(int port, SocketType socket_type) +int create_server_socket(const sockaddr_in6 &addr, SocketType socket_type) { int server_sock; if (socket_type == TCP_SOCKET) { @@ -51,12 +52,7 @@ int create_server_socket(int port, SocketType socket_type) exit(1); } - sockaddr_in6 addr; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - addr.sin6_port = htons(port); - - if (bind(server_sock, reinterpret_cast(&addr), sizeof(addr)) == -1) { + if (bind(server_sock, reinterpret_cast(&addr), sizeof(addr)) == -1) { log_perror("bind"); exit(1); } @@ -70,63 +66,72 @@ int create_server_socket(int port, SocketType socket_type) return server_sock; } + +sockaddr_in6 create_any_address(int port) +{ + sockaddr_in6 sin6; + memset(&sin6, 0, sizeof(sin6)); + sin6.sin6_family = AF_INET6; + sin6.sin6_port = htons(port); + return sin6; +} + +sockaddr_in6 extract_address_from_acceptor_proto(const AcceptorProto &proto) +{ + sockaddr_in6 sin6; + memset(&sin6, 0, sizeof(sin6)); + sin6.sin6_family = AF_INET6; + + if (!proto.addr().empty()) { + int ret = inet_pton(AF_INET6, proto.addr().c_str(), &sin6.sin6_addr); + assert(ret == 1); + } + + sin6.sin6_port = htons(proto.port()); + return sin6; +} -Acceptor::Acceptor(int server_sock, int port) +Acceptor::Acceptor(int server_sock, const sockaddr_in6 &addr) : server_sock(server_sock), - port(port) + addr(addr) { } Acceptor::Acceptor(const AcceptorProto &serialized) : server_sock(serialized.server_sock()), - port(serialized.port()) + addr(extract_address_from_acceptor_proto(serialized)) { } AcceptorProto Acceptor::serialize() const { + char buf[INET6_ADDRSTRLEN]; + inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)); + AcceptorProto serialized; serialized.set_server_sock(server_sock); - serialized.set_port(port); + serialized.set_addr(buf); + serialized.set_port(ntohs(addr.sin6_port)); return serialized; } void Acceptor::close_socket() { - int ret; - do { - ret = close(server_sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - } + safe_close(server_sock); } void Acceptor::do_work() { - while (!hupped) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check hupped. - pollfd pfd; - pfd.fd = server_sock; - pfd.events = POLLIN; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { - continue; - } - if (nfds == -1) { - log_perror("poll"); - usleep(100000); + while (!should_stop()) { + if (!wait_for_activity(server_sock, POLLIN, NULL)) { continue; } sockaddr_in6 addr; socklen_t addrlen = sizeof(addr); - // Get a new socket. - int sock = accept(server_sock, reinterpret_cast(&addr), &addrlen); + // Get a new socket, and set it as nonblocking. + int sock = accept4(server_sock, reinterpret_cast(&addr), &addrlen, SOCK_NONBLOCK); if (sock == -1 && errno == EINTR) { continue; } @@ -136,11 +141,13 @@ void Acceptor::do_work() continue; } - // Set the socket as nonblocking. + // Enable TCP_CORK for maximum throughput. In the rare case that the + // stream stops entirely, this will cause a small delay (~200 ms) + // before the last part is sent out, but that should be fine. int one = 1; - if (ioctl(sock, FIONBIO, &one) == -1) { - log_perror("FIONBIO"); - exit(1); + if (setsockopt(sock, SOL_TCP, TCP_CORK, &one, sizeof(one)) == -1) { + log_perror("setsockopt(TCP_CORK)"); + // Can still continue. } // Pick a server, round-robin, and hand over the socket to it.