CXXFLAGS=-Wall -O2 -g
LDLIBS=-lpthread -lprotobuf
-OBJS=main.o server.o serverpool.o mutexlock.o input.o parse.o markpool.o acceptor.o stats.o state.pb.o
+OBJS=main.o server.o serverpool.o mutexlock.o input.o parse.o markpool.o acceptor.o stats.o thread.o state.pb.o
all: cubemap
{
}
-void AcceptorThread::run()
-{
- should_stop = false;
- pthread_create(&worker_thread, NULL, &AcceptorThread::do_work_thunk, this);
-}
-
-void AcceptorThread::stop()
-{
- should_stop = true;
- pthread_kill(worker_thread, SIGHUP);
- if (pthread_join(worker_thread, NULL) == -1) {
- perror("pthread_join");
- exit(1);
- }
-}
-
-void *AcceptorThread::do_work_thunk(void *arg)
-{
- AcceptorThread *acceptor_thread = reinterpret_cast<AcceptorThread *>(arg);
- acceptor_thread->do_work();
- return NULL;
-}
-
void AcceptorThread::do_work()
{
while (!hupped) {
#ifndef _ACCEPTOR_H
#define _ACCEPTOR_H
+#include "thread.h"
+
int create_server_socket(int port);
// A thread that accepts new connections on a given socket,
// and hands them off to the server pool.
-class AcceptorThread {
+class AcceptorThread : public Thread {
public:
AcceptorThread(int server_sock);
- void run();
- void stop();
private:
- // Recovers the this pointer, and hands over control to do_work().
- static void *do_work_thunk(void *arg);
-
- void do_work();
+ virtual void do_work();
int server_sock;
-
- pthread_t worker_thread;
- volatile bool should_stop;
};
#endif // !defined(_ACCEPTOR_H)
return serialized;
}
-void Input::run()
-{
- should_stop = false;
-
- // Joinable is already the default, but it's good to be certain.
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- pthread_create(&worker_thread, &attr, Input::do_work_thunk, this);
-}
-
-void Input::stop()
-{
- should_stop = true;
-
- pthread_kill(worker_thread, SIGHUP);
- if (pthread_join(worker_thread, NULL) == -1) {
- perror("pthread_join");
- exit(1);
- }
-}
-
-void *Input::do_work_thunk(void *arg)
-{
- Input *input = static_cast<Input *>(arg);
- input->do_work();
- return NULL;
-}
-
int Input::lookup_and_connect(const string &host, const string &port)
{
addrinfo *ai;
#include <vector>
#include <string>
+#include "thread.h"
+
class InputProto;
-class Input {
+class Input : public Thread {
public:
Input(const std::string &stream_id, const std::string &url);
Input(const InputProto &serialized);
InputProto serialize() const;
- // Connect to the given URL and start streaming.
- void run();
-
- // Stops the streaming, but lets the file descriptor stay open.
- void stop();
-
std::string get_url() const { return url; }
private:
- // Recovers the this pointer and calls do_work().
- static void *do_work_thunk(void *arg);
-
// Actually does the download.
- void do_work();
+ virtual void do_work();
// Open a socket that connects to the given host and port. Does DNS resolving.
int lookup_and_connect(const std::string &host, const std::string &port);
// The socket we are downloading on (or -1).
int sock;
-
- // Handle to the thread that actually does the download.
- pthread_t worker_thread;
-
- // Whether we should stop or not.
- volatile bool should_stop;
};
#endif // !defined(_INPUT_H)
}
}
-void Server::run()
-{
- should_stop = false;
-
- // Joinable is already the default, but it's good to be certain.
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- pthread_create(&worker_thread, &attr, Server::do_work_thunk, this);
-}
-
-void Server::stop()
-{
- {
- MutexLock lock(&mutex);
- should_stop = true;
- }
-
- pthread_kill(worker_thread, SIGHUP);
- if (pthread_join(worker_thread, NULL) == -1) {
- perror("pthread_join");
- exit(1);
- }
-}
-
vector<ClientStats> Server::get_client_stats() const
{
vector<ClientStats> ret;
return ret;
}
-void *Server::do_work_thunk(void *arg)
-{
- Server *server = static_cast<Server *>(arg);
- server->do_work();
- return NULL;
-}
-
void Server::do_work()
{
for ( ;; ) {
#include <map>
#include <vector>
+#include "thread.h"
+
#define BACKLOG_SIZE 1048576
#define EPOLL_MAX_EVENTS 8192
#define EPOLL_TIMEOUT_MS 20
Stream(const Stream& other);
};
-class Server {
+class Server : public Thread {
public:
Server();
~Server();
- // Start a new thread that handles clients.
- void run();
-
- // Stop the thread.
- void stop();
-
// Get the list of all currently connected clients.
std::vector<ClientStats> get_client_stats() const;
void add_stream_from_serialized(const StreamProto &stream);
private:
- pthread_t worker_thread;
-
// Mutex protecting queued_data only. Note that if you want to hold both this
// and <mutex> below, you will need to take <mutex> before this one.
mutable pthread_mutex_t queued_data_mutex;
// All variables below this line are protected by the mutex.
mutable pthread_mutex_t mutex;
- // If the thread should stop or not.
- bool should_stop;
-
// Map from stream ID to stream.
std::map<std::string, Stream *> streams;
int epoll_fd;
epoll_event events[EPOLL_MAX_EVENTS];
- // Recover the this pointer, and call do_work().
- static void *do_work_thunk(void *arg);
-
// The actual worker thread.
- void do_work();
+ virtual void do_work();
// Process a client; read and write data as far as we can.
// After this call, one of these four is true:
{
}
-void StatsThread::run()
-{
- should_stop = false;
- pthread_create(&worker_thread, NULL, do_work_thunk, this);
-}
-
-void StatsThread::stop()
-{
- should_stop = true;
- pthread_kill(worker_thread, SIGHUP);
- if (pthread_join(worker_thread, NULL) == -1) {
- perror("pthread_join");
- exit(1);
- }
-}
-
-void *StatsThread::do_work_thunk(void *arg)
-{
- StatsThread *stats_thread = reinterpret_cast<StatsThread *>(arg);
- stats_thread->do_work();
- return NULL;
-}
-
void StatsThread::do_work()
{
while (!should_stop) {
#ifndef _STATS_H
#define _STATS_H 1
-#include <pthread.h>
+#include "thread.h"
#include <string>
// A thread that regularly writes out statistics, ie. a list of all connected clients
// with some information about each.
-class StatsThread {
+class StatsThread : public Thread {
public:
StatsThread(const std::string &stats_file, int stats_interval);
- void run();
- void stop();
private:
- // Recover the this pointer, and call do_work().
- static void *do_work_thunk(void *arg);
-
- void do_work();
+ virtual void do_work();
std::string stats_file;
int stats_interval;
-
- pthread_t worker_thread;
- volatile bool should_stop;
};
#endif // !defined(_STATS_H)
--- /dev/null
+#include <stdio.h>
+#include <stdlib.h>
+#include <signal.h>
+
+#include "thread.h"
+
+void Thread::run()
+{
+ should_stop = false;
+ pthread_create(&worker_thread, NULL, &Thread::do_work_thunk, this);
+}
+
+void Thread::stop()
+{
+ should_stop = true;
+ pthread_kill(worker_thread, SIGHUP);
+ if (pthread_join(worker_thread, NULL) == -1) {
+ perror("pthread_join");
+ exit(1);
+ }
+}
+
+void *Thread::do_work_thunk(void *arg)
+{
+ Thread *thread = reinterpret_cast<Thread *>(arg);
+ thread->do_work();
+ return NULL;
+}
+
--- /dev/null
+#ifndef _THREAD_H
+#define _THREAD_H
+
+#include <pthread.h>
+
+// A rather generic thread class with start/stop functionality.
+// NOTE: stop is somewhat racy (there's no guaranteed breakout from syscalls),
+// since signals don't stick. We'll need to figure out something more
+// intelligent later, probably based on sending a signal to an fd.
+
+class Thread {
+public:
+ void run();
+ void stop();
+
+protected:
+ // Recovers the this pointer, and calls do_work().
+ static void *do_work_thunk(void *arg);
+
+ virtual void do_work() = 0;
+
+ volatile bool should_stop;
+
+private:
+ pthread_t worker_thread;
+};
+
+#endif // !defined(_THREAD_H)