From e1722a5c0341fd541ce57f1eed4dc76cbd3efe07 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Wed, 10 Apr 2013 23:55:24 +0200 Subject: [PATCH] Factor all the common thread starting/stopping into a common Thread class. --- Makefile | 2 +- acceptor.cpp | 23 ----------------------- acceptor.h | 14 ++++---------- input.cpp | 29 ----------------------------- input.h | 21 ++++----------------- server.cpp | 32 -------------------------------- server.h | 20 ++++---------------- stats.cpp | 23 ----------------------- stats.h | 14 +++----------- thread.cpp | 29 +++++++++++++++++++++++++++++ thread.h | 28 ++++++++++++++++++++++++++++ 11 files changed, 73 insertions(+), 162 deletions(-) create mode 100644 thread.cpp create mode 100644 thread.h diff --git a/Makefile b/Makefile index f2e37ac..647dfe9 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g LDLIBS=-lpthread -lprotobuf -OBJS=main.o server.o serverpool.o mutexlock.o input.o parse.o markpool.o acceptor.o stats.o state.pb.o +OBJS=main.o server.o serverpool.o mutexlock.o input.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o all: cubemap diff --git a/acceptor.cpp b/acceptor.cpp index 776f39b..7620b79 100644 --- a/acceptor.cpp +++ b/acceptor.cpp @@ -66,29 +66,6 @@ AcceptorThread::AcceptorThread(int server_sock) { } -void AcceptorThread::run() -{ - should_stop = false; - pthread_create(&worker_thread, NULL, &AcceptorThread::do_work_thunk, this); -} - -void AcceptorThread::stop() -{ - should_stop = true; - pthread_kill(worker_thread, SIGHUP); - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); - } -} - -void *AcceptorThread::do_work_thunk(void *arg) -{ - AcceptorThread *acceptor_thread = reinterpret_cast(arg); - acceptor_thread->do_work(); - return NULL; -} - void AcceptorThread::do_work() { while (!hupped) { diff --git a/acceptor.h b/acceptor.h index 5cd579d..9767233 100644 --- a/acceptor.h +++ b/acceptor.h @@ -1,26 +1,20 @@ #ifndef _ACCEPTOR_H #define _ACCEPTOR_H +#include "thread.h" + int create_server_socket(int port); // A thread that accepts new connections on a given socket, // and hands them off to the server pool. -class AcceptorThread { +class AcceptorThread : public Thread { public: AcceptorThread(int server_sock); - void run(); - void stop(); private: - // Recovers the this pointer, and hands over control to do_work(). - static void *do_work_thunk(void *arg); - - void do_work(); + virtual void do_work(); int server_sock; - - pthread_t worker_thread; - volatile bool should_stop; }; #endif // !defined(_ACCEPTOR_H) diff --git a/input.cpp b/input.cpp index ccc4bc0..a54e6fb 100644 --- a/input.cpp +++ b/input.cpp @@ -114,35 +114,6 @@ InputProto Input::serialize() const return serialized; } -void Input::run() -{ - should_stop = false; - - // Joinable is already the default, but it's good to be certain. - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&worker_thread, &attr, Input::do_work_thunk, this); -} - -void Input::stop() -{ - should_stop = true; - - pthread_kill(worker_thread, SIGHUP); - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); - } -} - -void *Input::do_work_thunk(void *arg) -{ - Input *input = static_cast(arg); - input->do_work(); - return NULL; -} - int Input::lookup_and_connect(const string &host, const string &port) { addrinfo *ai; diff --git a/input.h b/input.h index 28bc14b..14537c8 100644 --- a/input.h +++ b/input.h @@ -4,9 +4,11 @@ #include #include +#include "thread.h" + class InputProto; -class Input { +class Input : public Thread { public: Input(const std::string &stream_id, const std::string &url); @@ -14,20 +16,11 @@ public: Input(const InputProto &serialized); InputProto serialize() const; - // Connect to the given URL and start streaming. - void run(); - - // Stops the streaming, but lets the file descriptor stay open. - void stop(); - std::string get_url() const { return url; } private: - // Recovers the this pointer and calls do_work(). - static void *do_work_thunk(void *arg); - // Actually does the download. - void do_work(); + virtual void do_work(); // Open a socket that connects to the given host and port. Does DNS resolving. int lookup_and_connect(const std::string &host, const std::string &port); @@ -81,12 +74,6 @@ private: // The socket we are downloading on (or -1). int sock; - - // Handle to the thread that actually does the download. - pthread_t worker_thread; - - // Whether we should stop or not. - volatile bool should_stop; }; #endif // !defined(_INPUT_H) diff --git a/server.cpp b/server.cpp index aed8779..d3fbaaf 100644 --- a/server.cpp +++ b/server.cpp @@ -177,31 +177,6 @@ Server::~Server() } } -void Server::run() -{ - should_stop = false; - - // Joinable is already the default, but it's good to be certain. - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&worker_thread, &attr, Server::do_work_thunk, this); -} - -void Server::stop() -{ - { - MutexLock lock(&mutex); - should_stop = true; - } - - pthread_kill(worker_thread, SIGHUP); - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); - } -} - vector Server::get_client_stats() const { vector ret; @@ -215,13 +190,6 @@ vector Server::get_client_stats() const return ret; } -void *Server::do_work_thunk(void *arg) -{ - Server *server = static_cast(arg); - server->do_work(); - return NULL; -} - void Server::do_work() { for ( ;; ) { diff --git a/server.h b/server.h index 7979493..8d5261f 100644 --- a/server.h +++ b/server.h @@ -9,6 +9,8 @@ #include #include +#include "thread.h" + #define BACKLOG_SIZE 1048576 #define EPOLL_MAX_EVENTS 8192 #define EPOLL_TIMEOUT_MS 20 @@ -117,17 +119,11 @@ private: Stream(const Stream& other); }; -class Server { +class Server : public Thread { public: Server(); ~Server(); - // Start a new thread that handles clients. - void run(); - - // Stop the thread. - void stop(); - // Get the list of all currently connected clients. std::vector get_client_stats() const; @@ -153,8 +149,6 @@ public: void add_stream_from_serialized(const StreamProto &stream); private: - pthread_t worker_thread; - // Mutex protecting queued_data only. Note that if you want to hold both this // and below, you will need to take before this one. mutable pthread_mutex_t queued_data_mutex; @@ -175,9 +169,6 @@ private: // All variables below this line are protected by the mutex. mutable pthread_mutex_t mutex; - // If the thread should stop or not. - bool should_stop; - // Map from stream ID to stream. std::map streams; @@ -188,11 +179,8 @@ private: int epoll_fd; epoll_event events[EPOLL_MAX_EVENTS]; - // Recover the this pointer, and call do_work(). - static void *do_work_thunk(void *arg); - // The actual worker thread. - void do_work(); + virtual void do_work(); // Process a client; read and write data as far as we can. // After this call, one of these four is true: diff --git a/stats.cpp b/stats.cpp index 445bae7..1ae23e9 100644 --- a/stats.cpp +++ b/stats.cpp @@ -18,29 +18,6 @@ StatsThread::StatsThread(const std::string &stats_file, int stats_interval) { } -void StatsThread::run() -{ - should_stop = false; - pthread_create(&worker_thread, NULL, do_work_thunk, this); -} - -void StatsThread::stop() -{ - should_stop = true; - pthread_kill(worker_thread, SIGHUP); - if (pthread_join(worker_thread, NULL) == -1) { - perror("pthread_join"); - exit(1); - } -} - -void *StatsThread::do_work_thunk(void *arg) -{ - StatsThread *stats_thread = reinterpret_cast(arg); - stats_thread->do_work(); - return NULL; -} - void StatsThread::do_work() { while (!should_stop) { diff --git a/stats.h b/stats.h index c4fb17a..3f5fc0b 100644 --- a/stats.h +++ b/stats.h @@ -1,29 +1,21 @@ #ifndef _STATS_H #define _STATS_H 1 -#include +#include "thread.h" #include // A thread that regularly writes out statistics, ie. a list of all connected clients // with some information about each. -class StatsThread { +class StatsThread : public Thread { public: StatsThread(const std::string &stats_file, int stats_interval); - void run(); - void stop(); private: - // Recover the this pointer, and call do_work(). - static void *do_work_thunk(void *arg); - - void do_work(); + virtual void do_work(); std::string stats_file; int stats_interval; - - pthread_t worker_thread; - volatile bool should_stop; }; #endif // !defined(_STATS_H) diff --git a/thread.cpp b/thread.cpp new file mode 100644 index 0000000..03bfd9d --- /dev/null +++ b/thread.cpp @@ -0,0 +1,29 @@ +#include +#include +#include + +#include "thread.h" + +void Thread::run() +{ + should_stop = false; + pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this); +} + +void Thread::stop() +{ + should_stop = true; + pthread_kill(worker_thread, SIGHUP); + if (pthread_join(worker_thread, NULL) == -1) { + perror("pthread_join"); + exit(1); + } +} + +void *Thread::do_work_thunk(void *arg) +{ + Thread *thread = reinterpret_cast(arg); + thread->do_work(); + return NULL; +} + diff --git a/thread.h b/thread.h new file mode 100644 index 0000000..d9b9535 --- /dev/null +++ b/thread.h @@ -0,0 +1,28 @@ +#ifndef _THREAD_H +#define _THREAD_H + +#include + +// A rather generic thread class with start/stop functionality. +// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls), +// since signals don't stick. We'll need to figure out something more +// intelligent later, probably based on sending a signal to an fd. + +class Thread { +public: + void run(); + void stop(); + +protected: + // Recovers the this pointer, and calls do_work(). + static void *do_work_thunk(void *arg); + + virtual void do_work() = 0; + + volatile bool should_stop; + +private: + pthread_t worker_thread; +}; + +#endif // !defined(_THREAD_H) -- 2.39.2