Rewrite the entire internal signal handling/wakeup.
[cubemap] / serverpool.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <google/protobuf/repeated_field.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6
7 #include "client.h"
8 #include "log.h"
9 #include "server.h"
10 #include "serverpool.h"
11 #include "state.pb.h"
12 #include "util.h"
13
14 using namespace std;
15
16 ServerPool::ServerPool(int size)
17         : servers(new Server[size]),
18           num_servers(size),
19           clients_added(0)
20 {
21 }
22
23 ServerPool::~ServerPool()
24 {
25         delete[] servers;
26 }
27         
28 CubemapStateProto ServerPool::serialize()
29 {
30         CubemapStateProto state;
31
32         for (int i = 0; i < num_servers; ++i) {
33                 CubemapStateProto local_state = servers[i].serialize();
34
35                 // The stream state should be identical between the servers, so we only store it once,
36                 // save for the fds, which we keep around to distribute to the servers after re-exec.
37                 if (i == 0) {
38                         state.mutable_streams()->MergeFrom(local_state.streams());
39                 } else {
40                         assert(state.streams_size() == local_state.streams_size());
41                         for (int j = 0; j < local_state.streams_size(); ++j) {
42                                 assert(local_state.streams(j).data_fds_size() == 1);
43                                 state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
44                         }
45                 }
46                 for (int j = 0; j < local_state.clients_size(); ++j) {
47                         state.add_clients()->MergeFrom(local_state.clients(j));
48                 }
49         }
50
51         return state;
52 }
53
54 void ServerPool::add_client(int sock)
55 {
56         servers[clients_added++ % num_servers].add_client_deferred(sock);
57 }
58
59 void ServerPool::add_client_from_serialized(const ClientProto &client)
60 {
61         servers[clients_added++ % num_servers].add_client_from_serialized(client);
62 }
63
64 void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
65 {
66         for (int i = 0; i < num_servers; ++i) {
67                 servers[i].add_stream(stream_id, backlog_size, encoding);
68         }
69 }
70
71 void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
72 {
73         assert(!data_fds.empty());
74         string contents;
75         for (int i = 0; i < num_servers; ++i) {
76                 int data_fd;
77                 if (i < int(data_fds.size())) {
78                         // Reuse one of the existing file descriptors.
79                         data_fd = data_fds[i];
80                 } else {
81                         // Clone the first one.
82                         if (contents.empty()) {
83                                 if (!read_tempfile(data_fds[0], &contents)) {
84                                         exit(1);
85                                 }
86                         }
87                         data_fd = make_tempfile(contents);
88                 }
89
90                 servers[i].add_stream_from_serialized(stream, data_fd);
91         }
92
93         // Close and delete any leftovers, if the number of servers was reduced.
94         for (size_t i = num_servers; i < data_fds.size(); ++i) {
95                 safe_close(data_fds[i]);  // Implicitly deletes the file.
96         }
97 }
98
99 void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
100 {
101         for (int i = 0; i < num_servers; ++i) {
102                 servers[i].set_header(stream_id, http_header, stream_header);
103         }
104 }
105
106 void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
107 {
108         for (int i = 0; i < num_servers; ++i) {
109                 servers[i].add_data_deferred(stream_id, data, bytes);
110         }
111 }
112
113 void ServerPool::run()
114 {
115         for (int i = 0; i < num_servers; ++i) {
116                 servers[i].run();
117         }
118 }
119         
120 void ServerPool::stop()
121 {
122         for (int i = 0; i < num_servers; ++i) {
123                 servers[i].stop();
124         }
125 }
126         
127 vector<ClientStats> ServerPool::get_client_stats() const
128 {
129         vector<ClientStats> ret;
130         for (int i = 0; i < num_servers; ++i) {
131                 vector<ClientStats> stats = servers[i].get_client_stats();
132                 ret.insert(ret.end(), stats.begin(), stats.end());
133         }
134         return ret;
135 }
136         
137 void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
138 {
139         for (int i = 0; i < num_servers; ++i) {
140                 servers[i].set_mark_pool(stream_id, mark_pool);
141         }       
142 }
143
144 void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
145 {
146         for (int i = 0; i < num_servers; ++i) {
147                 servers[i].set_backlog_size(stream_id, new_size);
148         }       
149 }
150
151 void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
152 {
153         for (int i = 0; i < num_servers; ++i) {
154                 servers[i].set_encoding(stream_id, encoding);
155         }       
156 }