]> git.sesse.net Git - cubemap/blobdiff - udpinput.cpp
Re-run include-what-you-use.
[cubemap] / udpinput.cpp
index db544857bf367e0cfb0923c4de7343496d01facd..f4c78fec4a7c8ddbf3f03e6304c5626b396c686e 100644 (file)
@@ -1,16 +1,19 @@
 #include <assert.h>
 #include <errno.h>
 #include <poll.h>
-#include <stdio.h>
+#include <stddef.h>
 #include <stdlib.h>
 #include <sys/socket.h>
+#include <time.h>
 #include <unistd.h>
 #include <string>
 
 #include "acceptor.h"
 #include "log.h"
+#include "mutexlock.h"
 #include "serverpool.h"
 #include "state.pb.h"
+#include "stream.h"
 #include "udpinput.h"
 #include "util.h"
 #include "version.h"
@@ -29,6 +32,12 @@ UDPInput::UDPInput(const string &url)
        assert(ok);
 
        construct_header();
+
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = 0;
+       stats.data_bytes_received = 0;
+       stats.connect_time = time(NULL);
 }
 
 UDPInput::UDPInput(const InputProto &serialized)
@@ -41,6 +50,16 @@ UDPInput::UDPInput(const InputProto &serialized)
        assert(ok);
 
        construct_header();
+
+       pthread_mutex_init(&stats_mutex, NULL);
+       stats.url = url;
+       stats.bytes_received = serialized.bytes_received();
+       stats.data_bytes_received = serialized.data_bytes_received();
+       if (serialized.has_connect_time()) {
+               stats.connect_time = serialized.connect_time();
+       } else {
+               stats.connect_time = time(NULL);
+       }
 }
 
 InputProto UDPInput::serialize() const
@@ -48,6 +67,9 @@ InputProto UDPInput::serialize() const
        InputProto serialized;
        serialized.set_url(url);
        serialized.set_sock(sock);
+       serialized.set_bytes_received(stats.bytes_received);
+       serialized.set_data_bytes_received(stats.data_bytes_received);
+       serialized.set_connect_time(stats.connect_time);
        return serialized;
 }
 
@@ -94,10 +116,9 @@ void UDPInput::do_work()
                        continue;
                }
 
-               char buf[4096];
                int ret;
                do {
-                       ret = recv(sock, buf, sizeof(buf), 0);
+                       ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
                } while (ret == -1 && errno == EINTR);
 
                if (ret <= 0) {
@@ -105,9 +126,21 @@ void UDPInput::do_work()
                        close_socket();
                        continue;
                }
+
+               {
+                       MutexLock lock(&stats_mutex);
+                       stats.bytes_received += ret;
+                       stats.data_bytes_received += ret;
+               }
                
                for (size_t i = 0; i < stream_indices.size(); ++i) {
-                       servers->add_data(stream_indices[i], buf, ret);
+                       servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
                }
        }
 }
+
+InputStats UDPInput::get_stats() const
+{
+       MutexLock lock(&stats_mutex);
+       return stats;
+}