Support configurable BACKLOG_SIZE (per-stream). No support for changing across restar...
[cubemap] / serverpool.cpp
1 #include "serverpool.h"
2 #include "state.pb.h"
3
4 using namespace std;
5
6 ServerPool::ServerPool(int size)
7         : servers(new Server[size]),
8           num_servers(size),
9           clients_added(0)
10 {
11 }
12
13 ServerPool::~ServerPool()
14 {
15         delete[] servers;
16 }
17         
18 CubemapStateProto ServerPool::serialize()
19 {
20         CubemapStateProto state;
21
22         for (int i = 0; i < num_servers; ++i) {
23                 CubemapStateProto local_state = servers[i].serialize();
24
25                 // The stream state should be identical between the servers, so we only store it once.
26                 if (i == 0) {
27                         state.mutable_streams()->MergeFrom(local_state.streams());
28                 }
29                 for (int j = 0; j < local_state.clients_size(); ++j) {
30                         state.add_clients()->MergeFrom(local_state.clients(j));
31                 }
32         }
33
34         return state;
35 }
36
37 void ServerPool::add_client(int sock)
38 {
39         servers[clients_added++ % num_servers].add_client_deferred(sock);
40 }
41
42 void ServerPool::add_client_from_serialized(const ClientProto &client)
43 {
44         servers[clients_added++ % num_servers].add_client_from_serialized(client);
45 }
46
47 void ServerPool::add_stream(const std::string &stream_id, size_t backlog_size)
48 {
49         for (int i = 0; i < num_servers; ++i) {
50                 servers[i].add_stream(stream_id, backlog_size);
51         }
52 }
53
54 void ServerPool::add_stream_from_serialized(const StreamProto &stream)
55 {
56         for (int i = 0; i < num_servers; ++i) {
57                 servers[i].add_stream_from_serialized(stream);
58         }
59 }
60
61 void ServerPool::set_header(const std::string &stream_id, const std::string &header)
62 {
63         for (int i = 0; i < num_servers; ++i) {
64                 servers[i].set_header(stream_id, header);
65         }
66 }
67
68 void ServerPool::add_data(const std::string &stream_id, const char *data, size_t bytes)
69 {
70         for (int i = 0; i < num_servers; ++i) {
71                 servers[i].add_data_deferred(stream_id, data, bytes);
72         }
73 }
74
75 void ServerPool::run()
76 {
77         for (int i = 0; i < num_servers; ++i) {
78                 servers[i].run();
79         }
80 }
81         
82 void ServerPool::stop()
83 {
84         for (int i = 0; i < num_servers; ++i) {
85                 servers[i].stop();
86         }
87 }
88         
89 vector<ClientStats> ServerPool::get_client_stats() const
90 {
91         vector<ClientStats> ret;
92         for (int i = 0; i < num_servers; ++i) {
93                 vector<ClientStats> stats = servers[i].get_client_stats();
94                 ret.insert(ret.end(), stats.begin(), stats.end());
95         }
96         return ret;
97 }
98         
99 void ServerPool::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool)
100 {
101         for (int i = 0; i < num_servers; ++i) {
102                 servers[i].set_mark_pool(stream_id, mark_pool);
103         }       
104 }