]> git.sesse.net Git - cubemap/blobdiff - udpinput.cpp
Fix various close-on-exec bugs.
[cubemap] / udpinput.cpp
index dada7b0a2d6dd7e97e4a1e6385f00d4612fe9fbe..873b0b952911a226d8f08255f80e2d092e163801 100644 (file)
@@ -1,5 +1,6 @@
 #include <assert.h>
 #include <errno.h>
+#include <fcntl.h>
 #include <poll.h>
 #include <stddef.h>
 #include <stdlib.h>
@@ -11,7 +12,6 @@
 
 #include "acceptor.h"
 #include "log.h"
-#include "mutexlock.h"
 #include "serverpool.h"
 #include "state.pb.h"
 #include "stream.h"
@@ -113,19 +113,18 @@ UDPInput::UDPInput(const string &url)
 
        construct_header();
 
-       pthread_mutex_init(&stats_mutex, NULL);
        stats.url = url;
-       stats.bytes_received = 0;
-       stats.data_bytes_received = 0;
-       stats.metadata_bytes_received = 0;
-       stats.connect_time = time(NULL);
-       stats.latency_sec = HUGE_VAL;
+       stats.connect_time = time(nullptr);
 }
 
 UDPInput::UDPInput(const InputProto &serialized)
        : url(serialized.url()),
          sock(serialized.sock())
 {
+       // Set back the close-on-exec flag for the socket.
+       // (This can't leak into a child, since we haven't been started yet.)
+       fcntl(sock, F_SETFD, O_CLOEXEC);
+
        // Should be verified by the caller.
        string protocol;
        bool ok = parse_url(url, &protocol, &user, &host, &port, &path);
@@ -133,19 +132,22 @@ UDPInput::UDPInput(const InputProto &serialized)
 
        construct_header();
 
-       pthread_mutex_init(&stats_mutex, NULL);
        stats.url = url;
        stats.bytes_received = serialized.bytes_received();
        stats.data_bytes_received = serialized.data_bytes_received();
        if (serialized.has_connect_time()) {
                stats.connect_time = serialized.connect_time();
        } else {
-               stats.connect_time = time(NULL);
+               stats.connect_time = time(nullptr);
        }
 }
 
 InputProto UDPInput::serialize() const
 {
+       // Unset the close-on-exec flag for the socket.
+       // (This can't leak into a child, since there's only one thread left.)
+       fcntl(sock, F_SETFD, 0);
+
        InputProto serialized;
        serialized.set_url(url);
        serialized.set_sock(sock);
@@ -206,7 +208,7 @@ void UDPInput::do_work()
                }
 
                // Wait for a packet, or a wakeup.
-               bool activity = wait_for_activity(sock, POLLIN, NULL);
+               bool activity = wait_for_activity(sock, POLLIN, nullptr);
                if (!activity) {
                        // Most likely, should_stop was set.
                        continue;
@@ -217,26 +219,26 @@ void UDPInput::do_work()
                        ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
                } while (ret == -1 && errno == EINTR);
 
-               if (ret <= 0) {
+               if (ret < 0) {  // Note that zero-byte packets are legal.
                        log_perror("recv");
                        close_socket();
                        continue;
                }
 
                {
-                       MutexLock lock(&stats_mutex);
+                       lock_guard<mutex> lock(stats_mutex);
                        stats.bytes_received += ret;
                        stats.data_bytes_received += ret;
                }
        
                for (size_t stream_index : stream_indices) {    
-                       servers->add_data(stream_index, packet_buf, ret, /*metacube_flags=*/0);
+                       servers->add_data(stream_index, packet_buf, ret, /*metacube_flags=*/0, /*pts=*/RationalPTS());
                }
        }
 }
 
 InputStats UDPInput::get_stats() const
 {
-       MutexLock lock(&stats_mutex);
+       lock_guard<mutex> lock(stats_mutex);
        return stats;
 }