]> git.sesse.net Git - cubemap/blob - cubemap.cpp
Split Server and MutexLock out into separate source files.
[cubemap] / cubemap.cpp
1 #include <stdio.h>
2 #include <string.h>
3 #include <stdint.h>
4 #include <assert.h>
5 #include <arpa/inet.h>
6 #include <curl/curl.h>
7 #include <sys/socket.h>
8 #include <pthread.h>
9 #include <sys/types.h>
10 #include <sys/ioctl.h>
11 #include <sys/epoll.h>
12 #include <errno.h>
13 #include <vector>
14 #include <string>
15 #include <map>
16
17 #include "metacube.h"
18 #include "server.h"
19
20 #define NUM_SERVERS 4
21 #define STREAM_ID "stream"
22 #define STREAM_URL "http://gruessi.zrh.sesse.net:4013/"
23 #define PORT 9094
24
25 using namespace std;
26
27 Server *servers = NULL;
28
29 class Input {
30 public:
31         Input(const string &stream_id);
32
33         // Connect to the given URL and start streaming.
34         // WARNING: Currently this blocks; it does not run in a separate thread!
35         void run(const string &url);
36
37 private:
38         // Recovers the this pointer and calls curl_callback().
39         static size_t curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata);
40
41         // Stores the given data, looks for Metacube blocks (skipping data if needed),
42         // and calls process_block() for each one.
43         void curl_callback(char *ptr, size_t bytes);
44         void process_block(const char *data, uint32_t size, uint32_t flags);
45
46         // Drops <num_bytes> bytes from the head of <pending_data>,
47         // and outputs a warning.
48         void drop_pending_data(size_t num_bytes);
49
50         string stream_id;
51
52         // Data we have received but not fully processed yet.
53         vector<char> pending_data;
54
55         // If <pending_data> starts with a Metacube header,
56         // this is true.
57         bool has_metacube_header;
58 };
59
60 Input::Input(const string &stream_id)
61         : stream_id(stream_id),
62           has_metacube_header(false)
63 {
64 }
65
66 void Input::run(const string &url)
67 {
68         CURL *curl = curl_easy_init();
69         curl_easy_setopt(curl, CURLOPT_URL, STREAM_URL);
70         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk);
71         curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
72         curl_easy_perform(curl);
73 }
74
75 size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata)
76 {
77         Input *input = static_cast<Input *>(userdata);
78         size_t bytes = size * nmemb;
79         input->curl_callback(ptr, bytes);       
80         return bytes;
81 }
82         
83 void Input::curl_callback(char *ptr, size_t bytes)
84 {
85         pending_data.insert(pending_data.end(), ptr, ptr + bytes);
86
87         for ( ;; ) {
88                 // If we don't have enough data (yet) for even the Metacube header, just return.
89                 if (pending_data.size() < sizeof(metacube_block_header)) {
90                         return;
91                 }
92
93                 // Make sure we have the Metacube sync header at the start.
94                 // We may need to skip over junk data (it _should_ not happen, though).
95                 if (!has_metacube_header) {
96                         char *ptr = static_cast<char *>(
97                                 memmem(pending_data.data(), pending_data.size(),
98                                        METACUBE_SYNC, strlen(METACUBE_SYNC)));
99                         if (ptr == NULL) {
100                                 // OK, so we didn't find the sync marker. We know then that
101                                 // we do not have the _full_ marker in the buffer, but we
102                                 // could have N-1 bytes. Drop everything before that,
103                                 // and then give up.
104                                 drop_pending_data(pending_data.size() - (strlen(METACUBE_SYNC) - 1));
105                                 return;
106                         } else {
107                                 // Yay, we found the header. Drop everything (if anything) before it.
108                                 drop_pending_data(ptr - pending_data.data());
109                                 has_metacube_header = true;
110
111                                 // Re-check that we have the entire header; we could have dropped data.
112                                 if (pending_data.size() < sizeof(metacube_block_header)) {
113                                         return;
114                                 }
115                         }
116                 }
117
118                 // Now it's safe to read the header.
119                 metacube_block_header *hdr = reinterpret_cast<metacube_block_header *>(pending_data.data());    
120                 assert(memcmp(hdr->sync, METACUBE_SYNC, sizeof(hdr->sync)) == 0);
121                 uint32_t size = ntohl(hdr->size);
122                 uint32_t flags = ntohl(hdr->flags);
123
124                 // See if we have the entire block. If not, wait for more data.
125                 if (pending_data.size() < sizeof(metacube_block_header) + size) {
126                         return;
127                 }
128
129                 process_block(pending_data.data(), size, flags);
130
131                 // Consume this block. This isn't the most efficient way of dealing with things
132                 // should we have many blocks, but these routines don't need to be too efficient
133                 // anyway.
134                 pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size);
135         }
136 }
137                 
138 void Input::process_block(const char *data, uint32_t size, uint32_t flags)
139 {       
140         if (flags & METACUBE_FLAGS_HEADER) {
141                 string header(data, data + size);
142                 for (int i = 0; i < NUM_SERVERS; ++i) {
143                         servers[i].set_header(stream_id, header);
144                 }
145         } else { 
146                 for (int i = 0; i < NUM_SERVERS; ++i) {
147                         servers[i].add_data(stream_id, data, size);
148                 }
149         }
150 }
151
152 void Input::drop_pending_data(size_t num_bytes)
153 {
154         if (num_bytes == 0) {
155                 return;
156         }
157         fprintf(stderr, "Warning: Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?\n",
158                 (long long)num_bytes);
159         pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes);
160 }
161
162 int create_server_socket(int port)
163 {
164         int server_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
165         if (server_sock == -1) {
166                 perror("socket");
167                 exit(1);
168         }
169
170         // We want dual-stack sockets. (Sorry, OpenBSD and Windows XP...)
171         int zero = 0;
172         if (setsockopt(server_sock, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero)) == -1) {
173                 perror("setsockopt(IPV6_V6ONLY)");
174                 exit(1);
175         }
176
177         sockaddr_in6 addr;
178         memset(&addr, 0, sizeof(addr));
179         addr.sin6_family = AF_INET6;
180         addr.sin6_port = htons(port);
181
182         if (bind(server_sock, reinterpret_cast<sockaddr *>(&addr), sizeof(addr)) == -1) {
183                 perror("bind");
184                 exit(1);
185         }
186
187         if (listen(server_sock, 128) == -1) {
188                 perror("listen");
189                 exit(1);
190         }
191
192         return server_sock;
193 }
194
195 void *acceptor_thread_run(void *arg)
196 {
197         int server_sock = int(intptr_t(arg));
198         int num_accepted = 0;
199         for ( ;; ) {
200                 sockaddr_in6 addr;
201                 socklen_t addrlen = sizeof(addr);
202
203                 // Get a new socket.
204                 int sock = accept(server_sock, reinterpret_cast<sockaddr *>(&addr), &addrlen);
205                 if (sock == -1 && errno == EINTR) {
206                         continue;
207                 }
208                 if (sock == -1) {
209                         perror("accept");
210                         exit(1);
211                 }
212
213                 // Set the socket as nonblocking.
214                 int one = 1;
215                 if (ioctl(sock, FIONBIO, &one) == -1) {
216                         perror("FIONBIO");
217                         exit(1);
218                 }
219
220                 // Pick a server, round-robin, and hand over the socket to it.
221                 servers[num_accepted % NUM_SERVERS].add_client(sock);
222                 ++num_accepted; 
223         }
224 }
225
226 int main(int argc, char **argv)
227 {
228         servers = new Server[NUM_SERVERS];
229         for (int i = 0; i < NUM_SERVERS; ++i) {
230                 servers[i].add_stream(STREAM_ID);
231                 servers[i].run();
232         }
233
234         int server_sock = create_server_socket(PORT);
235
236         pthread_t acceptor_thread;
237         pthread_create(&acceptor_thread, NULL, acceptor_thread_run, reinterpret_cast<void *>(server_sock));
238
239         Input input(STREAM_ID);
240         input.run(STREAM_URL);
241 }