Support Metacube _output_. Required splitting HTTP headers from stream headers, which...
[cubemap] / serverpool.cpp
1 #include <google/protobuf/repeated_field.h>
2
3 #include "client.h"
4 #include "server.h"
5 #include "serverpool.h"
6 #include "state.pb.h"
7
8 using namespace std;
9
10 ServerPool::ServerPool(int size)
11         : servers(new Server[size]),
12           num_servers(size),
13           clients_added(0)
14 {
15 }
16
17 ServerPool::~ServerPool()
18 {
19         delete[] servers;
20 }
21         
22 CubemapStateProto ServerPool::serialize()
23 {
24         CubemapStateProto state;
25
26         for (int i = 0; i < num_servers; ++i) {
27                 CubemapStateProto local_state = servers[i].serialize();
28
29                 // The stream state should be identical between the servers, so we only store it once.
30                 if (i == 0) {
31                         state.mutable_streams()->MergeFrom(local_state.streams());
32                 }
33                 for (int j = 0; j < local_state.clients_size(); ++j) {
34                         state.add_clients()->MergeFrom(local_state.clients(j));
35                 }
36         }
37
38         return state;
39 }
40
41 void ServerPool::add_client(int sock)
42 {
43         servers[clients_added++ % num_servers].add_client_deferred(sock);
44 }
45
46 void ServerPool::add_client_from_serialized(const ClientProto &client)
47 {
48         servers[clients_added++ % num_servers].add_client_from_serialized(client);
49 }
50
51 void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
52 {
53         for (int i = 0; i < num_servers; ++i) {
54                 servers[i].add_stream(stream_id, backlog_size, encoding);
55         }
56 }
57
58 void ServerPool::add_stream_from_serialized(const StreamProto &stream)
59 {
60         for (int i = 0; i < num_servers; ++i) {
61                 servers[i].add_stream_from_serialized(stream);
62         }
63 }
64
65 void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
66 {
67         for (int i = 0; i < num_servers; ++i) {
68                 servers[i].set_header(stream_id, http_header, stream_header);
69         }
70 }
71
72 void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
73 {
74         for (int i = 0; i < num_servers; ++i) {
75                 servers[i].add_data_deferred(stream_id, data, bytes);
76         }
77 }
78
79 void ServerPool::run()
80 {
81         for (int i = 0; i < num_servers; ++i) {
82                 servers[i].run();
83         }
84 }
85         
86 void ServerPool::stop()
87 {
88         for (int i = 0; i < num_servers; ++i) {
89                 servers[i].stop();
90         }
91 }
92         
93 vector<ClientStats> ServerPool::get_client_stats() const
94 {
95         vector<ClientStats> ret;
96         for (int i = 0; i < num_servers; ++i) {
97                 vector<ClientStats> stats = servers[i].get_client_stats();
98                 ret.insert(ret.end(), stats.begin(), stats.end());
99         }
100         return ret;
101 }
102         
103 void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
104 {
105         for (int i = 0; i < num_servers; ++i) {
106                 servers[i].set_mark_pool(stream_id, mark_pool);
107         }       
108 }
109
110 void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
111 {
112         for (int i = 0; i < num_servers; ++i) {
113                 servers[i].set_backlog_size(stream_id, new_size);
114         }       
115 }
116
117 void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
118 {
119         for (int i = 0; i < num_servers; ++i) {
120                 servers[i].set_encoding(stream_id, encoding);
121         }       
122 }