-#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
+#include <netinet/in.h>
#include <pthread.h>
+#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
+#include <string.h>
#include <sys/epoll.h>
#include <sys/sendfile.h>
#include <sys/socket.h>
void Server::add_client(int sock)
{
- clients.insert(make_pair(sock, Client(sock)));
+ pair<map<int, Client>::iterator, bool> ret =
+ clients.insert(make_pair(sock, Client(sock)));
+ assert(ret.second == true); // Should not already exist.
+ Client *client_ptr = &ret.first->second;
// Start listening on data from this socket.
epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
- ev.data.u64 = reinterpret_cast<uint64_t>(&clients[sock]);
+ ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
log_perror("epoll_ctl(EPOLL_CTL_ADD)");
exit(1);
}
- process_client(&clients[sock]);
+ process_client(client_ptr);
}
void Server::add_client_from_serialized(const ClientProto &client)
} else {
stream = stream_it->second;
}
- clients.insert(make_pair(client.sock(), Client(client, stream)));
- Client *client_ptr = &clients[client.sock()];
+ pair<map<int, Client>::iterator, bool> ret =
+ clients.insert(make_pair(client.sock(), Client(client, stream)));
+ assert(ret.second == true); // Should not already exist.
+ Client *client_ptr = &ret.first->second;
// Start listening on data from this socket.
epoll_event ev;
streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
}
-void Server::add_stream_from_serialized(const StreamProto &stream)
+void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+ streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
}
void Server::set_backlog_size(const string &stream_id, size_t new_size)
return;
}
if (bytes_to_send > stream->backlog_size) {
- log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection",
- client->remote_addr.c_str(),
- (long long int)(bytes_to_send - stream->backlog_size));
+ size_t bytes_lost = bytes_to_send - stream->backlog_size;
client->stream_pos = stream->bytes_received - stream->backlog_size;
- client->bytes_lost += bytes_to_send - stream->backlog_size;
+ client->bytes_lost += bytes_lost;
++client->num_loss_events;
bytes_to_send = stream->backlog_size;
+
+ double loss_fraction = double(client->bytes_lost) / double(client->bytes_lost + client->bytes_sent);
+ log(WARNING, "[%s] Client lost %lld bytes (total loss: %.2f%%), maybe too slow connection",
+ client->remote_addr.c_str(),
+ (long long int)(bytes_lost),
+ 100.0 * loss_fraction);
}
// See if we need to split across the circular buffer.
"\r\n" +
stream->stream_header;
} else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
- metacube_block_header hdr;
- memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
- hdr.size = htonl(stream->stream_header.size());
- hdr.flags = htonl(METACUBE_FLAGS_HEADER);
-
client->header_or_error = stream->http_header +
"Content-encoding: metacube\r\n" +
- "\r\n" +
- string(reinterpret_cast<char *>(&hdr), sizeof(hdr)) +
- stream->stream_header;
+ "\r\n";
+ if (!stream->stream_header.empty()) {
+ metacube_block_header hdr;
+ memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(stream->stream_header.size());
+ hdr.flags = htonl(METACUBE_FLAGS_HEADER);
+ client->header_or_error.append(
+ string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
+ }
+ client->header_or_error.append(stream->stream_header);
} else {
assert(false);
}