Support UDP input. Also fix some issues with socket closing.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 11 Apr 2013 21:34:03 +0000 (23:34 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 11 Apr 2013 21:34:03 +0000 (23:34 +0200)
Makefile
acceptor.cpp
acceptor.h
cubemap.config.sample
httpinput.cpp
httpinput.h
input.cpp
input.h
main.cpp
udpinput.cpp [new file with mode: 0644]
udpinput.h [new file with mode: 0644]

index 5367daa..bc3511a 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ PROTOC=protoc
 CXXFLAGS=-Wall -O2 -g
 LDLIBS=-lpthread -lprotobuf
 
 CXXFLAGS=-Wall -O2 -g
 LDLIBS=-lpthread -lprotobuf
 
-OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o
+OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o
 
 all: cubemap
 
 
 all: cubemap
 
index 87bf47b..fbc5b04 100644 (file)
@@ -17,9 +17,15 @@ using namespace std;
 extern ServerPool *servers;
 extern volatile bool hupped;
 
 extern ServerPool *servers;
 extern volatile bool hupped;
 
-int create_server_socket(int port)
+int create_server_socket(int port, SocketType socket_type)
 {
 {
-       int server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
+       int server_sock;
+       if (socket_type == TCP_SOCKET) {
+               server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
+       } else {
+               assert(socket_type == UDP_SOCKET);
+               server_sock = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP);
+       }
        if (server_sock == -1) {
                perror("socket");
                exit(1);
        if (server_sock == -1) {
                perror("socket");
                exit(1);
@@ -54,9 +60,11 @@ int create_server_socket(int port)
                exit(1);
        }
 
                exit(1);
        }
 
-       if (listen(server_sock, 128) == -1) {
-               perror("listen");
-               exit(1);
+       if (socket_type == TCP_SOCKET) {
+               if (listen(server_sock, 128) == -1) {
+                       perror("listen");
+                       exit(1);
+               }
        }
 
        return server_sock;
        }
 
        return server_sock;
index caa4647..e0a1bd5 100644 (file)
@@ -3,7 +3,11 @@
 
 #include "thread.h"
 
 
 #include "thread.h"
 
-int create_server_socket(int port);
+enum SocketType {
+       TCP_SOCKET,
+       UDP_SOCKET,
+};
+int create_server_socket(int port, SocketType socket_type);
 
 class AcceptorProto;
 
 
 class AcceptorProto;
 
index 5773927..852e0ca 100644 (file)
@@ -12,3 +12,4 @@ stats_interval 60
 # now the streams!
 #
 stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
 # now the streams!
 #
 stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
+stream /udp.ts src=udp://@:1234
index 46f1358..354e4b9 100644 (file)
@@ -57,6 +57,18 @@ HTTPInput::HTTPInput(const InputProto &serialized)
        parse_url(url, &protocol, &host, &port, &path);  // Don't care if it fails.
 }
 
        parse_url(url, &protocol, &host, &port, &path);  // Don't care if it fails.
 }
 
