}
Stream::Stream(const StreamProto &serialized)
- : header(serialized.header()),
+ : stream_id(serialized.stream_id()),
+ header(serialized.header()),
data(new char[BACKLOG_SIZE]),
data_size(serialized.data_size())
{
serialized.set_header(header);
serialized.set_data(string(data, data + BACKLOG_SIZE));
serialized.set_data_size(data_size);
+ serialized.set_stream_id(stream_id);
return serialized;
}
}
}
+Server::~Server()
+{
+ close(epoll_fd);
+}
+
void Server::run()
{
should_stop = false;
exit(1);
}
}
-
+
+void Server::add_client_from_serialized(const ClientProto &client)
+{
+ MutexLock lock(&mutex);
+ clients.insert(make_pair(client.sock(), Client(client)));
+
+ // Start listening on data from this socket.
+ epoll_event ev;
+ if (client.state() == Client::READING_REQUEST) {
+ ev.events = EPOLLIN | EPOLLRDHUP;
+ } else {
+ // If we don't have more data for this client, we'll be putting it into
+ // the sleeping array again soon.
+ ev.events = EPOLLOUT | EPOLLRDHUP;
+ }
+ ev.data.u64 = 0; // Keep Valgrind happy.
+ ev.data.fd = client.sock();
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
+ perror("epoll_ctl(EPOLL_CTL_ADD)");
+ exit(1);
+ }
+}
+
void Server::add_stream(const string &stream_id)
{
MutexLock lock(&mutex);
streams.insert(make_pair(stream_id, new Stream(stream_id)));
}
+
+void Server::add_stream_from_serialized(const StreamProto &stream)
+{
+ MutexLock lock(&mutex);
+ streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+}
void Server::set_header(const string &stream_id, const string &header)
{