X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=acceptor.cpp;h=593e4d4eb9cc207fce83730aee1c29e7a35519f2;hp=70cd30c23bdfdcd53c16efe025c2ce4e58b01bdd;hb=07569f8e011cd9b064c64bef1ce56f77bf7ddf53;hpb=4856c49f1b63753ce86ad759ee649a1117628a8e diff --git a/acceptor.cpp b/acceptor.cpp index 70cd30c..593e4d4 100644 --- a/acceptor.cpp +++ b/acceptor.cpp @@ -1,8 +1,8 @@ #include #include #include +#include #include -#include #include #include #include @@ -13,13 +13,13 @@ #include "log.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" using namespace std; extern ServerPool *servers; -extern volatile bool hupped; -int create_server_socket(int port, SocketType socket_type) +int create_server_socket(const sockaddr_in6 &addr, SocketType socket_type) { int server_sock; if (socket_type == TCP_SOCKET) { @@ -52,12 +52,7 @@ int create_server_socket(int port, SocketType socket_type) exit(1); } - sockaddr_in6 addr; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - addr.sin6_port = htons(port); - - if (bind(server_sock, reinterpret_cast(&addr), sizeof(addr)) == -1) { + if (bind(server_sock, reinterpret_cast(&addr), sizeof(addr)) == -1) { log_perror("bind"); exit(1); } @@ -71,55 +66,64 @@ int create_server_socket(int port, SocketType socket_type) return server_sock; } + +sockaddr_in6 create_any_address(int port) +{ + sockaddr_in6 sin6; + memset(&sin6, 0, sizeof(sin6)); + sin6.sin6_family = AF_INET6; + sin6.sin6_port = htons(port); + return sin6; +} + +sockaddr_in6 extract_address_from_acceptor_proto(const AcceptorProto &proto) +{ + sockaddr_in6 sin6; + memset(&sin6, 0, sizeof(sin6)); + sin6.sin6_family = AF_INET6; + + if (!proto.addr().empty()) { + int ret = inet_pton(AF_INET6, proto.addr().c_str(), &sin6.sin6_addr); + assert(ret == 1); + } + + sin6.sin6_port = htons(proto.port()); + return sin6; +} -Acceptor::Acceptor(int server_sock, int port) +Acceptor::Acceptor(int server_sock, const sockaddr_in6 &addr) : server_sock(server_sock), - port(port) + addr(addr) { } Acceptor::Acceptor(const AcceptorProto &serialized) : server_sock(serialized.server_sock()), - port(serialized.port()) + addr(extract_address_from_acceptor_proto(serialized)) { } AcceptorProto Acceptor::serialize() const { + char buf[INET6_ADDRSTRLEN]; + inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)); + AcceptorProto serialized; serialized.set_server_sock(server_sock); - serialized.set_port(port); + serialized.set_addr(buf); + serialized.set_port(ntohs(addr.sin6_port)); return serialized; } void Acceptor::close_socket() { - int ret; - do { - ret = close(server_sock); - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - } + safe_close(server_sock); } void Acceptor::do_work() { - while (!hupped) { - // Since we are non-blocking, we need to wait for the right state first. - // Wait up to 50 ms, then check hupped. - pollfd pfd; - pfd.fd = server_sock; - pfd.events = POLLIN; - - int nfds = poll(&pfd, 1, 50); - if (nfds == 0 || (nfds == -1 && errno == EINTR)) { - continue; - } - if (nfds == -1) { - log_perror("poll"); - usleep(100000); + while (!should_stop()) { + if (!wait_for_activity(server_sock, POLLIN, NULL)) { continue; } @@ -140,10 +144,18 @@ void Acceptor::do_work() // Set the socket as nonblocking. int one = 1; if (ioctl(sock, FIONBIO, &one) == -1) { - log_perror("FIONBIO"); + log_perror("ioctl(FIONBIO)"); exit(1); } + // Enable TCP_CORK for maximum throughput. In the rare case that the + // stream stops entirely, this will cause a small delay (~200 ms) + // before the last part is sent out, but that should be fine. + if (setsockopt(sock, SOL_TCP, TCP_CORK, &one, sizeof(one)) == -1) { + log_perror("setsockopt(TCP_CORK)"); + // Can still continue. + } + // Pick a server, round-robin, and hand over the socket to it. servers->add_client(sock); }