]> git.sesse.net Git - cubemap/blob - server.cpp
When closing unused inputs, remember to take down their sockets.
[cubemap] / server.cpp
1 #include <arpa/inet.h>
2 #include <assert.h>
3 #include <errno.h>
4 #include <pthread.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <sys/epoll.h>
8 #include <sys/sendfile.h>
9 #include <sys/socket.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12 #include <algorithm>
13 #include <map>
14 #include <string>
15 #include <utility>
16 #include <vector>
17
18 #include "accesslog.h"
19 #include "log.h"
20 #include "markpool.h"
21 #include "metacube.h"
22 #include "mutexlock.h"
23 #include "parse.h"
24 #include "server.h"
25 #include "state.pb.h"
26 #include "stream.h"
27
28 using namespace std;
29
30 extern AccessLogThread *access_log;
31
32 Server::Server()
33 {
34         pthread_mutex_init(&mutex, NULL);
35         pthread_mutex_init(&queued_data_mutex, NULL);
36
37         epoll_fd = epoll_create(1024);  // Size argument is ignored.
38         if (epoll_fd == -1) {
39                 log_perror("epoll_fd");
40                 exit(1);
41         }
42 }
43
44 Server::~Server()
45 {
46         for (map<string, Stream *>::iterator stream_it = streams.begin();
47              stream_it != streams.end();
48              ++stream_it) {
49                 delete stream_it->second;
50         }
51
52         int ret;
53         do {
54                 ret = close(epoll_fd);
55         } while (ret == -1 && errno == EINTR);
56
57         if (ret == -1) {
58                 log_perror("close(epoll_fd)");
59         }
60 }
61
62 vector<ClientStats> Server::get_client_stats() const
63 {
64         vector<ClientStats> ret;
65
66         MutexLock lock(&mutex);
67         for (map<int, Client>::const_iterator client_it = clients.begin();
68              client_it != clients.end();
69              ++client_it) {
70                 ret.push_back(client_it->second.get_stats());
71         }
72         return ret;
73 }
74
75 void Server::do_work()
76 {
77         for ( ;; ) {
78                 int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
79                 if (nfds == -1 && errno == EINTR) {
80                         if (should_stop) {
81                                 return;
82                         }
83                         continue;
84                 }
85                 if (nfds == -1) {
86                         log_perror("epoll_wait");
87                         exit(1);
88                 }
89
90                 MutexLock lock(&mutex);  // We release the mutex between iterations.
91         
92                 process_queued_data();
93
94                 for (int i = 0; i < nfds; ++i) {
95                         Client *client = reinterpret_cast<Client *>(events[i].data.u64);
96
97                         if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) {
98                                 close_client(client);
99                                 continue;
100                         }
101
102                         process_client(client);
103                 }
104
105                 for (map<string, Stream *>::iterator stream_it = streams.begin();
106                      stream_it != streams.end();
107                      ++stream_it) {
108                         vector<Client *> to_process;
109                         swap(stream_it->second->to_process, to_process);
110                         for (size_t i = 0; i < to_process.size(); ++i) {
111                                 process_client(to_process[i]);
112                         }
113                 }
114
115                 if (should_stop) {
116                         return;
117                 }
118         }
119 }
120
121 CubemapStateProto Server::serialize()
122 {
123         // We don't serialize anything queued, so empty the queues.
124         process_queued_data();
125
126         CubemapStateProto serialized;
127         for (map<int, Client>::const_iterator client_it = clients.begin();
128              client_it != clients.end();
129              ++client_it) {
130                 serialized.add_clients()->MergeFrom(client_it->second.serialize());
131         }
132         for (map<string, Stream *>::const_iterator stream_it = streams.begin();
133              stream_it != streams.end();
134              ++stream_it) {
135                 serialized.add_streams()->MergeFrom(stream_it->second->serialize());
136         }
137         return serialized;
138 }
139
140 void Server::add_client_deferred(int sock)
141 {
142         MutexLock lock(&queued_data_mutex);
143         queued_add_clients.push_back(sock);
144 }
145
146 void Server::add_client(int sock)
147 {
148         clients.insert(make_pair(sock, Client(sock)));
149
150         // Start listening on data from this socket.
151         epoll_event ev;
152         ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
153         ev.data.u64 = reinterpret_cast<uint64_t>(&clients[sock]);
154         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
155                 log_perror("epoll_ctl(EPOLL_CTL_ADD)");
156                 exit(1);
157         }
158
159         process_client(&clients[sock]);
160 }
161
162 void Server::add_client_from_serialized(const ClientProto &client)
163 {
164         MutexLock lock(&mutex);
165         Stream *stream;
166         map<string, Stream *>::iterator stream_it = streams.find(client.stream_id());
167         if (stream_it == streams.end()) {
168                 stream = NULL;
169         } else {
170                 stream = stream_it->second;
171         }
172         clients.insert(make_pair(client.sock(), Client(client, stream)));
173         Client *client_ptr = &clients[client.sock()];
174
175         // Start listening on data from this socket.
176         epoll_event ev;
177         if (client.state() == Client::READING_REQUEST) {
178                 ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
179         } else {
180                 // If we don't have more data for this client, we'll be putting it into
181                 // the sleeping array again soon.
182                 ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
183         }
184         ev.data.u64 = 0;  // Keep Valgrind happy.
185         ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
186         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
187                 log_perror("epoll_ctl(EPOLL_CTL_ADD)");
188                 exit(1);
189         }
190
191         if (client_ptr->state == Client::SENDING_DATA && 
192             client_ptr->stream_pos == client_ptr->stream->bytes_received) {
193                 client_ptr->stream->put_client_to_sleep(client_ptr);
194         } else {
195                 process_client(client_ptr);
196         }
197 }
198
199 void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
200 {
201         MutexLock lock(&mutex);
202         streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
203 }
204
205 void Server::add_stream_from_serialized(const StreamProto &stream)
206 {
207         MutexLock lock(&mutex);
208         streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
209 }
210         
211 void Server::set_backlog_size(const string &stream_id, size_t new_size)
212 {
213         MutexLock lock(&mutex);
214         assert(streams.count(stream_id) != 0);
215         streams[stream_id]->set_backlog_size(new_size);
216 }
217         
218 void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
219 {
220         MutexLock lock(&mutex);
221         assert(streams.count(stream_id) != 0);
222         streams[stream_id]->encoding = encoding;
223 }
224         
225 void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
226 {
227         MutexLock lock(&mutex);
228         find_stream(stream_id)->http_header = http_header;
229         find_stream(stream_id)->stream_header = stream_header;
230
231         // If there are clients we haven't sent anything to yet, we should give
232         // them the header, so push back into the SENDING_HEADER state.
233         for (map<int, Client>::iterator client_it = clients.begin();
234              client_it != clients.end();
235              ++client_it) {
236                 Client *client = &client_it->second;
237                 if (client->state == Client::SENDING_DATA &&
238                     client->stream_pos == 0) {
239                         construct_header(client);
240                 }
241         }
242 }
243         
244 void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
245 {
246         MutexLock lock(&mutex);
247         assert(clients.empty());
248         find_stream(stream_id)->mark_pool = mark_pool;
249 }
250
251 void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
252 {
253         MutexLock lock(&queued_data_mutex);
254         queued_data[stream_id].append(string(data, data + bytes));
255 }
256
257 // See the .h file for postconditions after this function.      
258 void Server::process_client(Client *client)
259 {
260         switch (client->state) {
261         case Client::READING_REQUEST: {
262 read_request_again:
263                 // Try to read more of the request.
264                 char buf[1024];
265                 int ret;
266                 do {
267                         ret = read(client->sock, buf, sizeof(buf));
268                 } while (ret == -1 && errno == EINTR);
269
270                 if (ret == -1 && errno == EAGAIN) {
271                         // No more data right now. Nothing to do.
272                         // This is postcondition #2.
273                         return;
274                 }
275                 if (ret == -1) {
276                         log_perror("read");
277                         close_client(client);
278                         return;
279                 }
280                 if (ret == 0) {
281                         // OK, the socket is closed.
282                         close_client(client);
283                         return;
284                 }
285
286                 RequestParseStatus status = wait_for_double_newline(&client->request, buf, ret);
287         
288                 switch (status) {
289                 case RP_OUT_OF_SPACE:
290                         log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str());
291                         close_client(client);
292                         return;
293                 case RP_NOT_FINISHED_YET:
294                         // OK, we don't have the entire header yet. Fine; we'll get it later.
295                         // See if there's more data for us.
296                         goto read_request_again;
297                 case RP_EXTRA_DATA:
298                         log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str());
299                         close_client(client);
300                         return;
301                 case RP_FINISHED:
302                         break;
303                 }
304
305                 assert(status == RP_FINISHED);
306
307                 int error_code = parse_request(client);
308                 if (error_code == 200) {
309                         construct_header(client);
310                 } else {
311                         construct_error(client, error_code);
312                 }
313
314                 // We've changed states, so fall through.
315                 assert(client->state == Client::SENDING_ERROR ||
316                        client->state == Client::SENDING_HEADER);
317         }
318         case Client::SENDING_ERROR:
319         case Client::SENDING_HEADER: {
320 sending_header_or_error_again:
321                 int ret;
322                 do {
323                         ret = write(client->sock,
324                                     client->header_or_error.data() + client->header_or_error_bytes_sent,
325                                     client->header_or_error.size() - client->header_or_error_bytes_sent);
326                 } while (ret == -1 && errno == EINTR);
327
328                 if (ret == -1 && errno == EAGAIN) {
329                         // We're out of socket space, so now we're at the “low edge” of epoll's
330                         // edge triggering. epoll will tell us when there is more room, so for now,
331                         // just return.
332                         // This is postcondition #4.
333                         return;
334                 }
335
336                 if (ret == -1) {
337                         // Error! Postcondition #1.
338                         log_perror("write");
339                         close_client(client);
340                         return;
341                 }
342                 
343                 client->header_or_error_bytes_sent += ret;
344                 assert(client->header_or_error_bytes_sent <= client->header_or_error.size());
345
346                 if (client->header_or_error_bytes_sent < client->header_or_error.size()) {
347                         // We haven't sent all yet. Fine; go another round.
348                         goto sending_header_or_error_again;
349                 }
350
351                 // We're done sending the header or error! Clear it to release some memory.
352                 client->header_or_error.clear();
353
354                 if (client->state == Client::SENDING_ERROR) {
355                         // We're done sending the error, so now close.  
356                         // This is postcondition #1.
357                         close_client(client);
358                         return;
359                 }
360
361                 // Start sending from the end. In other words, we won't send any of the backlog,
362                 // but we'll start sending immediately as we get data.
363                 // This is postcondition #3.
364                 client->state = Client::SENDING_DATA;
365                 client->stream_pos = client->stream->bytes_received;
366                 client->stream->put_client_to_sleep(client);
367                 return;
368         }
369         case Client::SENDING_DATA: {
370 sending_data_again:
371                 // See if there's some data we've lost. Ideally, we should drop to a block boundary,
372                 // but resync will be the mux's problem.
373                 Stream *stream = client->stream;
374                 size_t bytes_to_send = stream->bytes_received - client->stream_pos;
375                 if (bytes_to_send == 0) {
376                         return;
377                 }
378                 if (bytes_to_send > stream->backlog_size) {
379                         log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection",
380                                 client->remote_addr.c_str(),
381                                 (long long int)(bytes_to_send - stream->backlog_size));
382                         client->stream_pos = stream->bytes_received - stream->backlog_size;
383                         client->bytes_lost += bytes_to_send - stream->backlog_size;
384                         ++client->num_loss_events;
385                         bytes_to_send = stream->backlog_size;
386                 }
387
388                 // See if we need to split across the circular buffer.
389                 bool more_data = false;
390                 if ((client->stream_pos % stream->backlog_size) + bytes_to_send > stream->backlog_size) {
391                         bytes_to_send = stream->backlog_size - (client->stream_pos % stream->backlog_size);
392                         more_data = true;
393                 }
394
395                 ssize_t ret;
396                 do {
397                         loff_t offset = client->stream_pos % stream->backlog_size;
398                         ret = sendfile(client->sock, stream->data_fd, &offset, bytes_to_send);
399                 } while (ret == -1 && errno == EINTR);
400
401                 if (ret == -1 && errno == EAGAIN) {
402                         // We're out of socket space, so return; epoll will wake us up
403                         // when there is more room.
404                         // This is postcondition #4.
405                         return;
406                 }
407                 if (ret == -1) {
408                         // Error, close; postcondition #1.
409                         log_perror("sendfile");
410                         close_client(client);
411                         return;
412                 }
413                 client->stream_pos += ret;
414                 client->bytes_sent += ret;
415
416                 if (client->stream_pos == stream->bytes_received) {
417                         // We don't have any more data for this client, so put it to sleep.
418                         // This is postcondition #3.
419                         stream->put_client_to_sleep(client);
420                 } else if (more_data && size_t(ret) == bytes_to_send) {
421                         goto sending_data_again;
422                 }
423                 break;
424         }
425         default:
426                 assert(false);
427         }
428 }
429
430 int Server::parse_request(Client *client)
431 {
432         vector<string> lines = split_lines(client->request);
433         if (lines.empty()) {
434                 return 400;  // Bad request (empty).
435         }
436
437         vector<string> request_tokens = split_tokens(lines[0]);
438         if (request_tokens.size() < 2) {
439                 return 400;  // Bad request (empty).
440         }
441         if (request_tokens[0] != "GET") {
442                 return 400;  // Should maybe be 405 instead?
443         }
444         if (streams.count(request_tokens[1]) == 0) {
445                 return 404;  // Not found.
446         }
447
448         client->stream_id = request_tokens[1];
449         client->stream = find_stream(client->stream_id);
450         if (client->stream->mark_pool != NULL) {
451                 client->fwmark = client->stream->mark_pool->get_mark();
452         } else {
453                 client->fwmark = 0;  // No mark.
454         }
455         if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) {                          
456                 if (client->fwmark != 0) {
457                         log_perror("setsockopt(SO_MARK)");
458                 }
459         }
460         client->request.clear();
461
462         return 200;  // OK!
463 }
464
465 void Server::construct_header(Client *client)
466 {
467         Stream *stream = find_stream(client->stream_id);
468         if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
469                 client->header_or_error = stream->http_header +
470                         "\r\n" +
471                         stream->stream_header;
472         } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
473                 metacube_block_header hdr;
474                 memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
475                 hdr.size = htonl(stream->stream_header.size());
476                 hdr.flags = htonl(METACUBE_FLAGS_HEADER);
477
478                 client->header_or_error = stream->http_header +
479                         "Content-encoding: metacube\r\n" +
480                         "\r\n" +
481                         string(reinterpret_cast<char *>(&hdr), sizeof(hdr)) +
482                         stream->stream_header;
483         } else {
484                 assert(false);
485         }
486
487         // Switch states.
488         client->state = Client::SENDING_HEADER;
489
490         epoll_event ev;
491         ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
492         ev.data.u64 = reinterpret_cast<uint64_t>(client);
493
494         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
495                 log_perror("epoll_ctl(EPOLL_CTL_MOD)");
496                 exit(1);
497         }
498 }
499         
500 void Server::construct_error(Client *client, int error_code)
501 {
502         char error[256];
503         snprintf(error, 256, "HTTP/1.0 %d Error\r\nContent-type: text/plain\r\n\r\nSomething went wrong. Sorry.\r\n",
504                 error_code);
505         client->header_or_error = error;
506
507         // Switch states.
508         client->state = Client::SENDING_ERROR;
509
510         epoll_event ev;
511         ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
512         ev.data.u64 = reinterpret_cast<uint64_t>(client);
513
514         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
515                 log_perror("epoll_ctl(EPOLL_CTL_MOD)");
516                 exit(1);
517         }
518 }
519
520 template<class T>
521 void delete_from(vector<T> *v, T elem)
522 {
523         typename vector<T>::iterator new_end = remove(v->begin(), v->end(), elem);
524         v->erase(new_end, v->end());
525 }
526         
527 void Server::close_client(Client *client)
528 {
529         if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) {
530                 log_perror("epoll_ctl(EPOLL_CTL_DEL)");
531                 exit(1);
532         }
533
534         // This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
535         if (client->stream != NULL) {
536                 delete_from(&client->stream->sleeping_clients, client);
537                 delete_from(&client->stream->to_process, client);
538                 if (client->stream->mark_pool != NULL) {
539                         int fwmark = client->fwmark;
540                         client->stream->mark_pool->release_mark(fwmark);
541                 }
542         }
543
544         // Log to access_log.
545         access_log->write(client->get_stats());
546
547         // Bye-bye!
548         int ret;
549         do {
550                 ret = close(client->sock);
551         } while (ret == -1 && errno == EINTR);
552
553         if (ret == -1) {
554                 log_perror("close");
555         }
556
557         clients.erase(client->sock);
558 }
559         
560 Stream *Server::find_stream(const string &stream_id)
561 {
562         map<string, Stream *>::iterator it = streams.find(stream_id);
563         assert(it != streams.end());
564         return it->second;
565 }
566
567 void Server::process_queued_data()
568 {
569         MutexLock lock(&queued_data_mutex);
570
571         for (size_t i = 0; i < queued_add_clients.size(); ++i) {
572                 add_client(queued_add_clients[i]);
573         }
574         queued_add_clients.clear();     
575         
576         for (map<string, string>::iterator queued_it = queued_data.begin();
577              queued_it != queued_data.end();
578              ++queued_it) {
579                 Stream *stream = find_stream(queued_it->first);
580                 stream->add_data(queued_it->second.data(), queued_it->second.size());
581                 stream->wake_up_all_clients();
582         }
583         queued_data.clear();
584 }