X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=input.cpp;h=6a53c22dc7a31cf3219aa70603a0a0446f0f5e0a;hp=29e4ed3975577e998184c27cf539419b77adb801;hb=b59fa7ce2d47f135ea027548cc89f937a5fa875b;hpb=519ddcdf0458032a2024d7acc57642fe27829dc0 diff --git a/input.cpp b/input.cpp index 29e4ed3..6a53c22 100644 --- a/input.cpp +++ b/input.cpp @@ -15,31 +15,73 @@ #include #include "metacube.h" +#include "mutexlock.h" #include "input.h" #include "server.h" +#include "serverpool.h" using namespace std; -extern Server *servers; +extern ServerPool *servers; -Input::Input(const string &stream_id) +Input::Input(const string &stream_id, const string &url) : stream_id(stream_id), + url(url), has_metacube_header(false) { } -void Input::run(const string &url) +void Input::run() +{ + should_stop = false; + + // Joinable is already the default, but it's good to be certain. + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_create(&worker_thread, &attr, Input::do_work_thunk, this); +} + +void Input::stop() +{ + should_stop = true; + + if (pthread_join(worker_thread, NULL) == -1) { + perror("pthread_join"); + exit(1); + } +} + +void *Input::do_work_thunk(void *arg) +{ + Input *input = static_cast(arg); + input->do_work(); + return NULL; +} + +void Input::do_work() { CURL *curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); - curl_easy_perform(curl); + + while (!should_stop) { + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); + curl_easy_perform(curl); + if (!should_stop) { + printf("Transfer ended, waiting 0.2 seconds and restarting...\n"); + usleep(200000); + } + } } size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata) { Input *input = static_cast(userdata); + if (input->should_stop) { + return 0; + } + size_t bytes = size * nmemb; input->curl_callback(ptr, bytes); return bytes; @@ -91,12 +133,13 @@ void Input::curl_callback(char *ptr, size_t bytes) return; } - process_block(pending_data.data(), size, flags); + process_block(pending_data.data() + sizeof(metacube_block_header), size, flags); // Consume this block. This isn't the most efficient way of dealing with things // should we have many blocks, but these routines don't need to be too efficient // anyway. pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size); + has_metacube_header = false; } } @@ -104,13 +147,9 @@ void Input::process_block(const char *data, uint32_t size, uint32_t flags) { if (flags & METACUBE_FLAGS_HEADER) { string header(data, data + size); - for (int i = 0; i < NUM_SERVERS; ++i) { - servers[i].set_header(stream_id, header); - } + servers->set_header(stream_id, header); } else { - for (int i = 0; i < NUM_SERVERS; ++i) { - servers[i].add_data(stream_id, data, size); - } + servers->add_data(stream_id, data, size); } }