#include <assert.h>
#include <errno.h>
+#include <limits.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
-#include <unistd.h>
+#include <sys/types.h>
#include <string>
#include <vector>
using namespace std;
-Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
- : stream_id(stream_id),
+Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
+ : url(url),
encoding(encoding),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
}
Stream::Stream(const StreamProto &serialized, int data_fd)
- : stream_id(serialized.stream_id()),
+ : url(serialized.url()),
http_header(serialized.http_header()),
stream_header(serialized.stream_header()),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
serialized.set_bytes_received(bytes_received);
- serialized.set_stream_id(stream_id);
+ serialized.set_url(url);
data_fd = -1;
return serialized;
}
vector<iovec> collect_iovecs(const vector<iovec> &data, size_t bytes_wanted)
{
vector<iovec> ret;
- for (size_t i = 0; i < data.size() && bytes_wanted > 0; ++i) {
+ size_t max_iovecs = std::min<size_t>(data.size(), IOV_MAX);
+ for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) {
if (data[i].iov_len <= bytes_wanted) {
// Consume the entire iovec.
ret.push_back(data[i]);
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
- log_perror("pwrite");
+ log_perror("pwritev");
// Dazed and confused, but trying to continue...
return;
}
hdr.flags = htonl(0);
iovec iov;
- iov.iov_base = new char[sizeof(hdr)];
+ iov.iov_base = new char[bytes + sizeof(hdr)];
+ iov.iov_len = bytes + sizeof(hdr);
+
memcpy(iov.iov_base, &hdr, sizeof(hdr));
- iov.iov_len = sizeof(hdr);
+ memcpy(reinterpret_cast<char *>(iov.iov_base) + sizeof(hdr), data, bytes);
+
+ queued_data.push_back(iov);
+ } else if (encoding == Stream::STREAM_ENCODING_RAW) {
+ // Just add the data itself.
+ iovec iov;
+ iov.iov_base = new char[bytes];
+ memcpy(iov.iov_base, data, bytes);
+ iov.iov_len = bytes;
+
queued_data.push_back(iov);
} else {
- assert(encoding == Stream::STREAM_ENCODING_RAW);
+ assert(false);
}
-
- // Add the data itself.
- iovec iov;
- iov.iov_base = new char[bytes];
- memcpy(iov.iov_base, data, bytes);
- iov.iov_len = bytes;
- queued_data.push_back(iov);
}
void Stream::process_queued_data()