]> git.sesse.net Git - cubemap/blobdiff - stream.cpp
Never send more than IOV_MAX (1024) iovecs in pwritev.
[cubemap] / stream.cpp
index 17b5949172a531cae5a37eb4f8948dbf177bb4a9..e8d65db1a8cfda924226ad1838d087ad01a176a9 100644 (file)
@@ -1,9 +1,10 @@
 #include <assert.h>
 #include <errno.h>
+#include <limits.h>
 #include <netinet/in.h>
 #include <stdlib.h>
 #include <string.h>
-#include <unistd.h>
+#include <sys/types.h>
 #include <string>
 #include <vector>
 
@@ -15,8 +16,8 @@
 
 using namespace std;
 
-Stream::Stream(const string &stream_id, size_t backlog_size, Encoding encoding)
-       : stream_id(stream_id),
+Stream::Stream(const string &url, size_t backlog_size, Encoding encoding)
+       : url(url),
          encoding(encoding),
          data_fd(make_tempfile("")),
           backlog_size(backlog_size),
@@ -36,7 +37,7 @@ Stream::~Stream()
 }
 
 Stream::Stream(const StreamProto &serialized, int data_fd)
-       : stream_id(serialized.stream_id()),
+       : url(serialized.url()),
          http_header(serialized.http_header()),
          stream_header(serialized.stream_header()),
          encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
@@ -71,7 +72,7 @@ StreamProto Stream::serialize()
        serialized.add_data_fds(data_fd);
        serialized.set_backlog_size(backlog_size);
        serialized.set_bytes_received(bytes_received);
-       serialized.set_stream_id(stream_id);
+       serialized.set_url(url);
        data_fd = -1;
        return serialized;
 }
@@ -129,7 +130,8 @@ void Stream::put_client_to_sleep(Client *client)
 vector<iovec> collect_iovecs(const vector<iovec> &data, size_t bytes_wanted)
 {
        vector<iovec> ret;
-       for (size_t i = 0; i < data.size() && bytes_wanted > 0; ++i) {
+       size_t max_iovecs = std::min<size_t>(data.size(), IOV_MAX);
+       for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) {
                if (data[i].iov_len <= bytes_wanted) {
                        // Consume the entire iovec.
                        ret.push_back(data[i]);
@@ -185,7 +187,7 @@ void Stream::add_data_raw(const vector<iovec> &orig_data)
                } while (ret == -1 && errno == EINTR);
 
                if (ret == -1) {
-                       log_perror("pwrite");
+                       log_perror("pwritev");
                        // Dazed and confused, but trying to continue...
                        return;
                }
@@ -206,20 +208,24 @@ void Stream::add_data_deferred(const char *data, size_t bytes)
                hdr.flags = htonl(0);
 
                iovec iov;
-               iov.iov_base = new char[sizeof(hdr)];
+               iov.iov_base = new char[bytes + sizeof(hdr)];
+               iov.iov_len = bytes + sizeof(hdr);
+
                memcpy(iov.iov_base, &hdr, sizeof(hdr));
-               iov.iov_len = sizeof(hdr);
+               memcpy(reinterpret_cast<char *>(iov.iov_base) + sizeof(hdr), data, bytes);
+
+               queued_data.push_back(iov);
+       } else if (encoding == Stream::STREAM_ENCODING_RAW) {
+               // Just add the data itself.
+               iovec iov;
+               iov.iov_base = new char[bytes];
+               memcpy(iov.iov_base, data, bytes);
+               iov.iov_len = bytes;
+
                queued_data.push_back(iov);
        } else {
-               assert(encoding == Stream::STREAM_ENCODING_RAW);
+               assert(false);
        }
-
-       // Add the data itself.
-       iovec iov;
-       iov.iov_base = new char[bytes];
-       memcpy(iov.iov_base, data, bytes);
-       iov.iov_len = bytes;
-       queued_data.push_back(iov);
 }
 
 void Stream::process_queued_data()