X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=main.cpp;h=32d654cee654bef1bea675585c1a0c23250e3129;hp=abc7617d0ee3826cbde7ec0b119e0e0dfa91238f;hb=52d37b2815f4b144fbac899b7f9548353a6fbe3c;hpb=9dfb5deaface1a58a72f0640acc6b3c698098f47 diff --git a/main.cpp b/main.cpp index abc7617..32d654c 100644 --- a/main.cpp +++ b/main.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -22,7 +23,6 @@ #include "input.h" #include "input_stats.h" #include "log.h" -#include "markpool.h" #include "sa_compare.h" #include "serverpool.h" #include "state.pb.h" @@ -35,10 +35,21 @@ using namespace std; AccessLogThread *access_log = NULL; ServerPool *servers = NULL; -vector mark_pools; volatile bool hupped = false; volatile bool stopped = false; +namespace { + +struct OrderByConnectionTime { + bool operator() (const ClientProto &a, const ClientProto &b) const { + if (a.connect_time_sec() != b.connect_time_sec()) + return a.connect_time_sec() < b.connect_time_sec(); + return a.connect_time_nsec() < b.connect_time_nsec(); + } +}; + +} // namespace + struct InputWithRefcount { Input *input; int refcount; @@ -151,11 +162,6 @@ void create_streams(const Config &config, const set &deserialized_urls, multimap *inputs) { - for (unsigned i = 0; i < config.mark_pools.size(); ++i) { - const MarkPoolConfig &mp_config = config.mark_pools[i]; - mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); - } - // HTTP streams. set expecting_urls = deserialized_urls; for (unsigned i = 0; i < config.streams.size(); ++i) { @@ -173,19 +179,17 @@ void create_streams(const Config &config, if (deserialized_urls.count(stream_config.url) == 0) { stream_index = servers->add_stream(stream_config.url, stream_config.backlog_size, + stream_config.prebuffering_bytes, Stream::Encoding(stream_config.encoding)); } else { stream_index = servers->lookup_stream_by_url(stream_config.url); assert(stream_index != -1); servers->set_backlog_size(stream_index, stream_config.backlog_size); + servers->set_prebuffering_bytes(stream_index, stream_config.prebuffering_bytes); servers->set_encoding(stream_index, Stream::Encoding(stream_config.encoding)); } - if (stream_config.mark_pool != -1) { - servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); - } - servers->set_pacing_rate(stream_index, stream_config.pacing_rate); string src = stream_config.src; @@ -212,11 +216,11 @@ void create_streams(const Config &config, // UDP streams. for (unsigned i = 0; i < config.udpstreams.size(); ++i) { const UDPStreamConfig &udpstream_config = config.udpstreams[i]; - MarkPool *mark_pool = NULL; - if (udpstream_config.mark_pool != -1) { - mark_pool = mark_pools[udpstream_config.mark_pool]; - } - int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool, udpstream_config.pacing_rate); + int stream_index = servers->add_udpstream( + udpstream_config.dst, + udpstream_config.pacing_rate, + udpstream_config.ttl, + udpstream_config.multicast_iface_index); string src = udpstream_config.src; if (!src.empty()) { @@ -393,7 +397,7 @@ start: if (state_fd != -1) { log(INFO, "Deserializing state from previous process..."); string serialized; - if (!read_tempfile(state_fd, &serialized)) { + if (!read_tempfile_and_close(state_fd, &serialized)) { exit(1); } if (!loaded_state.ParseFromString(serialized)) { @@ -439,7 +443,7 @@ start: // Deserialize the acceptors. for (int i = 0; i < loaded_state.acceptors_size(); ++i) { - sockaddr_in6 sin6 = ExtractAddressFromAcceptorProto(loaded_state.acceptors(i)); + sockaddr_in6 sin6 = extract_address_from_acceptor_proto(loaded_state.acceptors(i)); deserialized_acceptors.insert(make_pair( sin6, new Acceptor(loaded_state.acceptors(i)))); @@ -454,10 +458,33 @@ start: // Find all streams in the configuration file, create them, and connect to the inputs. create_streams(config, deserialized_urls, &inputs); vector acceptors = create_acceptors(config, &deserialized_acceptors); + + // Convert old-style timestamps to new-style timestamps for all clients; + // this simplifies the sort below. + { + timespec now_monotonic; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now_monotonic) == -1) { + log(ERROR, "clock_gettime(CLOCK_MONOTONIC_COARSE) failed."); + exit(1); + } + long delta_sec = now_monotonic.tv_sec - time(NULL); + + for (int i = 0; i < loaded_state.clients_size(); ++i) { + ClientProto* client = loaded_state.mutable_clients(i); + if (client->has_connect_time_old()) { + client->set_connect_time_sec(client->connect_time_old() + delta_sec); + client->set_connect_time_nsec(now_monotonic.tv_nsec); + client->clear_connect_time_old(); + } + } + } // Put back the existing clients. It doesn't matter which server we - // allocate them to, so just do round-robin. However, we need to add - // them after the mark pools have been set up. + // allocate them to, so just do round-robin. However, we need to sort them + // by connection time first, since add_client_serialized() expects that. + sort(loaded_state.mutable_clients()->begin(), + loaded_state.mutable_clients()->end(), + OrderByConnectionTime()); for (int i = 0; i < loaded_state.clients_size(); ++i) { if (deleted_urls.count(loaded_state.clients(i).url()) != 0) { safe_close(loaded_state.clients(i).sock()); @@ -554,11 +581,6 @@ start: } delete servers; - for (unsigned i = 0; i < mark_pools.size(); ++i) { - delete mark_pools[i]; - } - mark_pools.clear(); - access_log->stop(); delete access_log; shut_down_logging();