CXXFLAGS=-Wall -O2 -g
LDLIBS=-lcurl -lpthread -lprotobuf
-OBJS=cubemap.o server.o mutexlock.o input.o state.pb.o
+OBJS=cubemap.o server.o serverpool.o mutexlock.o input.o state.pb.o
all: cubemap
#include "metacube.h"
#include "server.h"
+#include "serverpool.h"
#include "input.h"
#include "state.pb.h"
using namespace std;
-Server *servers = NULL;
+ServerPool *servers = NULL;
volatile bool hupped = false;
void hup(int ignored)
}
// Pick a server, round-robin, and hand over the socket to it.
- servers[num_accepted % NUM_SERVERS].add_client(sock);
+ servers->add_client(sock);
++num_accepted;
}
}
exit(1);
}
- // Create the servers.
- servers = new Server[NUM_SERVERS];
+ servers = new ServerPool(NUM_SERVERS);
int server_sock = -1, old_port = -1;
if (argc == 4 && strcmp(argv[2], "-state") == 0) {
// Deserialize the streams.
for (int i = 0; i < loaded_state.streams_size(); ++i) {
- for (int j = 0; j < NUM_SERVERS; ++j) {
- servers[j].add_stream_from_serialized(loaded_state.streams(i));
- }
+ servers->add_stream_from_serialized(loaded_state.streams(i));
}
// Put back the existing clients. It doesn't matter which server we
// allocate them to, so just do round-robin.
for (int i = 0; i < loaded_state.clients_size(); ++i) {
- servers[i % NUM_SERVERS].add_client_from_serialized(loaded_state.clients(i));
+ servers->add_client_from_serialized(loaded_state.clients(i));
}
// Deserialize the server socket.
fprintf(stderr, "done.\n");
} else{
// TODO: This should come from the config file.
- for (int i = 0; i < NUM_SERVERS; ++i) {
- servers[i].add_stream(STREAM_ID);
- }
+ servers->add_stream(STREAM_ID);
}
// Open a new server socket if we do not already have one, or if we changed ports.
server_sock = create_server_socket(port);
}
- // Start up all the servers!
- for (int i = 0; i < NUM_SERVERS; ++i) {
- servers[i].run();
- }
+ servers->run();
pthread_t acceptor_thread;
pthread_create(&acceptor_thread, NULL, acceptor_thread_run, reinterpret_cast<void *>(server_sock));
CubemapStateProto state;
state.set_server_sock(server_sock);
state.set_port(port);
- for (int i = 0; i < NUM_SERVERS; ++i) {
- servers[i].stop();
+ for (int i = 0; i < NUM_SERVERS; ++i) {
+ servers->get_server(i)->stop();
- CubemapStateProto local_state = servers[i].serialize();
+ CubemapStateProto local_state = servers->get_server(i)->serialize();
// The stream state should be identical between the servers, so we only store it once.
if (i == 0) {
state.add_clients()->MergeFrom(local_state.clients(j));
}
}
- delete[] servers;
+ delete servers;
fprintf(stderr, "Serializing state and re-execing...\n");
int state_fd = make_tempfile(state);
#include "mutexlock.h"
#include "input.h"
#include "server.h"
+#include "serverpool.h"
using namespace std;
-extern Server *servers;
+extern ServerPool *servers;
Input::Input(const string &stream_id, const string &url)
: stream_id(stream_id),
{
if (flags & METACUBE_FLAGS_HEADER) {
string header(data, data + size);
- for (int i = 0; i < NUM_SERVERS; ++i) {
- servers[i].set_header(stream_id, header);
- }
+ servers->set_header(stream_id, header);
} else {
- for (int i = 0; i < NUM_SERVERS; ++i) {
- servers[i].add_data(stream_id, data, size);
- }
+ servers->add_data(stream_id, data, size);
}
}
#include <stdint.h>
#include <pthread.h>
+#include <sys/epoll.h>
#include <string>
#include <map>
+#include <vector>
-#define NUM_SERVERS 4
#define BACKLOG_SIZE 1048576
#define EPOLL_MAX_EVENTS 8192
#define EPOLL_TIMEOUT_MS 20
--- /dev/null
+#include "serverpool.h"
+
+using namespace std;
+
+ServerPool::ServerPool(int size)
+ : servers(new Server[size]),
+ num_servers(size),
+ clients_added(0)
+{
+}
+
+ServerPool::~ServerPool()
+{
+ delete[] servers;
+}
+
+void ServerPool::add_client(int sock)
+{
+ servers[clients_added++ % num_servers].add_client(sock);
+}
+
+void ServerPool::add_client_from_serialized(const ClientProto &client)
+{
+ servers[clients_added++ % num_servers].add_client_from_serialized(client);
+}
+
+void ServerPool::add_stream(const std::string &stream_id)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].add_stream(stream_id);
+ }
+}
+
+void ServerPool::add_stream_from_serialized(const StreamProto &stream)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].add_stream_from_serialized(stream);
+ }
+}
+
+void ServerPool::set_header(const std::string &stream_id, const std::string &header)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_header(stream_id, header);
+ }
+}
+
+void ServerPool::add_data(const std::string &stream_id, const char *data, size_t bytes)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].add_data(stream_id, data, bytes);
+ }
+}
+
+void ServerPool::run()
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].run();
+ }
+}
--- /dev/null
+#ifndef _SERVERPOOL_H
+#define _SERVERPOOL_H 1
+
+#include "server.h"
+
+// Provides services such as load-balancing between a number of Server instances.
+class ServerPool {
+public:
+ ServerPool(int num_servers);
+ ~ServerPool();
+
+ // Accessor. Only to be used in rare situations, really.
+ // The ServerPool retains ownership.
+ Server *get_server(int num) { return &servers[num]; }
+
+ // Picks a server (round-robin) and allocates the given client to it.
+ void add_client(int sock);
+ void add_client_from_serialized(const ClientProto &client);
+
+ // Adds the given stream to all the servers.
+ void add_stream(const std::string &stream_id);
+ void add_stream_from_serialized(const StreamProto &stream);
+
+ // Adds the given data to all the servers.
+ void set_header(const std::string &stream_id, const std::string &header);
+ void add_data(const std::string &stream_id, const char *data, size_t bytes);
+
+ // Starts all the servers.
+ void run();
+
+private:
+ Server *servers;
+ int num_servers, clients_added;
+
+ ServerPool(const ServerPool &);
+};
+
+#endif // !defined(_SERVERPOOL_H)