}
}
}
- if (serialized.has_connect_time_old()) {
- // Do a rough conversion from time() to monotonic time.
- if (clock_gettime(CLOCK_MONOTONIC_COARSE, &connect_time) == -1) {
- log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
- return;
- }
- connect_time.tv_sec += serialized.connect_time_old() - time(NULL);
- } else {
- connect_time.tv_sec = serialized.connect_time_sec();
- connect_time.tv_nsec = serialized.connect_time_nsec();
- }
+ connect_time.tv_sec = serialized.connect_time_sec();
+ connect_time.tv_nsec = serialized.connect_time_nsec();
}
ClientProto Client::serialize() const
#include <sys/time.h>
#include <sys/wait.h>
#include <unistd.h>
+#include <algorithm>
#include <map>
#include <set>
#include <string>
volatile bool hupped = false;
volatile bool stopped = false;
+namespace {
+
+struct OrderByConnectionTime {
+ bool operator() (const ClientProto &a, const ClientProto &b) const {
+ if (a.connect_time_sec() != b.connect_time_sec())
+ return a.connect_time_sec() < b.connect_time_sec();
+ return a.connect_time_nsec() < b.connect_time_nsec();
+ }
+};
+
+} // namespace
+
struct InputWithRefcount {
Input *input;
int refcount;
// Find all streams in the configuration file, create them, and connect to the inputs.
create_streams(config, deserialized_urls, &inputs);
vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
+
+ // Convert old-style timestamps to new-style timestamps for all clients;
+ // this simplifies the sort below.
+ {
+ timespec now_monotonic;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &now_monotonic) == -1) {
+ log(ERROR, "clock_gettime(CLOCK_MONOTONIC_COARSE) failed.");
+ exit(1);
+ }
+ long delta_sec = now_monotonic.tv_sec - time(NULL);
+
+ for (int i = 0; i < loaded_state.clients_size(); ++i) {
+ ClientProto* client = loaded_state.mutable_clients(i);
+ if (client->has_connect_time_old()) {
+ client->set_connect_time_sec(client->connect_time_old() + delta_sec);
+ client->set_connect_time_nsec(now_monotonic.tv_nsec);
+ client->clear_connect_time_old();
+ }
+ }
+ }
// Put back the existing clients. It doesn't matter which server we
- // allocate them to, so just do round-robin.
+ // allocate them to, so just do round-robin. However, we need to sort them
+ // by connection time first, since add_client_serialized() expects that.
+ sort(loaded_state.mutable_clients()->begin(),
+ loaded_state.mutable_clients()->end(),
+ OrderByConnectionTime());
for (int i = 0; i < loaded_state.clients_size(); ++i) {
if (deleted_urls.count(loaded_state.clients(i).url()) != 0) {
safe_close(loaded_state.clients(i).sock());
extern AccessLogThread *access_log;
+namespace {
+
+inline bool is_equal(timespec a, timespec b)
+{
+ return a.tv_sec == b.tv_sec &&
+ a.tv_nsec == b.tv_nsec;
+}
+
+inline bool is_earlier(timespec a, timespec b)
+{
+ if (a.tv_sec != b.tv_sec)
+ return a.tv_sec < b.tv_sec;
+ return a.tv_nsec < b.tv_nsec;
+}
+
+} // namespace
+
Server::Server()
{
pthread_mutex_init(&mutex, NULL);
process_queued_data();
+ // Process each client where we have socket activity.
for (int i = 0; i < nfds; ++i) {
Client *client = reinterpret_cast<Client *>(events[i].data.u64);
process_client(client);
}
+ // Process each client where its stream has new data,
+ // even if there was no socket activity.
for (size_t i = 0; i < streams.size(); ++i) {
vector<Client *> to_process;
swap(streams[i]->to_process, to_process);
process_client(to_process[i]);
}
}
+
+ // Finally, go through each client to see if it's timed out
+ // in the READING_REQUEST state. (Seemingly there are clients
+ // that can hold sockets up for days at a time without sending
+ // anything at all.)
+ timespec timeout_time;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &timeout_time) == -1) {
+ log_perror("clock_gettime(CLOCK_MONOTONIC_COARSE)");
+ continue;
+ }
+ timeout_time.tv_sec -= REQUEST_READ_TIMEOUT_SEC;
+ while (!clients_ordered_by_connect_time.empty()) {
+ pair<timespec, int> &connect_time_and_fd = clients_ordered_by_connect_time.front();
+
+ // See if we have reached the end of clients to process.
+ if (is_earlier(timeout_time, connect_time_and_fd.first)) {
+ break;
+ }
+
+ // If this client doesn't exist anymore, just ignore it
+ // (it was deleted earlier).
+ std::map<int, Client>::iterator client_it = clients.find(connect_time_and_fd.second);
+ if (client_it == clients.end()) {
+ clients_ordered_by_connect_time.pop();
+ continue;
+ }
+ Client *client = &client_it->second;
+ if (!is_equal(client->connect_time, connect_time_and_fd.first)) {
+ // Another client has taken this fd in the meantime.
+ clients_ordered_by_connect_time.pop();
+ continue;
+ }
+
+ if (client->state != Client::READING_REQUEST) {
+ // Only READING_REQUEST can time out.
+ clients_ordered_by_connect_time.pop();
+ continue;
+ }
+
+ // OK, it timed out.
+ close_client(client);
+ clients_ordered_by_connect_time.pop();
+ }
}
}
assert(ret.second == true); // Should not already exist.
Client *client_ptr = &ret.first->second;
+ // Connection timestamps must be nondecreasing.
+ assert(clients_ordered_by_connect_time.empty() ||
+ !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
+ clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, sock));
+
// Start listening on data from this socket.
epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
assert(ret.second == true); // Should not already exist.
Client *client_ptr = &ret.first->second;
+ // Connection timestamps must be nondecreasing.
+ assert(clients_ordered_by_connect_time.empty() ||
+ !is_earlier(client_ptr->connect_time, clients_ordered_by_connect_time.back().first));
+ clients_ordered_by_connect_time.push(make_pair(client_ptr->connect_time, client.sock()));
+
// Start listening on data from this socket.
epoll_event ev;
if (client.state() == Client::READING_REQUEST) {
#include <sys/types.h>
#include <time.h>
#include <map>
+#include <queue>
#include <string>
#include <vector>
#define EPOLL_MAX_EVENTS 8192
#define EPOLL_TIMEOUT_MS 20
#define MAX_CLIENT_REQUEST 16384
+#define REQUEST_READ_TIMEOUT_SEC 60
class CubemapStateProto;
class StreamProto;
// Map from file descriptor to client.
std::map<int, Client> clients;
+ // A list of all clients, ordered by the time they connected (first element),
+ // and their file descriptor (second element). It is ordered by connection time
+ // (and thus also by read timeout time) so that we can read clients from the
+ // start and stop processing once we get to one that isn't ready to be
+ // timed out yet (which makes each processing run amortized O(1)).
+ //
+ // Note that when we delete a client, we don't update this queue.
+ // This means that when reading it, we need to check if the client it
+ // describes is still exists (ie., that the fd still exists, and that
+ // the timespec matches).
+ std::queue<std::pair<timespec, int> > clients_ordered_by_connect_time;
+
// Used for epoll implementation (obviously).
int epoll_fd;
epoll_event events[EPOLL_MAX_EVENTS];