Send backlog file descriptors around instead of going through the protobuf. Much...
[cubemap] / serverpool.cpp
1 #include <unistd.h>
2 #include <errno.h>
3
4 #include "client.h"
5 #include "log.h"
6 #include "server.h"
7 #include "serverpool.h"
8 #include "state.pb.h"
9 #include "util.h"
10
11 using namespace std;
12
13 ServerPool::ServerPool(int size)
14         : servers(new Server[size]),
15           num_servers(size),
16           clients_added(0)
17 {
18 }
19
20 ServerPool::~ServerPool()
21 {
22         delete[] servers;
23 }
24         
25 CubemapStateProto ServerPool::serialize()
26 {
27         CubemapStateProto state;
28
29         for (int i = 0; i < num_servers; ++i) {
30                 CubemapStateProto local_state = servers[i].serialize();
31
32                 // The stream state should be identical between the servers, so we only store it once,
33                 // save for the fds, which we keep around to distribute to the servers after re-exec.
34                 if (i == 0) {
35                         state.mutable_streams()->MergeFrom(local_state.streams());
36                 } else {
37                         assert(state.streams_size() == local_state.streams_size());
38                         for (int j = 0; j < local_state.streams_size(); ++j) {
39                                 assert(local_state.streams(j).data_fds_size() == 1);
40                                 state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
41                         }
42                 }
43                 for (int j = 0; j < local_state.clients_size(); ++j) {
44                         state.add_clients()->MergeFrom(local_state.clients(j));
45                 }
46         }
47
48         return state;
49 }
50
51 void ServerPool::add_client(int sock)
52 {
53         servers[clients_added++ % num_servers].add_client_deferred(sock);
54 }
55
56 void ServerPool::add_client_from_serialized(const ClientProto &client)
57 {
58         servers[clients_added++ % num_servers].add_client_from_serialized(client);
59 }
60
61 void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
62 {
63         for (int i = 0; i < num_servers; ++i) {
64                 servers[i].add_stream(stream_id, backlog_size, encoding);
65         }
66 }
67
68 void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
69 {
70         assert(!data_fds.empty());
71         string contents;
72         for (int i = 0; i < num_servers; ++i) {
73                 int data_fd;
74                 if (i < int(data_fds.size())) {
75                         // Reuse one of the existing file descriptors.
76                         data_fd = data_fds[i];
77                 } else {
78                         // Clone the first one.
79                         if (contents.empty()) {
80                                 if (!read_tempfile(data_fds[0], &contents)) {
81                                         exit(1);
82                                 }
83                         }
84                         data_fd = make_tempfile(contents);
85                 }
86
87                 servers[i].add_stream_from_serialized(stream, data_fd);
88         }
89
90         // Close and delete any leftovers, if the number of servers was reduced.
91         for (size_t i = num_servers; i < data_fds.size(); ++i) {
92                 int ret;
93                 do {
94                         ret = close(data_fds[i]);  // Implicitly deletes the file.
95                 } while (ret == -1 && errno == EINTR);
96
97                 if (ret == -1) {
98                         log_perror("close");
99                         // Can still continue.
100                 }
101         }
102 }
103
104 void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
105 {
106         for (int i = 0; i < num_servers; ++i) {
107                 servers[i].set_header(stream_id, http_header, stream_header);
108         }
109 }
110
111 void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
112 {
113         for (int i = 0; i < num_servers; ++i) {
114                 servers[i].add_data_deferred(stream_id, data, bytes);
115         }
116 }
117
118 void ServerPool::run()
119 {
120         for (int i = 0; i < num_servers; ++i) {
121                 servers[i].run();
122         }
123 }
124         
125 void ServerPool::stop()
126 {
127         for (int i = 0; i < num_servers; ++i) {
128                 servers[i].stop();
129         }
130 }
131         
132 vector<ClientStats> ServerPool::get_client_stats() const
133 {
134         vector<ClientStats> ret;
135         for (int i = 0; i < num_servers; ++i) {
136                 vector<ClientStats> stats = servers[i].get_client_stats();
137                 ret.insert(ret.end(), stats.begin(), stats.end());
138         }
139         return ret;
140 }
141         
142 void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
143 {
144         for (int i = 0; i < num_servers; ++i) {
145                 servers[i].set_mark_pool(stream_id, mark_pool);
146         }       
147 }
148
149 void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
150 {
151         for (int i = 0; i < num_servers; ++i) {
152                 servers[i].set_backlog_size(stream_id, new_size);
153         }       
154 }
155
156 void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
157 {
158         for (int i = 0; i < num_servers; ++i) {
159                 servers[i].set_encoding(stream_id, encoding);
160         }       
161 }