]> git.sesse.net Git - cubemap/blobdiff - udpinput.cpp
Add a listen statement to listen on only specific IP addresses, in addition to the...
[cubemap] / udpinput.cpp
index 887850c91355d17b50d1f2f07a50be5bdd91c72c..6b1779989f26d4a55abe928d6af7ae9553525f3b 100644 (file)
@@ -2,16 +2,18 @@
 #include <errno.h>
 #include <poll.h>
 #include <stddef.h>
-#include <stdio.h>
 #include <stdlib.h>
 #include <sys/socket.h>
+#include <time.h>
 #include <unistd.h>
 #include <string>
 
 #include "acceptor.h"
 #include "log.h"
+#include "mutexlock.h"
 #include "serverpool.h"
 #include "state.pb.h"
+#include "stream.h"
 #include "udpinput.h"
 #include "util.h"
 #include "version.h"
@@ -30,6 +32,12 @@ UDPInput::UDPInput(const string &url)
        assert(ok);
 
        construct_header();
+
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = 0;
+       stats.data_bytes_received = 0;
+       stats.connect_time = time(NULL);
 }
 
 UDPInput::UDPInput(const InputProto &serialized)
@@ -42,6 +50,16 @@ UDPInput::UDPInput(const InputProto &serialized)
        assert(ok);
 
        construct_header();
+
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = serialized.bytes_received();
+       stats.data_bytes_received = serialized.data_bytes_received();
+       if (serialized.has_connect_time()) {
+               stats.connect_time = serialized.connect_time();
+       } else {
+               stats.connect_time = time(NULL);
+       }
 }
 
 InputProto UDPInput::serialize() const
@@ -49,6 +67,9 @@ InputProto UDPInput::serialize() const
        InputProto serialized;
        serialized.set_url(url);
        serialized.set_sock(sock);
+       serialized.set_bytes_received(stats.bytes_received);
+       serialized.set_data_bytes_received(stats.data_bytes_received);
+       serialized.set_connect_time(stats.connect_time);
        return serialized;
 }
 
@@ -79,7 +100,8 @@ void UDPInput::do_work()
        while (!should_stop()) {
                if (sock == -1) {
                        int port_num = atoi(port.c_str());
-                       sock = create_server_socket(port_num, UDP_SOCKET);
+                       sockaddr_in6 addr = CreateAnyAddress(port_num);
+                       sock = create_server_socket(addr, UDP_SOCKET);
                        if (sock == -1) {
                                log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
                                             url.c_str());
@@ -95,10 +117,9 @@ void UDPInput::do_work()
                        continue;
                }
 
-               char buf[4096];
                int ret;
                do {
-                       ret = recv(sock, buf, sizeof(buf), 0);
+                       ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
                } while (ret == -1 && errno == EINTR);
 
                if (ret <= 0) {
@@ -106,9 +127,21 @@ void UDPInput::do_work()
                        close_socket();
                        continue;
                }
+
+               {
+                       MutexLock lock(&stats_mutex);
+                       stats.bytes_received += ret;
+                       stats.data_bytes_received += ret;
+               }
                
                for (size_t i = 0; i < stream_indices.size(); ++i) {
-                       servers->add_data(stream_indices[i], buf, ret);
+                       servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
                }
        }
 }
+
+InputStats UDPInput::get_stats() const
+{
+       MutexLock lock(&stats_mutex);
+       return stats;
+}