-Server::Server()
-{
- pthread_mutex_init(&mutex, NULL);
-
- epoll_fd = epoll_create(1024); // Size argument is ignored.
- if (epoll_fd == -1) {
- perror("epoll_fd");
- exit(1);
- }
-}
-
-void Server::run()
-{
- pthread_t thread;
- pthread_create(&thread, NULL, Server::do_work_thunk, this);
-}
-
-void *Server::do_work_thunk(void *arg)
-{
- Server *server = static_cast<Server *>(arg);
- server->do_work();
- return NULL;
-}
-
-void Server::do_work()
-{
- for ( ;; ) {
- MutexLock lock(&mutex);
- printf("server thread running\n");
- sleep(1);
- }
-}
-
-void Server::add_client(int sock)
-{
- MutexLock lock(&mutex);
- Client new_client;
- new_client.state = Client::READING_REQUEST;
- new_client.header_bytes_sent = 0;
- new_client.bytes_sent = 0;
-
- clients.insert(make_pair(sock, new_client));
-
- // Start listening on data from this socket.
- epoll_event ev;
- ev.events = EPOLLIN;
- ev.data.fd = sock;
- if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
- perror("epoll_ctl(EPOLL_CTL_ADD)");
- exit(1);
- }
-}
-
-void Server::add_stream(const string &stream_id)
-{
- // TODO
-}
-
-void Server::set_header(const string &stream_id, const string &header)
-{
- // TODO
- printf("got header! %lu bytes\n", header.size());
-}
-
-void Server::add_data(const string &stream_id, const char *data, size_t bytes)
-{
- // TODO
-}
-
-class Input {
-public:
- Input(const string &stream_id);
-
- // Connect to the given URL and start streaming.
- // WARNING: Currently this blocks; it does not run in a separate thread!
- void run(const string &url);
-
-private:
- // Recovers the this pointer and calls curl_callback().
- static size_t curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata);
-
- // Stores the given data, looks for Metacube blocks (skipping data if needed),
- // and calls process_block() for each one.
- void curl_callback(char *ptr, size_t bytes);
- void process_block(const char *data, uint32_t size, uint32_t flags);
-
- // Drops <num_bytes> bytes from the head of <pending_data>,
- // and outputs a warning.
- void drop_pending_data(size_t num_bytes);
-
- string stream_id;
-
- // Data we have received but not fully processed yet.
- vector<char> pending_data;
-
- // If <pending_data> starts with a Metacube header,
- // this is true.
- bool has_metacube_header;
-};
-
-Input::Input(const string &stream_id)
- : stream_id(stream_id),
- has_metacube_header(false)
-{
-}
-
-void Input::run(const string &url)
-{
- CURL *curl = curl_easy_init();
- curl_easy_setopt(curl, CURLOPT_URL, STREAM_URL);
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
- curl_easy_perform(curl);
-}
-
-size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata)