#include <map>
#include "metacube.h"
+#include "mutexlock.h"
#include "input.h"
#include "server.h"
extern Server *servers;
-Input::Input(const string &stream_id)
+Input::Input(const string &stream_id, const string &url)
: stream_id(stream_id),
+ url(url),
has_metacube_header(false)
{
}
-void Input::run(const string &url)
+void Input::run()
+{
+ should_stop = false;
+
+ // Joinable is already the default, but it's good to be certain.
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ pthread_create(&worker_thread, &attr, Input::do_work_thunk, this);
+}
+
+void Input::stop()
+{
+ should_stop = true;
+
+ if (pthread_join(worker_thread, NULL) == -1) {
+ perror("pthread_join");
+ exit(1);
+ }
+}
+
+void *Input::do_work_thunk(void *arg)
+{
+ Input *input = static_cast<Input *>(arg);
+ input->do_work();
+ return NULL;
+}
+
+void Input::do_work()
{
CURL *curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
size_t Input::curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata)
{
Input *input = static_cast<Input *>(userdata);
+ if (input->should_stop) {
+ return 0;
+ }
+
size_t bytes = size * nmemb;
input->curl_callback(ptr, bytes);
return bytes;
class Input {
public:
- Input(const std::string &stream_id);
+ Input(const std::string &stream_id, const std::string &url);
// Connect to the given URL and start streaming.
- // WARNING: Currently this blocks; it does not run in a separate thread!
- void run(const std::string &url);
+ void run();
+
+ // Stop streaming. NOTE: Does not currently work!
+ void stop();
private:
+ // Recovers the this pointer and calls do_work().
+ static void *do_work_thunk(void *arg);
+
+ // Actually does the download.
+ void do_work();
+
// Recovers the this pointer and calls curl_callback().
static size_t curl_callback_thunk(char *ptr, size_t size, size_t nmemb, void *userdata);
void drop_pending_data(size_t num_bytes);
std::string stream_id;
+ std::string url;
// Data we have received but not fully processed yet.
std::vector<char> pending_data;
// If <pending_data> starts with a Metacube header,
// this is true.
bool has_metacube_header;
+
+ pthread_t worker_thread;
+
+ // Whether we should stop or not.
+ volatile bool should_stop;
};
#endif // !defined(_INPUT_H)
return serialized;
}
+Stream::Stream(const string &stream_id)
+ : stream_id(stream_id),
+ data(new char[BACKLOG_SIZE]),
+ data_size(0)
+{
+ memset(data, 0, BACKLOG_SIZE);
+}
+
+Stream::~Stream()
+{
+ delete[] data;
+}
+
+Stream::Stream(const StreamProto &serialized)
+ : header(serialized.header()),
+ data(new char[BACKLOG_SIZE]),
+ data_size(serialized.data_size())
+{
+ assert(serialized.data().size() == BACKLOG_SIZE);
+ memcpy(data, serialized.data().data(), BACKLOG_SIZE);
+}
+
+StreamProto Stream::serialize() const
+{
+ StreamProto serialized;
+ serialized.set_header(header);
+ serialized.set_data(string(data, data + BACKLOG_SIZE));
+ serialized.set_data_size(data_size);
+ return serialized;
+}
+
Server::Server()
{
pthread_mutex_init(&mutex, NULL);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&worker_thread, &attr, Server::do_work_thunk, this);
}
-
+
void Server::stop()
{
{
}
}
}
-
+
+CubemapStateProto Server::serialize() const
+{
+ CubemapStateProto serialized;
+ for (map<int, Client>::const_iterator client_it = clients.begin();
+ client_it != clients.end();
+ ++client_it) {
+ serialized.add_clients()->MergeFrom(client_it->second.serialize());
+ }
+ for (map<string, Stream *>::const_iterator stream_it = streams.begin();
+ stream_it != streams.end();
+ ++stream_it) {
+ serialized.add_streams()->MergeFrom(stream_it->second->serialize());
+ }
+ return serialized;
+}
+
void Server::add_client(int sock)
{
MutexLock lock(&mutex);
// Start listening on data from this socket.
epoll_event ev;
ev.events = EPOLLIN | EPOLLRDHUP;
+ ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.fd = sock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev) == -1) {
perror("epoll_ctl(EPOLL_CTL_ADD)");
void Server::add_stream(const string &stream_id)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream_id, Stream()));
+ streams.insert(make_pair(stream_id, new Stream(stream_id)));
}
void Server::set_header(const string &stream_id, const string &header)
{
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- streams[stream_id].header = header;
+ find_stream(stream_id)->header = header;
}
void Server::add_data(const string &stream_id, const char *data, size_t bytes)
}
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- Stream *stream = &streams[stream_id];
+ Stream *stream = find_stream(stream_id);
size_t pos = stream->data_size % BACKLOG_SIZE;
stream->data_size += bytes;
// Start sending from the end. In other words, we won't send any of the backlog,
// but we'll start sending immediately as we get data.
client->state = Client::SENDING_DATA;
- client->bytes_sent = streams[client->stream_id].data_size;
+ client->bytes_sent = find_stream(client->stream_id)->data_size;
break;
}
case Client::SENDING_DATA: {
// See if there's some data we've lost. Ideally, we should drop to a block boundary,
// but resync will be the mux's problem.
- const Stream &stream = streams[client->stream_id];
+ const Stream &stream = *find_stream(client->stream_id);
size_t bytes_to_send = stream.data_size - client->bytes_sent;
if (bytes_to_send > BACKLOG_SIZE) {
fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n",
client->sock,
(long long int)(bytes_to_send - BACKLOG_SIZE));
- client->bytes_sent = streams[client->stream_id].data_size - BACKLOG_SIZE;
+ client->bytes_sent = find_stream(client->stream_id)->data_size - BACKLOG_SIZE;
bytes_to_send = BACKLOG_SIZE;
}
// Construct the header.
client->header = "HTTP/1.0 200 OK\r\n Content-type: video/x-flv\r\nCache-Control: no-cache\r\nContent-type: todo/fixme\r\n\r\n" +
- streams[client->stream_id].header;
+ find_stream(client->stream_id)->header;
// Switch states.
client->state = Client::SENDING_HEADER;
epoll_event ev;
ev.events = EPOLLOUT | EPOLLRDHUP;
+ ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.fd = client->sock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
{
epoll_event ev;
ev.events = EPOLLRDHUP;
+ ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.fd = client->sock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
for (unsigned i = 0; i < sleeping_clients.size(); ++i) {
epoll_event ev;
ev.events = EPOLLOUT | EPOLLRDHUP;
+ ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.fd = sleeping_clients[i];
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sleeping_clients[i], &ev) == -1) {
perror("epoll_ctl(EPOLL_CTL_MOD)");
}
sleeping_clients.clear();
}
+
+Stream *Server::find_stream(const string &stream_id)
+{
+ map<string, Stream *>::iterator it = streams.find(stream_id);
+ assert(it != streams.end());
+ return it->second;
+}
#define MAX_CLIENT_REQUEST 16384
class ClientProto;
+class CubemapStateProto;
+class StreamProto;
struct Client {
Client() {}
};
struct Stream {
+ Stream(const std::string &stream_id);
+ ~Stream();
+
+ // Serialization/deserialization.
+ Stream(const StreamProto &serialized);
+ StreamProto serialize() const;
+
+ std::string stream_id;
+
// The HTTP response header, plus the video stream header (if any).
std::string header;
// The stream data itself, stored in a circular buffer.
- char data[BACKLOG_SIZE];
+ char *data;
// How many bytes <data> contains. Can very well be larger than BACKLOG_SIZE,
// since the buffer wraps.
size_t data_size;
+
+private:
+ Stream(const Stream& other);
};
class Server {
// Stop the thread.
void stop();
+ CubemapStateProto serialize() const;
+
void add_client(int sock);
void add_stream(const std::string &stream_id);
void set_header(const std::string &stream_id, const std::string &header);
bool should_stop;
// Map from stream ID to stream.
- std::map<std::string, Stream> streams;
+ std::map<std::string, Stream *> streams;
// Map from file descriptor to client.
std::map<int, Client> clients;
// We have more data, so mark all clients that are sleeping as ready to go.
void wake_up_all_clients();
+
+ // TODO: This function should probably die.
+ Stream *find_stream(const std::string &stream_id);
};
#endif // !defined(_SERVER_H)