X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=8282a562863deea72c1294cb55f8e0a0100f6a8d;hp=789d8eabd14c88c67524b632b7dbe1debbe410fd;hb=4075cd5b3568e68b28c60019ad137b34445c0cf3;hpb=a1906d9fe3e565bbd874df72e8552fa7daf9fb9a diff --git a/server.cpp b/server.cpp index 789d8ea..8282a56 100644 --- a/server.cpp +++ b/server.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -14,7 +15,10 @@ #include #include +#include "accesslog.h" +#include "log.h" #include "markpool.h" +#include "metacube.h" #include "mutexlock.h" #include "parse.h" #include "server.h" @@ -23,6 +27,8 @@ using namespace std; +extern AccessLogThread *access_log; + Server::Server() { pthread_mutex_init(&mutex, NULL); @@ -30,20 +36,26 @@ Server::Server() epoll_fd = epoll_create(1024); // Size argument is ignored. if (epoll_fd == -1) { - perror("epoll_fd"); + log_perror("epoll_fd"); exit(1); } } Server::~Server() { + for (map::iterator stream_it = streams.begin(); + stream_it != streams.end(); + ++stream_it) { + delete stream_it->second; + } + int ret; do { ret = close(epoll_fd); } while (ret == -1 && errno == EINTR); if (ret == -1) { - perror("close(epoll_fd)"); + log_perror("close(epoll_fd)"); } } @@ -71,7 +83,7 @@ void Server::do_work() continue; } if (nfds == -1) { - perror("epoll_wait"); + log_perror("epoll_wait"); exit(1); } @@ -140,7 +152,7 @@ void Server::add_client(int sock) ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; ev.data.u64 = reinterpret_cast(&clients[sock]); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_ADD)"); + log_perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } @@ -172,7 +184,7 @@ void Server::add_client_from_serialized(const ClientProto &client) ev.data.u64 = 0; // Keep Valgrind happy. ev.data.u64 = reinterpret_cast(client_ptr); if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_ADD)"); + log_perror("epoll_ctl(EPOLL_CTL_ADD)"); exit(1); } @@ -184,10 +196,10 @@ void Server::add_client_from_serialized(const ClientProto &client) } } -void Server::add_stream(const string &stream_id, size_t backlog_size) +void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding) { MutexLock lock(&mutex); - streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size))); + streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding))); } void Server::add_stream_from_serialized(const StreamProto &stream) @@ -196,17 +208,25 @@ void Server::add_stream_from_serialized(const StreamProto &stream) streams.insert(make_pair(stream.stream_id(), new Stream(stream))); } -void Server::set_backlog_size(const std::string &stream_id, size_t new_size) +void Server::set_backlog_size(const string &stream_id, size_t new_size) { MutexLock lock(&mutex); assert(streams.count(stream_id) != 0); streams[stream_id]->set_backlog_size(new_size); } -void Server::set_header(const string &stream_id, const string &header) +void Server::set_encoding(const string &stream_id, Stream::Encoding encoding) { MutexLock lock(&mutex); - find_stream(stream_id)->header = header; + assert(streams.count(stream_id) != 0); + streams[stream_id]->encoding = encoding; +} + +void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header) +{ + MutexLock lock(&mutex); + find_stream(stream_id)->http_header = http_header; + find_stream(stream_id)->stream_header = stream_header; // If there are clients we haven't sent anything to yet, we should give // them the header, so push back into the SENDING_HEADER state. @@ -221,7 +241,7 @@ void Server::set_header(const string &stream_id, const string &header) } } -void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool) +void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool) { MutexLock lock(&mutex); assert(clients.empty()); @@ -253,7 +273,7 @@ read_request_again: return; } if (ret == -1) { - perror("read"); + log_perror("read"); close_client(client); return; } @@ -267,7 +287,7 @@ read_request_again: switch (status) { case RP_OUT_OF_SPACE: - fprintf(stderr, "WARNING: fd %d sent overlong request!\n", client->sock); + log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str()); close_client(client); return; case RP_NOT_FINISHED_YET: @@ -275,7 +295,7 @@ read_request_again: // See if there's more data for us. goto read_request_again; case RP_EXTRA_DATA: - fprintf(stderr, "WARNING: fd %d had junk data after request!\n", client->sock); + log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str()); close_client(client); return; case RP_FINISHED: @@ -315,7 +335,7 @@ sending_header_or_error_again: if (ret == -1) { // Error! Postcondition #1. - perror("write"); + log_perror("write"); close_client(client); return; } @@ -356,8 +376,8 @@ sending_data_again: return; } if (bytes_to_send > stream->backlog_size) { - fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n", - client->sock, + log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection", + client->remote_addr.c_str(), (long long int)(bytes_to_send - stream->backlog_size)); client->stream_pos = stream->bytes_received - stream->backlog_size; client->bytes_lost += bytes_to_send - stream->backlog_size; @@ -386,7 +406,7 @@ sending_data_again: } if (ret == -1) { // Error, close; postcondition #1. - perror("sendfile"); + log_perror("sendfile"); close_client(client); return; } @@ -397,7 +417,7 @@ sending_data_again: // We don't have any more data for this client, so put it to sleep. // This is postcondition #3. stream->put_client_to_sleep(client); - } else if (more_data && ret == bytes_to_send) { + } else if (more_data && size_t(ret) == bytes_to_send) { goto sending_data_again; } break; @@ -434,7 +454,7 @@ int Server::parse_request(Client *client) } if (setsockopt(client->sock, SOL_SOCKET, SO_MARK, &client->fwmark, sizeof(client->fwmark)) == -1) { if (client->fwmark != 0) { - perror("setsockopt(SO_MARK)"); + log_perror("setsockopt(SO_MARK)"); } } client->request.clear(); @@ -444,7 +464,27 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - client->header_or_error = find_stream(client->stream_id)->header; + Stream *stream = find_stream(client->stream_id); + if (stream->encoding == Stream::STREAM_ENCODING_RAW) { + client->header_or_error = stream->http_header + + "\r\n" + + stream->stream_header; + } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) { + client->header_or_error = stream->http_header + + "Content-encoding: metacube\r\n" + + "\r\n"; + if (!stream->stream_header.empty()) { + metacube_block_header hdr; + memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); + hdr.size = htonl(stream->stream_header.size()); + hdr.flags = htonl(METACUBE_FLAGS_HEADER); + client->header_or_error.append( + string(reinterpret_cast(&hdr), sizeof(hdr))); + } + client->header_or_error.append(stream->stream_header); + } else { + assert(false); + } // Switch states. client->state = Client::SENDING_HEADER; @@ -454,7 +494,7 @@ void Server::construct_header(Client *client) ev.data.u64 = reinterpret_cast(client); if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_MOD)"); + log_perror("epoll_ctl(EPOLL_CTL_MOD)"); exit(1); } } @@ -474,7 +514,7 @@ void Server::construct_error(Client *client, int error_code) ev.data.u64 = reinterpret_cast(client); if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { - perror("epoll_ctl(EPOLL_CTL_MOD)"); + log_perror("epoll_ctl(EPOLL_CTL_MOD)"); exit(1); } } @@ -489,7 +529,7 @@ void delete_from(vector *v, T elem) void Server::close_client(Client *client) { if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) { - perror("epoll_ctl(EPOLL_CTL_DEL)"); + log_perror("epoll_ctl(EPOLL_CTL_DEL)"); exit(1); } @@ -503,6 +543,9 @@ void Server::close_client(Client *client) } } + // Log to access_log. + access_log->write(client->get_stats()); + // Bye-bye! int ret; do { @@ -510,7 +553,7 @@ void Server::close_client(Client *client) } while (ret == -1 && errno == EINTR); if (ret == -1) { - perror("close"); + log_perror("close"); } clients.erase(client->sock);