Support parsing streams from config file. Also support multiple streams (includes...
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 7 Apr 2013 15:42:23 +0000 (17:42 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 7 Apr 2013 15:42:23 +0000 (17:42 +0200)
Makefile
cubemap.cpp
parse.cpp [new file with mode: 0644]
parse.h [new file with mode: 0644]
server.cpp
server.h
state.proto

index 2db0a75..ca9b255 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ PROTOC=protoc
 CXXFLAGS=-Wall -O2 -g
 LDLIBS=-lcurl -lpthread -lprotobuf
 
 CXXFLAGS=-Wall -O2 -g
 LDLIBS=-lcurl -lpthread -lprotobuf
 
-OBJS=cubemap.o server.o serverpool.o mutexlock.o input.o state.pb.o
+OBJS=cubemap.o server.o serverpool.o mutexlock.o input.o parse.o state.pb.o
 
 all: cubemap
 
 
 all: cubemap
 
index ad218db..b00ca64 100644 (file)
@@ -17,6 +17,7 @@
 #include <map>
 
 #include "metacube.h"
 #include <map>
 
 #include "metacube.h"
+#include "parse.h"
 #include "server.h"
 #include "serverpool.h"
 #include "input.h"
 #include "server.h"
 #include "serverpool.h"
 #include "input.h"
@@ -171,120 +172,6 @@ CubemapStateProto read_tempfile(int state_fd)
        return state;
 }
 
        return state;
 }
 
