X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.h;h=2cb078e3fc418bdd6b47665bba31635b3af6e4e2;hp=f1cc7a0dbfc0d56116739ebccda95829d8cfe577;hb=b59fa7ce2d47f135ea027548cc89f937a5fa875b;hpb=eb33692c9edee93e0883cd9f980d48dc7e17d801 diff --git a/server.h b/server.h index f1cc7a0..2cb078e 100644 --- a/server.h +++ b/server.h @@ -3,16 +3,28 @@ #include #include +#include #include #include +#include -#define NUM_SERVERS 4 #define BACKLOG_SIZE 1048576 #define EPOLL_MAX_EVENTS 8192 #define EPOLL_TIMEOUT_MS 20 #define MAX_CLIENT_REQUEST 16384 +class ClientProto; +class CubemapStateProto; +class StreamProto; + struct Client { + Client() {} + Client(int sock); + + // Serialization/deserialization. + Client(const ClientProto &serialized); + ClientProto serialize() const; + // The file descriptor associated with this socket. int sock; @@ -21,9 +33,9 @@ struct Client { // The HTTP request, as sent by the client. If we are in READING_REQUEST, // this might not be finished. - std::string client_request; + std::string request; - // What stream we're connecting to; parsed from client_request. + // What stream we're connecting to; parsed from . // Not relevant for READING_REQUEST. std::string stream_id; @@ -40,42 +52,62 @@ struct Client { }; struct Stream { + Stream(const std::string &stream_id); + ~Stream(); + + // Serialization/deserialization. + Stream(const StreamProto &serialized); + StreamProto serialize() const; + + std::string stream_id; + // The HTTP response header, plus the video stream header (if any). std::string header; // The stream data itself, stored in a circular buffer. - char data[BACKLOG_SIZE]; + char *data; // How many bytes contains. Can very well be larger than BACKLOG_SIZE, // since the buffer wraps. size_t data_size; + +private: + Stream(const Stream& other); }; class Server { public: Server(); + ~Server(); // Start a new thread that handles clients. void run(); + + // Stop the thread. + void stop(); + + CubemapStateProto serialize() const; + void add_client(int sock); + void add_client_from_serialized(const ClientProto &client); + void add_stream(const std::string &stream_id); + void add_stream_from_serialized(const StreamProto &stream); + void set_header(const std::string &stream_id, const std::string &header); void add_data(const std::string &stream_id, const char *data, size_t bytes); private: - void process_client(Client *client); - - // Close a given client socket, and clean up after it. - void close_client(Client *client); - - // Parse the HTTP request, construct the header, and set the client into - // the SENDING_HEADER state. - void parse_request(Client *client); + pthread_t worker_thread; + // All variables below this line are protected by the mutex. pthread_mutex_t mutex; + // If the thread should stop or not. + bool should_stop; + // Map from stream ID to stream. - std::map streams; + std::map streams; // Map from file descriptor to client. std::map clients; @@ -84,11 +116,39 @@ private: int epoll_fd; epoll_event events[EPOLL_MAX_EVENTS]; + // Clients that are in SENDING_DATA, but that we don't listen on, + // because we currently don't have any data for them. + // See put_client_to_sleep() and wake_up_all_clients(). + std::vector sleeping_clients; + // Recover the this pointer, and call do_work(). static void *do_work_thunk(void *arg); // The actual worker thread. void do_work(); + + void process_client(Client *client); + + // Close a given client socket, and clean up after it. + void close_client(Client *client); + + // Parse the HTTP request. + void parse_request(Client *client); + + // Construct the HTTP header, and set the client into + // the SENDING_HEADER state. + void construct_header(Client *client); + + // Put client to sleep, since there is no more data for it; we will on + // longer listen on POLLOUT until we get more data. Also, it will be put + // in the list of clients to wake up when we do. + void put_client_to_sleep(Client *client); + + // We have more data, so mark all clients that are sleeping as ready to go. + void wake_up_all_clients(); + + // TODO: This function should probably die. + Stream *find_stream(const std::string &stream_id); }; #endif // !defined(_SERVER_H)