From: Steinar H. Gunderson Date: Thu, 11 Apr 2013 21:34:03 +0000 (+0200) Subject: Support UDP input. Also fix some issues with socket closing. X-Git-Tag: 1.0.0~144 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=ca9624c43b968a0f29ea44e47851ff686bb64bb6 Support UDP input. Also fix some issues with socket closing. --- diff --git a/Makefile b/Makefile index 5367daa..bc3511a 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g LDLIBS=-lpthread -lprotobuf -OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o +OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o all: cubemap diff --git a/acceptor.cpp b/acceptor.cpp index 87bf47b..fbc5b04 100644 --- a/acceptor.cpp +++ b/acceptor.cpp @@ -17,9 +17,15 @@ using namespace std; extern ServerPool *servers; extern volatile bool hupped; -int create_server_socket(int port) +int create_server_socket(int port, SocketType socket_type) { - int server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + int server_sock; + if (socket_type == TCP_SOCKET) { + server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + } else { + assert(socket_type == UDP_SOCKET); + server_sock = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP); + } if (server_sock == -1) { perror("socket"); exit(1); @@ -54,9 +60,11 @@ int create_server_socket(int port) exit(1); } - if (listen(server_sock, 128) == -1) { - perror("listen"); - exit(1); + if (socket_type == TCP_SOCKET) { + if (listen(server_sock, 128) == -1) { + perror("listen"); + exit(1); + } } return server_sock; diff --git a/acceptor.h b/acceptor.h index caa4647..e0a1bd5 100644 --- a/acceptor.h +++ b/acceptor.h @@ -3,7 +3,11 @@ #include "thread.h" -int create_server_socket(int port); +enum SocketType { + TCP_SOCKET, + UDP_SOCKET, +}; +int create_server_socket(int port, SocketType socket_type); class AcceptorProto; diff --git a/cubemap.config.sample b/cubemap.config.sample index 5773927..852e0ca 100644 --- a/cubemap.config.sample +++ b/cubemap.config.sample @@ -12,3 +12,4 @@ stats_interval 60 # now the streams! # stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000 +stream /udp.ts src=udp://@:1234 diff --git a/httpinput.cpp b/httpinput.cpp index 46f1358..354e4b9 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -57,6 +57,18 @@ HTTPInput::HTTPInput(const InputProto &serialized) parse_url(url, &protocol, &host, &port, &path); // Don't care if it fails. } +void HTTPInput::close_socket() +{ + int ret; + do { + ret = close(sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + perror("close()"); + } +} + InputProto HTTPInput::serialize() const { InputProto serialized; diff --git a/httpinput.h b/httpinput.h index 894d8e9..1714b71 100644 --- a/httpinput.h +++ b/httpinput.h @@ -15,6 +15,8 @@ public: // Serialization/deserialization. HTTPInput(const InputProto &serialized); virtual InputProto serialize() const; + + virtual void close_socket(); virtual std::string get_url() const { return url; } diff --git a/input.cpp b/input.cpp index 00b6547..3f6a9d7 100644 --- a/input.cpp +++ b/input.cpp @@ -2,6 +2,7 @@ #include #include "httpinput.h" +#include "udpinput.h" #include "input.h" #include "state.pb.h" @@ -10,18 +11,18 @@ using namespace std; // Extremely rudimentary URL parsing. bool parse_url(const string &url, string *protocol, string *host, string *port, string *path) { - if (url.find("http://") != 0) { + size_t split = url.find("://"); + if (split == string::npos) { return false; } + *protocol = string(url.begin(), url.begin() + split); - *protocol = "http"; - - string rest = url.substr(strlen("http://")); - size_t split = rest.find_first_of(":/"); + string rest = string(url.begin() + split + 3, url.end()); + split = rest.find_first_of(":/"); if (split == string::npos) { // http://foo *host = rest; - *port = "http"; + *port = *protocol; *path = "/"; return true; } @@ -47,7 +48,7 @@ bool parse_url(const string &url, string *protocol, string *host, string *port, } // http://foo/bar - *port = "http"; + *port = *protocol; *path = rest; return true; } @@ -61,6 +62,9 @@ Input *create_input(const std::string &stream_id, const std::string &url) if (protocol == "http") { return new HTTPInput(stream_id, url); } + if (protocol == "udp") { + return new UDPInput(stream_id, url); + } return NULL; } @@ -73,6 +77,9 @@ Input *create_input(const InputProto &serialized) if (protocol == "http") { return new HTTPInput(serialized); } + if (protocol == "udp") { + return new UDPInput(serialized); + } return NULL; } diff --git a/input.h b/input.h index 50cfeaa..79cfc54 100644 --- a/input.h +++ b/input.h @@ -21,6 +21,7 @@ public: virtual ~Input(); virtual InputProto serialize() const = 0; virtual std::string get_url() const = 0; + virtual void close_socket() = 0; }; #endif // !defined(_INPUT_H) diff --git a/main.cpp b/main.cpp index 25f82eb..d77fb2e 100644 --- a/main.cpp +++ b/main.cpp @@ -215,7 +215,7 @@ vector create_acceptors( acceptor = deserialized_acceptor_it->second; deserialized_acceptors->erase(deserialized_acceptor_it); } else { - int server_sock = create_server_socket(port); + int server_sock = create_server_socket(port, TCP_SOCKET); acceptor = new Acceptor(server_sock, port); } acceptor->run(); @@ -262,6 +262,7 @@ vector create_inputs(const vector &config, if (input->get_url() != src) { fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n", stream_id.c_str(), input->get_url().c_str(), src.c_str()); + input->close_socket(); delete input; input = NULL; } diff --git a/udpinput.cpp b/udpinput.cpp new file mode 100644 index 0000000..cd66f16 --- /dev/null +++ b/udpinput.cpp @@ -0,0 +1,121 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "acceptor.h" +#include "udpinput.h" +#include "serverpool.h" +#include "version.h" +#include "state.pb.h" + +using namespace std; + +extern ServerPool *servers; + +UDPInput::UDPInput(const string &stream_id, const string &url) + : stream_id(stream_id), + url(url), + sock(-1) +{ + // Should be verified by the caller. + string protocol; + bool ok = parse_url(url, &protocol, &host, &port, &path); + assert(ok); + + construct_header(); +} + +UDPInput::UDPInput(const InputProto &serialized) + : stream_id(serialized.stream_id()), + url(serialized.url()), + sock(serialized.sock()) +{ + // Should be verified by the caller. + string protocol; + bool ok = parse_url(url, &protocol, &host, &port, &path); + assert(ok); + + construct_header(); +} + +InputProto UDPInput::serialize() const +{ + InputProto serialized; + serialized.set_url(url); + serialized.set_sock(sock); + return serialized; +} + +void UDPInput::close_socket() +{ + int ret; + do { + ret = close(sock); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + perror("close()"); + } + + sock = -1; +} + +void UDPInput::construct_header() +{ + string header = + "HTTP/1.0 200 OK\r\n" + "Content-type: application/octet-stream\r\n" + "Cache-control: no-cache\r\n" + "Server: " SERVER_IDENTIFICATION "\r\n" + "\r\n"; + servers->set_header(stream_id, header); +} + +void UDPInput::do_work() +{ + while (!should_stop) { + if (sock == -1) { + int port_num = atoi(port.c_str()); + sock = create_server_socket(port_num, UDP_SOCKET); + if (sock == -1) { + fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n"); + usleep(200000); + continue; + } + } + + // Since we are non-blocking, we need to wait for the right state first. + // Wait up to 50 ms, then check should_stop. + pollfd pfd; + pfd.fd = sock; + pfd.events = POLLIN; + + int nfds = poll(&pfd, 1, 50); + if (nfds == 0 || (nfds == -1 && errno == EINTR)) { + continue; + } + if (nfds == -1) { + perror("poll"); + close_socket(); + continue; + } + + char buf[4096]; + int ret; + do { + ret = recv(sock, buf, sizeof(buf), 0); + } while (ret == -1 && errno == EINTR); + + if (ret <= 0) { + perror("recv"); + close_socket(); + continue; + } + + servers->add_data(stream_id, buf, ret); + } +} diff --git a/udpinput.h b/udpinput.h new file mode 100644 index 0000000..0f89d68 --- /dev/null +++ b/udpinput.h @@ -0,0 +1,39 @@ +#ifndef _UDPINPUT_H +#define _UDPINPUT_H 1 + +#include +#include + +#include "input.h" + +class InputProto; + +class UDPInput : public Input { +public: + UDPInput(const std::string &stream_id, const std::string &url); + + // Serialization/deserialization. + UDPInput(const InputProto &serialized); + virtual InputProto serialize() const; + + virtual std::string get_url() const { return url; } + virtual void close_socket(); + +private: + // Actually gets the packets. + virtual void do_work(); + + // Create the HTTP header. + void construct_header(); + + std::string stream_id; + + // The URL and its parsed components. + std::string url; + std::string host, port, path; + + // The socket we are receiving on (or -1). + int sock; +}; + +#endif // !defined(_UDPINPUT_H)