-// Split a line on whitespace, e.g. "foo  bar baz" -> {"foo", "bar", "baz"}.
-vector<string> split_tokens(const string &line)
-{
-       vector<string> ret;
-       string current_token;
-
-       for (size_t i = 0; i < line.size(); ++i) {
-               if (isspace(line[i])) {
-                       if (!current_token.empty()) {
-                               ret.push_back(current_token);
-                       }
-                       current_token.clear();
-               } else {
-                       current_token.push_back(line[i]);
-               }
-       }
-       if (!current_token.empty()) {
-               ret.push_back(current_token);
-       }
-       return ret;
-}
-
-struct ConfigLine {
-       string keyword;
-       vector<string> arguments;
-       map<string, string> parameters;
-};
-
-// Parse the configuration file.
-vector<ConfigLine> parse_config(const string &filename)
-{
-       vector<ConfigLine> ret;
-
-       FILE *fp = fopen(filename.c_str(), "r");
-       if (fp == NULL) {
-               perror(filename.c_str());
-               exit(1);
-       }
-
-       char buf[4096];
-       while (!feof(fp)) {
-               if (fgets(buf, sizeof(buf), fp) == NULL) {
-                       break;
-               }
-
-               // Chop off the string at the first #, \r or \n.
-               buf[strcspn(buf, "#\r\n")] = 0;
-
-               // Remove all whitespace from the end of the string.
-               size_t len = strlen(buf);
-               while (len > 0 && isspace(buf[len - 1])) {
-                       buf[--len] = 0;
-               }
-
-               // If the line is now all blank, ignore it.
-               if (len == 0) {
-                       continue;
-               }
-
-               vector<string> tokens = split_tokens(buf);
-               assert(!tokens.empty());
-               
-               ConfigLine line;
-               line.keyword = tokens[0];
-
-               for (size_t i = 1; i < tokens.size(); ++i) {
-                       // foo=bar is a parameter; anything else is an argument.
-                       size_t equals_pos = tokens[i].find_first_of('=');
-                       if (equals_pos == string::npos) {
-                               line.arguments.push_back(tokens[i]);
-                       } else {
-                               string key = tokens[i].substr(0, equals_pos);
-                               string value = tokens[i].substr(equals_pos + 1, string::npos);
-                               line.parameters.insert(make_pair(key, value));
-                       }
-               }
-
-               ret.push_back(line);
-       }
-
-       fclose(fp);
-       return ret;
-}
-
-// Note: Limits are inclusive.
-int fetch_config_int(const vector<ConfigLine> &config, const string &keyword, int min_limit, int max_limit)
-{
-       bool value_found = false;
-       int value = -1;
-       for (unsigned i = 0; i < config.size(); ++i) {
-               if (config[i].keyword != keyword) {
-                       continue;
-               }
-               if (config[i].parameters.size() > 0 ||
-                   config[i].arguments.size() != 1) {
-                       fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str());
-                       exit(1);
-               }
-               value_found = true;
-               value = atoi(config[i].arguments[0].c_str());  // TODO: verify int validity.
-       }
-       if (!value_found) {
-               fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n",
-                       keyword.c_str());
-               exit(1);
-       }
-       if (value < min_limit || value > max_limit) {
-               fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n",
-                       keyword.c_str(), value, min_limit, max_limit);
-               exit(1);
-       }
-       return value;
-}
-
 int main(int argc, char **argv)
 {
        fprintf(stderr, "\nCubemap starting.\n");
 int main(int argc, char **argv)
 {
        fprintf(stderr, "\nCubemap starting.\n");
@@ -304,6 +191,7 @@ int main(int argc, char **argv)
                CubemapStateProto loaded_state = read_tempfile(state_fd);
 
                // Deserialize the streams.
                CubemapStateProto loaded_state = read_tempfile(state_fd);
 
                // Deserialize the streams.
+               // TODO: Pick up new streams from the configuration file.
                for (int i = 0; i < loaded_state.streams_size(); ++i) {
                        servers->add_stream_from_serialized(loaded_state.streams(i));
                }
                for (int i = 0; i < loaded_state.streams_size(); ++i) {
                        servers->add_stream_from_serialized(loaded_state.streams(i));
                }
@@ -319,9 +207,19 @@ int main(int argc, char **argv)
                old_port = loaded_state.port();
 
                fprintf(stderr, "done.\n");
                old_port = loaded_state.port();
 
                fprintf(stderr, "done.\n");
-       } else{
-               // TODO: This should come from the config file.
-               servers->add_stream(STREAM_ID);
+       } else {
+               // Find all streams in the configuration file, and create them.
+               for (unsigned i = 0; i < config.size(); ++i) {
+                       if (config[i].keyword != "stream") {
+                               continue;
+                       }
+                       if (config[i].arguments.size() != 1) {
+                               fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n");
+                               exit(1);
+                       }
+                       string stream_id = config[i].arguments[0];
+                       servers->add_stream(stream_id);
+               }
        }
 
        // Open a new server socket if we do not already have one, or if we changed ports.
        }
 
        // Open a new server socket if we do not already have one, or if we changed ports.
@@ -339,9 +237,26 @@ int main(int argc, char **argv)
        pthread_t acceptor_thread;
        pthread_create(&acceptor_thread, NULL, acceptor_thread_run, reinterpret_cast<void *>(server_sock));
 
        pthread_t acceptor_thread;
        pthread_create(&acceptor_thread, NULL, acceptor_thread_run, reinterpret_cast<void *>(server_sock));
 
-       // TODO: This should come from the config file.
-       Input input(STREAM_ID, STREAM_URL);
-       input.run();
+       // Find all streams in the configuration file, and create inputs for them.
+       vector<Input *> inputs;
+       for (unsigned i = 0; i < config.size(); ++i) {
+               if (config[i].keyword != "stream") {
+                       continue;
+               }
+               assert(config[i].arguments.size() == 1);
+               string stream_id = config[i].arguments[0];
+
+               if (config[i].parameters.count("src") == 0) {
+                       fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n",
+                               stream_id.c_str());
+                       continue;
+               }
+
+               string src = config[i].parameters["src"];
+               Input *input = new Input(stream_id, src);
+               input->run();
+               inputs.push_back(input);
+       }
 
        signal(SIGHUP, hup);
 
 
        signal(SIGHUP, hup);
 
