+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <string>
+
+#include "acceptor.h"
+#include "udpinput.h"
+#include "serverpool.h"
+#include "version.h"
+#include "state.pb.h"
+
+using namespace std;
+
+extern ServerPool *servers;
+
+UDPInput::UDPInput(const string &stream_id, const string &url)
+ : stream_id(stream_id),
+ url(url),
+ sock(-1)
+{
+ // Should be verified by the caller.
+ string protocol;
+ bool ok = parse_url(url, &protocol, &host, &port, &path);
+ assert(ok);
+
+ construct_header();
+}
+
+UDPInput::UDPInput(const InputProto &serialized)
+ : stream_id(serialized.stream_id()),
+ url(serialized.url()),
+ sock(serialized.sock())
+{
+ // Should be verified by the caller.
+ string protocol;
+ bool ok = parse_url(url, &protocol, &host, &port, &path);
+ assert(ok);
+
+ construct_header();
+}
+
+InputProto UDPInput::serialize() const
+{
+ InputProto serialized;
+ serialized.set_url(url);
+ serialized.set_sock(sock);
+ return serialized;
+}
+
+void UDPInput::close_socket()
+{
+ int ret;
+ do {
+ ret = close(sock);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ perror("close()");
+ }
+
+ sock = -1;
+}
+
+void UDPInput::construct_header()
+{
+ string header =
+ "HTTP/1.0 200 OK\r\n"
+ "Content-type: application/octet-stream\r\n"
+ "Cache-control: no-cache\r\n"
+ "Server: " SERVER_IDENTIFICATION "\r\n"
+ "\r\n";
+ servers->set_header(stream_id, header);
+}
+
+void UDPInput::do_work()
+{
+ while (!should_stop) {
+ if (sock == -1) {
+ int port_num = atoi(port.c_str());
+ sock = create_server_socket(port_num, UDP_SOCKET);
+ if (sock == -1) {
+ fprintf(stderr, "WARNING: UDP socket creation failed. Waiting 0.2 seconds and trying again...\n");
+ usleep(200000);
+ continue;
+ }
+ }
+
+ // Since we are non-blocking, we need to wait for the right state first.
+ // Wait up to 50 ms, then check should_stop.
+ pollfd pfd;
+ pfd.fd = sock;
+ pfd.events = POLLIN;
+
+ int nfds = poll(&pfd, 1, 50);
+ if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
+ continue;
+ }
+ if (nfds == -1) {
+ perror("poll");
+ close_socket();
+ continue;
+ }
+
+ char buf[4096];
+ int ret;
+ do {
+ ret = recv(sock, buf, sizeof(buf), 0);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret <= 0) {
+ perror("recv");
+ close_socket();
+ continue;
+ }
+
+ servers->add_data(stream_id, buf, ret);
+ }
+}