]> git.sesse.net Git - cubemap/blob - serverpool.cpp
Refer to streams internally mostly by an index, not the stream_id.
[cubemap] / serverpool.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <google/protobuf/repeated_field.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6
7 #include "client.h"
8 #include "log.h"
9 #include "server.h"
10 #include "serverpool.h"
11 #include "state.pb.h"
12 #include "util.h"
13
14 using namespace std;
15
16 ServerPool::ServerPool(int size)
17         : servers(new Server[size]),
18           num_servers(size),
19           clients_added(0)
20 {
21 }
22
23 ServerPool::~ServerPool()
24 {
25         delete[] servers;
26 }
27         
28 CubemapStateProto ServerPool::serialize()
29 {
30         CubemapStateProto state;
31
32         for (int i = 0; i < num_servers; ++i) {
33                 CubemapStateProto local_state = servers[i].serialize();
34
35                 // The stream state should be identical between the servers, so we only store it once,
36                 // save for the fds, which we keep around to distribute to the servers after re-exec.
37                 if (i == 0) {
38                         state.mutable_streams()->MergeFrom(local_state.streams());
39                 } else {
40                         assert(state.streams_size() == local_state.streams_size());
41                         for (int j = 0; j < local_state.streams_size(); ++j) {
42                                 assert(local_state.streams(j).data_fds_size() == 1);
43                                 state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
44                         }
45                 }
46                 for (int j = 0; j < local_state.clients_size(); ++j) {
47                         state.add_clients()->MergeFrom(local_state.clients(j));
48                 }
49         }
50
51         return state;
52 }
53
54 void ServerPool::add_client(int sock)
55 {
56         servers[clients_added++ % num_servers].add_client_deferred(sock);
57 }
58
59 void ServerPool::add_client_from_serialized(const ClientProto &client)
60 {
61         servers[clients_added++ % num_servers].add_client_from_serialized(client);
62 }
63
64 int ServerPool::lookup_stream_by_url(const std::string &url) const
65 {
66         assert(servers != NULL);
67         return servers[0].lookup_stream_by_url(url);
68 }
69
70 int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
71 {
72         int stream_index = -1;
73         for (int i = 0; i < num_servers; ++i) {
74                 int stream_index2 = servers[i].add_stream(url, backlog_size, encoding);
75                 if (i == 0) {
76                         stream_index = stream_index2;
77                 } else {
78                         // Verify that all servers have this under the same stream index.
79                         assert(stream_index == stream_index2);
80                 }
81         }
82         return stream_index;
83 }
84
85 int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
86 {
87         assert(!data_fds.empty());
88         string contents;
89         int stream_index = -1;
90         for (int i = 0; i < num_servers; ++i) {
91                 int data_fd;
92                 if (i < int(data_fds.size())) {
93                         // Reuse one of the existing file descriptors.
94                         data_fd = data_fds[i];
95                 } else {
96                         // Clone the first one.
97                         if (contents.empty()) {
98                                 if (!read_tempfile(data_fds[0], &contents)) {
99                                         exit(1);
100                                 }
101                         }
102                         data_fd = make_tempfile(contents);
103                 }
104
105                 int stream_index2 = servers[i].add_stream_from_serialized(stream, data_fd);
106                 if (i == 0) {
107                         stream_index = stream_index2;
108                 } else {
109                         // Verify that all servers have this under the same stream index.
110                         assert(stream_index == stream_index2);
111                 }
112         }
113
114         // Close and delete any leftovers, if the number of servers was reduced.
115         for (size_t i = num_servers; i < data_fds.size(); ++i) {
116                 safe_close(data_fds[i]);  // Implicitly deletes the file.
117         }
118
119         return stream_index;
120 }
121
122 void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header)
123 {
124         for (int i = 0; i < num_servers; ++i) {
125                 servers[i].set_header(stream_index, http_header, stream_header);
126         }
127 }
128
129 void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
130 {
131         for (int i = 0; i < num_servers; ++i) {
132                 servers[i].add_data_deferred(stream_index, data, bytes);
133         }
134 }
135
136 void ServerPool::run()
137 {
138         for (int i = 0; i < num_servers; ++i) {
139                 servers[i].run();
140         }
141 }
142         
143 void ServerPool::stop()
144 {
145         for (int i = 0; i < num_servers; ++i) {
146                 servers[i].stop();
147         }
148 }
149         
150 vector<ClientStats> ServerPool::get_client_stats() const
151 {
152         vector<ClientStats> ret;
153         for (int i = 0; i < num_servers; ++i) {
154                 vector<ClientStats> stats = servers[i].get_client_stats();
155                 ret.insert(ret.end(), stats.begin(), stats.end());
156         }
157         return ret;
158 }
159         
160 void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool)
161 {
162         for (int i = 0; i < num_servers; ++i) {
163                 servers[i].set_mark_pool(stream_index, mark_pool);
164         }       
165 }
166
167 void ServerPool::set_backlog_size(int stream_index, size_t new_size)
168 {
169         for (int i = 0; i < num_servers; ++i) {
170                 servers[i].set_backlog_size(stream_index, new_size);
171         }       
172 }
173
174 void ServerPool::set_encoding(int stream_index, Stream::Encoding encoding)
175 {
176         for (int i = 0; i < num_servers; ++i) {
177                 servers[i].set_encoding(stream_index, encoding);
178         }       
179 }