6 #include <sys/socket.h>
12 #include "serverpool.h"
19 extern ServerPool *servers;
21 UDPInput::UDPInput(const string &url)
25 // Should be verified by the caller.
27 bool ok = parse_url(url, &protocol, &host, &port, &path);
33 UDPInput::UDPInput(const InputProto &serialized)
34 : url(serialized.url()),
35 sock(serialized.sock())
37 // Should be verified by the caller.
39 bool ok = parse_url(url, &protocol, &host, &port, &path);
45 InputProto UDPInput::serialize() const
47 InputProto serialized;
48 serialized.set_url(url);
49 serialized.set_sock(sock);
53 void UDPInput::close_socket()
58 } while (ret == -1 && errno == EINTR);
61 log_perror("close()");
67 void UDPInput::construct_header()
71 "Content-type: application/octet-stream\r\n"
72 "Cache-control: no-cache\r\n"
73 "Server: " SERVER_IDENTIFICATION "\r\n"
74 "Connection: close\r\n";
77 void UDPInput::add_destination(const string &stream_id)
79 stream_ids.push_back(stream_id);
80 servers->set_header(stream_id, http_header, "");
83 void UDPInput::do_work()
85 while (!should_stop) {
87 int port_num = atoi(port.c_str());
88 sock = create_server_socket(port_num, UDP_SOCKET);
90 log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
97 // Since we are non-blocking, we need to wait for the right state first.
98 // Wait up to 50 ms, then check should_stop.
103 int nfds = poll(&pfd, 1, 50);
104 if (nfds == 0 || (nfds == -1 && errno == EINTR)) {
116 ret = recv(sock, buf, sizeof(buf), 0);
117 } while (ret == -1 && errno == EINTR);
125 for (size_t i = 0; i < stream_ids.size(); ++i) {
126 servers->add_data(stream_ids[i], buf, ret);