Do not bother making a METACUBE_FLAGS_HEADER block if it is empty.
[cubemap] / server.cpp
1 #include <arpa/inet.h>
2 #include <assert.h>
3 #include <errno.h>
4 #include <pthread.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <sys/epoll.h>
8 #include <sys/sendfile.h>
9 #include <sys/socket.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12 #include <algorithm>
13 #include <map>
14 #include <string>
15 #include <utility>
16 #include <vector>
17
18 #include "accesslog.h"
19 #include "log.h"
20 #include "markpool.h"
21 #include "metacube.h"
22 #include "mutexlock.h"
23 #include "parse.h"
24 #include "server.h"
25 #include "state.pb.h"
26 #include "stream.h"
27
28 using namespace std;
29
30 extern AccessLogThread *access_log;
31
32 Server::Server()
33 {
34         pthread_mutex_init(&mutex, NULL);
35         pthread_mutex_init(&queued_data_mutex, NULL);
36
37         epoll_fd = epoll_create(1024);  // Size argument is ignored.
38         if (epoll_fd == -1) {
39                 log_perror("epoll_fd");
40                 exit(1);
41         }
42 }
43
44 Server::~Server()
45 {
46         for (map<string, Stream *>::iterator stream_it = streams.begin();
47              stream_it != streams.end();
48              ++stream_it) {
49                 delete stream_it->second;
50         }
51
52         int ret;
53         do {
54                 ret = close(epoll_fd);
55         } while (ret == -1 && errno == EINTR);
56
57         if (ret == -1) {
58                 log_perror("close(epoll_fd)");
59         }
60 }
61
62 vector<ClientStats> Server::get_client_stats() const
63 {
64         vector<ClientStats> ret;
65
66         MutexLock lock(&mutex);
67         for (map<int, Client>::const_iterator client_it = clients.begin();
68              client_it != clients.end();
69              ++client_it) {
70                 ret.push_back(client_it->second.get_stats());
71         }
72         return ret;
73 }
74
75 void Server::do_work()
76 {
77         for ( ;; ) {
78                 int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
79                 if (nfds == -1 && errno == EINTR) {
80                         if (should_stop) {
81                                 return;
82                         }
83                         continue;
84                 }
85                 if (nfds == -1) {
86                         log_perror("epoll_wait");
87                         exit(1);
88                 }
89
90                 MutexLock lock(&mutex);  // We release the mutex between iterations.
91         
92                 process_queued_data();
93
94                 for (int i = 0; i < nfds; ++i) {
95                         Client *client = reinterpret_cast<Client *>(events[i].data.u64);
96
97                         if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) {
98                                 close_client(client);
99                                 continue;
100                         }
101
102                         process_client(client);
103                 }
104
105                 for (map<string, Stream *>::iterator stream_it = streams.begin();
106                      stream_it != streams.end();
107                      ++stream_it) {
108                         vector<Client *> to_process;
109                         swap(stream_it->second->to_process, to_process);
110                         for (size_t i = 0; i < to_process.size(); ++i) {
111                                 process_client(to_process[i]);
112                         }
113                 }
114
115                 if (should_stop) {
116                         return;
117                 }
118         }
119 }
120
121 CubemapStateProto Server::serialize()
122 {
123         // We don't serialize anything queued, so empty the queues.
124         process_queued_data();
125
126         CubemapStateProto serialized;
127         for (map<int, Client>::const_iterator client_it = clients.begin();
128              client_it != clients.end();
129              ++client_it) {
130                 serialized.add_clients()->MergeFrom(client_it->second.serialize());
131         }
132         for (map<string, Stream *>::const_iterator stream_it = streams.begin();
133              stream_it != streams.end();
134              ++stream_it) {
135                 serialized.add_streams()->MergeFrom(stream_it->second->serialize());
136         }
137         return serialized;
138 }
139
140 void Server::add_client_deferred(int sock)
141 {
142         MutexLock lock(&queued_data_mutex);
143         queued_add_clients.push_back(sock);
144 }
145
146 void Server::add_client(int sock)
147 {
148         clients.insert(make_pair(sock, Client(sock)));
149
150         // Start listening on data from this socket.
151         epoll_event ev;
152         ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
153         ev.data.u64 = reinterpret_cast<uint64_t>(&clients[sock]);
154         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
155                 log_perror("epoll_ctl(EPOLL_CTL_ADD)");
156                 exit(1);
157         }
158
159         process_client(&clients[sock]);
160 }
161
162 void Server::add_client_from_serialized(const ClientProto &client)
163 {
164         MutexLock lock(&mutex);
165         Stream *stream;
166         map<string, Stream *>::iterator stream_it = streams.find(client.stream_id());
167         if (stream_it == streams.end()) {
168                 stream = NULL;
169         } else {
170                 stream = stream_it->second;
171         }
172         clients.insert(make_pair(client.sock(), Client(client, stream)));
173         Client *client_ptr = &clients[client.sock()];
174
175         // Start listening on data from this socket.
176         epoll_event ev;
177         if (client.state() == Client::READING_REQUEST) {
178                 ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
179         } else {
180                 // If we don't have more data for this client, we'll be putting it into
181                 // the sleeping array again soon.
182                 ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
183         }
184         ev.data.u64 = 0;  // Keep Valgrind happy.
185         ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
186         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
187                 log_perror("epoll_ctl(EPOLL_CTL_ADD)");
188                 exit(1);
189         }
190
191         if (client_ptr->state == Client::SENDING_DATA && 
192             client_ptr->stream_pos == client_ptr->stream->bytes_received) {
193                 client_ptr->stream->put_client_to_sleep(client_ptr);
194         } else {
195                 process_client(client_ptr);
196         }
197 }
198
199 void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
200 {
201         MutexLock lock(&mutex);
202         streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
203 }
204
205 void Server::add_stream_from_serialized(const StreamProto &stream)
206 {
207         MutexLock lock(&mutex);
208         streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
209 }
210         
211 void Server::set_backlog_size(const string &stream_id, size_t new_size)
212 {
213         MutexLock lock(&mutex);
214         assert(streams.count(stream_id) != 0);
215         streams[stream_id]->set_backlog_size(new_size);
216 }
217         
218 void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
219 {
220         MutexLock lock(&mutex);
221         assert(streams.count(stream_id) != 0);
222         streams[stream_id]->encoding = encoding;
223 }
224         
225 void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
226 {
227         MutexLock lock(&mutex);
228         find_stream(stream_id)->http_header = http_header;
229         find_stream(stream_id)->stream_header = stream_header;
230
231         // If there are clients we haven't sent anything to yet, we should give
232         // them the header, so push back into the SENDING_HEADER state.
233         for (map<int, Client>::iterator client_it = clients.begin();
234              client_it != clients.end();
235              ++client_it) {
236                 Client *client = &client_it->second;
237                 if (client->state == Client::SENDING_DATA &&
238                     client->stream_pos == 0) {
239                         construct_header(client);
240                 }
241         }
242 }
243         
244 void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
245 {
246         MutexLock lock(&mutex);
247         assert(clients.empty());
248         find_stream(stream_id)->mark_pool = mark_pool;
249 }
250
251 void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
252 {
253         MutexLock lock(&queued_data_mutex);
254         queued_data[stream_id].append(string(data, data + bytes));
255 }
256
257 // See the .h file for postconditions after this function.      
258 void Server::process_client(Client *client)
259 {
260         switch (client->state) {
261         case Client::READING_REQUEST: {
262 read_request_again:
263                 // Try to read more of the request.
264                 char buf[1024];
265                 int ret;
266                 do {
267                         ret = read(client->sock, buf, sizeof(buf));
268                 } while (ret == -1 && errno == EINTR);
269
270                 if (ret == -1 && errno == EAGAIN) {
271                         // No more data right now. Nothing to do.
272                         // This is postcondition #2.
273                         return;
274                 }
275                 if (ret == -1) {
276                         log_perror("read");
277                         close_client(client);
278                         return;
279                 }
280                 if (ret == 0) {
281                         // OK, the socket is closed.
282                         close_client(client);
283                         return;
284                 }
285
286                 RequestParseStatus status = wait_for_double_newline(&client->request, buf, ret);
287         
288                 switch (status) {
289                 case RP_OUT_OF_SPACE:
290                         log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str());
291                         close_client(client);
292                         return;
293                 case RP_NOT_FINISHED_YET:
294                         // OK, we don't have the entire header yet. Fine; we'll get it later.
295                         // See if there's more data for us.
296                         goto read_request_again;
297                 case RP_EXTRA_DATA:
298                         log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str());
299                         close_client(client);
300                         return;
301                 case RP_FINISHED:
302                         break;
303                 }
304
305                 assert(status == RP_FINISHED);
306
307                 int error_code = parse_request(client);
308                 if (error_code == 200) {
309                         construct_header(client);
310                 } else {
311                         construct_error(client, error_code);
312                 }
313
314                 // We've changed states, so fall through.
315                 assert(client->state == Client::SENDING_ERROR ||
316                        client->state == Client::SENDING_HEADER);
317         }
318         case Client::SENDING_ERROR:
319         case Client::SENDING_HEADER: {
320 sending_header_or_error_again:
321                 int ret;
322                 do {
323                         ret = write(client->sock,
324                                     client->header_or_error.data() + client->header_or_error_bytes_sent,
325                                     client->header_or_error.size() - client->header_or_error_bytes_sent);
326                 } while (ret == -1 && errno == EINTR);
327
328                 if (ret == -1 && errno == EAGAIN) {
329                         // We're out of socket space, so now we're at the “low edge” of epoll's
330                         // edge triggering. epoll will tell us when there is more room, so for now,
331                         // just return.
332                         // This is postcondition #4.
333                         return;
334                 }
335
336                 if (ret == -1) {
337                         // Error! Postcondition #1.
338                         log_perror("write");
339                         close_client(client);
340                         return;
341                 }
342                 
343                 client->header_or_error_bytes_sent += ret;
344                 assert(client->header_or_error_bytes_sent <= client->header_or_error.size());
345
346                 if (client->header_or_error_bytes_sent < client->header_or_error.size()) {
347                         // We haven't sent all yet. Fine; go another round.
348                         goto sending_header_or_error_again;
349                 }
350
351                 // We're done sending the header or error! Clear it to release some memory.
352                 client->header_or_error.clear();
353
354                 if (client->state == Client::SENDING_ERROR) {
355                         // We're done sending the error, so now close.  
356                         // This is postcondition #1.
357                         close_client(client);
358                         return;
359                 }
360
361                 // Start sending from the end. In other words, we won't send any of the backlog,
362                 // but we'll start sending immediately as we get data.
363                 // This is postcondition #3.
364                 client->state = Client::SENDING_DATA;
365                 client->stream_pos = client->stream->bytes_received;
366                 client->stream->put_client_to_sleep(client);
367                 return;
368         }
369         case Client::SENDING_DATA: {
370 sending_data_again:
371                 // See if there's some data we've lost. Ideally, we should drop to a block boundary,
372                 // but resync will be the mux's problem.
373                 Stream *stream = client->stream;
374                 size_t bytes_to_send = stream->bytes_received - client->stream_pos;
375                 if (bytes_to_send == 0) {
376                         return;
377                 }
378                 if (bytes_to_send > stream->backlog_size) {
379                         log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection",
380                                 client->remote_addr.c_str(),
381                                 (long long int)(bytes_to_send - stream->backlog_size));
382                         client->stream_pos = stream->bytes_received - stream->backlog_size;
383                         client->bytes_lost += bytes_to_send - stream->backlog_size;
384                         ++client->num_loss_events;
385                         bytes_to_send = stream->backlog_size;
386                 }
387
388                 // See if we need to split across the circular buffer.
389                 bool more_data = false;
390                 if ((client->stream_pos % stream->backlog_size) + bytes_to_send > stream->backlog_size) {
391                         bytes_to_send = stream->backlog_size - (client->stream_pos % stream->backlog_size);
392                         more_data = true;
393                 }
394
395                 ssize_t ret;
396                 do {
397                         loff_t offset = client->stream_pos % stream->backlog_size;
398                         ret = sendfile(client->sock, stream->data_fd, &offset, bytes_to_send);
399                 } while (ret == -1 && errno == EINTR);
400
401                 if (ret == -1 && errno == EAGAIN) {
402                         // We're out of socket space, so return; epoll will wake us up
403                         // when there is more room.
404                         // This is postcondition #4.
405                         return;
406                 }
407                 if (ret == -1) {
408                         // Error, close; postcondition #1.
409                         log_perror("sendfile");
410                         close_client(client);
411                         return;
412                 }
413                 client->stream_pos += ret;
414                 client->bytes_sent += ret;
415
416                 if (client->stream_pos == stream->bytes_received) {
417                         // We don't have any more data for this client, so put it to sleep.
418                         // This is postcondition #3.
419                         stream->put_client_to_sleep(client);
420                 } else if (more_data && size_t(ret) == bytes_to_send) {
421                         goto sending_data_again;
422                 }
423                 break;
424         }
425         default:
426                 assert(false);
427         }
428 }
429
430 int Server::parse_request(Client *client)
431 {
432         vector<string> lines = split_lines(client->request);
433         if (lines.empty()) {
434                 return 400;  // Bad request (empty).
435         }
436
437         vector<string> request_tokens = split_tokens(lines[0]);
438         if (request_tokens.size() < 2) {
439                 return 400;  // Bad request (empty).
440         }
441         if (request_tokens[0] != "GET") {
442                 return 400;  // Should maybe be 405 instead?
443         }
444         if (streams.count(request_tokens[1]) == 0) {
445                 return 404;  // Not found.
446         }
447
448         client->stream_id = request_tokens[1];
449         client->stream = find_stream(client->stream_id);
450         if (client->stream->mark_pool != NULL) {
451                 client->fwmark = client->stream->mark_pool->get_mark();
452         } else {
453                 client->fwmark = 0;  // No mark.
454         }
455         if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) {                          
456                 if (client->fwmark != 0) {
457                         log_perror("setsockopt(SO_MARK)");
458                 }
459         }
460         client->request.clear();
461
462         return 200;  // OK!
463 }
464
465 void Server::construct_header(Client *client)
466 {
467         Stream *stream = find_stream(client->stream_id);
468         if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
469                 client->header_or_error = stream->http_header +
470                         "\r\n" +
471                         stream->stream_header;
472         } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
473                 client->header_or_error = stream->http_header +
474                         "Content-encoding: metacube\r\n" +
475                         "\r\n";
476                 if (!stream->stream_header.empty()) {
477                         metacube_block_header hdr;
478                         memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
479                         hdr.size = htonl(stream->stream_header.size());
480                         hdr.flags = htonl(METACUBE_FLAGS_HEADER);
481                         client->header_or_error.append(
482                                 string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
483                 }
484                 client->header_or_error.append(stream->stream_header);
485         } else {
486                 assert(false);
487         }
488
489         // Switch states.
490         client->state = Client::SENDING_HEADER;
491
492         epoll_event ev;
493         ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
494         ev.data.u64 = reinterpret_cast<uint64_t>(client);
495
496         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
497                 log_perror("epoll_ctl(EPOLL_CTL_MOD)");
498                 exit(1);
499         }
500 }
501         
502 void Server::construct_error(Client *client, int error_code)
503 {
504         char error[256];
505         snprintf(error, 256, "HTTP/1.0 %d Error\r\nContent-type: text/plain\r\n\r\nSomething went wrong. Sorry.\r\n",
506                 error_code);
507         client->header_or_error = error;
508
509         // Switch states.
510         client->state = Client::SENDING_ERROR;
511
512         epoll_event ev;
513         ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
514         ev.data.u64 = reinterpret_cast<uint64_t>(client);
515
516         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
517                 log_perror("epoll_ctl(EPOLL_CTL_MOD)");
518                 exit(1);
519         }
520 }
521
522 template<class T>
523 void delete_from(vector<T> *v, T elem)
524 {
525         typename vector<T>::iterator new_end = remove(v->begin(), v->end(), elem);
526         v->erase(new_end, v->end());
527 }
528         
529 void Server::close_client(Client *client)
530 {
531         if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) {
532                 log_perror("epoll_ctl(EPOLL_CTL_DEL)");
533                 exit(1);
534         }
535
536         // This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
537         if (client->stream != NULL) {
538                 delete_from(&client->stream->sleeping_clients, client);
539                 delete_from(&client->stream->to_process, client);
540                 if (client->stream->mark_pool != NULL) {
541                         int fwmark = client->fwmark;
542                         client->stream->mark_pool->release_mark(fwmark);
543                 }
544         }
545
546         // Log to access_log.
547         access_log->write(client->get_stats());
548
549         // Bye-bye!
550         int ret;
551         do {
552                 ret = close(client->sock);
553         } while (ret == -1 && errno == EINTR);
554
555         if (ret == -1) {
556                 log_perror("close");
557         }
558
559         clients.erase(client->sock);
560 }
561         
562 Stream *Server::find_stream(const string &stream_id)
563 {
564         map<string, Stream *>::iterator it = streams.find(stream_id);
565         assert(it != streams.end());
566         return it->second;
567 }
568
569 void Server::process_queued_data()
570 {
571         MutexLock lock(&queued_data_mutex);
572
573         for (size_t i = 0; i < queued_add_clients.size(); ++i) {
574                 add_client(queued_add_clients[i]);
575         }
576         queued_add_clients.clear();     
577         
578         for (map<string, string>::iterator queued_it = queued_data.begin();
579              queued_it != queued_data.end();
580              ++queued_it) {
581                 Stream *stream = find_stream(queued_it->first);
582                 stream->add_data(queued_it->second.data(), queued_it->second.size());
583                 stream->wake_up_all_clients();
584         }
585         queued_data.clear();
586 }