From ae994771c0747d43bd1ed422224f4caacb95ca9f Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sat, 13 Apr 2013 22:35:45 +0200 Subject: [PATCH] Move Server:add_data() into Stream, where it more logically belongs. --- server.cpp | 47 ++--------------------------------------------- server.h | 1 - stream.cpp | 43 +++++++++++++++++++++++++++++++++++++++++++ stream.h | 7 +++++-- 4 files changed, 50 insertions(+), 48 deletions(-) diff --git a/server.cpp b/server.cpp index dfd5b97..266d418 100644 --- a/server.cpp +++ b/server.cpp @@ -225,50 +225,6 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t queued_data[stream_id].append(string(data, data + bytes)); } -void Server::add_data(const string &stream_id, const char *data, ssize_t bytes) -{ - Stream *stream = find_stream(stream_id); - size_t pos = stream->bytes_received % stream->backlog_size; - stream->bytes_received += bytes; - - if (pos + bytes > stream->backlog_size) { - ssize_t to_copy = stream->backlog_size - pos; - while (to_copy > 0) { - int ret = pwrite(stream->data_fd, data, to_copy, pos); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret == -1) { - perror("pwrite"); - // Dazed and confused, but trying to continue... - break; - } - pos += ret; - data += ret; - to_copy -= ret; - bytes -= ret; - } - pos = 0; - } - - while (bytes > 0) { - int ret = pwrite(stream->data_fd, data, bytes, pos); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret == -1) { - perror("pwrite"); - // Dazed and confused, but trying to continue... - break; - } - pos += ret; - data += ret; - bytes -= ret; - } - - stream->wake_up_all_clients(); -} - // See the .h file for postconditions after this function. void Server::process_client(Client *client) { @@ -569,7 +525,8 @@ void Server::process_queued_data() for (map::iterator queued_it = queued_data.begin(); queued_it != queued_data.end(); ++queued_it) { - add_data(queued_it->first, queued_it->second.data(), queued_it->second.size()); + Stream *stream = find_stream(queued_it->first); + stream->add_data(queued_it->second.data(), queued_it->second.size()); } queued_data.clear(); } diff --git a/server.h b/server.h index e8114cf..94fdb67 100644 --- a/server.h +++ b/server.h @@ -123,7 +123,6 @@ private: void process_queued_data(); void add_client(int sock); - void add_data(const std::string &stream_id, const char *data, ssize_t bytes); }; #endif // !defined(_SERVER_H) diff --git a/stream.cpp b/stream.cpp index 080d3bf..d571578 100644 --- a/stream.cpp +++ b/stream.cpp @@ -68,6 +68,49 @@ void Stream::put_client_to_sleep(Client *client) sleeping_clients.push_back(client); } +void Stream::add_data(const char *data, ssize_t bytes) +{ + size_t pos = bytes_received % backlog_size; + bytes_received += bytes; + + if (pos + bytes > backlog_size) { + ssize_t to_copy = backlog_size - pos; + while (to_copy > 0) { + int ret = pwrite(data_fd, data, to_copy, pos); + if (ret == -1 && errno == EINTR) { + continue; + } + if (ret == -1) { + perror("pwrite"); + // Dazed and confused, but trying to continue... + break; + } + pos += ret; + data += ret; + to_copy -= ret; + bytes -= ret; + } + pos = 0; + } + + while (bytes > 0) { + int ret = pwrite(data_fd, data, bytes, pos); + if (ret == -1 && errno == EINTR) { + continue; + } + if (ret == -1) { + perror("pwrite"); + // Dazed and confused, but trying to continue... + break; + } + pos += ret; + data += ret; + bytes -= ret; + } + + wake_up_all_clients(); +} + void Stream::wake_up_all_clients() { if (to_process.empty()) { diff --git a/stream.h b/stream.h index ea13f40..a57c6d8 100644 --- a/stream.h +++ b/stream.h @@ -60,11 +60,14 @@ struct Stream { // in the list of clients to wake up when we do. void put_client_to_sleep(Client *client); - // We have more data, so mark all clients that are sleeping as ready to go. - void wake_up_all_clients(); + // Add more input data to the stream, and wake up all clients that are sleeping. + void add_data(const char *data, ssize_t bytes); private: Stream(const Stream& other); + + // We have more data, so mark all clients that are sleeping as ready to go. + void wake_up_all_clients(); }; #endif // !defined(_STREAM_H) -- 2.39.2