}
Stream::Stream(const StreamProto &serialized)
- : header(serialized.header()),
+ : stream_id(serialized.stream_id()),
+ header(serialized.header()),
data(new char[BACKLOG_SIZE]),
data_size(serialized.data_size())
{
serialized.set_header(header);
serialized.set_data(string(data, data + BACKLOG_SIZE));
serialized.set_data_size(data_size);
+ serialized.set_stream_id(stream_id);
return serialized;
}
}
}
+Server::~Server()
+{
+ close(epoll_fd);
+}
+
void Server::run()
{
should_stop = false;
exit(1);
}
}
-
+
+void Server::add_client_from_serialized(const ClientProto &client)
+{
+ MutexLock lock(&mutex);
+ clients.insert(make_pair(client.sock(), Client(client)));
+
+ // Start listening on data from this socket.
+ epoll_event ev;
+ if (client.state() == Client::READING_REQUEST) {
+ ev.events = EPOLLIN | EPOLLRDHUP;
+ } else {
+ // If we don't have more data for this client, we'll be putting it into
+ // the sleeping array again soon.
+ ev.events = EPOLLOUT | EPOLLRDHUP;
+ }
+ ev.data.u64 = 0; // Keep Valgrind happy.
+ ev.data.fd = client.sock();
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
+ perror("epoll_ctl(EPOLL_CTL_ADD)");
+ exit(1);
+ }
+}
+
void Server::add_stream(const string &stream_id)
{
MutexLock lock(&mutex);
streams.insert(make_pair(stream_id, new Stream(stream_id)));
}
+
+void Server::add_stream_from_serialized(const StreamProto &stream)
+{
+ MutexLock lock(&mutex);
+ streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+}
void Server::set_header(const string &stream_id, const string &header)
{
MutexLock lock(&mutex);
find_stream(stream_id)->header = header;
+
+ // If there are clients we haven't sent anything to yet, we should give
+ // them the header, so push back into the SENDING_HEADER state.
+ for (map<int, Client>::iterator client_it = clients.begin();
+ client_it != clients.end();
+ ++client_it) {
+ Client *client = &client_it->second;
+ if (client->state == Client::SENDING_DATA &&
+ client->bytes_sent == 0) {
+ construct_header(client);
+ }
+ }
}
void Server::add_data(const string &stream_id, const char *data, size_t bytes)
}
parse_request(client);
+ construct_header(client);
break;
}
case Client::SENDING_HEADER: {
// TODO: Actually parse the request. :-)
client->stream_id = "stream";
client->request.clear();
+}
- // Construct the header.
- client->header = "HTTP/1.0 200 OK\r\n Content-type: video/x-flv\r\nCache-Control: no-cache\r\nContent-type: todo/fixme\r\n\r\n" +
+void Server::construct_header(Client *client)
+{
+ client->header = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" +
find_stream(client->stream_id)->header;
// Switch states.