X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=server.cpp;h=949d6a4d6958eff742c6c75783ed18f48d7f49c2;hp=dfd5b97dffb4e67c38c8a585df77269686e873a1;hb=1071b76d8e60d51d88d4e1310e7d6261b628a454;hpb=488f28bf7070f44469a006ed4a9d4c423788d175 diff --git a/server.cpp b/server.cpp index dfd5b97..949d6a4 100644 --- a/server.cpp +++ b/server.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -175,7 +174,7 @@ void Server::add_client_from_serialized(const ClientProto &client) } if (client_ptr->state == Client::SENDING_DATA && - client_ptr->bytes_sent == client_ptr->stream->bytes_received) { + client_ptr->stream_pos == client_ptr->stream->bytes_received) { client_ptr->stream->put_client_to_sleep(client_ptr); } else { process_client(client_ptr); @@ -194,6 +193,13 @@ void Server::add_stream_from_serialized(const StreamProto &stream) streams.insert(make_pair(stream.stream_id(), new Stream(stream))); } +void Server::set_backlog_size(const std::string &stream_id, size_t new_size) +{ + MutexLock lock(&mutex); + assert(streams.count(stream_id) != 0); + streams[stream_id]->set_backlog_size(new_size); +} + void Server::set_header(const string &stream_id, const string &header) { MutexLock lock(&mutex); @@ -206,7 +212,7 @@ void Server::set_header(const string &stream_id, const string &header) ++client_it) { Client *client = &client_it->second; if (client->state == Client::SENDING_DATA && - client->bytes_sent == 0) { + client->stream_pos == 0) { construct_header(client); } } @@ -225,50 +231,6 @@ void Server::add_data_deferred(const string &stream_id, const char *data, size_t queued_data[stream_id].append(string(data, data + bytes)); } -void Server::add_data(const string &stream_id, const char *data, ssize_t bytes) -{ - Stream *stream = find_stream(stream_id); - size_t pos = stream->bytes_received % stream->backlog_size; - stream->bytes_received += bytes; - - if (pos + bytes > stream->backlog_size) { - ssize_t to_copy = stream->backlog_size - pos; - while (to_copy > 0) { - int ret = pwrite(stream->data_fd, data, to_copy, pos); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret == -1) { - perror("pwrite"); - // Dazed and confused, but trying to continue... - break; - } - pos += ret; - data += ret; - to_copy -= ret; - bytes -= ret; - } - pos = 0; - } - - while (bytes > 0) { - int ret = pwrite(stream->data_fd, data, bytes, pos); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret == -1) { - perror("pwrite"); - // Dazed and confused, but trying to continue... - break; - } - pos += ret; - data += ret; - bytes -= ret; - } - - stream->wake_up_all_clients(); -} - // See the .h file for postconditions after this function. void Server::process_client(Client *client) { @@ -377,7 +339,7 @@ sending_header_or_error_again: // but we'll start sending immediately as we get data. // This is postcondition #3. client->state = Client::SENDING_DATA; - client->bytes_sent = client->stream->bytes_received; + client->stream_pos = client->stream->bytes_received; client->stream->put_client_to_sleep(client); return; } @@ -386,7 +348,7 @@ sending_data_again: // See if there's some data we've lost. Ideally, we should drop to a block boundary, // but resync will be the mux's problem. Stream *stream = client->stream; - size_t bytes_to_send = stream->bytes_received - client->bytes_sent; + size_t bytes_to_send = stream->bytes_received - client->stream_pos; if (bytes_to_send == 0) { return; } @@ -394,20 +356,20 @@ sending_data_again: fprintf(stderr, "WARNING: fd %d lost %lld bytes, maybe too slow connection\n", client->sock, (long long int)(bytes_to_send - stream->backlog_size)); - client->bytes_sent = stream->bytes_received - stream->backlog_size; + client->stream_pos = stream->bytes_received - stream->backlog_size; bytes_to_send = stream->backlog_size; } // See if we need to split across the circular buffer. bool more_data = false; - if ((client->bytes_sent % stream->backlog_size) + bytes_to_send > stream->backlog_size) { - bytes_to_send = stream->backlog_size - (client->bytes_sent % stream->backlog_size); + if ((client->stream_pos % stream->backlog_size) + bytes_to_send > stream->backlog_size) { + bytes_to_send = stream->backlog_size - (client->stream_pos % stream->backlog_size); more_data = true; } ssize_t ret; do { - loff_t offset = client->bytes_sent % stream->backlog_size; + loff_t offset = client->stream_pos % stream->backlog_size; ret = sendfile(client->sock, stream->data_fd, &offset, bytes_to_send); } while (ret == -1 && errno == EINTR); @@ -423,9 +385,9 @@ sending_data_again: close_client(client); return; } - client->bytes_sent += ret; + client->stream_pos += ret; - if (client->bytes_sent == stream->bytes_received) { + if (client->stream_pos == stream->bytes_received) { // We don't have any more data for this client, so put it to sleep. // This is postcondition #3. stream->put_client_to_sleep(client); @@ -569,7 +531,9 @@ void Server::process_queued_data() for (map::iterator queued_it = queued_data.begin(); queued_it != queued_data.end(); ++queued_it) { - add_data(queued_it->first, queued_it->second.data(), queued_it->second.size()); + Stream *stream = find_stream(queued_it->first); + stream->add_data(queued_it->second.data(), queued_it->second.size()); + stream->wake_up_all_clients(); } queued_data.clear(); }