From 1e15bf9054e65adfce268578f4e474c980ef6443 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 20 Apr 2013 16:30:16 +0200 Subject: [PATCH] Move iovecs around instead of having single data buffers. Hopefully a tad more efficient, although it does mean somewhat more malloc-ing. --- stream.cpp | 126 ++++++++++++++++++++++++++++++++++++----------------- stream.h | 11 +++-- 2 files changed, 93 insertions(+), 44 deletions(-) diff --git a/stream.cpp b/stream.cpp index 4be673c..17b5949 100644 --- a/stream.cpp +++ b/stream.cpp @@ -111,7 +111,13 @@ void Stream::set_backlog_size(size_t new_size) // Now cheat a bit by rewinding, and adding all the old data back. bytes_received -= existing_data.size(); - add_data_raw(existing_data.data(), existing_data.size()); + iovec iov; + iov.iov_base = const_cast(existing_data.data()); + iov.iov_len = existing_data.size(); + + vector iovs; + iovs.push_back(iov); + add_data_raw(iovs); } void Stream::put_client_to_sleep(Client *client) @@ -119,65 +125,101 @@ void Stream::put_client_to_sleep(Client *client) sleeping_clients.push_back(client); } -void Stream::add_data_raw(const char *data, ssize_t bytes) +// Return a new set of iovecs that contains only the first bytes of . +vector collect_iovecs(const vector &data, size_t bytes_wanted) { - size_t pos = bytes_received % backlog_size; - bytes_received += bytes; - - if (pos + bytes > backlog_size) { - ssize_t to_copy = backlog_size - pos; - while (to_copy > 0) { - int ret = pwrite(data_fd, data, to_copy, pos); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret == -1) { - log_perror("pwrite"); - // Dazed and confused, but trying to continue... - break; - } - pos += ret; - data += ret; - to_copy -= ret; - bytes -= ret; + vector ret; + for (size_t i = 0; i < data.size() && bytes_wanted > 0; ++i) { + if (data[i].iov_len <= bytes_wanted) { + // Consume the entire iovec. + ret.push_back(data[i]); + bytes_wanted -= data[i].iov_len; + } else { + // Take only parts of this iovec. + iovec iov; + iov.iov_base = data[i].iov_base; + iov.iov_len = bytes_wanted; + ret.push_back(iov); + bytes_wanted = 0; } - pos = 0; } + return ret; +} - while (bytes > 0) { - int ret = pwrite(data_fd, data, bytes, pos); - if (ret == -1 && errno == EINTR) { - continue; +// Return a new set of iovecs that contains all of except the first bytes. +vector remove_iovecs(const vector &data, size_t bytes_wanted) +{ + vector ret; + size_t i; + for (i = 0; i < data.size() && bytes_wanted > 0; ++i) { + if (data[i].iov_len <= bytes_wanted) { + // Consume the entire iovec. + bytes_wanted -= data[i].iov_len; + } else { + // Take only parts of this iovec. + iovec iov; + iov.iov_base = reinterpret_cast(data[i].iov_base) + bytes_wanted; + iov.iov_len = data[i].iov_len - bytes_wanted; + ret.push_back(iov); + bytes_wanted = 0; } + } + + // Add the rest of the iovecs unchanged. + ret.insert(ret.end(), data.begin() + i, data.end()); + return ret; +} + +void Stream::add_data_raw(const vector &orig_data) +{ + vector data = orig_data; + while (!data.empty()) { + size_t pos = bytes_received % backlog_size; + + // Collect as many iovecs as we can before we hit the point + // where the circular buffer wraps around. + vector to_write = collect_iovecs(data, backlog_size - pos); + ssize_t ret; + do { + ret = pwritev(data_fd, to_write.data(), to_write.size(), pos); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { log_perror("pwrite"); // Dazed and confused, but trying to continue... - break; + return; } - pos += ret; - data += ret; - bytes -= ret; + bytes_received += ret; + + // Remove the data that was actually written from the set of iovecs. + data = remove_iovecs(data, ret); } } void Stream::add_data_deferred(const char *data, size_t bytes) { - if (encoding == Stream::STREAM_ENCODING_RAW) { - queued_data.append(string(data, data + bytes)); - } else if (encoding == STREAM_ENCODING_METACUBE) { + if (encoding == Stream::STREAM_ENCODING_METACUBE) { + // Add a Metacube block header before the data. metacube_block_header hdr; memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync)); hdr.size = htonl(bytes); hdr.flags = htonl(0); - char *block = new char[bytes + sizeof(hdr)]; - memcpy(block, &hdr, sizeof(hdr)); - memcpy(block + sizeof(hdr), data, bytes); - queued_data.append(string(block, block + bytes + sizeof(hdr))); - delete[] block; + iovec iov; + iov.iov_base = new char[sizeof(hdr)]; + memcpy(iov.iov_base, &hdr, sizeof(hdr)); + iov.iov_len = sizeof(hdr); + queued_data.push_back(iov); } else { - assert(false); + assert(encoding == Stream::STREAM_ENCODING_RAW); } + + // Add the data itself. + iovec iov; + iov.iov_base = new char[bytes]; + memcpy(iov.iov_base, data, bytes); + iov.iov_len = bytes; + queued_data.push_back(iov); } void Stream::process_queued_data() @@ -186,7 +228,11 @@ void Stream::process_queued_data() return; } - add_data_raw(queued_data.data(), queued_data.size()); + add_data_raw(queued_data); + for (size_t i = 0; i < queued_data.size(); ++i) { + char *data = reinterpret_cast(queued_data[i].iov_base); + delete[] data; + } queued_data.clear(); // We have more data, so wake up all clients. diff --git a/stream.h b/stream.h index 97451f0..33fb55e 100644 --- a/stream.h +++ b/stream.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -70,7 +71,8 @@ struct Stream { MarkPool *mark_pool; // Queued data, if any. Protected by the owning Server's . - std::string queued_data; + // The data pointers in the iovec are owned by us. + std::vector queued_data; // Put client to sleep, since there is no more data for it; we will on // longer listen on POLLOUT until we get more data. Also, it will be put @@ -78,6 +80,7 @@ struct Stream { void put_client_to_sleep(Client *client); // Add more data to , adding Metacube headers if needed. + // Does not take ownership of . // You should hold the owning Server's . void add_data_deferred(const char *data, size_t bytes); @@ -89,9 +92,9 @@ private: Stream(const Stream& other); // Adds data directly to the stream file descriptor, without adding headers or - // going through . You should hold the owning Server's - // . - void add_data_raw(const char *data, ssize_t bytes); + // going through . + // You should hold the owning Server's . + void add_data_raw(const std::vector &data); }; #endif // !defined(_STREAM_H) -- 2.39.2