+void HTTPInput::close_socket()
+{
+       int ret;
+       do {
+               ret = close(sock);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               perror("close()");
+       }
+}
+
 InputProto HTTPInput::serialize() const
 {
        InputProto serialized;
 InputProto HTTPInput::serialize() const
 {
        InputProto serialized;
index 894d8e9..1714b71 100644 (file)
@@ -15,6 +15,8 @@ public:
        // Serialization/deserialization.
        HTTPInput(const InputProto &serialized);
        virtual InputProto serialize() const;
        // Serialization/deserialization.
        HTTPInput(const InputProto &serialized);
        virtual InputProto serialize() const;
+       
+       virtual void close_socket();
 
        virtual std::string get_url() const { return url; }
 
 
        virtual std::string get_url() const { return url; }
 
index 00b6547..3f6a9d7 100644 (file)
--- a/input.cpp
+++ b/input.cpp
@@ -2,6 +2,7 @@
 #include <string>
 
 #include "httpinput.h"
 #include <string>
 
 #include "httpinput.h"
+#include "udpinput.h"
 #include "input.h"
 #include "state.pb.h"
 
 #include "input.h"
 #include "state.pb.h"
 
@@ -10,18 +11,18 @@ using namespace std;
 // Extremely rudimentary URL parsing.
 bool parse_url(const string &url, string *protocol, string *host, string *port, string *path)
 {
 // Extremely rudimentary URL parsing.
 bool parse_url(const string &url, string *protocol, string *host, string *port, string *path)
 {
-       if (url.find("http://") != 0) {
+       size_t split = url.find("://");
+       if (split == string::npos) {
                return false;
        }
                return false;
        }
+       *protocol = string(url.begin(), url.begin() + split);
 
 
-       *protocol = "http";
-       
-       string rest = url.substr(strlen("http://"));
-       size_t split = rest.find_first_of(":/");
+       string rest = string(url.begin() + split + 3, url.end());
+       split = rest.find_first_of(":/");
        if (split == string::npos) {
                // http://foo
                *host = rest;
        if (split == string::npos) {
                // http://foo
                *host = rest;
-               *port = "http";
+               *port = *protocol;
                *path = "/";
                return true;
        }
                *path = "/";
                return true;
        }
@@ -47,7 +48,7 @@ bool parse_url(const string &url, string *protocol, string *host, string *port,
        }
 
        // http://foo/bar
        }
 
        // http://foo/bar
-       *port = "http";
+       *port = *protocol;
        *path = rest;
        return true;
 }
        *path = rest;
        return true;
 }
@@ -61,6 +62,9 @@ Input *create_input(const std::string &stream_id, const std::string &url)
        if (protocol == "http") {
                return new HTTPInput(stream_id, url);
        }
        if (protocol == "http") {
                return new HTTPInput(stream_id, url);
        }
+       if (protocol == "udp") {
+               return new UDPInput(stream_id, url);
+       }
        return NULL;
 }
 
        return NULL;
 }
 
@@ -73,6 +77,9 @@ Input *create_input(const InputProto &serialized)
        if (protocol == "http") {
                return new HTTPInput(serialized);
        }
        if (protocol == "http") {
                return new HTTPInput(serialized);
        }
+       if (protocol == "udp") {
+               return new UDPInput(serialized);
+       }
        return NULL;
 }
 
        return NULL;
 }
 
diff --git a/input.h b/input.h
index 50cfeaa..79cfc54 100644 (file)
--- a/input.h
+++ b/input.h
@@ -21,6 +21,7 @@ public:
        virtual ~Input();
        virtual InputProto serialize() const = 0;
        virtual std::string get_url() const = 0;
        virtual ~Input();
        virtual InputProto serialize() const = 0;
        virtual std::string get_url() const = 0;
+       virtual void close_socket() = 0;
 };
 
 #endif  // !defined(_INPUT_H)
 };
 
 #endif  // !defined(_INPUT_H)
index 25f82eb..d77fb2e 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -215,7 +215,7 @@ vector<Acceptor *> create_acceptors(
                        acceptor = deserialized_acceptor_it->second;
                        deserialized_acceptors->erase(deserialized_acceptor_it);
                } else {
                        acceptor = deserialized_acceptor_it->second;
                        deserialized_acceptors->erase(deserialized_acceptor_it);
                } else {
-                       int server_sock = create_server_socket(port);
+                       int server_sock = create_server_socket(port, TCP_SOCKET);
                        acceptor = new Acceptor(server_sock, port);
                }
                acceptor->run();
                        acceptor = new Acceptor(server_sock, port);
                }
                acceptor->run();
@@ -262,6 +262,7 @@ vector<Input *> create_inputs(const vector<ConfigLine> &config,
                        if (input->get_url() != src) {
                                fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n",
                                        stream_id.c_str(), input->get_url().c_str(), src.c_str());
                        if (input->get_url() != src) {
                                fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n",
                                        stream_id.c_str(), input->get_url().c_str(), src.c_str());
+                               input->close_socket();
                                delete input;
                                input = NULL;
                        }
                                delete input;
                                input = NULL;
                        }
