CXXFLAGS=-Wall -O2 -g
LDLIBS=-lpthread -lprotobuf
-OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o
+OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o
all: cubemap
extern ServerPool *servers;
extern volatile bool hupped;
-int create_server_socket(int port)
+int create_server_socket(int port, SocketType socket_type)
{
- int server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
+ int server_sock;
+ if (socket_type == TCP_SOCKET) {
+ server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
+ } else {
+ assert(socket_type == UDP_SOCKET);
+ server_sock = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP);
+ }
if (server_sock == -1) {
perror("socket");
exit(1);
exit(1);
}
- if (listen(server_sock, 128) == -1) {
- perror("listen");
- exit(1);
+ if (socket_type == TCP_SOCKET) {
+ if (listen(server_sock, 128) == -1) {
+ perror("listen");
+ exit(1);
+ }
}
return server_sock;
#include "thread.h"
-int create_server_socket(int port);
+enum SocketType {
+ TCP_SOCKET,
+ UDP_SOCKET,
+};
+int create_server_socket(int port, SocketType socket_type);
class AcceptorProto;
# now the streams!
#
stream /test.flv src=http://gruessi.zrh.sesse.net:4013/test.flv mark=1000-5000
+stream /udp.ts src=udp://@:1234
parse_url(url, &protocol, &host, &port, &path); // Don't care if it fails.
}
+void HTTPInput::close_socket()
+{
+ int ret;
+ do {
+ ret = close(sock);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ perror("close()");
+ }
+}
+
InputProto HTTPInput::serialize() const
{
InputProto serialized;
// Serialization/deserialization.
HTTPInput(const InputProto &serialized);
virtual InputProto serialize() const;
+
+ virtual void close_socket();
virtual std::string get_url() const { return url; }
#include <string>
#include "httpinput.h"
+#include "udpinput.h"
#include "input.h"
#include "state.pb.h"
// Extremely rudimentary URL parsing.
bool parse_url(const string &url, string *protocol, string *host, string *port, string *path)
{
- if (url.find("http://") != 0) {
+ size_t split = url.find("://");
+ if (split == string::npos) {
return false;
}
+ *protocol = string(url.begin(), url.begin() + split);
- *protocol = "http";
-
- string rest = url.substr(strlen("http://"));
- size_t split = rest.find_first_of(":/");
+ string rest = string(url.begin() + split + 3, url.end());
+ split = rest.find_first_of(":/");
if (split == string::npos) {
// http://foo
*host = rest;
- *port = "http";
+ *port = *protocol;
*path = "/";
return true;
}
}
// http://foo/bar
- *port = "http";
+ *port = *protocol;
*path = rest;
return true;
}
if (protocol == "http") {
return new HTTPInput(stream_id, url);
}
+ if (protocol == "udp") {
+ return new UDPInput(stream_id, url);
+ }
return NULL;
}
if (protocol == "http") {
return new HTTPInput(serialized);
}
+ if (protocol == "udp") {
+ return new UDPInput(serialized);
+ }
return NULL;
}
virtual ~Input();
virtual InputProto serialize() const = 0;
virtual std::string get_url() const = 0;
+ virtual void close_socket() = 0;
};
#endif // !defined(_INPUT_H)
acceptor = deserialized_acceptor_it->second;
deserialized_acceptors->erase(deserialized_acceptor_it);
} else {
- int server_sock = create_server_socket(port);
+ int server_sock = create_server_socket(port, TCP_SOCKET);
acceptor = new Acceptor(server_sock, port);
}
acceptor->run();
if (input->get_url() != src) {
fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n",
stream_id.c_str(), input->get_url().c_str(), src.c_str());
+ input->close_socket();
delete input;
input = NULL;
}
--- /dev/null
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <string>
+
+#include "acceptor.h"
+#include "udpinput.h"
+#include "serverpool.h"
+#include "version.h"
+#include "state.pb.h"
+
+using namespace std;
+
+extern ServerPool *servers;
+
+UDPInput::UDPInput(const string &stream_id, const string &url)
+ : stream_id(stream_id),
+ url(url),
+ sock(-1)
+{
+ // Should be verified by the caller.
+ string protocol;
+ bool ok = parse_url(url, &protocol, &host, &port, &path);
+ assert(ok);
+
+ construct_header();
+}
+
+UDPInput::UDPInput(const InputProto &serialized)
+ : stream_id(serialized.stream_id()),
+ url(serialized.url()),
+ sock(serialized.sock())
+{
+ // Should be verified by the caller.
+ string protocol;
+ bool ok = parse_url(url, &protocol, &host, &port, &path);
+ assert(ok);
+
+ construct_header();
+}
+
+InputProto UDPInput::serialize() const
+{
+ InputProto serialized;
+ serialized.set_url(url);
+ serialized.set_sock(sock);
+ return serialized;
+}
+
+void UDPInput::close_socket()
+{
+ int ret;
+ do {
+ ret = close(sock);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ perror("close()");
+ }
+
+ sock = -1;
+}
+
+void UDPInput::construct_header()
+{
+ string header =
+ "HTTP/1.0 200 OK\r\n"
+ "Content-type: application/octet-stream\r\n"
+ "Cache-control: no-cache\r\n"
+ "Server: " SERVER_IDENTIFICATION "\r\n"
+ "\r\n";
+ servers->set_header(stream_id, header);
+}
+
+void UDPInput::do_work()
+{
+ while (!should_stop) {
+ if (sock == -1) {
+ int port_num = atoi(port.c_str());
+ sock = create_server_socket(port_num, UDP_SOCKET);
+ if (sock == -1) {
+ fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n");
+ usleep(200000);
+ continue;
+ }
+ }
+
+ // Since we are non-blocking, we need to wait for the right state first.
+ // Wait up to 50 ms, then check should_stop.
+ pollfd pfd;
+ pfd.fd = sock;
+ pfd.events = POLLIN;
+
+ int nfds = poll(&pfd, 1, 50);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ continue;
+ }
+ if (nfds == -1) {
+ perror("poll");
+ close_socket();
+ continue;
+ }
+
+ char buf[4096];
+ int ret;
+ do {
+ ret = recv(sock, buf, sizeof(buf), 0);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret <= 0) {
+ perror("recv");
+ close_socket();
+ continue;
+ }
+
+ servers->add_data(stream_id, buf, ret);
+ }
+}
--- /dev/null
+#ifndef _UDPINPUT_H
+#define _UDPINPUT_H 1
+
+#include <vector>
+#include <string>
+
+#include "input.h"
+
+class InputProto;
+
+class UDPInput : public Input {
+public:
+ UDPInput(const std::string &stream_id, const std::string &url);
+
+ // Serialization/deserialization.
+ UDPInput(const InputProto &serialized);
+ virtual InputProto serialize() const;
+
+ virtual std::string get_url() const { return url; }
+ virtual void close_socket();
+
+private:
+ // Actually gets the packets.
+ virtual void do_work();
+
+ // Create the HTTP header.
+ void construct_header();
+
+ std::string stream_id;
+
+ // The URL and its parsed components.
+ std::string url;
+ std::string host, port, path;
+
+ // The socket we are receiving on (or -1).
+ int sock;
+};
+
+#endif // !defined(_UDPINPUT_H)