@@ -349,7 +264,11 @@ int main(int argc, char **argv)
                usleep(100000);
        }
 
                usleep(100000);
        }
 
-       input.stop();
+       // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec.
+       for (size_t i = 0; i < inputs.size(); ++i) {
+               inputs[i]->stop();
+               delete inputs[i];  // TODO: serialize instead of using libcurl.
+       }
 
        CubemapStateProto state;
        state.set_server_sock(server_sock);
 
        CubemapStateProto state;
        state.set_server_sock(server_sock);
diff --git a/parse.cpp b/parse.cpp
new file mode 100644 (file)
index 0000000..0747afb
--- /dev/null
+++ b/parse.cpp
@@ -0,0 +1,143 @@
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include <assert.h>
+#include <vector>
+#include <string>
+
+#include "parse.h"
+
+using namespace std;
+
+vector<string> split_tokens(const string &line)
+{
+       vector<string> ret;
+       string current_token;
+
+       for (size_t i = 0; i < line.size(); ++i) {
+               if (isspace(line[i])) {
+                       if (!current_token.empty()) {
+                               ret.push_back(current_token);
+                       }
+                       current_token.clear();
+               } else {
+                       current_token.push_back(line[i]);
+               }
+       }
+       if (!current_token.empty()) {
+               ret.push_back(current_token);
+       }
+       return ret;
+}
+
+vector<string> split_lines(const string &str)
+{
+       vector<string> ret;
+       string current_line;
+
+       for (size_t i = 0; i < str.size(); ++i) {
+               // Skip \r if followed by an \n.
+               if (str[i] == '\r' && i < str.size() - 1 && str[i + 1] == '\n') {
+                       continue;
+               }
+
+               // End of the current line?
+               if (str[i] == '\n') {
+                       if (!current_line.empty()) {
+                               ret.push_back(current_line);
+                       }
+                       current_line.clear();
+               } else {
+                       current_line.push_back(str[i]);
+               }
+       }
+       if (!current_line.empty()) {
+               ret.push_back(current_line);
+       }
+       return ret;
+}
+
+vector<ConfigLine> parse_config(const string &filename)
+{
+       vector<ConfigLine> ret;
+
+       FILE *fp = fopen(filename.c_str(), "r");
+       if (fp == NULL) {
+               perror(filename.c_str());
+               exit(1);
+       }
+
+       char buf[4096];
+       while (!feof(fp)) {
+               if (fgets(buf, sizeof(buf), fp) == NULL) {
+                       break;
+               }
+
+               // Chop off the string at the first #, \r or \n.
+               buf[strcspn(buf, "#\r\n")] = 0;
+
+               // Remove all whitespace from the end of the string.
+               size_t len = strlen(buf);
+               while (len > 0 && isspace(buf[len - 1])) {
+                       buf[--len] = 0;
+               }
+
+               // If the line is now all blank, ignore it.
+               if (len == 0) {
+                       continue;
+               }
+
+               vector<string> tokens = split_tokens(buf);
+               assert(!tokens.empty());
+               
+               ConfigLine line;
+               line.keyword = tokens[0];
+
+               for (size_t i = 1; i < tokens.size(); ++i) {
+                       // foo=bar is a parameter; anything else is an argument.
+                       size_t equals_pos = tokens[i].find_first_of('=');
+                       if (equals_pos == string::npos) {
+                               line.arguments.push_back(tokens[i]);
+                       } else {
+                               string key = tokens[i].substr(0, equals_pos);
+                               string value = tokens[i].substr(equals_pos + 1, string::npos);
+                               line.parameters.insert(make_pair(key, value));
+                       }
+               }
+
+               ret.push_back(line);
+       }
+
+       fclose(fp);
+       return ret;
+}
+
+int fetch_config_int(const vector<ConfigLine> &config, const string &keyword, int min_limit, int max_limit)
+{
+       bool value_found = false;
+       int value = -1;
+       for (unsigned i = 0; i < config.size(); ++i) {
+               if (config[i].keyword != keyword) {
+                       continue;
+               }
+               if (config[i].parameters.size() > 0 ||
+                   config[i].arguments.size() != 1) {
+                       fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str());
+                       exit(1);
+               }
+               value_found = true;
+               value = atoi(config[i].arguments[0].c_str());  // TODO: verify int validity.
+       }
+       if (!value_found) {
+               fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n",
+                       keyword.c_str());
+               exit(1);
+       }
+       if (value < min_limit || value > max_limit) {
+               fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n",
+                       keyword.c_str(), value, min_limit, max_limit);
+               exit(1);
+       }
+       return value;
+}
diff --git a/parse.h b/parse.h
new file mode 100644 (file)
index 0000000..83c5d2a
--- /dev/null
+++ b/parse.h
@@ -0,0 +1,28 @@
+#ifndef _PARSE_H
+#define _PARSE_H
+
+// Various routines that deal with parsing; both configuration files and HTTP requests.
+
+#include <map>
+#include <vector>
+#include <string>
+
+struct ConfigLine {
+       std::string keyword;
+       std::vector<std::string> arguments;
+       std::map<std::string, std::string> parameters;
+};
+
+// Split a line on whitespace, e.g. "foo  bar baz" -> {"foo", "bar", "baz"}.
+std::vector<std::string> split_tokens(const std::string &line);
+
+// Split a string on \n or \r\n, e.g. "foo\nbar\r\n\nbaz\r\n\r\n" -> {"foo", "bar", "baz"}.
+std::vector<std::string> split_lines(const std::string &str);
+
+// Parse the configuration file.
+std::vector<ConfigLine> parse_config(const std::string &filename);
+
+// Note: Limits are inclusive.
+int fetch_config_int(const std::vector<ConfigLine> &config, const std::string &keyword, int min_limit, int max_limit);
+
+#endif  // !defined(_PARSE_H)
index 5f155c0..7e43522 100644 (file)
@@ -18,6 +18,7 @@
 #include "metacube.h"
 #include "server.h"
 #include "mutexlock.h"
 #include "metacube.h"
 #include "server.h"
 #include "mutexlock.h"
