Fix broken Client constructor.
[cubemap] / server.cpp
1 #include <stdio.h>
2 #include <string.h>
3 #include <stdint.h>
4 #include <assert.h>
5 #include <arpa/inet.h>
6 #include <curl/curl.h>
7 #include <sys/socket.h>
8 #include <pthread.h>
9 #include <sys/types.h>
10 #include <sys/ioctl.h>
11 #include <sys/epoll.h>
12 #include <errno.h>
13 #include <vector>
14 #include <string>
15 #include <map>
16 #include <algorithm>
17
18 #include "metacube.h"
19 #include "server.h"
20 #include "mutexlock.h"
21 #include "state.pb.h"
22
23 using namespace std;
24
25 Client::Client(int sock)
26         : sock(sock),
27           state(Client::READING_REQUEST),
28           header_bytes_sent(0),
29           bytes_sent(0)
30 {
31         request.reserve(1024);
32 }
33         
34 Client::Client(const ClientProto &serialized)
35         : sock(serialized.sock()),
36           state(State(serialized.state())),
37           request(serialized.request()),
38           stream_id(serialized.stream_id()),
39           header(serialized.header()),
40           header_bytes_sent(serialized.header_bytes_sent()),
41           bytes_sent(serialized.bytes_sent())
42 {
43 }
44
45 ClientProto Client::serialize() const
46 {
47         ClientProto serialized;
48         serialized.set_sock(sock);
49         serialized.set_state(state);
50         serialized.set_request(request);
51         serialized.set_stream_id(stream_id);
52         serialized.set_header(header);
53         serialized.set_header_bytes_sent(serialized.header_bytes_sent());
54         serialized.set_bytes_sent(bytes_sent);
55         return serialized;
56 }
57
58 Server::Server()
59 {
60         pthread_mutex_init(&mutex, NULL);
61
62         epoll_fd = epoll_create(1024);  // Size argument is ignored.
63         if (epoll_fd == -1) {
64                 perror("epoll_fd");
65                 exit(1);
66         }
67 }
68
69 void Server::run()
70 {
71         should_stop = false;
72         
73         // Joinable is already the default, but it's good to be certain.
74         pthread_attr_t attr;
75         pthread_attr_init(&attr);
76         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
77         pthread_create(&worker_thread, &attr, Server::do_work_thunk, this);
78 }
79
80 void Server::stop()
81 {
82         {
83                 MutexLock lock(&mutex);
84                 should_stop = true;
85         }
86
87         if (pthread_join(worker_thread, NULL) == -1) {
88                 perror("pthread_join");
89                 exit(1);
90         }
91 }
92
93 void *Server::do_work_thunk(void *arg)
94 {
95         Server *server = static_cast<Server *>(arg);
96         server->do_work();
97         return NULL;
98 }
99
100 void Server::do_work()
101 {
102         for ( ;; ) {
103                 int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
104                 if (nfds == -1) {
105                         perror("epoll_wait");
106                         exit(1);
107                 }
108
109                 MutexLock lock(&mutex);  // We release the mutex between iterations.
110         
111                 if (should_stop) {
112                         return;
113                 }
114         
115                 for (int i = 0; i < nfds; ++i) {
116                         int fd = events[i].data.fd;
117                         assert(clients.count(fd) != 0);
118                         Client *client = &clients[fd];
119
120                         if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) {
121                                 close_client(client);
122                                 continue;
123                         }
124
125                         process_client(client);
126                 }
127         }
128 }
129         
130 void Server::add_client(int sock)
131 {
132         MutexLock lock(&mutex);
133         clients.insert(make_pair(sock, Client(sock)));
134
135         // Start listening on data from this socket.
136         epoll_event ev;
137         ev.events = EPOLLIN | EPOLLRDHUP;
138         ev.data.fd = sock;
139         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
140                 perror("epoll_ctl(EPOLL_CTL_ADD)");
141                 exit(1);
142         }
143 }
144         
145 void Server::add_stream(const string &stream_id)
146 {
147         MutexLock lock(&mutex);
148         streams.insert(make_pair(stream_id, Stream()));
149 }
150         
151 void Server::set_header(const string &stream_id, const string &header)
152 {
153         MutexLock lock(&mutex);
154         assert(streams.count(stream_id) != 0);
155         streams[stream_id].header = header;
156 }
157         
158 void Server::add_data(const string &stream_id, const char *data, size_t bytes)
159 {
160         if (bytes == 0) {
161                 return;
162         }
163
164         MutexLock lock(&mutex);
165         assert(streams.count(stream_id) != 0);
166         Stream *stream = &streams[stream_id];
167         size_t pos = stream->data_size % BACKLOG_SIZE;
168         stream->data_size += bytes;
169
170         if (pos + bytes > BACKLOG_SIZE) {
171                 size_t to_copy = BACKLOG_SIZE - pos;
172                 memcpy(stream->data + pos, data, to_copy);
173                 data += to_copy;
174                 bytes -= to_copy;
175                 pos = 0;
176         }
177
178         memcpy(stream->data + pos, data, bytes);
179         wake_up_all_clients();
180 }
181         
182 void Server::process_client(Client *client)
183 {
184         switch (client->state) {
185         case Client::READING_REQUEST: {
186                 // Try to read more of the request.
187                 char buf[1024];
188                 int ret = read(client->sock, buf, sizeof(buf));
189                 if (ret == -1) {
190                         perror("read");
191                         close_client(client);
192                         return;
193                 }
194                 if (ret == 0) {
195                         // No data? This really means that we were triggered for something else than
196                         // POLLIN (which suggests a logic error in epoll).
197                         fprintf(stderr, "WARNING: fd %d returned unexpectedly 0 bytes!\n", client->sock);
198                         close_client(client);
199                         return;
200                 }
201
202                 // Guard against overlong requests gobbling up all of our space.
203                 if (client->request.size() + ret > MAX_CLIENT_REQUEST) {
204                         fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock);
205                         close_client(client);
206                         return;
207                 }       
208
209                 // See if we have \r\n\r\n anywhere in the request. We start three bytes
210                 // before what we just appended, in case we just got the final character.
211                 size_t existing_req_bytes = client->request.size();
212                 client->request.append(string(buf, buf + ret));
213         
214                 size_t start_at = (existing_req_bytes >= 3 ? existing_req_bytes - 3 : 0);
215                 const char *ptr = reinterpret_cast<char *>(
216                         memmem(client->request.data() + start_at, client->request.size() - start_at,
217                                "\r\n\r\n", 4));
218                 if (ptr == NULL) {
219                         // OK, we don't have the entire header yet. Fine; we'll get it later.
220                         return;
221                 }
222
223                 if (ptr != client->request.data() + client->request.size() - 4) {
224                         fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock);
225                         close_client(client);
226                         return;
227                 }
228
229                 parse_request(client);
230                 break;
231         }
232         case Client::SENDING_HEADER: {
233                 int ret = write(client->sock,
234                                 client->header.data() + client->header_bytes_sent,
235                                 client->header.size() - client->header_bytes_sent);
236                 if (ret == -1) {
237                         perror("write");
238                         close_client(client);
239                         return;
240                 }
241                 
242                 client->header_bytes_sent += ret;
243                 assert(client->header_bytes_sent <= client->header.size());
244
245                 if (client->header_bytes_sent < client->header.size()) {
246                         // We haven't sent all yet. Fine; we'll do that later.
247                         return;
248                 }
249
250                 // We're done sending the header! Clear the entire header to release some memory.
251                 client->header.clear();
252
253                 // Start sending from the end. In other words, we won't send any of the backlog,
254                 // but we'll start sending immediately as we get data.
255                 client->state = Client::SENDING_DATA;
256                 client->bytes_sent = streams[client->stream_id].data_size;
257                 break;
258         }
259         case Client::SENDING_DATA: {
260                 // See if there's some data we've lost. Ideally, we should drop to a block boundary,
261                 // but resync will be the mux's problem.
262                 const Stream &stream = streams[client->stream_id];
263                 size_t bytes_to_send = stream.data_size - client->bytes_sent;
264                 if (bytes_to_send > BACKLOG_SIZE) {
265                         fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n",
266                                 client->sock,
267                                 (long long int)(bytes_to_send - BACKLOG_SIZE));
268                         client->bytes_sent = streams[client->stream_id].data_size - BACKLOG_SIZE;
269                         bytes_to_send = BACKLOG_SIZE;
270                 }
271
272                 // See if we need to split across the circular buffer.
273                 ssize_t ret;
274                 if ((client->bytes_sent % BACKLOG_SIZE) + bytes_to_send > BACKLOG_SIZE) {
275                         size_t bytes_first_part = BACKLOG_SIZE - (client->bytes_sent % BACKLOG_SIZE);
276
277                         iovec iov[2];
278                         iov[0].iov_base = const_cast<char *>(stream.data + (client->bytes_sent % BACKLOG_SIZE));
279                         iov[0].iov_len = bytes_first_part;
280
281                         iov[1].iov_base = const_cast<char *>(stream.data);
282                         iov[1].iov_len = bytes_to_send - bytes_first_part;
283
284                         ret = writev(client->sock, iov, 2);
285                 } else {
286                         ret = write(client->sock,
287                                     stream.data + (client->bytes_sent % BACKLOG_SIZE),
288                                     bytes_to_send);
289                 }
290                 if (ret == -1) {
291                         perror("write/writev");
292                         close_client(client);
293                         return;
294                 }
295                 client->bytes_sent += ret;
296
297                 if (client->bytes_sent == stream.data_size) {
298                         // We don't have any more data for this client, so put it to sleep.
299                         put_client_to_sleep(client);
300                 }
301                 break;
302         }
303         default:
304                 assert(false);
305         }
306 }
307
308 void Server::parse_request(Client *client)
309 {
310         // TODO: Actually parse the request. :-)
311         client->stream_id = "stream";
312         client->request.clear();
313
314         // Construct the header.
315         client->header = "HTTP/1.0 200 OK\r\n  Content-type: video/x-flv\r\nCache-Control: no-cache\r\nContent-type: todo/fixme\r\n\r\n" +
316                 streams[client->stream_id].header;
317
318         // Switch states.
319         client->state = Client::SENDING_HEADER;
320
321         epoll_event ev;
322         ev.events = EPOLLOUT | EPOLLRDHUP;
323         ev.data.fd = client->sock;
324
325         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
326                 perror("epoll_ctl(EPOLL_CTL_MOD)");
327                 exit(1);
328         }
329 }
330         
331 void Server::close_client(Client *client)
332 {
333         if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) {
334                 perror("epoll_ctl(EPOLL_CTL_DEL)");
335                 exit(1);
336         }
337
338         // This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
339         vector<int>::iterator new_end =
340                 remove(sleeping_clients.begin(), sleeping_clients.end(), client->sock);
341         sleeping_clients.erase(new_end, sleeping_clients.end());
342         
343         // Bye-bye!
344         close(client->sock);
345         clients.erase(client->sock);
346 }
347         
348 void Server::put_client_to_sleep(Client *client)
349 {
350         epoll_event ev;
351         ev.events = EPOLLRDHUP;
352         ev.data.fd = client->sock;
353
354         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
355                 perror("epoll_ctl(EPOLL_CTL_MOD)");
356                 exit(1);
357         }
358
359         sleeping_clients.push_back(client->sock);
360 }
361
362 void Server::wake_up_all_clients()
363 {
364         for (unsigned i = 0; i < sleeping_clients.size(); ++i) {
365                 epoll_event ev;
366                 ev.events = EPOLLOUT | EPOLLRDHUP;
367                 ev.data.fd = sleeping_clients[i];
368                 if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sleeping_clients[i], &ev) == -1) {
369                         perror("epoll_ctl(EPOLL_CTL_MOD)");
370                         exit(1);
371                 }
372         }
373         sleeping_clients.clear();
374 }