Inputs no longer know about stream_id, only the stream_index.
This is faster (we used maybe 5% of our time in find_stream),
and will make it easier to add non-HTTP outputs in the future.
Within (HTTP) clients, stream_id also no longer exists, but is replaced
by "url", which really works exactly the same way. url is mostly used during
re-exec, however, so it's less important than it used to be.
fprintf(logfp, "%llu %s %s %d %llu %llu %llu\n",
(long long unsigned)(writes[i].connect_time),
writes[i].remote_addr.c_str(),
- writes[i].stream_id.c_str(),
+ writes[i].url.c_str(),
int(now - writes[i].connect_time),
(long long unsigned)(writes[i].bytes_sent),
(long long unsigned)(writes[i].bytes_lost),
connect_time(serialized.connect_time()),
state(State(serialized.state())),
request(serialized.request()),
- stream_id(serialized.stream_id()),
+ url(serialized.url()),
stream(stream),
header_or_error(serialized.header_or_error()),
header_or_error_bytes_sent(serialized.header_or_error_bytes_sent()),
serialized.set_connect_time(connect_time);
serialized.set_state(state);
serialized.set_request(request);
- serialized.set_stream_id(stream_id);
+ serialized.set_url(url);
serialized.set_header_or_error(header_or_error);
serialized.set_header_or_error_bytes_sent(serialized.header_or_error_bytes_sent());
serialized.set_stream_pos(stream_pos);
ClientStats Client::get_stats() const
{
ClientStats stats;
- if (stream_id.empty()) {
- stats.stream_id = "-";
+ if (url.empty()) {
+ stats.url = "-";
} else {
- stats.stream_id = stream_id;
+ stats.url = url;
}
stats.sock = sock;
stats.fwmark = fwmark;
// Digested statistics for writing to logs etc.
struct ClientStats {
- std::string stream_id;
+ std::string url;
int sock;
int fwmark;
std::string remote_addr;
// What stream we're connecting to; parsed from <request>.
// Not relevant for READING_REQUEST.
- std::string stream_id;
+ std::string url;
Stream *stream;
// The header we want to send. This is nominally a copy of Stream::header,
}
StreamConfig stream;
- stream.stream_id = line.arguments[0];
+ stream.url = line.arguments[0];
map<string, string>::const_iterator src_it = line.parameters.find("src");
if (src_it == line.parameters.end()) {
log(WARNING, "stream '%s' has no src= attribute, clients will not get any data.",
- stream.stream_id.c_str());
+ stream.url.c_str());
} else {
stream.src = src_it->second;
// TODO: Verify that the URL is parseable?
};
struct StreamConfig {
- std::string stream_id;
+ std::string url; // As seen by the client.
std::string src; // Can be empty.
size_t backlog_size;
int mark_pool; // -1 for none.
http_header.append(it->first + ": " + it->second + "\r\n");
}
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header, "");
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, "");
}
return true;
request_bytes_sent = 0;
response.clear();
pending_data.clear();
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], "", "");
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], "", "");
}
{
char *inner_data = pending_data.data() + sizeof(metacube_block_header);
if (flags & METACUBE_FLAGS_HEADER) {
string header(inner_data, inner_data + size);
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->set_header(stream_ids[i], http_header, header);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->set_header(stream_indices[i], http_header, header);
}
} else {
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->add_data(stream_ids[i], inner_data, size);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->add_data(stream_indices[i], inner_data, size);
}
}
virtual std::string get_url() const { return url; }
- virtual void add_destination(const std::string &stream_id)
+ virtual void add_destination(int stream_index)
{
- stream_ids.push_back(stream_id);
+ stream_indices.push_back(stream_index);
}
private:
};
State state;
- std::vector<std::string> stream_ids;
+ std::vector<int> stream_indices;
// The URL and its parsed components.
std::string url;
virtual InputProto serialize() const = 0;
virtual std::string get_url() const = 0;
virtual void close_socket() = 0;
- virtual void add_destination(const std::string &stream_id) = 0;
+ virtual void add_destination(int stream_index) = 0;
};
#endif // !defined(_INPUT_H)
}
void create_streams(const Config &config,
- const set<string> &deserialized_stream_ids,
+ const set<string> &deserialized_urls,
multimap<string, InputWithRefcount> *inputs)
{
for (unsigned i = 0; i < config.mark_pools.size(); ++i) {
mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to));
}
- set<string> expecting_stream_ids = deserialized_stream_ids;
+ set<string> expecting_urls = deserialized_urls;
for (unsigned i = 0; i < config.streams.size(); ++i) {
const StreamConfig &stream_config = config.streams[i];
- if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
- servers->add_stream(stream_config.stream_id,
- stream_config.backlog_size,
- Stream::Encoding(stream_config.encoding));
+ int stream_index;
+ if (deserialized_urls.count(stream_config.url) == 0) {
+ stream_index = servers->add_stream(stream_config.url,
+ stream_config.backlog_size,
+ Stream::Encoding(stream_config.encoding));
} else {
- servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
- servers->set_encoding(stream_config.stream_id,
+ stream_index = servers->lookup_stream_by_url(stream_config.url);
+ assert(stream_index != -1);
+ servers->set_backlog_size(stream_index, stream_config.backlog_size);
+ servers->set_encoding(stream_index,
Stream::Encoding(stream_config.encoding));
}
- expecting_stream_ids.erase(stream_config.stream_id);
+ expecting_urls.erase(stream_config.url);
if (stream_config.mark_pool != -1) {
- servers->set_mark_pool(stream_config.stream_id,
- mark_pools[stream_config.mark_pool]);
+ servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]);
}
string src = stream_config.src;
if (!src.empty()) {
multimap<string, InputWithRefcount>::iterator input_it = inputs->find(src);
assert(input_it != inputs->end());
- input_it->second.input->add_destination(stream_config.stream_id);
+ input_it->second.input->add_destination(stream_index);
++input_it->second.refcount;
}
}
// Warn about any servers we've lost.
// TODO: Make an option (delete=yes?) to actually shut down streams.
- for (set<string>::const_iterator stream_it = expecting_stream_ids.begin();
- stream_it != expecting_stream_ids.end();
+ for (set<string>::const_iterator stream_it = expecting_urls.begin();
+ stream_it != expecting_urls.end();
++stream_it) {
- string stream_id = *stream_it;
+ string url = *stream_it;
log(WARNING, "stream '%s' disappeared from the configuration file. "
"It will not be deleted, but clients will not get any new inputs.",
- stream_id.c_str());
+ url.c_str());
}
}
CubemapStateProto loaded_state;
struct timeval serialize_start;
- set<string> deserialized_stream_ids;
+ set<string> deserialized_urls;
map<int, Acceptor *> deserialized_acceptors;
multimap<string, InputWithRefcount> inputs; // multimap due to older versions without deduplication.
if (state_fd != -1) {
}
servers->add_stream_from_serialized(stream, data_fds);
- deserialized_stream_ids.insert(stream.stream_id());
+ deserialized_urls.insert(stream.url());
}
// Deserialize the inputs. Note that we don't actually add them to any stream yet.
create_config_inputs(config, &inputs);
// Find all streams in the configuration file, create them, and connect to the inputs.
- create_streams(config, deserialized_stream_ids, &inputs);
+ create_streams(config, deserialized_urls, &inputs);
vector<Acceptor *> acceptors = create_acceptors(config, &deserialized_acceptors);
// Put back the existing clients. It doesn't matter which server we
Server::~Server()
{
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- delete stream_it->second;
+ for (size_t i = 0; i < streams.size(); ++i) {
+ delete streams[i];
}
safe_close(epoll_fd);
process_client(client);
}
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
+ for (size_t i = 0; i < streams.size(); ++i) {
vector<Client *> to_process;
- swap(stream_it->second->to_process, to_process);
+ swap(streams[i]->to_process, to_process);
for (size_t i = 0; i < to_process.size(); ++i) {
process_client(to_process[i]);
}
++client_it) {
serialized.add_clients()->MergeFrom(client_it->second.serialize());
}
- for (map<string, Stream *>::const_iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- serialized.add_streams()->MergeFrom(stream_it->second->serialize());
+ for (size_t i = 0; i < streams.size(); ++i) {
+ serialized.add_streams()->MergeFrom(streams[i]->serialize());
}
return serialized;
}
{
MutexLock lock(&mutex);
Stream *stream;
- map<string, Stream *>::iterator stream_it = streams.find(client.stream_id());
- if (stream_it == streams.end()) {
+ int stream_index = lookup_stream_by_url(client.url());
+ if (stream_index == -1) {
+ assert(client.state() != Client::SENDING_DATA);
stream = NULL;
} else {
- stream = stream_it->second;
+ stream = streams[stream_index];
}
pair<map<int, Client>::iterator, bool> ret =
clients.insert(make_pair(client.sock(), Client(client, stream)));
// the sleeping array again soon.
ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
}
- ev.data.u64 = 0; // Keep Valgrind happy.
ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
log_perror("epoll_ctl(EPOLL_CTL_ADD)");
}
}
-void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
+int Server::lookup_stream_by_url(const std::string &url) const
+{
+ map<string, int>::const_iterator url_it = url_map.find(url);
+ if (url_it == url_map.end()) {
+ return -1;
+ }
+ return url_it->second;
+}
+
+int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
+ url_map.insert(make_pair(url, streams.size()));
+ streams.push_back(new Stream(url, backlog_size, encoding));
+ return streams.size() - 1;
}
-void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
+int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
+ url_map.insert(make_pair(stream.url(), streams.size()));
+ streams.push_back(new Stream(stream, data_fd));
+ return streams.size() - 1;
}
-void Server::set_backlog_size(const string &stream_id, size_t new_size)
+void Server::set_backlog_size(int stream_index, size_t new_size)
{
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- streams[stream_id]->set_backlog_size(new_size);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->set_backlog_size(new_size);
}
-void Server::set_encoding(const string &stream_id, Stream::Encoding encoding)
+void Server::set_encoding(int stream_index, Stream::Encoding encoding)
{
MutexLock lock(&mutex);
- assert(streams.count(stream_id) != 0);
- streams[stream_id]->encoding = encoding;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->encoding = encoding;
}
-void Server::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+void Server::set_header(int stream_index, const string &http_header, const string &stream_header)
{
MutexLock lock(&mutex);
- find_stream(stream_id)->http_header = http_header;
- find_stream(stream_id)->stream_header = stream_header;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->http_header = http_header;
+ streams[stream_index]->stream_header = stream_header;
// If there are clients we haven't sent anything to yet, we should give
// them the header, so push back into the SENDING_HEADER state.
}
}
-void Server::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
+void Server::set_mark_pool(int stream_index, MarkPool *mark_pool)
{
MutexLock lock(&mutex);
assert(clients.empty());
- find_stream(stream_id)->mark_pool = mark_pool;
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->mark_pool = mark_pool;
}
-void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes)
{
MutexLock lock(&queued_data_mutex);
- find_stream(stream_id)->add_data_deferred(data, bytes);
+ assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
+ streams[stream_index]->add_data_deferred(data, bytes);
}
// See the .h file for postconditions after this function.
if (request_tokens[0] != "GET") {
return 400; // Should maybe be 405 instead?
}
- if (streams.count(request_tokens[1]) == 0) {
+
+ map<string, int>::const_iterator url_map_it = url_map.find(request_tokens[1]);
+ if (url_map_it == url_map.end()) {
return 404; // Not found.
}
- client->stream_id = request_tokens[1];
- client->stream = find_stream(client->stream_id);
+ client->url = request_tokens[1];
+ client->stream = streams[url_map_it->second];
if (client->stream->mark_pool != NULL) {
client->fwmark = client->stream->mark_pool->get_mark();
} else {
void Server::construct_header(Client *client)
{
- Stream *stream = find_stream(client->stream_id);
+ Stream *stream = client->stream;
if (stream->encoding == Stream::STREAM_ENCODING_RAW) {
client->header_or_error = stream->http_header +
"\r\n" +
clients.erase(client->sock);
}
-Stream *Server::find_stream(const string &stream_id)
-{
- map<string, Stream *>::iterator it = streams.find(stream_id);
- assert(it != streams.end());
- return it->second;
-}
-
void Server::process_queued_data()
{
MutexLock lock(&queued_data_mutex);
add_client(queued_add_clients[i]);
}
queued_add_clients.clear();
-
- for (map<string, Stream *>::iterator stream_it = streams.begin();
- stream_it != streams.end();
- ++stream_it) {
- stream_it->second->process_queued_data();
+
+ for (size_t i = 0; i < streams.size(); ++i) {
+ streams[i]->process_queued_data();
}
}
std::vector<ClientStats> get_client_stats() const;
// Set header (both HTTP header and any stream headers) for the given stream.
- void set_header(const std::string &stream_id,
+ void set_header(int stream_index,
const std::string &http_header,
const std::string &stream_header);
// Set that the given stream should use the given mark pool from now on.
// NOTE: This should be set before any clients are connected!
- void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
+ void set_mark_pool(int stream_index, MarkPool *mark_pool);
// These will be deferred until the next time an iteration in do_work() happens,
// and the order between them are undefined.
// XXX: header should ideally be ordered with respect to data.
void add_client_deferred(int sock);
- void add_data_deferred(const std::string &stream_id, const char *data, size_t bytes);
+ void add_data_deferred(int stream_index, const char *data, size_t bytes);
// These should not be called while running, since that would violate
// threading assumptions (ie., that epoll is only called from one thread
// at the same time).
CubemapStateProto serialize();
void add_client_from_serialized(const ClientProto &client);
- void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding);
- void add_stream_from_serialized(const StreamProto &stream, int data_fd);
- void set_backlog_size(const std::string &stream_id, size_t new_size);
- void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
+ int add_stream(const std::string &url, size_t bytes_received, Stream::Encoding encoding);
+ int add_stream_from_serialized(const StreamProto &stream, int data_fd);
+ int lookup_stream_by_url(const std::string &url) const;
+ void set_backlog_size(int stream_index, size_t new_size);
+ void set_encoding(int stream_index, Stream::Encoding encoding);
private:
// Mutex protecting queued_add_clients and streams[..]->queued_data.
// All variables below this line are protected by the mutex.
mutable pthread_mutex_t mutex;
- // Map from stream ID to stream.
- std::map<std::string, Stream *> streams;
+ // All streams.
+ std::vector<Stream *> streams;
+
+ // Map from URL to index into <streams>.
+ std::map<std::string, int> url_map;
// Map from file descriptor to client.
std::map<int, Client> clients;
// the SENDING_ERROR state.
void construct_error(Client *client, int error_code);
- // TODO: This function should probably die.
- Stream *find_stream(const std::string &stream_id);
-
void process_queued_data();
void add_client(int sock);
servers[clients_added++ % num_servers].add_client_from_serialized(client);
}
-void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream::Encoding encoding)
+int ServerPool::lookup_stream_by_url(const std::string &url) const
{
+ assert(servers != NULL);
+ return servers[0].lookup_stream_by_url(url);
+}
+
+int ServerPool::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+{
+ int stream_index = -1;
for (int i = 0; i < num_servers; ++i) {
- servers[i].add_stream(stream_id, backlog_size, encoding);
+ int stream_index2 = servers[i].add_stream(url, backlog_size, encoding);
+ if (i == 0) {
+ stream_index = stream_index2;
+ } else {
+ // Verify that all servers have this under the same stream index.
+ assert(stream_index == stream_index2);
+ }
}
+ return stream_index;
}
-void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
+int ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
{
assert(!data_fds.empty());
string contents;
+ int stream_index = -1;
for (int i = 0; i < num_servers; ++i) {
int data_fd;
if (i < int(data_fds.size())) {
data_fd = make_tempfile(contents);
}
- servers[i].add_stream_from_serialized(stream, data_fd);
+ int stream_index2 = servers[i].add_stream_from_serialized(stream, data_fd);
+ if (i == 0) {
+ stream_index = stream_index2;
+ } else {
+ // Verify that all servers have this under the same stream index.
+ assert(stream_index == stream_index2);
+ }
}
// Close and delete any leftovers, if the number of servers was reduced.
for (size_t i = num_servers; i < data_fds.size(); ++i) {
safe_close(data_fds[i]); // Implicitly deletes the file.
}
+
+ return stream_index;
}
-void ServerPool::set_header(const string &stream_id, const string &http_header, const string &stream_header)
+void ServerPool::set_header(int stream_index, const string &http_header, const string &stream_header)
{
for (int i = 0; i < num_servers; ++i) {
- servers[i].set_header(stream_id, http_header, stream_header);
+ servers[i].set_header(stream_index, http_header, stream_header);
}
}
-void ServerPool::add_data(const string &stream_id, const char *data, size_t bytes)
+void ServerPool::add_data(int stream_index, const char *data, size_t bytes)
{
for (int i = 0; i < num_servers; ++i) {
- servers[i].add_data_deferred(stream_id, data, bytes);
+ servers[i].add_data_deferred(stream_index, data, bytes);
}
}
return ret;
}
-void ServerPool::set_mark_pool(const string &stream_id, MarkPool *mark_pool)
+void ServerPool::set_mark_pool(int stream_index, MarkPool *mark_pool)
{
for (int i = 0; i < num_servers; ++i) {
- servers[i].set_mark_pool(stream_id, mark_pool);
+ servers[i].set_mark_pool(stream_index, mark_pool);
}
}
-void ServerPool::set_backlog_size(const string &stream_id, size_t new_size)
+void ServerPool::set_backlog_size(int stream_index, size_t new_size)
{
for (int i = 0; i < num_servers; ++i) {
- servers[i].set_backlog_size(stream_id, new_size);
+ servers[i].set_backlog_size(stream_index, new_size);
}
}
-void ServerPool::set_encoding(const string &stream_id, Stream::Encoding encoding)
+void ServerPool::set_encoding(int stream_index, Stream::Encoding encoding)
{
for (int i = 0; i < num_servers; ++i) {
- servers[i].set_encoding(stream_id, encoding);
+ servers[i].set_encoding(stream_index, encoding);
}
}
void add_client(int sock);
void add_client_from_serialized(const ClientProto &client);
- // Adds the given stream to all the servers.
- void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding);
- void add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
+ // Adds the given stream to all the servers. Returns the stream index.
+ int add_stream(const std::string &url, size_t backlog_size, Stream::Encoding encoding);
+ int add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
+
+ // Returns the stream index for the given URL (e.g. /foo.ts). Returns -1 on failure.
+ int lookup_stream_by_url(const std::string &url) const;
// Adds the given data to all the servers.
- void set_header(const std::string &stream_id,
+ void set_header(int stream_index,
const std::string &http_header,
const std::string &stream_header);
- void add_data(const std::string &stream_id, const char *data, size_t bytes);
+ void add_data(int stream_index, const char *data, size_t bytes);
// Connects the given stream to the given mark pool for all the servers.
- void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
+ void set_mark_pool(int stream_index, MarkPool *mark_pool);
// Changes the given stream's backlog size on all the servers.
- void set_backlog_size(const std::string &stream_id, size_t new_size);
+ void set_backlog_size(int stream_index, size_t new_size);
// Changes the given stream's encoding type on all the servers.
- void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
+ void set_encoding(int stream_index, Stream::Encoding encoding);
// Starts all the servers.
void run();
optional int64 connect_time = 9;
optional int32 state = 2;
optional bytes request = 3;
- optional string stream_id = 4;
+ optional string url = 4;
optional bytes header_or_error = 5;
optional int64 header_or_error_bytes_sent = 6;
optional int64 stream_pos = 7;
repeated int32 data_fds = 8;
optional int64 backlog_size = 5 [default=1048576];
optional int64 bytes_received = 3;
- optional string stream_id = 4;
+ optional string url = 4;
// Older versions stored the HTTP and video headers together in this field.
optional bytes header = 1;
client_stats[i].remote_addr.c_str(),
client_stats[i].sock,
client_stats[i].fwmark,
- client_stats[i].stream_id.c_str(),
+ client_stats[i].url.c_str(),
int(now - client_stats[i].connect_time),
(long long unsigned)(client_stats[i].bytes_sent),
(long long unsigned)(client_stats[i].bytes_lost),
using namespace std;
-Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
- : stream_id(stream_id),
+Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
+ : url(url),
encoding(encoding),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
}
Stream::Stream(const StreamProto &serialized, int data_fd)
- : stream_id(serialized.stream_id()),
+ : url(serialized.url()),
http_header(serialized.http_header()),
stream_header(serialized.stream_header()),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
serialized.set_bytes_received(bytes_received);
- serialized.set_stream_id(stream_id);
+ serialized.set_url(url);
data_fd = -1;
return serialized;
}
// Changes the backlog size, restructuring the data as needed.
void set_backlog_size(size_t new_size);
- std::string stream_id;
+ std::string url;
// The HTTP response header, without the trailing double newline.
std::string http_header;
"Connection: close\r\n";
}
-void UDPInput::add_destination(const string &stream_id)
+void UDPInput::add_destination(int stream_index)
{
- stream_ids.push_back(stream_id);
- servers->set_header(stream_id, http_header, "");
+ stream_indices.push_back(stream_index);
+ servers->set_header(stream_index, http_header, "");
}
void UDPInput::do_work()
continue;
}
- for (size_t i = 0; i < stream_ids.size(); ++i) {
- servers->add_data(stream_ids[i], buf, ret);
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->add_data(stream_indices[i], buf, ret);
}
}
}
virtual std::string get_url() const { return url; }
virtual void close_socket();
- virtual void add_destination(const std::string &stream_id);
+ virtual void add_destination(int stream_index);
private:
// Actually gets the packets.
// Create the HTTP header.
void construct_header();
- std::vector<std::string> stream_ids;
+ std::vector<int> stream_indices;
// The URL and its parsed components.
std::string url;