From: Steinar H. Gunderson Date: Sat, 13 Apr 2013 20:02:18 +0000 (+0200) Subject: Move Client and Stream into their own files. X-Git-Tag: 1.0.0~124 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=195dc469133d0daed6ac69cdef373dc8dade9637 Move Client and Stream into their own files. --- diff --git a/Makefile b/Makefile index d7209aa..77fe962 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g LDLIBS=-lpthread -lprotobuf -OBJS=main.o server.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o thread.o util.o state.pb.o +OBJS=main.o client.o server.o stream.o serverpool.o mutexlock.o input.o httpinput.o udpinput.o parse.o config.o markpool.o acceptor.o stats.o thread.o util.o state.pb.o all: cubemap diff --git a/client.cpp b/client.cpp new file mode 100644 index 0000000..be47e0e --- /dev/null +++ b/client.cpp @@ -0,0 +1,90 @@ +#include +#include +#include +#include +#include + +#include "client.h" +#include "markpool.h" +#include "stream.h" +#include "state.pb.h" + +using namespace std; + +Client::Client(int sock) + : sock(sock), + fwmark(0), + connect_time(time(NULL)), + state(Client::READING_REQUEST), + stream(NULL), + header_or_error_bytes_sent(0), + bytes_sent(0) +{ + request.reserve(1024); + + // Find the remote address, and convert it to ASCII. + sockaddr_in6 addr; + socklen_t addr_len = sizeof(addr); + + if (getpeername(sock, reinterpret_cast(&addr), &addr_len) == -1) { + perror("getpeername"); + remote_addr = ""; + } else { + char buf[INET6_ADDRSTRLEN]; + if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) { + perror("inet_ntop"); + remote_addr = ""; + } else { + remote_addr = buf; + } + } +} + +Client::Client(const ClientProto &serialized, Stream *stream) + : sock(serialized.sock()), + remote_addr(serialized.remote_addr()), + connect_time(serialized.connect_time()), + state(State(serialized.state())), + request(serialized.request()), + stream_id(serialized.stream_id()), + stream(stream), + header_or_error(serialized.header_or_error()), + header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()), + bytes_sent(serialized.bytes_sent()) +{ + if (stream->mark_pool != NULL) { + fwmark = stream->mark_pool->get_mark(); + } else { + fwmark = 0; // No mark. + } + if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { + if (fwmark != 0) { + perror("setsockopt(SO_MARK)"); + } + } +} + +ClientProto Client::serialize() const +{ + ClientProto serialized; + serialized.set_sock(sock); + serialized.set_remote_addr(remote_addr); + serialized.set_connect_time(connect_time); + serialized.set_state(state); + serialized.set_request(request); + serialized.set_stream_id(stream_id); + serialized.set_header_or_error(header_or_error); + serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()); + serialized.set_bytes_sent(bytes_sent); + return serialized; +} + +ClientStats Client::get_stats() const +{ + ClientStats stats; + stats.stream_id = stream_id; + stats.remote_addr = remote_addr; + stats.connect_time = connect_time; + stats.bytes_sent = bytes_sent; + return stats; +} diff --git a/client.h b/client.h new file mode 100644 index 0000000..87fa718 --- /dev/null +++ b/client.h @@ -0,0 +1,65 @@ +#ifndef _CLIENT_H +#define _CLIENT_H 1 + +// A Client represents a single connection from a client (watching a single stream). + +#include +#include + +class ClientProto; +class Stream; + +// Digested statistics for writing to logs etc. +struct ClientStats { + std::string stream_id; + std::string remote_addr; + time_t connect_time; + size_t bytes_sent; +}; + +struct Client { + Client() {} + Client(int sock); + + // Serialization/deserialization. + Client(const ClientProto &serialized, Stream *stream); + ClientProto serialize() const; + + ClientStats get_stats() const; + + // The file descriptor associated with this socket. + int sock; + + // The fwmark associated with this socket (or 0). + int fwmark; + + // Some information only used for logging. + std::string remote_addr; + time_t connect_time; + + enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR }; + State state; + + // The HTTP request, as sent by the client. If we are in READING_REQUEST, + // this might not be finished. + std::string request; + + // What stream we're connecting to; parsed from . + // Not relevant for READING_REQUEST. + std::string stream_id; + Stream *stream; + + // The header we want to send. This is nominally a copy of Stream::header, + // but since that might change on reconnects etc., we keep a local copy here. + // Only relevant for SENDING_HEADER or SENDING_ERROR; blank otherwise. + std::string header_or_error; + + // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER + // or SENDING_ERROR. + size_t header_or_error_bytes_sent; + + // Number of bytes we've sent of data. Only relevant for SENDING_DATA. + size_t bytes_sent; +}; + +#endif // !defined(_CLIENT_H) diff --git a/server.cpp b/server.cpp index 0c36e49..64819f4 100644 --- a/server.cpp +++ b/server.cpp @@ -19,160 +19,14 @@ #include #include "markpool.h" -#include "metacube.h" +#include "parse.h" #include "server.h" +#include "stream.h" #include "mutexlock.h" -#include "parse.h" -#include "util.h" #include "state.pb.h" using namespace std; -Client::Client(int sock) - : sock(sock), - fwmark(0), - connect_time(time(NULL)), - state(Client::READING_REQUEST), - stream(NULL), - header_or_error_bytes_sent(0), - bytes_sent(0) -{ - request.reserve(1024); - - // Find the remote address, and convert it to ASCII. - sockaddr_in6 addr; - socklen_t addr_len = sizeof(addr); - - if (getpeername(sock, reinterpret_cast(&addr), &addr_len) == -1) { - perror("getpeername"); - remote_addr = ""; - } else { - char buf[INET6_ADDRSTRLEN]; - if (inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf)) == NULL) { - perror("inet_ntop"); - remote_addr = ""; - } else { - remote_addr = buf; - } - } -} - -Client::Client(const ClientProto &serialized, Stream *stream) - : sock(serialized.sock()), - remote_addr(serialized.remote_addr()), - connect_time(serialized.connect_time()), - state(State(serialized.state())), - request(serialized.request()), - stream_id(serialized.stream_id()), - stream(stream), - header_or_error(serialized.header_or_error()), - header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()), - bytes_sent(serialized.bytes_sent()) -{ - if (stream->mark_pool != NULL) { - fwmark = stream->mark_pool->get_mark(); - } else { - fwmark = 0; // No mark. - } - if (setsockopt(sock, SOL_SOCKET, SO_MARK, &fwmark, sizeof(fwmark)) == -1) { - if (fwmark != 0) { - perror("setsockopt(SO_MARK)"); - } - } -} - -ClientProto Client::serialize() const -{ - ClientProto serialized; - serialized.set_sock(sock); - serialized.set_remote_addr(remote_addr); - serialized.set_connect_time(connect_time); - serialized.set_state(state); - serialized.set_request(request); - serialized.set_stream_id(stream_id); - serialized.set_header_or_error(header_or_error); - serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()); - serialized.set_bytes_sent(bytes_sent); - return serialized; -} - -ClientStats Client::get_stats() const -{ - ClientStats stats; - stats.stream_id = stream_id; - stats.remote_addr = remote_addr; - stats.connect_time = connect_time; - stats.bytes_sent = bytes_sent; - return stats; -} - -Stream::Stream(const string &stream_id, size_t backlog_size) - : stream_id(stream_id), - data_fd(make_tempfile("")), - backlog_size(backlog_size), - bytes_received(0), - mark_pool(NULL) -{ - if (data_fd == -1) { - exit(1); - } -} - -Stream::~Stream() -{ - if (data_fd != -1) { - int ret; - do { - ret = close(data_fd); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - perror("close"); - } - } -} - -Stream::Stream(const StreamProto &serialized) - : stream_id(serialized.stream_id()), - header(serialized.header()), - data_fd(make_tempfile(serialized.data())), - backlog_size(serialized.backlog_size()), - bytes_received(serialized.bytes_received()), - mark_pool(NULL) -{ - if (data_fd == -1) { - exit(1); - } -} - -StreamProto Stream::serialize() -{ - StreamProto serialized; - serialized.set_header(header); - if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd. - exit(1); - } - serialized.set_backlog_size(backlog_size); - serialized.set_bytes_received(bytes_received); - serialized.set_stream_id(stream_id); - data_fd = -1; - return serialized; -} - -void Stream::put_client_to_sleep(Client *client) -{ - sleeping_clients.push_back(client); -} - -void Stream::wake_up_all_clients() -{ - if (to_process.empty()) { - swap(sleeping_clients, to_process); - } else { - to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end()); - sleeping_clients.clear(); - } -} - Server::Server() { pthread_mutex_init(&mutex, NULL); diff --git a/server.h b/server.h index 7910f4c..343adf9 100644 --- a/server.h +++ b/server.h @@ -9,125 +9,17 @@ #include #include +#include "client.h" #include "thread.h" #define EPOLL_MAX_EVENTS 8192 #define EPOLL_TIMEOUT_MS 20 #define MAX_CLIENT_REQUEST 16384 -class ClientProto; class CubemapStateProto; class MarkPool; -class Stream; class StreamProto; -// Digested statistics for writing to logs etc. -struct ClientStats { - std::string stream_id; - std::string remote_addr; - time_t connect_time; - size_t bytes_sent; -}; - -struct Client { - Client() {} - Client(int sock); - - // Serialization/deserialization. - Client(const ClientProto &serialized, Stream *stream); - ClientProto serialize() const; - - ClientStats get_stats() const; - - // The file descriptor associated with this socket. - int sock; - - // The fwmark associated with this socket (or 0). - int fwmark; - - // Some information only used for logging. - std::string remote_addr; - time_t connect_time; - - enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR }; - State state; - - // The HTTP request, as sent by the client. If we are in READING_REQUEST, - // this might not be finished. - std::string request; - - // What stream we're connecting to; parsed from . - // Not relevant for READING_REQUEST. - std::string stream_id; - Stream *stream; - - // The header we want to send. This is nominally a copy of Stream::header, - // but since that might change on reconnects etc., we keep a local copy here. - // Only relevant for SENDING_HEADER or SENDING_ERROR; blank otherwise. - std::string header_or_error; - - // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER - // or SENDING_ERROR. - size_t header_or_error_bytes_sent; - - // Number of bytes we've sent of data. Only relevant for SENDING_DATA. - size_t bytes_sent; -}; - -struct Stream { - Stream(const std::string &stream_id, size_t backlog_size); - ~Stream(); - - // Serialization/deserialization. - Stream(const StreamProto &serialized); - StreamProto serialize(); - - std::string stream_id; - - // The HTTP response header, plus the video stream header (if any). - std::string header; - - // The stream data itself, stored in a circular buffer. - // - // We store our data in a file, so that we can send the data to the - // kernel only once (with write()). We then use sendfile() for each - // client, which effectively zero-copies it out of the kernel's buffer - // cache. This is significantly more efficient than doing write() from - // a userspace memory buffer, since the latter makes the kernel copy - // the same data from userspace many times. - int data_fd; - - // How many bytes can hold (the buffer size). - size_t backlog_size; - - // How many bytes this stream have received. Can very well be larger - // than , since the buffer wraps. - size_t bytes_received; - - // Clients that are in SENDING_DATA, but that we don't listen on, - // because we currently don't have any data for them. - // See put_client_to_sleep() and wake_up_all_clients(). - std::vector sleeping_clients; - - // Clients that we recently got data for (when they were in - // ). - std::vector to_process; - - // What pool to fetch marks from, or NULL. - MarkPool *mark_pool; - - // Put client to sleep, since there is no more data for it; we will on - // longer listen on POLLOUT until we get more data. Also, it will be put - // in the list of clients to wake up when we do. - void put_client_to_sleep(Client *client); - - // We have more data, so mark all clients that are sleeping as ready to go. - void wake_up_all_clients(); - -private: - Stream(const Stream& other); -}; - class Server : public Thread { public: Server(); diff --git a/stream.cpp b/stream.cpp new file mode 100644 index 0000000..322391e --- /dev/null +++ b/stream.cpp @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include +#include + +#include "stream.h" +#include "util.h" +#include "state.pb.h" + +using namespace std; + +Stream::Stream(const string &stream_id, size_t backlog_size) + : stream_id(stream_id), + data_fd(make_tempfile("")), + backlog_size(backlog_size), + bytes_received(0), + mark_pool(NULL) +{ + if (data_fd == -1) { + exit(1); + } +} + +Stream::~Stream() +{ + if (data_fd != -1) { + int ret; + do { + ret = close(data_fd); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + perror("close"); + } + } +} + +Stream::Stream(const StreamProto &serialized) + : stream_id(serialized.stream_id()), + header(serialized.header()), + data_fd(make_tempfile(serialized.data())), + backlog_size(serialized.backlog_size()), + bytes_received(serialized.bytes_received()), + mark_pool(NULL) +{ + if (data_fd == -1) { + exit(1); + } +} + +StreamProto Stream::serialize() +{ + StreamProto serialized; + serialized.set_header(header); + if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd. + exit(1); + } + serialized.set_backlog_size(backlog_size); + serialized.set_bytes_received(bytes_received); + serialized.set_stream_id(stream_id); + data_fd = -1; + return serialized; +} + +void Stream::put_client_to_sleep(Client *client) +{ + sleeping_clients.push_back(client); +} + +void Stream::wake_up_all_clients() +{ + if (to_process.empty()) { + swap(sleeping_clients, to_process); + } else { + to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end()); + sleeping_clients.clear(); + } +} diff --git a/stream.h b/stream.h new file mode 100644 index 0000000..256946c --- /dev/null +++ b/stream.h @@ -0,0 +1,69 @@ +#ifndef _STREAM_H +#define _STREAM_H 1 + +// Representation of a single, muxed (we only really care about bytes/blocks) stream. +// Fed by Input, sent out by Server (to Client). + +#include +#include +#include + +class Client; +class MarkPool; +class StreamProto; + +struct Stream { + Stream(const std::string &stream_id, size_t backlog_size); + ~Stream(); + + // Serialization/deserialization. + Stream(const StreamProto &serialized); + StreamProto serialize(); + + std::string stream_id; + + // The HTTP response header, plus the video stream header (if any). + std::string header; + + // The stream data itself, stored in a circular buffer. + // + // We store our data in a file, so that we can send the data to the + // kernel only once (with write()). We then use sendfile() for each + // client, which effectively zero-copies it out of the kernel's buffer + // cache. This is significantly more efficient than doing write() from + // a userspace memory buffer, since the latter makes the kernel copy + // the same data from userspace many times. + int data_fd; + + // How many bytes can hold (the buffer size). + size_t backlog_size; + + // How many bytes this stream have received. Can very well be larger + // than , since the buffer wraps. + size_t bytes_received; + + // Clients that are in SENDING_DATA, but that we don't listen on, + // because we currently don't have any data for them. + // See put_client_to_sleep() and wake_up_all_clients(). + std::vector sleeping_clients; + + // Clients that we recently got data for (when they were in + // ). + std::vector to_process; + + // What pool to fetch marks from, or NULL. + MarkPool *mark_pool; + + // Put client to sleep, since there is no more data for it; we will on + // longer listen on POLLOUT until we get more data. Also, it will be put + // in the list of clients to wake up when we do. + void put_client_to_sleep(Client *client); + + // We have more data, so mark all clients that are sleeping as ready to go. + void wake_up_all_clients(); + +private: + Stream(const Stream& other); +}; + +#endif // !defined(_STREAM_H)