]> git.sesse.net Git - cubemap/blobdiff - input.cpp
Take the port from the configuration file.
[cubemap] / input.cpp
index 29e4ed3975577e998184c27cf539419b77adb801..541b5df6273bf7e59b0230acbcb1d936f679b977 100644 (file)
--- a/input.cpp
+++ b/input.cpp
@@ -15,6 +15,7 @@
 #include <map>
 
 #include "metacube.h"
+#include "mutexlock.h"
 #include "input.h"
 #include "server.h"
 
@@ -22,24 +23,64 @@ using namespace std;
 
 extern Server *servers;
 
-Input::Input(const string &stream_id)
+Input::Input(const string &stream_id, const string &url)
        : stream_id(stream_id),
+         url(url),
          has_metacube_header(false)
 {
 }
 
-void Input::run(const string &url)
+void Input::run()
+{
+       should_stop = false;
+       
+       // Joinable is already the default, but it's good to be certain.
+       pthread_attr_t attr;
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+       pthread_create(&worker_thread, &attr, Input::do_work_thunk, this);
+}
+
+void Input::stop()
+{
+       should_stop = true;
+
+       if (pthread_join(worker_thread, NULL) == -1) {
+               perror("pthread_join");
+               exit(1);
+       }
+}
+
+void *Input::do_work_thunk(void *arg)
+{
+       Input *input = static_cast<Input *>(arg);
+       input->do_work();
+       return NULL;
+}
+
+void Input::do_work()
 {
        CURL *curl = curl_easy_init();
-       curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
-       curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk);
-       curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
-       curl_easy_perform(curl);
+
+       while (!should_stop) {
+               curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
+               curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Input::curl_callback_thunk);
+               curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
+               curl_easy_perform(curl);
+               if (!should_stop) {
+                       printf("Transfer ended, waiting 0.2 seconds and restarting...\n");
+                       usleep(200000);
+               }
+       }
 }
 
 size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata)
 {
        Input *input = static_cast<Input *>(userdata);
+       if (input->should_stop) {
+               return 0;
+       }
+
        size_t bytes = size * nmemb;
        input->curl_callback(ptr, bytes);       
        return bytes;
@@ -91,12 +132,13 @@ void Input::curl_callback(char *ptr, size_t bytes)
                        return;
                }
 
-               process_block(pending_data.data(), size, flags);
+               process_block(pending_data.data() + sizeof(metacube_block_header), size, flags);
 
                // Consume this block. This isn't the most efficient way of dealing with things
                // should we have many blocks, but these routines don't need to be too efficient
                // anyway.
                pending_data.erase(pending_data.begin(), pending_data.begin() + sizeof(metacube_block_header) + size);
+               has_metacube_header = false;
        }
 }