Revert "Rewrite the entire internal signal handling/wakeup."
[cubemap] / serverpool.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <google/protobuf/repeated_field.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6
7 #include "client.h"
8 #include "log.h"
9 #include "server.h"
10 #include "serverpool.h"
11 #include "state.pb.h"
12 #include "util.h"
13
14 using namespace std;
15
16 ServerPool::ServerPool(int size)
17         : servers(new Server[size]),
18           num_servers(size),
19           clients_added(0)
20 {
21 }
22
23 ServerPool::~ServerPool()
24 {
25         delete[] servers;
26 }
27         
28 CubemapStateProto ServerPool::serialize()
29 {
30         CubemapStateProto state;
31
32         for (int i = 0; i < num_servers; ++i) {
33                 CubemapStateProto local_state = servers[i].serialize();
34
35                 // The stream state should be identical between the servers, so we only store it once,
36                 // save for the fds, which we keep around to distribute to the servers after re-exec.
37                 if (i == 0) {
38                         state.mutable_streams()->MergeFrom(local_state.streams());
39                 } else {
40                         assert(state.streams_size() == local_state.streams_size());
41                         for (int j = 0; j < local_state.streams_size(); ++j) {
42                                 assert(local_state.streams(j).data_fds_size() == 1);
43                                 state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
44                         }
45                 }
46                 for (int j = 0; j < local_state.clients_size(); ++j) {
47                         state.add_clients()->MergeFrom(local_state.clients(j));
48                 }
49         }
50
51         return state;
52 }
53
54 void ServerPool::add_client(int sock)
55 {
56         servers[clients_added++ % num_servers].add_client_deferred(sock);
57 }
58
59 void ServerPool::add_client_from_serialized(const ClientProto &client)
60 {
61         servers[clients_added++ % num_servers].add_client_from_serialized(client);
62 }
63
64 void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
65 {
66         for (int i = 0; i < num_servers; ++i) {
67                 servers[i].add_stream(stream_id, backlog_size, encoding);
68         }
69 }
70
71 void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
72 {
73         assert(!data_fds.empty());
74         string contents;
75         for (int i = 0; i < num_servers; ++i) {
76                 int data_fd;
77                 if (i < int(data_fds.size())) {
78                         // Reuse one of the existing file descriptors.
79                         data_fd = data_fds[i];
80                 } else {
81                         // Clone the first one.
82                         if (contents.empty()) {
83                                 if (!read_tempfile(data_fds[0], &contents)) {
84                                         exit(1);
85                                 }
86                         }
87                         data_fd = make_tempfile(contents);
88                 }
89
90                 servers[i].add_stream_from_serialized(stream, data_fd);
91         }
92
93         // Close and delete any leftovers, if the number of servers was reduced.
94         for (size_t i = num_servers; i < data_fds.size(); ++i) {
95                 int ret;
96                 do {
97                         ret = close(data_fds[i]);  // Implicitly deletes the file.
98                 } while (ret == -1 && errno == EINTR);
99
100                 if (ret == -1) {
101                         log_perror("close");
102                         // Can still continue.
103                 }
104         }
105 }
106
107 void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
108 {
109         for (int i = 0; i < num_servers; ++i) {
110                 servers[i].set_header(stream_id, http_header, stream_header);
111         }
112 }
113
114 void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
115 {
116         for (int i = 0; i < num_servers; ++i) {
117                 servers[i].add_data_deferred(stream_id, data, bytes);
118         }
119 }
120
121 void ServerPool::run()
122 {
123         for (int i = 0; i < num_servers; ++i) {
124                 servers[i].run();
125         }
126 }
127         
128 void ServerPool::stop()
129 {
130         for (int i = 0; i < num_servers; ++i) {
131                 servers[i].stop();
132         }
133 }
134         
135 vector<ClientStats> ServerPool::get_client_stats() const
136 {
137         vector<ClientStats> ret;
138         for (int i = 0; i < num_servers; ++i) {
139                 vector<ClientStats> stats = servers[i].get_client_stats();
140                 ret.insert(ret.end(), stats.begin(), stats.end());
141         }
142         return ret;
143 }
144         
145 void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
146 {
147         for (int i = 0; i < num_servers; ++i) {
148                 servers[i].set_mark_pool(stream_id, mark_pool);
149         }       
150 }
151
152 void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
153 {
154         for (int i = 0; i < num_servers; ++i) {
155                 servers[i].set_backlog_size(stream_id, new_size);
156         }       
157 }
158
159 void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
160 {
161         for (int i = 0; i < num_servers; ++i) {
162                 servers[i].set_encoding(stream_id, encoding);
163         }       
164 }