-#include <arpa/inet.h>
+#include <assert.h>
#include <errno.h>
-#include <stdio.h>
+#include <netinet/in.h>
#include <stdlib.h>
+#include <string.h>
#include <unistd.h>
#include <string>
#include <vector>
-#include "state.pb.h"
#include "log.h"
#include "metacube.h"
+#include "state.pb.h"
#include "stream.h"
#include "util.h"
Stream::~Stream()
{
if (data_fd != -1) {
- int ret;
- do {
- ret = close(data_fd);
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- log_perror("close");
- }
+ safe_close(data_fd);
}
}
-Stream::Stream(const StreamProto &serialized)
+Stream::Stream(const StreamProto &serialized, int data_fd)
: stream_id(serialized.stream_id()),
http_header(serialized.http_header()),
stream_header(serialized.stream_header()),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
- data_fd(make_tempfile(serialized.data())),
+ data_fd(data_fd),
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
mark_pool(NULL)
StreamProto serialized;
serialized.set_http_header(http_header);
serialized.set_stream_header(stream_header);
- if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd.
- exit(1);
- }
+ serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
serialized.set_bytes_received(bytes_received);
serialized.set_stream_id(stream_id);
}
string existing_data;
- if (!read_tempfile(data_fd, &existing_data)) { // Closes data_fd.
+ if (!read_tempfile_and_close(data_fd, &existing_data)) {
exit(1);
}
// Create a new, empty data file.
data_fd = make_tempfile("");
+ if (data_fd == -1) {
+ exit(1);
+ }
backlog_size = new_size;
// Now cheat a bit by rewinding, and adding all the old data back.
bytes_received -= existing_data.size();
- add_data(existing_data.data(), existing_data.size());
+ add_data_raw(existing_data.data(), existing_data.size());
}
void Stream::put_client_to_sleep(Client *client)
sleeping_clients.push_back(client);
}
-void Stream::add_data(const char *data, ssize_t bytes)
-{
- if (encoding == Stream::STREAM_ENCODING_RAW) {
- add_data_raw(data, bytes);
- } else if (encoding == STREAM_ENCODING_METACUBE) {
- metacube_block_header hdr;
- memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
- hdr.size = htonl(bytes);
- hdr.flags = htonl(0);
-
- char *block = new char[bytes + sizeof(hdr)];
- memcpy(block, &hdr, sizeof(hdr));
- memcpy(block + sizeof(hdr), data, bytes);
- add_data_raw(block, bytes + sizeof(hdr));
- delete[] block;
- } else {
- assert(false);
- }
-}
-
void Stream::add_data_raw(const char *data, ssize_t bytes)
{
size_t pos = bytes_received % backlog_size;
}
}
-void Stream::wake_up_all_clients()
+void Stream::add_data_deferred(const char *data, size_t bytes)
{
+ if (encoding == Stream::STREAM_ENCODING_RAW) {
+ queued_data.append(string(data, data + bytes));
+ } else if (encoding == STREAM_ENCODING_METACUBE) {
+ metacube_block_header hdr;
+ memcpy(hdr.sync, METACUBE_SYNC, sizeof(hdr.sync));
+ hdr.size = htonl(bytes);
+ hdr.flags = htonl(0);
+
+ char *block = new char[bytes + sizeof(hdr)];
+ memcpy(block, &hdr, sizeof(hdr));
+ memcpy(block + sizeof(hdr), data, bytes);
+ queued_data.append(string(block, block + bytes + sizeof(hdr)));
+ delete[] block;
+ } else {
+ assert(false);
+ }
+}
+
+void Stream::process_queued_data()
+{
+ if (queued_data.empty()) {
+ return;
+ }
+
+ add_data_raw(queued_data.data(), queued_data.size());
+ queued_data.clear();
+
+ // We have more data, so wake up all clients.
if (to_process.empty()) {
swap(sleeping_clients, to_process);
} else {