X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=6e1005e1090ccd851c5690f3f6f4174c6c00f8ab;hp=f06685ab02224ea654b85d82f09d35a5dc456a5c;hb=5ab36b04b0c12058394335e891398c494df513d2;hpb=c2c9f6441f9ae8091a39aea0340417d5915e1ac9 diff --git a/server.cpp b/server.cpp index f06685a..6e1005e 100644 --- a/server.cpp +++ b/server.cpp @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include #include @@ -24,16 +26,36 @@ using namespace std; Client::Client(int sock) : sock(sock), + connect_time(time(NULL)), state(Client::READING_REQUEST), stream(NULL), header_or_error_bytes_sent(0), bytes_sent(0) { request.reserve(1024); + + // Find the remote address, and convert it to ASCII. + sockaddr_in6 addr; + socklen_t addr_len = sizeof(addr); + + if (getpeername(sock, reinterpret_cast(&addr), &addr_len) == -1) { + perror("getpeername"); + remote_addr = ""; + } else { + char buf[INET6_ADDRSTRLEN]; + if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) { + perror("inet_ntop"); + remote_addr = ""; + } else { + remote_addr = buf; + } + } } Client::Client(const ClientProto &serialized, Stream *stream) : sock(serialized.sock()), + remote_addr(serialized.remote_addr()), + connect_time(serialized.connect_time()), state(State(serialized.state())), request(serialized.request()), stream_id(serialized.stream_id()), @@ -48,6 +70,8 @@ ClientProto Client::serialize() const { ClientProto serialized; serialized.set_sock(sock); + serialized.set_remote_addr(remote_addr); + serialized.set_connect_time(connect_time); serialized.set_state(state); serialized.set_request(request); serialized.set_stream_id(stream_id); @@ -56,6 +80,16 @@ ClientProto Client::serialize() const serialized.set_bytes_sent(bytes_sent); return serialized; } + +ClientStats Client::get_stats() const +{ + ClientStats stats; + stats.stream_id = stream_id; + stats.remote_addr = remote_addr; + stats.connect_time = connect_time; + stats.bytes_sent = bytes_sent; + return stats; +} Stream::Stream(const string &stream_id) : stream_id(stream_id), @@ -147,11 +181,25 @@ void Server::stop() should_stop = true; } + pthread_kill(worker_thread, SIGHUP); if (pthread_join(worker_thread, NULL) == -1) { perror("pthread_join"); exit(1); } } + +vector Server::get_client_stats() const +{ + vector ret; + + MutexLock lock(&mutex); + for (map::const_iterator client_it = clients.begin(); + client_it != clients.end(); + ++client_it) { + ret.push_back(client_it->second.get_stats()); + } + return ret; +} void *Server::do_work_thunk(void *arg) { @@ -165,6 +213,9 @@ void Server::do_work() for ( ;; ) { int nfds = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, EPOLL_TIMEOUT_MS); if (nfds == -1 && errno == EINTR) { + if (should_stop) { + return; + } continue; } if (nfds == -1) { @@ -174,10 +225,6 @@ void Server::do_work() MutexLock lock(&mutex); // We release the mutex between iterations. - if (should_stop) { - return; - } - process_queued_data(); for (int i = 0; i < nfds; ++i) { @@ -196,11 +243,15 @@ void Server::do_work() for (map::iterator stream_it = streams.begin(); stream_it != streams.end(); ++stream_it) { - Stream *stream = stream_it->second; - for (size_t i = 0; i < stream->to_process.size(); ++i) { - process_client(stream->to_process[i]); + vector to_process; + swap(stream_it->second->to_process, to_process); + for (size_t i = 0; i < to_process.size(); ++i) { + process_client(to_process[i]); } - stream->to_process.clear(); + } + + if (should_stop) { + return; } } } @@ -538,8 +589,7 @@ int Server::parse_request(Client *client) void Server::construct_header(Client *client) { - client->header_or_error = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" + - find_stream(client->stream_id)->header; + client->header_or_error = find_stream(client->stream_id)->header; // Switch states. client->state = Client::SENDING_HEADER; @@ -575,6 +625,13 @@ void Server::construct_error(Client *client, int error_code) exit(1); } } + +template +void delete_from(vector *v, T elem) +{ + typename vector::iterator new_end = remove(v->begin(), v->end(), elem); + v->erase(new_end, v->end()); +} void Server::close_client(Client *client) { @@ -585,12 +642,8 @@ void Server::close_client(Client *client) // This client could be sleeping, so we'll need to fix that. (Argh, O(n).) if (client->stream != NULL) { - vector::iterator new_end = - remove(client->stream->sleeping_clients.begin(), - client->stream->sleeping_clients.end(), - client); - client->stream->sleeping_clients.erase( - new_end, client->stream->sleeping_clients.end()); + delete_from(&client->stream->sleeping_clients, client); + delete_from(&client->stream->to_process, client); } // Bye-bye!