+#include "parse.h"
 #include "state.pb.h"
 
 using namespace std;
 #include "state.pb.h"
 
 using namespace std;
@@ -25,7 +26,7 @@ using namespace std;
 Client::Client(int sock)
        : sock(sock),
          state(Client::READING_REQUEST),
 Client::Client(int sock)
        : sock(sock),
          state(Client::READING_REQUEST),
-         header_bytes_sent(0),
+         header_or_error_bytes_sent(0),
          bytes_sent(0)
 {
        request.reserve(1024);
          bytes_sent(0)
 {
        request.reserve(1024);
@@ -36,8 +37,8 @@ Client::Client(const ClientProto &serialized)
          state(State(serialized.state())),
          request(serialized.request()),
          stream_id(serialized.stream_id()),
          state(State(serialized.state())),
          request(serialized.request()),
          stream_id(serialized.stream_id()),
-         header(serialized.header()),
-         header_bytes_sent(serialized.header_bytes_sent()),
+         header_or_error(serialized.header_or_error()),
+         header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()),
          bytes_sent(serialized.bytes_sent())
 {
 }
          bytes_sent(serialized.bytes_sent())
 {
 }
@@ -49,8 +50,8 @@ ClientProto Client::serialize() const
        serialized.set_state(state);
        serialized.set_request(request);
        serialized.set_stream_id(stream_id);
        serialized.set_state(state);
        serialized.set_request(request);
        serialized.set_stream_id(stream_id);
-       serialized.set_header(header);
-       serialized.set_header_bytes_sent(serialized.header_bytes_sent());
+       serialized.set_header_or_error(header_or_error);
+       serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent());
        serialized.set_bytes_sent(bytes_sent);
        return serialized;
 }
        serialized.set_bytes_sent(bytes_sent);
        return serialized;
 }
