if (encoding == Input::INPUT_ENCODING_RAW) {
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], ptr, bytes, SUITABLE_FOR_STREAM_START);
+ servers->add_data(stream_indices[i], ptr, bytes, /*metacube_flags=*/0);
}
return;
}
for (size_t i = 0; i < stream_indices.size(); ++i) {
servers->set_header(stream_indices[i], http_header, stream_header);
}
- } else {
- StreamStartSuitability suitable_for_stream_start;
- if (flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) {
- suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START;
- } else {
- suitable_for_stream_start = SUITABLE_FOR_STREAM_START;
- }
- for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], inner_data, size, suitable_for_stream_start);
- }
+ }
+ for (size_t i = 0; i < stream_indices.size(); ++i) {
+ servers->add_data(stream_indices[i], inner_data, size, flags);
}
// Consume the block. This isn't the most efficient way of dealing with things
ping_url_map[url] = allow_origin;
}
-void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
+void Server::add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
{
assert(stream_index >= 0 && stream_index < ssize_t(streams.size()));
- streams[stream_index]->add_data_deferred(data, bytes, suitable_for_stream_start);
+ streams[stream_index]->add_data_deferred(data, bytes, metacube_flags);
}
// See the .h file for postconditions after this function.
// and the order between them are undefined.
// XXX: header should ideally be ordered with respect to data.
void add_client_deferred(int sock);
- void add_data_deferred(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
+ void add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags);
// These should not be called while running, since that would violate
// threading assumptions (ie., that epoll is only called from one thread
}
}
-void ServerPool::add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
+void ServerPool::add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags)
{
assert(stream_index >= 0 && stream_index < ssize_t(num_http_streams + udp_streams.size()));
// HTTP stream.
for (int i = 0; i < num_servers; ++i) {
- servers[i].add_data_deferred(stream_index, data, bytes, suitable_for_stream_start);
+ servers[i].add_data_deferred(stream_index, data, bytes, metacube_flags);
}
}
void set_header(int stream_index,
const std::string &http_header,
const std::string &stream_header);
- void add_data(int stream_index, const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
+ void add_data(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags);
// Sets the max pacing rate for all the servers.
void set_pacing_rate(int stream_index, uint32_t pacing_rate);
DataElement data_element;
data_element.data.iov_base = const_cast<char *>(existing_data.data());
data_element.data.iov_len = existing_data.size();
- data_element.suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START; // Ignored by add_data_raw().
+ data_element.metacube_flags = 0; // Ignored by add_data_raw().
vector<DataElement> data_elements;
data_elements.push_back(data_element);
Stream::DataElement data_element;
data_element.data.iov_base = reinterpret_cast<char *>(data[i].data.iov_base) + bytes_wanted;
data_element.data.iov_len = data[i].data.iov_len - bytes_wanted;
- data_element.suitable_for_stream_start = NOT_SUITABLE_FOR_STREAM_START;
+ data_element.metacube_flags = METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
ret.push_back(data_element);
bytes_wanted = 0;
}
}
}
-void Stream::add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start)
+void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags)
{
+ // For regular output, we don't want to send the client twice
+ // (it's already sent out together with the HTTP header).
+ // However, for Metacube output, we need to send it so that
+ // the Cubemap instance in the other end has a chance to update it.
+ // It may come twice in its stream, but Cubemap doesn't care.
+ if (encoding == Stream::STREAM_ENCODING_RAW &&
+ (metacube_flags & METACUBE_FLAGS_HEADER) != 0) {
+ return;
+ }
+
MutexLock lock(&queued_data_mutex);
- assert(suitable_for_stream_start == SUITABLE_FOR_STREAM_START ||
- suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START);
DataElement data_element;
- data_element.suitable_for_stream_start = suitable_for_stream_start;
+ data_element.metacube_flags = metacube_flags;
if (encoding == Stream::STREAM_ENCODING_METACUBE) {
// Add a Metacube block header before the data.
metacube2_block_header hdr;
memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
hdr.size = htonl(bytes);
- hdr.flags = htons(0);
- if (suitable_for_stream_start == NOT_SUITABLE_FOR_STREAM_START) {
- hdr.flags |= htons(METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START);
- }
+ hdr.flags = htons(metacube_flags);
hdr.csum = htons(metacube2_compute_crc(&hdr));
data_element.data.iov_base = new char[bytes + sizeof(hdr)];
static const int minimum_start_point_distance = 10240;
size_t byte_position = bytes_received;
for (size_t i = 0; i < queued_data_copy.size(); ++i) {
- if (queued_data_copy[i].suitable_for_stream_start == SUITABLE_FOR_STREAM_START) {
+ if ((queued_data_copy[i].metacube_flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0) {
size_t num_points = suitable_starting_points.size();
if (num_points >= 2 &&
suitable_starting_points[num_points - 1] - suitable_starting_points[num_points - 2] < minimum_start_point_distance) {
class StreamProto;
struct Client;
-enum StreamStartSuitability {
- NOT_SUITABLE_FOR_STREAM_START,
- SUITABLE_FOR_STREAM_START,
-};
-
struct Stream {
// Must be in sync with StreamConfig::Encoding.
enum Encoding { STREAM_ENCODING_RAW = 0, STREAM_ENCODING_METACUBE };
// The data pointers in the iovec are owned by us.
struct DataElement {
iovec data;
- StreamStartSuitability suitable_for_stream_start;
+ uint16_t metacube_flags;
};
std::vector<DataElement> queued_data;
// Add more data to <queued_data>, adding Metacube headers if needed.
// Does not take ownership of <data>.
- void add_data_deferred(const char *data, size_t bytes, StreamStartSuitability suitable_for_stream_start);
+ void add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags);
// Add queued data to the stream, if any.
// You should hold the owning Server's <mutex>.
}
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->add_data(stream_indices[i], packet_buf, ret, SUITABLE_FOR_STREAM_START);
+ servers->add_data(stream_indices[i], packet_buf, ret, /*metacube_flags=*/0);
}
}
}