From: Steinar H. Gunderson Date: Sun, 7 Apr 2013 15:42:23 +0000 (+0200) Subject: Support parsing streams from config file. Also support multiple streams (includes... X-Git-Tag: 1.0.0~188 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=17d773d2d45d495704e974b9246eccb21faa8635 Support parsing streams from config file. Also support multiple streams (includes parsing HTTP), with HTTP error messages and all. --- diff --git a/Makefile b/Makefile index 2db0a75..ca9b255 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROTOC=protoc CXXFLAGS=-Wall -O2 -g LDLIBS=-lcurl -lpthread -lprotobuf -OBJS=cubemap.o server.o serverpool.o mutexlock.o input.o state.pb.o +OBJS=cubemap.o server.o serverpool.o mutexlock.o input.o parse.o state.pb.o all: cubemap diff --git a/cubemap.cpp b/cubemap.cpp index ad218db..b00ca64 100644 --- a/cubemap.cpp +++ b/cubemap.cpp @@ -17,6 +17,7 @@ #include #include "metacube.h" +#include "parse.h" #include "server.h" #include "serverpool.h" #include "input.h" @@ -171,120 +172,6 @@ CubemapStateProto read_tempfile(int state_fd) return state; } -// Split a line on whitespace, e.g. "foo bar baz" -> {"foo", "bar", "baz"}. -vector split_tokens(const string &line) -{ - vector ret; - string current_token; - - for (size_t i = 0; i < line.size(); ++i) { - if (isspace(line[i])) { - if (!current_token.empty()) { - ret.push_back(current_token); - } - current_token.clear(); - } else { - current_token.push_back(line[i]); - } - } - if (!current_token.empty()) { - ret.push_back(current_token); - } - return ret; -} - -struct ConfigLine { - string keyword; - vector arguments; - map parameters; -}; - -// Parse the configuration file. -vector parse_config(const string &filename) -{ - vector ret; - - FILE *fp = fopen(filename.c_str(), "r"); - if (fp == NULL) { - perror(filename.c_str()); - exit(1); - } - - char buf[4096]; - while (!feof(fp)) { - if (fgets(buf, sizeof(buf), fp) == NULL) { - break; - } - - // Chop off the string at the first #, \r or \n. - buf[strcspn(buf, "#\r\n")] = 0; - - // Remove all whitespace from the end of the string. - size_t len = strlen(buf); - while (len > 0 && isspace(buf[len - 1])) { - buf[--len] = 0; - } - - // If the line is now all blank, ignore it. - if (len == 0) { - continue; - } - - vector tokens = split_tokens(buf); - assert(!tokens.empty()); - - ConfigLine line; - line.keyword = tokens[0]; - - for (size_t i = 1; i < tokens.size(); ++i) { - // foo=bar is a parameter; anything else is an argument. - size_t equals_pos = tokens[i].find_first_of('='); - if (equals_pos == string::npos) { - line.arguments.push_back(tokens[i]); - } else { - string key = tokens[i].substr(0, equals_pos); - string value = tokens[i].substr(equals_pos + 1, string::npos); - line.parameters.insert(make_pair(key, value)); - } - } - - ret.push_back(line); - } - - fclose(fp); - return ret; -} - -// Note: Limits are inclusive. -int fetch_config_int(const vector &config, const string &keyword, int min_limit, int max_limit) -{ - bool value_found = false; - int value = -1; - for (unsigned i = 0; i < config.size(); ++i) { - if (config[i].keyword != keyword) { - continue; - } - if (config[i].parameters.size() > 0 || - config[i].arguments.size() != 1) { - fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); - exit(1); - } - value_found = true; - value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity. - } - if (!value_found) { - fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", - keyword.c_str()); - exit(1); - } - if (value < min_limit || value > max_limit) { - fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n", - keyword.c_str(), value, min_limit, max_limit); - exit(1); - } - return value; -} - int main(int argc, char **argv) { fprintf(stderr, "\nCubemap starting.\n"); @@ -304,6 +191,7 @@ int main(int argc, char **argv) CubemapStateProto loaded_state = read_tempfile(state_fd); // Deserialize the streams. + // TODO: Pick up new streams from the configuration file. for (int i = 0; i < loaded_state.streams_size(); ++i) { servers->add_stream_from_serialized(loaded_state.streams(i)); } @@ -319,9 +207,19 @@ int main(int argc, char **argv) old_port = loaded_state.port(); fprintf(stderr, "done.\n"); - } else{ - // TODO: This should come from the config file. - servers->add_stream(STREAM_ID); + } else { + // Find all streams in the configuration file, and create them. + for (unsigned i = 0; i < config.size(); ++i) { + if (config[i].keyword != "stream") { + continue; + } + if (config[i].arguments.size() != 1) { + fprintf(stderr, "ERROR: 'stream' takes exactly one argument\n"); + exit(1); + } + string stream_id = config[i].arguments[0]; + servers->add_stream(stream_id); + } } // Open a new server socket if we do not already have one, or if we changed ports. @@ -339,9 +237,26 @@ int main(int argc, char **argv) pthread_t acceptor_thread; pthread_create(&acceptor_thread, NULL, acceptor_thread_run, reinterpret_cast(server_sock)); - // TODO: This should come from the config file. - Input input(STREAM_ID, STREAM_URL); - input.run(); + // Find all streams in the configuration file, and create inputs for them. + vector inputs; + for (unsigned i = 0; i < config.size(); ++i) { + if (config[i].keyword != "stream") { + continue; + } + assert(config[i].arguments.size() == 1); + string stream_id = config[i].arguments[0]; + + if (config[i].parameters.count("src") == 0) { + fprintf(stderr, "WARNING: stream '%s' has no src= attribute, clients will not get any data.\n", + stream_id.c_str()); + continue; + } + + string src = config[i].parameters["src"]; + Input *input = new Input(stream_id, src); + input->run(); + inputs.push_back(input); + } signal(SIGHUP, hup); @@ -349,7 +264,11 @@ int main(int argc, char **argv) usleep(100000); } - input.stop(); + // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i]->stop(); + delete inputs[i]; // TODO: serialize instead of using libcurl. + } CubemapStateProto state; state.set_server_sock(server_sock); diff --git a/parse.cpp b/parse.cpp new file mode 100644 index 0000000..0747afb --- /dev/null +++ b/parse.cpp @@ -0,0 +1,143 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "parse.h" + +using namespace std; + +vector split_tokens(const string &line) +{ + vector ret; + string current_token; + + for (size_t i = 0; i < line.size(); ++i) { + if (isspace(line[i])) { + if (!current_token.empty()) { + ret.push_back(current_token); + } + current_token.clear(); + } else { + current_token.push_back(line[i]); + } + } + if (!current_token.empty()) { + ret.push_back(current_token); + } + return ret; +} + +vector split_lines(const string &str) +{ + vector ret; + string current_line; + + for (size_t i = 0; i < str.size(); ++i) { + // Skip \r if followed by an \n. + if (str[i] == '\r' && i < str.size() - 1 && str[i + 1] == '\n') { + continue; + } + + // End of the current line? + if (str[i] == '\n') { + if (!current_line.empty()) { + ret.push_back(current_line); + } + current_line.clear(); + } else { + current_line.push_back(str[i]); + } + } + if (!current_line.empty()) { + ret.push_back(current_line); + } + return ret; +} + +vector parse_config(const string &filename) +{ + vector ret; + + FILE *fp = fopen(filename.c_str(), "r"); + if (fp == NULL) { + perror(filename.c_str()); + exit(1); + } + + char buf[4096]; + while (!feof(fp)) { + if (fgets(buf, sizeof(buf), fp) == NULL) { + break; + } + + // Chop off the string at the first #, \r or \n. + buf[strcspn(buf, "#\r\n")] = 0; + + // Remove all whitespace from the end of the string. + size_t len = strlen(buf); + while (len > 0 && isspace(buf[len - 1])) { + buf[--len] = 0; + } + + // If the line is now all blank, ignore it. + if (len == 0) { + continue; + } + + vector tokens = split_tokens(buf); + assert(!tokens.empty()); + + ConfigLine line; + line.keyword = tokens[0]; + + for (size_t i = 1; i < tokens.size(); ++i) { + // foo=bar is a parameter; anything else is an argument. + size_t equals_pos = tokens[i].find_first_of('='); + if (equals_pos == string::npos) { + line.arguments.push_back(tokens[i]); + } else { + string key = tokens[i].substr(0, equals_pos); + string value = tokens[i].substr(equals_pos + 1, string::npos); + line.parameters.insert(make_pair(key, value)); + } + } + + ret.push_back(line); + } + + fclose(fp); + return ret; +} + +int fetch_config_int(const vector &config, const string &keyword, int min_limit, int max_limit) +{ + bool value_found = false; + int value = -1; + for (unsigned i = 0; i < config.size(); ++i) { + if (config[i].keyword != keyword) { + continue; + } + if (config[i].parameters.size() > 0 || + config[i].arguments.size() != 1) { + fprintf(stderr, "ERROR: '%s' takes one argument and no parameters\n", keyword.c_str()); + exit(1); + } + value_found = true; + value = atoi(config[i].arguments[0].c_str()); // TODO: verify int validity. + } + if (!value_found) { + fprintf(stderr, "ERROR: Missing '%s' statement in config file.\n", + keyword.c_str()); + exit(1); + } + if (value < min_limit || value > max_limit) { + fprintf(stderr, "ERROR: '%s' is set to %d, must be in [%d,%d]\n", + keyword.c_str(), value, min_limit, max_limit); + exit(1); + } + return value; +} diff --git a/parse.h b/parse.h new file mode 100644 index 0000000..83c5d2a --- /dev/null +++ b/parse.h @@ -0,0 +1,28 @@ +#ifndef _PARSE_H +#define _PARSE_H + +// Various routines that deal with parsing; both configuration files and HTTP requests. + +#include +#include +#include + +struct ConfigLine { + std::string keyword; + std::vector arguments; + std::map parameters; +}; + +// Split a line on whitespace, e.g. "foo bar baz" -> {"foo", "bar", "baz"}. +std::vector split_tokens(const std::string &line); + +// Split a string on \n or \r\n, e.g. "foo\nbar\r\n\nbaz\r\n\r\n" -> {"foo", "bar", "baz"}. +std::vector split_lines(const std::string &str); + +// Parse the configuration file. +std::vector parse_config(const std::string &filename); + +// Note: Limits are inclusive. +int fetch_config_int(const std::vector &config, const std::string &keyword, int min_limit, int max_limit); + +#endif // !defined(_PARSE_H) diff --git a/server.cpp b/server.cpp index 5f155c0..7e43522 100644 --- a/server.cpp +++ b/server.cpp @@ -18,6 +18,7 @@ #include "metacube.h" #include "server.h" #include "mutexlock.h" +#include "parse.h" #include "state.pb.h" using namespace std; @@ -25,7 +26,7 @@ using namespace std; Client::Client(int sock) : sock(sock), state(Client::READING_REQUEST), - header_bytes_sent(0), + header_or_error_bytes_sent(0), bytes_sent(0) { request.reserve(1024); @@ -36,8 +37,8 @@ Client::Client(const ClientProto &serialized) state(State(serialized.state())), request(serialized.request()), stream_id(serialized.stream_id()), - header(serialized.header()), - header_bytes_sent(serialized.header_bytes_sent()), + header_or_error(serialized.header_or_error()), + header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()), bytes_sent(serialized.bytes_sent()) { } @@ -49,8 +50,8 @@ ClientProto Client::serialize() const serialized.set_state(state); serialized.set_request(request); serialized.set_stream_id(stream_id); - serialized.set_header(header); - serialized.set_header_bytes_sent(serialized.header_bytes_sent()); + serialized.set_header_or_error(header_or_error); + serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()); serialized.set_bytes_sent(bytes_sent); return serialized; } @@ -319,35 +320,44 @@ void Server::process_client(Client *client) return; } - parse_request(client); - construct_header(client); + int error_code = parse_request(client); + if (error_code == 200) { + construct_header(client); + } else { + construct_error(client, error_code); + } break; } + case Client::SENDING_ERROR: case Client::SENDING_HEADER: { int ret = write(client->sock, - client->header.data() + client->header_bytes_sent, - client->header.size() - client->header_bytes_sent); + client->header_or_error.data() + client->header_or_error_bytes_sent, + client->header_or_error.size() - client->header_or_error_bytes_sent); if (ret == -1) { perror("write"); close_client(client); return; } - client->header_bytes_sent += ret; - assert(client->header_bytes_sent <= client->header.size()); + client->header_or_error_bytes_sent += ret; + assert(client->header_or_error_bytes_sent <= client->header_or_error.size()); - if (client->header_bytes_sent < client->header.size()) { + if (client->header_or_error_bytes_sent < client->header_or_error.size()) { // We haven't sent all yet. Fine; we'll do that later. return; } - // We're done sending the header! Clear the entire header to release some memory. - client->header.clear(); + // We're done sending the header or error! Clear it to release some memory. + client->header_or_error.clear(); - // Start sending from the end. In other words, we won't send any of the backlog, - // but we'll start sending immediately as we get data. - client->state = Client::SENDING_DATA; - client->bytes_sent = find_stream(client->stream_id)->data_size; + if (client->state == Client::SENDING_ERROR) { + close_client(client); + } else { + // Start sending from the end. In other words, we won't send any of the backlog, + // but we'll start sending immediately as we get data. + client->state = Client::SENDING_DATA; + client->bytes_sent = find_stream(client->stream_id)->data_size; + } break; } case Client::SENDING_DATA: { @@ -399,16 +409,33 @@ void Server::process_client(Client *client) } } -void Server::parse_request(Client *client) +int Server::parse_request(Client *client) { - // TODO: Actually parse the request. :-) - client->stream_id = "stream"; + vector lines = split_lines(client->request); + if (lines.empty()) { + return 400; // Bad request (empty). + } + + vector request_tokens = split_tokens(lines[0]); + if (request_tokens.size() < 2) { + return 400; // Bad request (empty). + } + if (request_tokens[0] != "GET") { + return 400; // Should maybe be 405 instead? + } + if (streams.count(request_tokens[1]) == 0) { + return 404; // Not found. + } + + client->stream_id = request_tokens[1]; client->request.clear(); + + return 200; // OK! } void Server::construct_header(Client *client) { - client->header = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" + + client->header_or_error = "HTTP/1.0 200 OK\r\nContent-type: video/x-flv\r\nCache-Control: no-cache\r\n\r\n" + find_stream(client->stream_id)->header; // Switch states. @@ -425,6 +452,27 @@ void Server::construct_header(Client *client) } } +void Server::construct_error(Client *client, int error_code) +{ + char error[256]; + snprintf(error, 256, "HTTP/1.0 %d Error\r\nContent-type: text/plain\r\n\r\nSomething went wrong. Sorry.\r\n", + error_code); + client->header_or_error = error; + + // Switch states. + client->state = Client::SENDING_ERROR; + + epoll_event ev; + ev.events = EPOLLOUT | EPOLLRDHUP; + ev.data.u64 = 0; // Keep Valgrind happy. + ev.data.fd = client->sock; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client->sock, &ev) == -1) { + perror("epoll_ctl(EPOLL_CTL_MOD)"); + exit(1); + } +} + void Server::close_client(Client *client) { if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->sock, NULL) == -1) { diff --git a/server.h b/server.h index 2cb078e..cc1b78d 100644 --- a/server.h +++ b/server.h @@ -28,7 +28,7 @@ struct Client { // The file descriptor associated with this socket. int sock; - enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA }; + enum State { READING_REQUEST, SENDING_HEADER, SENDING_DATA, SENDING_ERROR }; State state; // The HTTP request, as sent by the client. If we are in READING_REQUEST, @@ -41,11 +41,12 @@ struct Client { // The header we want to send. This is nominally a copy of Stream::header, // but since that might change on reconnects etc., we keep a local copy here. - // Only relevant for SENDING_HEADER; blank otherwise. - std::string header; + // Only relevant for SENDING_HEADER or SENDING_ERROR; blank otherwise. + std::string header_or_error; - // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER. - size_t header_bytes_sent; + // Number of bytes we've sent of the header. Only relevant for SENDING_HEADER + // or SENDING_ERROR. + size_t header_or_error_bytes_sent; // Number of bytes we've sent of data. Only relevant for SENDING_DATA. size_t bytes_sent; @@ -132,13 +133,17 @@ private: // Close a given client socket, and clean up after it. void close_client(Client *client); - // Parse the HTTP request. - void parse_request(Client *client); + // Parse the HTTP request. Returns a HTTP status code (200/400/404). + int parse_request(Client *client); // Construct the HTTP header, and set the client into // the SENDING_HEADER state. void construct_header(Client *client); + // Construct a generic error with the given line, and set the client into + // the SENDING_ERROR state. + void construct_error(Client *client, int error_code); + // Put client to sleep, since there is no more data for it; we will on // longer listen on POLLOUT until we get more data. Also, it will be put // in the list of clients to wake up when we do. diff --git a/state.proto b/state.proto index d33d689..7d114da 100644 --- a/state.proto +++ b/state.proto @@ -4,8 +4,8 @@ message ClientProto { optional int32 state = 2; optional bytes request = 3; optional string stream_id = 4; - optional bytes header = 5; - optional int64 header_bytes_sent = 6; + optional bytes header_or_error = 5; + optional int64 header_or_error_bytes_sent = 6; optional int64 bytes_sent = 7; };