@@ -319,35 +320,44 @@ void Server::process_client(Client *client)
                        return;
                }
 
                        return;
                }
 
-               parse_request(client);
-               construct_header(client);
+               int error_code = parse_request(client);
+               if (error_code == 200) {
+                       construct_header(client);
+               } else {
+                       construct_error(client, error_code);
+               }
                break;
        }
                break;
        }
+       case Client::SENDING_ERROR:
        case Client::SENDING_HEADER: {
                int ret = write(client->sock,
        case Client::SENDING_HEADER: {
                int ret = write(client->sock,
-                               client->header.data() + client->header_bytes_sent,
-                               client->header.size() - client->header_bytes_sent);
+                               client->header_or_error.data() + client->header_or_error_bytes_sent,
+                               client->header_or_error.size() - client->header_or_error_bytes_sent);
                if (ret == -1) {
                        perror("write");
                        close_client(client);
                        return;
                }
                
                if (ret == -1) {
                        perror("write");
                        close_client(client);
                        return;
                }
                
-               client->header_bytes_sent += ret;
-               assert(client->header_bytes_sent <= client->header.size());
+               client->header_or_error_bytes_sent += ret;
+               assert(client->header_or_error_bytes_sent <= client->header_or_error.size());
 
 
-               if (client->header_bytes_sent < client->header.size()) {
+               if (client->header_or_error_bytes_sent < client->header_or_error.size()) {
                        // We haven't sent all yet. Fine; we'll do that later.
                        return;
                }
 
                        // We haven't sent all yet. Fine; we'll do that later.
                        return;
                }
 
-               // We're done sending the header! Clear the entire header to release some memory.
-               client->header.clear();
+               // We're done sending the header or error! Clear it to release some memory.
+               client->header_or_error.clear();
 
 
-               // Start sending from the end. In other words, we won't send any of the backlog,
-               // but we'll start sending immediately as we get data.
-               client->state = Client::SENDING_DATA;
-               client->bytes_sent = find_stream(client->stream_id)->data_size;
+               if (client->state == Client::SENDING_ERROR) {
+                       close_client(client);
+               } else {
+                       // Start sending from the end. In other words, we won't send any of the backlog,
+                       // but we'll start sending immediately as we get data.
+                       client->state = Client::SENDING_DATA;
+                       client->bytes_sent = find_stream(client->stream_id)->data_size;
+               }
                break;
        }
        case Client::SENDING_DATA: {
                break;
        }
        case Client::SENDING_DATA: {
@@ -399,16 +409,33 @@ void Server::process_client(Client *client)
        }
 }
 
        }
 }
 
-void Server::parse_request(Client *client)
+int Server::parse_request(Client *client)
 {
 {
-       // TODO: Actually parse the request. :-)
-       client->stream_id = "stream";
+       vector<string> lines = split_lines(client->request);
+       if (lines.empty()) {
+               return 400;  // Bad request (empty).
+       }
+
+       vector<string> request_tokens = split_tokens(lines[0]);
+       if (request_tokens.size() < 2) {
+               return 400;  // Bad request (empty).
+       }
+       if (request_tokens[0] != "GET") {
+               return 400;  // Should maybe be 405 instead?
+       }
+       if (streams.count(request_tokens[1]) == 0) {
+               return 404;  // Not found.
+       }
+
+       client->stream_id = request_tokens[1];
        client->request.clear();
        client->request.clear();
+
+       return 200;  // OK!
 }
 
 void Server::construct_header(Client *client)
 {
 }
 
 void Server::construct_header(Client *client)
 {
-       client->header = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" +
+       client->header_or_error = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" +
                find_stream(client->stream_id)->header;
 
        // Switch states.
                find_stream(client->stream_id)->header;
 
        // Switch states.
@@ -425,6 +452,27 @@ void Server::construct_header(Client *client)
        }
 }
        
        }
 }
        
