Factor all the common thread starting/stopping into a common Thread class.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 10 Apr 2013 21:55:24 +0000 (23:55 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Wed, 10 Apr 2013 21:55:24 +0000 (23:55 +0200)
Makefile
acceptor.cpp
acceptor.h
input.cpp
input.h
server.cpp
server.h
stats.cpp
stats.h
thread.cpp [new file with mode: 0644]
thread.h [new file with mode: 0644]

index f2e37ac..647dfe9 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ PROTOC=protoc
 CXXFLAGS=-Wall -O2 -g
 LDLIBS=-lpthread -lprotobuf
 
-OBJS=main.o server.o serverpool.o mutexlock.o input.o parse.o markpool.o acceptor.o stats.o state.pb.o
+OBJS=main.o server.o serverpool.o mutexlock.o input.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o
 
 all: cubemap
 
index 776f39b..7620b79 100644 (file)
@@ -66,29 +66,6 @@ AcceptorThread::AcceptorThread(int server_sock)
 {
 }
 
-void AcceptorThread::run()
-{
-       should_stop = false;
-       pthread_create(&worker_thread, NULL, &AcceptorThread::do_work_thunk, this);
-}
-
-void AcceptorThread::stop()
-{
-       should_stop = true;
-       pthread_kill(worker_thread, SIGHUP);
-       if (pthread_join(worker_thread, NULL) == -1) {
-               perror("pthread_join");
-               exit(1);
-       }
-}
-
-void *AcceptorThread::do_work_thunk(void *arg)
-{
-       AcceptorThread *acceptor_thread = reinterpret_cast<AcceptorThread *>(arg);
-       acceptor_thread->do_work();
-       return NULL;
-}
-
 void AcceptorThread::do_work()
 {
        while (!hupped) {
index 5cd579d..9767233 100644 (file)
@@ -1,26 +1,20 @@
 #ifndef _ACCEPTOR_H
 #define _ACCEPTOR_H
 
+#include "thread.h"
+
 int create_server_socket(int port);
 
 // A thread that accepts new connections on a given socket,
 // and hands them off to the server pool.
-class AcceptorThread {
+class AcceptorThread : public Thread {
 public:
        AcceptorThread(int server_sock);
-       void run();
-       void stop();
 
 private:
-       // Recovers the this pointer, and hands over control to do_work().
-       static void *do_work_thunk(void *arg);
-
-       void do_work();
+       virtual void do_work();
 
        int server_sock;
-
-       pthread_t worker_thread;
-       volatile bool should_stop;
 };
 
 #endif  // !defined(_ACCEPTOR_H)
index ccc4bc0..a54e6fb 100644 (file)
--- a/input.cpp
+++ b/input.cpp
@@ -114,35 +114,6 @@ InputProto Input::serialize() const
        return serialized;
 }
 
-void Input::run()
-{
-       should_stop = false;
-       
-       // Joinable is already the default, but it's good to be certain.
-       pthread_attr_t attr;
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-       pthread_create(&worker_thread, &attr, Input::do_work_thunk, this);
-}
-
-void Input::stop()
-{
-       should_stop = true;
-
-       pthread_kill(worker_thread, SIGHUP);
-       if (pthread_join(worker_thread, NULL) == -1) {
-               perror("pthread_join");
-               exit(1);
-       }
-}
-
-void *Input::do_work_thunk(void *arg)
-{
-       Input *input = static_cast<Input *>(arg);
-       input->do_work();
-       return NULL;
-}
-
 int Input::lookup_and_connect(const string &host, const string &port)
 {
        addrinfo *ai;
diff --git a/input.h b/input.h
index 28bc14b..14537c8 100644 (file)
--- a/input.h
+++ b/input.h
@@ -4,9 +4,11 @@
 #include <vector>
 #include <string>
 
+#include "thread.h"
+
 class InputProto;
 
-class Input {
+class Input : public Thread {
 public:
        Input(const std::string &stream_id, const std::string &url);
 
@@ -14,20 +16,11 @@ public:
        Input(const InputProto &serialized);
        InputProto serialize() const;
 
-       // Connect to the given URL and start streaming.
-       void run();
-
-       // Stops the streaming, but lets the file descriptor stay open.
-       void stop();
-
        std::string get_url() const { return url; }
 
 private:
-       // Recovers the this pointer and calls do_work().
-       static void *do_work_thunk(void *arg);
-
        // Actually does the download.
-       void do_work();
+       virtual void do_work();
        
        // Open a socket that connects to the given host and port. Does DNS resolving.
        int lookup_and_connect(const std::string &host, const std::string &port);
@@ -81,12 +74,6 @@ private:
 
        // The socket we are downloading on (or -1).
        int sock;       
-
-       // Handle to the thread that actually does the download.
-       pthread_t worker_thread;
-
-       // Whether we should stop or not.
-       volatile bool should_stop;
 };
 
 #endif  // !defined(_INPUT_H)
index aed8779..d3fbaaf 100644 (file)
@@ -177,31 +177,6 @@ Server::~Server()
        }
 }
 
-void Server::run()
-{
-       should_stop = false;
-       
-       // Joinable is already the default, but it's good to be certain.
-       pthread_attr_t attr;
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-       pthread_create(&worker_thread, &attr, Server::do_work_thunk, this);
-}
-       
-void Server::stop()
-{
-       {
-               MutexLock lock(&mutex);
-               should_stop = true;
-       }
-
-       pthread_kill(worker_thread, SIGHUP);
-       if (pthread_join(worker_thread, NULL) == -1) {
-               perror("pthread_join");
-               exit(1);
-       }
-}
-       
 vector<ClientStats> Server::get_client_stats() const
 {
        vector<ClientStats> ret;
@@ -215,13 +190,6 @@ vector<ClientStats> Server::get_client_stats() const
        return ret;
 }
 
-void *Server::do_work_thunk(void *arg)
-{
-       Server *server = static_cast<Server *>(arg);
-       server->do_work();
-       return NULL;
-}
-
 void Server::do_work()
 {
        for ( ;; ) {
index 7979493..8d5261f 100644 (file)
--- a/server.h
+++ b/server.h
@@ -9,6 +9,8 @@
 #include <map>
 #include <vector>
 
+#include "thread.h"
+
 #define BACKLOG_SIZE 1048576
 #define EPOLL_MAX_EVENTS 8192
 #define EPOLL_TIMEOUT_MS 20
@@ -117,17 +119,11 @@ private:
        Stream(const Stream& other);
 };
 
-class Server {
+class Server : public Thread {
 public:
        Server();
        ~Server();
 
-       // Start a new thread that handles clients.
-       void run();
-
-       // Stop the thread.
-       void stop();
-       
        // Get the list of all currently connected clients.     
        std::vector<ClientStats> get_client_stats() const;
 
@@ -153,8 +149,6 @@ public:
        void add_stream_from_serialized(const StreamProto &stream);
 
 private:
-       pthread_t worker_thread;
-
        // Mutex protecting queued_data only. Note that if you want to hold both this
        // and <mutex> below, you will need to take <mutex> before this one.
        mutable pthread_mutex_t queued_data_mutex;
@@ -175,9 +169,6 @@ private:
        // All variables below this line are protected by the mutex.
        mutable pthread_mutex_t mutex;
 
-       // If the thread should stop or not.
-       bool should_stop;       
-
        // Map from stream ID to stream.
        std::map<std::string, Stream *> streams;
 
@@ -188,11 +179,8 @@ private:
        int epoll_fd;
        epoll_event events[EPOLL_MAX_EVENTS];
 
-       // Recover the this pointer, and call do_work().
-       static void *do_work_thunk(void *arg);
-
        // The actual worker thread.
-       void do_work();
+       virtual void do_work();
 
        // Process a client; read and write data as far as we can.
        // After this call, one of these four is true:
index 445bae7..1ae23e9 100644 (file)
--- a/stats.cpp
+++ b/stats.cpp
@@ -18,29 +18,6 @@ StatsThread::StatsThread(const std::string &stats_file, int stats_interval)
 {
 }
 
-void StatsThread::run()
-{
-       should_stop = false;
-       pthread_create(&worker_thread, NULL, do_work_thunk, this);
-}
-
-void StatsThread::stop()
-{
-       should_stop = true;
-       pthread_kill(worker_thread, SIGHUP);
-       if (pthread_join(worker_thread, NULL) == -1) {
-               perror("pthread_join");
-               exit(1);
-       }
-}
-       
-void *StatsThread::do_work_thunk(void *arg)
-{
-       StatsThread *stats_thread = reinterpret_cast<StatsThread *>(arg);
-       stats_thread->do_work();
-       return NULL;
-}
-
 void StatsThread::do_work()
 {
        while (!should_stop) {
diff --git a/stats.h b/stats.h
index c4fb17a..3f5fc0b 100644 (file)
--- a/stats.h
+++ b/stats.h
@@ -1,29 +1,21 @@
 #ifndef _STATS_H
 #define _STATS_H 1
 
-#include <pthread.h>
+#include "thread.h"
 #include <string>
 
 // A thread that regularly writes out statistics, ie. a list of all connected clients
 // with some information about each.
 
-class StatsThread {
+class StatsThread : public Thread {
 public:
        StatsThread(const std::string &stats_file, int stats_interval);
-       void run();
-       void stop();
 
 private:
-       // Recover the this pointer, and call do_work().
-       static void *do_work_thunk(void *arg);
-
-       void do_work();
+       virtual void do_work();
 
        std::string stats_file;
        int stats_interval;
-
-       pthread_t worker_thread;
-       volatile bool should_stop;
 };
        
 #endif  // !defined(_STATS_H)
diff --git a/thread.cpp b/thread.cpp
new file mode 100644 (file)
index 0000000..03bfd9d
--- /dev/null
@@ -0,0 +1,29 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <signal.h>
+
+#include "thread.h"
+
+void Thread::run()
+{
+       should_stop = false;
+       pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
+}
+
+void Thread::stop()
+{
+       should_stop = true;
+       pthread_kill(worker_thread, SIGHUP);
+       if (pthread_join(worker_thread, NULL) == -1) {
+               perror("pthread_join");
+               exit(1);
+       }
+}
+
+void *Thread::do_work_thunk(void *arg)
+{
+       Thread *thread = reinterpret_cast<Thread *>(arg);
+       thread->do_work();
+       return NULL;
+}
+
diff --git a/thread.h b/thread.h
new file mode 100644 (file)
index 0000000..d9b9535
--- /dev/null
+++ b/thread.h
@@ -0,0 +1,28 @@
+#ifndef _THREAD_H
+#define _THREAD_H
+
+#include <pthread.h>
+
+// A rather generic thread class with start/stop functionality.
+// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls),
+// since signals don't stick. We'll need to figure out something more
+// intelligent later, probably based on sending a signal to an fd.
+
+class Thread {
+public:
+       void run();
+       void stop();
+
+protected:
+       // Recovers the this pointer, and calls do_work().
+       static void *do_work_thunk(void *arg);
+
+       virtual void do_work() = 0;
+
+       volatile bool should_stop;
+
+private:
+       pthread_t worker_thread;
+};
+
+#endif  // !defined(_THREAD_H)