]> git.sesse.net Git - cubemap/blob - server.cpp
Explicitly SIGHUP threads to kill them out of syscalls when we want to join them...
[cubemap] / server.cpp
1 #include <stdio.h>
2 #include <string.h>
3 #include <stdint.h>
4 #include <assert.h>
5 #include <arpa/inet.h>
6 #include <sys/socket.h>
7 #include <pthread.h>
8 #include <sys/types.h>
9 #include <sys/ioctl.h>
10 #include <sys/epoll.h>
11 #include <time.h>
12 #include <signal.h>
13 #include <errno.h>
14 #include <vector>
15 #include <string>
16 #include <map>
17 #include <algorithm>
18
19 #include "metacube.h"
20 #include "server.h"
21 #include "mutexlock.h"
22 #include "parse.h"
23 #include "state.pb.h"
24
25 using namespace std;
26
27 Client::Client(int sock)
28         : sock(sock),
29           connect_time(time(NULL)),
30           state(Client::READING_REQUEST),
31           stream(NULL),
32           header_or_error_bytes_sent(0),
33           bytes_sent(0)
34 {
35         request.reserve(1024);
36
37         // Find the remote address, and convert it to ASCII.
38         sockaddr_in6 addr;
39         socklen_t addr_len = sizeof(addr);
40
41         if (getpeername(sock, reinterpret_cast<sockaddr *>(&addr), &addr_len) == -1) {
42                 perror("getpeername");
43                 remote_addr = "";
44         } else {
45                 char buf[INET6_ADDRSTRLEN];
46                 if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) {
47                         perror("inet_ntop");
48                         remote_addr = "";
49                 } else {
50                         remote_addr = buf;
51                 }
52         }
53 }
54         
55 Client::Client(const ClientProto &serialized, Stream *stream)
56         : sock(serialized.sock()),
57           remote_addr(serialized.remote_addr()),
58           connect_time(serialized.connect_time()),
59           state(State(serialized.state())),
60           request(serialized.request()),
61           stream_id(serialized.stream_id()),
62           stream(stream),
63           header_or_error(serialized.header_or_error()),
64           header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()),
65           bytes_sent(serialized.bytes_sent())
66 {
67 }
68
69 ClientProto Client::serialize() const
70 {
71         ClientProto serialized;
72         serialized.set_sock(sock);
73         serialized.set_remote_addr(remote_addr);
74         serialized.set_connect_time(connect_time);
75         serialized.set_state(state);
76         serialized.set_request(request);
77         serialized.set_stream_id(stream_id);
78         serialized.set_header_or_error(header_or_error);
79         serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent());
80         serialized.set_bytes_sent(bytes_sent);
81         return serialized;
82 }
83         
84 ClientStats Client::get_stats() const
85 {
86         ClientStats stats;
87         stats.stream_id = stream_id;
88         stats.remote_addr = remote_addr;
89         stats.connect_time = connect_time;
90         stats.bytes_sent = bytes_sent;
91         return stats;
92 }
93
94 Stream::Stream(const string &stream_id)
95         : stream_id(stream_id),
96           data(new char[BACKLOG_SIZE]),
97           data_size(0)
98 {
99         memset(data, 0, BACKLOG_SIZE);
100 }
101
102 Stream::~Stream()
103 {
104         delete[] data;
105 }
106
107 Stream::Stream(const StreamProto &serialized)
108         : stream_id(serialized.stream_id()),
109           header(serialized.header()),
110           data(new char[BACKLOG_SIZE]),
111           data_size(serialized.data_size())
112 {
113         assert(serialized.data().size() == BACKLOG_SIZE);
114         memcpy(data, serialized.data().data(), BACKLOG_SIZE);
115 }
116
117 StreamProto Stream::serialize() const
118 {
119         StreamProto serialized;
120         serialized.set_header(header);
121         serialized.set_data(string(data, data + BACKLOG_SIZE));
122         serialized.set_data_size(data_size);
123         serialized.set_stream_id(stream_id);
124         return serialized;
125 }
126
127 void Stream::put_client_to_sleep(Client *client)
128 {
129         sleeping_clients.push_back(client);
130 }
131
132 void Stream::wake_up_all_clients()
133 {
134         if (to_process.empty()) {
135                 swap(sleeping_clients, to_process);
136         } else {
137                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
138                 sleeping_clients.clear();
139         }
140 }
141
142 Server::Server()
143 {
144         pthread_mutex_init(&mutex, NULL);
145         pthread_mutex_init(&queued_data_mutex, NULL);
146
147         epoll_fd = epoll_create(1024);  // Size argument is ignored.
148         if (epoll_fd == -1) {
149                 perror("epoll_fd");
150                 exit(1);
151         }
152 }
153
154 Server::~Server()
155 {
156         int ret;
157         do {
158                 ret = close(epoll_fd);
159         } while (ret == -1 && errno == EINTR);
160
161         if (ret == -1) {
162                 perror("close(epoll_fd)");
163         }
164 }
165
166 void Server::run()
167 {
168         should_stop = false;
169         
170         // Joinable is already the default, but it's good to be certain.
171         pthread_attr_t attr;
172         pthread_attr_init(&attr);
173         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
174         pthread_create(&worker_thread, &attr, Server::do_work_thunk, this);
175 }
176         
177 void Server::stop()
178 {
179         {
180                 MutexLock lock(&mutex);
181                 should_stop = true;
182         }
183
184         pthread_kill(worker_thread, SIGHUP);
185         if (pthread_join(worker_thread, NULL) == -1) {
186                 perror("pthread_join");
187                 exit(1);
188         }
189 }
190         
191 vector<ClientStats> Server::get_client_stats() const
192 {
193         vector<ClientStats> ret;
194
195         MutexLock lock(&mutex);
196         for (map<int, Client>::const_iterator client_it = clients.begin();
197              client_it != clients.end();
198              ++client_it) {
199                 ret.push_back(client_it->second.get_stats());
200         }
201         return ret;
202 }
203
204 void *Server::do_work_thunk(void *arg)
205 {
206         Server *server = static_cast<Server *>(arg);
207         server->do_work();
208         return NULL;
209 }
210
211 void Server::do_work()
212 {
213         for ( ;; ) {
214                 int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS);
215                 if (nfds == -1 && errno == EINTR) {
216                         if (should_stop) {
217                                 return;
218                         }
219                         continue;
220                 }
221                 if (nfds == -1) {
222                         perror("epoll_wait");
223                         exit(1);
224                 }
225
226                 MutexLock lock(&mutex);  // We release the mutex between iterations.
227         
228                 process_queued_data();
229
230                 for (int i = 0; i < nfds; ++i) {
231                         int fd = events[i].data.fd;
232                         assert(clients.count(fd) != 0);
233                         Client *client = &clients[fd];
234
235                         if (events[i].events & (EPOLLERR | EPOLLRDHUP | EPOLLHUP)) {
236                                 close_client(client);
237                                 continue;
238                         }
239
240                         process_client(client);
241                 }
242
243                 for (map<string, Stream *>::iterator stream_it = streams.begin();
244                      stream_it != streams.end();
245                      ++stream_it) {
246                         vector<Client *> to_process;
247                         swap(stream_it->second->to_process, to_process);
248                         for (size_t i = 0; i < to_process.size(); ++i) {
249                                 process_client(to_process[i]);
250                         }
251                 }
252
253                 if (should_stop) {
254                         return;
255                 }
256         }
257 }
258
259 CubemapStateProto Server::serialize()
260 {
261         // We don't serialize anything queued, so empty the queues.
262         process_queued_data();
263
264         CubemapStateProto serialized;
265         for (map<int, Client>::const_iterator client_it = clients.begin();
266              client_it != clients.end();
267              ++client_it) {
268                 serialized.add_clients()->MergeFrom(client_it->second.serialize());
269         }
270         for (map<string, Stream *>::const_iterator stream_it = streams.begin();
271              stream_it != streams.end();
272              ++stream_it) {
273                 serialized.add_streams()->MergeFrom(stream_it->second->serialize());
274         }
275         return serialized;
276 }
277
278 void Server::add_client_deferred(int sock)
279 {
280         MutexLock lock(&queued_data_mutex);
281         queued_add_clients.push_back(sock);
282 }
283
284 void Server::add_client(int sock)
285 {
286         clients.insert(make_pair(sock, Client(sock)));
287
288         // Start listening on data from this socket.
289         epoll_event ev;
290         ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
291         ev.data.u64 = 0;  // Keep Valgrind happy.
292         ev.data.fd = sock;
293         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
294                 perror("epoll_ctl(EPOLL_CTL_ADD)");
295                 exit(1);
296         }
297
298         process_client(&clients[sock]);
299 }
300
301 void Server::add_client_from_serialized(const ClientProto &client)
302 {
303         MutexLock lock(&mutex);
304         Stream *stream = find_stream(client.stream_id());
305         clients.insert(make_pair(client.sock(), Client(client, stream)));
306         Client *client_ptr = &clients[client.sock()];
307
308         // Start listening on data from this socket.
309         epoll_event ev;
310         if (client.state() == Client::READING_REQUEST) {
311                 ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
312         } else {
313                 // If we don't have more data for this client, we'll be putting it into
314                 // the sleeping array again soon.
315                 ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
316         }
317         ev.data.u64 = 0;  // Keep Valgrind happy.
318         ev.data.fd = client.sock();
319         if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
320                 perror("epoll_ctl(EPOLL_CTL_ADD)");
321                 exit(1);
322         }
323
324         if (client_ptr->state == Client::SENDING_DATA && 
325             client_ptr->bytes_sent == client_ptr->stream->data_size) {
326                 client_ptr->stream->put_client_to_sleep(client_ptr);
327         } else {
328                 process_client(client_ptr);
329         }
330 }
331
332 void Server::add_stream(const string &stream_id)
333 {
334         MutexLock lock(&mutex);
335         streams.insert(make_pair(stream_id, new Stream(stream_id)));
336 }
337
338 void Server::add_stream_from_serialized(const StreamProto &stream)
339 {
340         MutexLock lock(&mutex);
341         streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
342 }
343         
344 void Server::set_header(const string &stream_id, const string &header)
345 {
346         MutexLock lock(&mutex);
347         find_stream(stream_id)->header = header;
348
349         // If there are clients we haven't sent anything to yet, we should give
350         // them the header, so push back into the SENDING_HEADER state.
351         for (map<int, Client>::iterator client_it = clients.begin();
352              client_it != clients.end();
353              ++client_it) {
354                 Client *client = &client_it->second;
355                 if (client->state == Client::SENDING_DATA &&
356                     client->bytes_sent == 0) {
357                         construct_header(client);
358                 }
359         }
360 }
361
362 void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
363 {
364         MutexLock lock(&queued_data_mutex);
365         queued_data[stream_id].append(string(data, data + bytes));
366 }
367
368 void Server::add_data(const string &stream_id, const char *data, size_t bytes)
369 {
370         Stream *stream = find_stream(stream_id);
371         size_t pos = stream->data_size % BACKLOG_SIZE;
372         stream->data_size += bytes;
373
374         if (pos + bytes > BACKLOG_SIZE) {
375                 size_t to_copy = BACKLOG_SIZE - pos;
376                 memcpy(stream->data + pos, data, to_copy);
377                 data += to_copy;
378                 bytes -= to_copy;
379                 pos = 0;
380         }
381
382         memcpy(stream->data + pos, data, bytes);
383         stream->wake_up_all_clients();
384 }
385
386 // See the .h file for postconditions after this function.      
387 void Server::process_client(Client *client)
388 {
389         switch (client->state) {
390         case Client::READING_REQUEST: {
391 read_request_again:
392                 // Try to read more of the request.
393                 char buf[1024];
394                 int ret;
395                 do {
396                         ret = read(client->sock, buf, sizeof(buf));
397                 } while (ret == -1 && errno == EINTR);
398
399                 if (ret == -1 && errno == EAGAIN) {
400                         // No more data right now. Nothing to do.
401                         // This is postcondition #2.
402                         return;
403                 }
404                 if (ret == -1) {
405                         perror("read");
406                         close_client(client);
407                         return;
408                 }
409                 if (ret == 0) {
410                         // OK, the socket is closed.
411                         close_client(client);
412                         return;
413                 }
414
415                 RequestParseStatus status = wait_for_double_newline(&client->request, buf, ret);
416         
417                 switch (status) {
418                 case RP_OUT_OF_SPACE:
419                         fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock);
420                         close_client(client);
421                         return;
422                 case RP_NOT_FINISHED_YET:
423                         // OK, we don't have the entire header yet. Fine; we'll get it later.
424                         // See if there's more data for us.
425                         goto read_request_again;
426                 case RP_EXTRA_DATA:
427                         fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock);
428                         close_client(client);
429                         return;
430                 case RP_FINISHED:
431                         break;
432                 }
433
434                 assert(status == RP_FINISHED);
435
436                 int error_code = parse_request(client);
437                 if (error_code == 200) {
438                         construct_header(client);
439                 } else {
440                         construct_error(client, error_code);
441                 }
442
443                 // We've changed states, so fall through.
444                 assert(client->state == Client::SENDING_ERROR ||
445                        client->state == Client::SENDING_HEADER);
446         }
447         case Client::SENDING_ERROR:
448         case Client::SENDING_HEADER: {
449 sending_header_or_error_again:
450                 int ret;
451                 do {
452                         ret = write(client->sock,
453                                     client->header_or_error.data() + client->header_or_error_bytes_sent,
454                                     client->header_or_error.size() - client->header_or_error_bytes_sent);
455                 } while (ret == -1 && errno == EINTR);
456
457                 if (ret == -1 && errno == EAGAIN) {
458                         // We're out of socket space, so now we're at the “low edge” of epoll's
459                         // edge triggering. epoll will tell us when there is more room, so for now,
460                         // just return.
461                         // This is postcondition #4.
462                         return;
463                 }
464
465                 if (ret == -1) {
466                         // Error! Postcondition #1.
467                         perror("write");
468                         close_client(client);
469                         return;
470                 }
471                 
472                 client->header_or_error_bytes_sent += ret;
473                 assert(client->header_or_error_bytes_sent <= client->header_or_error.size());
474
475                 if (client->header_or_error_bytes_sent < client->header_or_error.size()) {
476                         // We haven't sent all yet. Fine; go another round.
477                         goto sending_header_or_error_again;
478                 }
479
480                 // We're done sending the header or error! Clear it to release some memory.
481                 client->header_or_error.clear();
482
483                 if (client->state == Client::SENDING_ERROR) {
484                         // We're done sending the error, so now close.  
485                         // This is postcondition #1.
486                         close_client(client);
487                         return;
488                 }
489
490                 // Start sending from the end. In other words, we won't send any of the backlog,
491                 // but we'll start sending immediately as we get data.
492                 // This is postcondition #3.
493                 client->state = Client::SENDING_DATA;
494                 client->bytes_sent = client->stream->data_size;
495                 client->stream->put_client_to_sleep(client);
496                 return;
497         }
498         case Client::SENDING_DATA: {
499                 // See if there's some data we've lost. Ideally, we should drop to a block boundary,
500                 // but resync will be the mux's problem.
501                 Stream *stream = client->stream;
502                 size_t bytes_to_send = stream->data_size - client->bytes_sent;
503                 if (bytes_to_send == 0) {
504                         return;
505                 }
506                 if (bytes_to_send > BACKLOG_SIZE) {
507                         fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n",
508                                 client->sock,
509                                 (long long int)(bytes_to_send - BACKLOG_SIZE));
510                         client->bytes_sent = stream->data_size - BACKLOG_SIZE;
511                         bytes_to_send = BACKLOG_SIZE;
512                 }
513
514                 // See if we need to split across the circular buffer.
515                 ssize_t ret;
516                 if ((client->bytes_sent % BACKLOG_SIZE) + bytes_to_send > BACKLOG_SIZE) {
517                         size_t bytes_first_part = BACKLOG_SIZE - (client->bytes_sent % BACKLOG_SIZE);
518
519                         iovec iov[2];
520                         iov[0].iov_base = const_cast<char *>(stream->data + (client->bytes_sent % BACKLOG_SIZE));
521                         iov[0].iov_len = bytes_first_part;
522
523                         iov[1].iov_base = const_cast<char *>(stream->data);
524                         iov[1].iov_len = bytes_to_send - bytes_first_part;
525
526                         do {
527                                 ret = writev(client->sock, iov, 2);
528                         } while (ret == -1 && errno == EINTR);
529                 } else {
530                         do {
531                                 ret = write(client->sock,
532                                             stream->data + (client->bytes_sent % BACKLOG_SIZE),
533                                             bytes_to_send);
534                         } while (ret == -1 && errno == EINTR);
535                 }
536                 if (ret == -1 && errno == EAGAIN) {
537                         // We're out of socket space, so return; epoll will wake us up
538                         // when there is more room.
539                         // This is postcondition #4.
540                         return;
541                 }
542                 if (ret == -1) {
543                         // Error, close; postcondition #1.
544                         perror("write/writev");
545                         close_client(client);
546                         return;
547                 }
548                 client->bytes_sent += ret;
549
550                 if (client->bytes_sent == stream->data_size) {
551                         // We don't have any more data for this client, so put it to sleep.
552                         // This is postcondition #3.
553                         stream->put_client_to_sleep(client);
554                 } else {
555                         // XXX: Do we need to go another round here to explicitly
556                         // get the EAGAIN?
557                 }
558                 break;
559         }
560         default:
561                 assert(false);
562         }
563 }
564
565 int Server::parse_request(Client *client)
566 {
567         vector<string> lines = split_lines(client->request);
568         if (lines.empty()) {
569                 return 400;  // Bad request (empty).
570         }
571
572         vector<string> request_tokens = split_tokens(lines[0]);
573         if (request_tokens.size() < 2) {
574                 return 400;  // Bad request (empty).
575         }
576         if (request_tokens[0] != "GET") {
577                 return 400;  // Should maybe be 405 instead?
578         }
579         if (streams.count(request_tokens[1]) == 0) {
580                 return 404;  // Not found.
581         }
582
583         client->stream_id = request_tokens[1];
584         client->stream = find_stream(client->stream_id);
585         client->request.clear();
586
587         return 200;  // OK!
588 }
589
590 void Server::construct_header(Client *client)
591 {
592         client->header_or_error = find_stream(client->stream_id)->header;
593
594         // Switch states.
595         client->state = Client::SENDING_HEADER;
596
597         epoll_event ev;
598         ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
599         ev.data.u64 = 0;  // Keep Valgrind happy.
600         ev.data.fd = client->sock;
601
602         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
603                 perror("epoll_ctl(EPOLL_CTL_MOD)");
604                 exit(1);
605         }
606 }
607         
608 void Server::construct_error(Client *client, int error_code)
609 {
610         char error[256];
611         snprintf(error, 256, "HTTP/1.0 %d Error\r\nContent-type: text/plain\r\n\r\nSomething went wrong. Sorry.\r\n",
612                 error_code);
613         client->header_or_error = error;
614
615         // Switch states.
616         client->state = Client::SENDING_ERROR;
617
618         epoll_event ev;
619         ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
620         ev.data.u64 = 0;  // Keep Valgrind happy.
621         ev.data.fd = client->sock;
622
623         if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
624                 perror("epoll_ctl(EPOLL_CTL_MOD)");
625                 exit(1);
626         }
627 }
628
629 template<class T>
630 void delete_from(vector<T> *v, T elem)
631 {
632         typename vector<T>::iterator new_end = remove(v->begin(), v->end(), elem);
633         v->erase(new_end, v->end());
634 }
635         
636 void Server::close_client(Client *client)
637 {
638         if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) {
639                 perror("epoll_ctl(EPOLL_CTL_DEL)");
640                 exit(1);
641         }
642
643         // This client could be sleeping, so we'll need to fix that. (Argh, O(n).)
644         if (client->stream != NULL) {
645                 delete_from(&client->stream->sleeping_clients, client);
646                 delete_from(&client->stream->to_process, client);
647         }
648         
649         // Bye-bye!
650         int ret;
651         do {
652                 ret = close(client->sock);
653         } while (ret == -1 && errno == EINTR);
654
655         if (ret == -1) {
656                 perror("close");
657         }
658
659         clients.erase(client->sock);
660 }
661         
662 Stream *Server::find_stream(const string &stream_id)
663 {
664         map<string, Stream *>::iterator it = streams.find(stream_id);
665         assert(it != streams.end());
666         return it->second;
667 }
668
669 void Server::process_queued_data()
670 {
671         MutexLock lock(&queued_data_mutex);
672
673         for (size_t i = 0; i < queued_add_clients.size(); ++i) {
674                 add_client(queued_add_clients[i]);
675         }
676         queued_add_clients.clear();     
677         
678         for (map<string, string>::iterator queued_it = queued_data.begin();
679              queued_it != queued_data.end();
680              ++queued_it) {
681                 add_data(queued_it->first, queued_it->second.data(), queued_it->second.size());
682         }
683         queued_data.clear();
684 }