+void Server::construct_error(Client *client, int error_code)
+{
+       char error[256];
+       snprintf(error, 256, "HTTP/1.0 %d Error\r\nContent-type: text/plain\r\n\r\nSomething went wrong. Sorry.\r\n",
+               error_code);
+       client->header_or_error = error;
+
+       // Switch states.
+       client->state = Client::SENDING_ERROR;
+
+       epoll_event ev;
+       ev.events = EPOLLOUT | EPOLLRDHUP;
+       ev.data.u64 = 0;  // Keep Valgrind happy.
+       ev.data.fd = client->sock;
+
+       if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) {
+               perror("epoll_ctl(EPOLL_CTL_MOD)");
+               exit(1);
+       }
+}
+       
 void Server::close_client(Client *client)
 {
        if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) {
 void Server::close_client(Client *client)
 {
        if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) {
index 2cb078e..cc1b78d 100644 (file)
--- a/server.h
+++ b/server.h
@@ -28,7 +28,7 @@ struct Client {
        // The file descriptor associated with this socket.
        int sock;
 
        // The file descriptor associated with this socket.
        int sock;
 
-       enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA };
+       enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR };
        State state;
 
        // The HTTP request, as sent by the client. If we are in READING_REQUEST,
        State state;
 
        // The HTTP request, as sent by the client. If we are in READING_REQUEST,
@@ -41,11 +41,12 @@ struct Client {
 
        // The header we want to send. This is nominally a copy of Stream::header,
        // but since that might change on reconnects etc., we keep a local copy here.
 
        // The header we want to send. This is nominally a copy of Stream::header,
        // but since that might change on reconnects etc., we keep a local copy here.
-       // Only relevant for SENDING_HEADER; blank otherwise.
-       std::string header;
+       // Only relevant for SENDING_HEADER or SENDING_ERROR; blank otherwise.
+       std::string header_or_error;
 
 
-       // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER.
-       size_t header_bytes_sent;
+       // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER
+       // or SENDING_ERROR.
+       size_t header_or_error_bytes_sent;
 
        // Number of bytes we've sent of data. Only relevant for SENDING_DATA.
        size_t bytes_sent;
 
        // Number of bytes we've sent of data. Only relevant for SENDING_DATA.
        size_t bytes_sent;
@@ -132,13 +133,17 @@ private:
        // Close a given client socket, and clean up after it.
        void close_client(Client *client);
 
        // Close a given client socket, and clean up after it.
        void close_client(Client *client);
 
-       // Parse the HTTP request.
-       void parse_request(Client *client);
+       // Parse the HTTP request. Returns a HTTP status code (200/400/404).
+       int parse_request(Client *client);
 
        // Construct the HTTP header, and set the client into
        // the SENDING_HEADER state.
        void construct_header(Client *client);
 
 
        // Construct the HTTP header, and set the client into
        // the SENDING_HEADER state.
        void construct_header(Client *client);
 
+       // Construct a generic error with the given line, and set the client into
+       // the SENDING_ERROR state.
+       void construct_error(Client *client, int error_code);
+
        // Put client to sleep, since there is no more data for it; we will on
        // longer listen on POLLOUT until we get more data. Also, it will be put
        // in the list of clients to wake up when we do.
        // Put client to sleep, since there is no more data for it; we will on
        // longer listen on POLLOUT until we get more data. Also, it will be put
        // in the list of clients to wake up when we do.
index d33d689..7d114da 100644 (file)
@@ -4,8 +4,8 @@ message ClientProto {
        optional int32 state = 2;
        optional bytes request = 3;
        optional string stream_id = 4;
        optional int32 state = 2;
        optional bytes request = 3;
        optional string stream_id = 4;
-       optional bytes header = 5;
-       optional int64 header_bytes_sent = 6;
+       optional bytes header_or_error = 5;
+       optional int64 header_or_error_bytes_sent = 6;
        optional int64 bytes_sent = 7;
 };
 
        optional int64 bytes_sent = 7;
 };