+
+ if (is_tls) {
+ assert(tls_server_contexts.count(acceptor));
+ client_ptr->tls_context = tls_accept(tls_server_contexts[acceptor]);
+ if (client_ptr->tls_context == nullptr) {
+ log(ERROR, "tls_accept() failed");
+ close_client(client_ptr);
+ return;
+ }
+ tls_make_exportable(client_ptr->tls_context, 1);
+ }
+
+ process_client(client_ptr);
+}
+
+void Server::add_client_from_serialized(const ClientProto &client, const vector<shared_ptr<const string>> &short_responses)
+{
+ lock_guard<mutex> lock(mu);
+ Stream *stream;
+ int stream_index = lookup_stream_by_url(client.url());
+ if (stream_index == -1) {
+ assert(client.state() != Client::SENDING_DATA);
+ stream = nullptr;
+ } else {
+ stream = streams[stream_index].get();
+ }
+ auto inserted = clients.insert(make_pair(client.sock(), Client(client, short_responses, stream)));
+ assert(inserted.second == true); // Should not already exist.
+ Client *client_ptr = &inserted.first->second;
+
+ // Connection timestamps must be nondecreasing.
+ assert(clients_ordered_by_connect_time.empty() ||
+ !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
+ clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, client.sock()));
+
+ // Start listening on data from this socket.
+ epoll_event ev;
+ if (client.state() == Client::READING_REQUEST) {
+ // See the corresponding comment in Server::add_client().
+ if (client.has_tls_context()) {
+ ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
+ } else {
+ ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
+ }
+ } else {
+ // If we don't have more data for this client, we'll be putting it into
+ // the sleeping array again soon.
+ ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
+ }
+ ev.data.ptr = client_ptr;
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
+ log_perror("epoll_ctl(EPOLL_CTL_ADD)");
+ exit(1);
+ }
+
+ if (client_ptr->state == Client::WAITING_FOR_KEYFRAME ||
+ client_ptr->state == Client::PREBUFFERING ||
+ (client_ptr->state == Client::SENDING_DATA &&
+ client_ptr->stream_pos == client_ptr->stream->bytes_received)) {
+ client_ptr->stream->put_client_to_sleep(client_ptr);
+ } else {
+ process_client(client_ptr);
+ }
+}
+
+void Server::start_client_timeout_timer(Client *client)
+{
+ // Connection timestamps must be nondecreasing. I can't find any guarantee
+ // that even the monotonic clock can't go backwards by a small amount
+ // (think switching between CPUs with non-synchronized TSCs), so if
+ // this actually should happen, we hack around it by fudging
+ // connect_time.
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &client->connect_time) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ } else {
+ if (!clients_ordered_by_connect_time.empty() &&
+ is_earlier(client->connect_time, clients_ordered_by_connect_time.back().first)) {
+ client->connect_time = clients_ordered_by_connect_time.back().first;
+ }
+ clients_ordered_by_connect_time.push(make_pair(client->connect_time, client->sock));
+ }
+}
+
+int Server::lookup_stream_by_url(const string &url) const
+{
+ const auto stream_url_it = stream_url_map.find(url);
+ if (stream_url_it == stream_url_map.end()) {
+ return -1;
+ }
+ return stream_url_it->second;
+}
+
+int Server::add_stream(const string &url,
+ const string &hls_url,
+ size_t backlog_size,
+ size_t prebuffering_bytes,
+ Stream::Encoding encoding,
+ Stream::Encoding src_encoding,
+ unsigned hls_frag_duration,
+ size_t hls_backlog_margin,
+ const string &allow_origin)
+{
+ lock_guard<mutex> lock(mu);
+ stream_url_map.insert(make_pair(url, streams.size()));
+ if (!hls_url.empty()) {
+ stream_hls_url_map.insert(make_pair(hls_url, streams.size()));
+ }
+ streams.emplace_back(new Stream(url, backlog_size, prebuffering_bytes, encoding, src_encoding, hls_frag_duration, hls_backlog_margin, allow_origin));
+ return streams.size() - 1;
+}
+
+int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
+{
+ lock_guard<mutex> lock(mu);
+ stream_url_map.insert(make_pair(stream.url(), streams.size()));
+ // stream_hls_url_map will be updated in register_hls_url(), since it is not part
+ // of the serialized state (it will always be picked out from the configuration).
+ streams.emplace_back(new Stream(stream, data_fd));
+ return streams.size() - 1;
+}
+
+void Server::add_hls_zombie_from_serialized(const HLSZombieProto &zombie_proto)
+{
+ lock_guard<mutex> lock(mu);
+ HLSZombie zombie;
+ zombie.remote_addr = zombie_proto.remote_addr();
+ zombie.url = zombie_proto.url();
+ zombie.referer = zombie_proto.referer();
+ zombie.user_agent = zombie_proto.user_agent();
+ zombie.expires.tv_sec = zombie_proto.expires_sec();
+ zombie.expires.tv_nsec = zombie_proto.expires_nsec();
+ hls_zombies[zombie_proto.key()] = move(zombie);
+}
+
+void Server::set_backlog_size(int stream_index, size_t new_size)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->set_backlog_size(new_size);
+}
+
+void Server::set_prebuffering_bytes(int stream_index, size_t new_amount)
+{
+ lock_guard<mutex> lock(mu);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->prebuffering_bytes = new_amount;