diff --git a/udpinput.cpp b/udpinput.cpp
new file mode 100644 (file)
index 0000000..cd66f16
--- /dev/null
@@ -0,0 +1,121 @@
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <string>
+
+#include "acceptor.h"
+#include "udpinput.h"
+#include "serverpool.h"
+#include "version.h"
+#include "state.pb.h"
+
+using namespace std;
+
+extern ServerPool *servers;
+
+UDPInput::UDPInput(const string &stream_id, const string &url)
+       : stream_id(stream_id),
+         url(url),
+         sock(-1)
+{
+       // Should be verified by the caller.
+       string protocol;
+       bool ok = parse_url(url, &protocol, &host, &port, &path);
+       assert(ok);
+
+       construct_header();
+}
+
+UDPInput::UDPInput(const InputProto &serialized)
+       : stream_id(serialized.stream_id()),
+         url(serialized.url()),
+         sock(serialized.sock())
+{
+       // Should be verified by the caller.
+       string protocol;
+       bool ok = parse_url(url, &protocol, &host, &port, &path);
+       assert(ok);
+
+       construct_header();
+}
+
+InputProto UDPInput::serialize() const
+{
+       InputProto serialized;
+       serialized.set_url(url);
+       serialized.set_sock(sock);
+       return serialized;
+}
+
+void UDPInput::close_socket()
+{
+       int ret;
+       do {
+               ret = close(sock);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               perror("close()");
+       }
+
+       sock = -1;
+}
+       
+void UDPInput::construct_header()
+{
+       string header =
+               "HTTP/1.0 200 OK\r\n"
+               "Content-type: application/octet-stream\r\n"
+               "Cache-control: no-cache\r\n"
+               "Server: " SERVER_IDENTIFICATION "\r\n"
+               "\r\n";
+       servers->set_header(stream_id, header);
+}
+
+void UDPInput::do_work()
+{
+       while (!should_stop) {
+               if (sock == -1) {
+                       int port_num = atoi(port.c_str());
+                       sock = create_server_socket(port_num, UDP_SOCKET);
+                       if (sock == -1) {
+                               fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n");
+                               usleep(200000);
+                               continue;
+                       }
+               }
+
+               // Since we are non-blocking, we need to wait for the right state first.
+               // Wait up to 50 ms, then check should_stop.
+               pollfd pfd;
+               pfd.fd = sock;
+               pfd.events = POLLIN;
+
+               int nfds = poll(&pfd, 1, 50);
+               if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+                       continue;
+               }
+               if (nfds == -1) {
+                       perror("poll");
+                       close_socket();
+                       continue;       
+               }
+
+               char buf[4096];
+               int ret;
+               do {
+                       ret = recv(sock, buf, sizeof(buf), 0);
+               } while (ret == -1 && errno == EINTR);
+
+               if (ret <= 0) {
+                       perror("recv");
+                       close_socket();
+                       continue;
+               }
+               
+               servers->add_data(stream_id, buf, ret);
+       }
+}
diff --git a/udpinput.h b/udpinput.h
new file mode 100644 (file)
index 0000000..0f89d68
--- /dev/null
@@ -0,0 +1,39 @@
+#ifndef _UDPINPUT_H
+#define _UDPINPUT_H 1
+
+#include <vector>
+#include <string>
+
+#include "input.h"
+
+class InputProto;
+
+class UDPInput : public Input {
+public:
+       UDPInput(const std::string &stream_id, const std::string &url);
+
+       // Serialization/deserialization.
+       UDPInput(const InputProto &serialized);
+       virtual InputProto serialize() const;
+
+       virtual std::string get_url() const { return url; }
+       virtual void close_socket();
+
+private:
+       // Actually gets the packets.
+       virtual void do_work();
+
+       // Create the HTTP header.
+       void construct_header();
+
+       std::string stream_id;
+
+       // The URL and its parsed components.
+       std::string url;
+       std::string host, port, path;
+
+       // The socket we are receiving on (or -1).
+       int sock;
+};
+
+#endif  // !defined(_UDPINPUT_H)