void Server::add_data_deferred(const string &stream_id, const char *data, size_t bytes)
{
MutexLock lock(&queued_data_mutex);
- queued_data[stream_id].append(string(data, data + bytes));
+ find_stream(stream_id)->add_data_deferred(data, bytes);
}
// See the .h file for postconditions after this function.
}
queued_add_clients.clear();
- for (map<string, string>::iterator queued_it = queued_data.begin();
- queued_it != queued_data.end();
- ++queued_it) {
- Stream *stream = find_stream(queued_it->first);
- stream->add_data(queued_it->second.data(), queued_it->second.size());
- stream->wake_up_all_clients();
+ for (map<string, Stream *>::iterator stream_it = streams.begin();
+ stream_it != streams.end();
+ ++stream_it) {
+ stream_it->second->process_queued_data();
}
- queued_data.clear();
}
void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
private:
- // Mutex protecting queued_data only. Note that if you want to hold both this
- // and <mutex> below, you will need to take <mutex> before this one.
+ // Mutex protecting queued_add_clients and streams[..]->queued_data.
+ // Note that if you want to hold both this and <mutex> below,
+ // you will need to take <mutex> before this one.
mutable pthread_mutex_t queued_data_mutex;
// Deferred commands that should be run from the do_work() thread as soon as possible.
//
// Protected by <queued_data_mutex>.
std::vector<int> queued_add_clients;
- std::map<std::string, std::string> queued_data;
// All variables below this line are protected by the mutex.
mutable pthread_mutex_t mutex;
sleeping_clients.push_back(client);
}
-void Stream::add_data(const char *data, ssize_t bytes)
-{
- if (encoding == Stream::STREAM_ENCODING_RAW) {
- add_data_raw(data, bytes);
- } else if (encoding == STREAM_ENCODING_METACUBE) {
- metacube_block_header hdr;
- memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
- hdr.size = htonl(bytes);
- hdr.flags = htonl(0);
-
- char *block = new char[bytes + sizeof(hdr)];
- memcpy(block, &hdr, sizeof(hdr));
- memcpy(block + sizeof(hdr), data, bytes);
- add_data_raw(block, bytes + sizeof(hdr));
- delete[] block;
- } else {
- assert(false);
- }
-}
-
void Stream::add_data_raw(const char *data, ssize_t bytes)
{
size_t pos = bytes_received % backlog_size;
}
}
-void Stream::wake_up_all_clients()
+void Stream::add_data_deferred(const char *data, size_t bytes)
+{
+ if (encoding == Stream::STREAM_ENCODING_RAW) {
+ queued_data.append(string(data, data + bytes));
+ } else if (encoding == STREAM_ENCODING_METACUBE) {
+ metacube_block_header hdr;
+ memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(bytes);
+ hdr.flags = htonl(0);
+
+ char *block = new char[bytes + sizeof(hdr)];
+ memcpy(block, &hdr, sizeof(hdr));
+ memcpy(block + sizeof(hdr), data, bytes);
+ queued_data.append(string(block, block + bytes + sizeof(hdr)));
+ delete[] block;
+ } else {
+ assert(false);
+ }
+}
+
+void Stream::process_queued_data()
{
+ if (queued_data.empty()) {
+ return;
+ }
+
+ add_data_raw(queued_data.data(), queued_data.size());
+ queued_data.clear();
+
+ // We have more data, so wake up all clients.
if (to_process.empty()) {
swap(sleeping_clients, to_process);
} else {
// What pool to fetch marks from, or NULL.
MarkPool *mark_pool;
+ // Queued data, if any. Protected by the owning Server's <queued_data_mutex>.
+ std::string queued_data;
+
// Put client to sleep, since there is no more data for it; we will on
// longer listen on POLLOUT until we get more data. Also, it will be put
// in the list of clients to wake up when we do.
void put_client_to_sleep(Client *client);
- // Add more input data to the stream. You should probably call wake_up_all_clients()
- // after that.
- void add_data(const char *data, ssize_t bytes);
+ // Add more data to <queued_data>, adding Metacube headers if needed.
+ // You should hold the owning Server's <queued_data_mutex>.
+ void add_data_deferred(const char *data, size_t bytes);
- // We have more data, so mark all clients that are sleeping as ready to go.
- void wake_up_all_clients();
+ // Add queued data to the stream, if any.
+ // You should hold the owning Server's <mutex> _and_ <queued_data_mutex>.
+ void process_queued_data();
private:
Stream(const Stream& other);
+ // Adds data directly to the stream file descriptor, without adding headers or
+ // going through <queued_data>. You should hold the owning Server's
+ // <mutex>.
void add_data_raw(const char *data, ssize_t bytes);
};