+#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <utility>
#include <vector>
+#include "accesslog.h"
#include "log.h"
#include "markpool.h"
+#include "metacube.h"
#include "mutexlock.h"
#include "parse.h"
#include "server.h"
using namespace std;
+extern AccessLogThread *access_log;
+
Server::Server()
{
pthread_mutex_init(&mutex, NULL);
Server::~Server()
{
+ for (map<string, Stream *>::iterator stream_it = streams.begin();
+ stream_it != streams.end();
+ ++stream_it) {
+ delete stream_it->second;
+ }
+
int ret;
do {
ret = close(epoll_fd);
}
}
-void Server::add_stream(const string &stream_id, size_t backlog_size)
+void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size)));
+ streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
}
-void Server::add_stream_from_serialized(const StreamProto &stream)
+void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+ streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
}
-void Server::set_backlog_size(const std::string &stream_id, size_t new_size)
+void Server::set_backlog_size(const string &stream_id, size_t new_size)
{
MutexLock lock(&mutex);
assert(streams.count(stream_id) != 0);
streams[stream_id]->set_backlog_size(new_size);
}
-void Server::set_header(const string &stream_id, const string &header)
+void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- find_stream(stream_id)->header = header;
+ assert(streams.count(stream_id) != 0);
+ streams[stream_id]->encoding = encoding;
+}
+
+void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+{
+ MutexLock lock(&mutex);
+ find_stream(stream_id)->http_header = http_header;
+ find_stream(stream_id)->stream_header = stream_header;
// If there are clients we haven't sent anything to yet, we should give
// them the header, so push back into the SENDING_HEADER state.
}
}
-void Server::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
{
MutexLock lock(&mutex);
assert(clients.empty());
switch (status) {
case RP_OUT_OF_SPACE:
- log(WARNING, "fd %d sent overlong request!", client->sock);
+ log(WARNING, "[%s] Client sent overlong request!", client->remote_addr.c_str());
close_client(client);
return;
case RP_NOT_FINISHED_YET:
// See if there's more data for us.
goto read_request_again;
case RP_EXTRA_DATA:
- log(WARNING, "fd %d had junk data after request!", client->sock);
+ log(WARNING, "[%s] Junk data after request!", client->remote_addr.c_str());
close_client(client);
return;
case RP_FINISHED:
return;
}
if (bytes_to_send > stream->backlog_size) {
- log(WARNING, "fd %d lost %lld bytes, maybe too slow connection",
- client->sock,
+ log(WARNING, "[%s] Client lost %lld bytes, maybe too slow connection",
+ client->remote_addr.c_str(),
(long long int)(bytes_to_send - stream->backlog_size));
client->stream_pos = stream->bytes_received - stream->backlog_size;
client->bytes_lost += bytes_to_send - stream->backlog_size;
void Server::construct_header(Client *client)
{
- client->header_or_error = find_stream(client->stream_id)->header;
+ Stream *stream = find_stream(client->stream_id);
+ if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
+ client->header_or_error = stream->http_header +
+ "\r\n" +
+ stream->stream_header;
+ } else if (stream->encoding == Stream::STREAM_ENCODING_METACUBE) {
+ client->header_or_error = stream->http_header +
+ "Content-encoding: metacube\r\n" +
+ "\r\n";
+ if (!stream->stream_header.empty()) {
+ metacube_block_header hdr;
+ memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(stream->stream_header.size());
+ hdr.flags = htonl(METACUBE_FLAGS_HEADER);
+ client->header_or_error.append(
+ string(reinterpret_cast<char *>(&hdr), sizeof(hdr)));
+ }
+ client->header_or_error.append(stream->stream_header);
+ } else {
+ assert(false);
+ }
// Switch states.
client->state = Client::SENDING_HEADER;
}
}
+ // Log to access_log.
+ access_log->write(client->get_stats());
+
// Bye-bye!
int ret;
do {