14 Stream::Stream(const string &stream_id, size_t backlog_size)
15 : stream_id(stream_id),
16 data_fd(make_tempfile("")),
17 backlog_size(backlog_size),
32 } while (ret == -1 && errno == EINTR);
39 Stream::Stream(const StreamProto &serialized)
40 : stream_id(serialized.stream_id()),
41 header(serialized.header()),
42 data_fd(make_tempfile(serialized.data())),
43 backlog_size(serialized.backlog_size()),
44 bytes_received(serialized.bytes_received()),
52 StreamProto Stream::serialize()
54 StreamProto serialized;
55 serialized.set_header(header);
56 if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd.
59 serialized.set_backlog_size(backlog_size);
60 serialized.set_bytes_received(bytes_received);
61 serialized.set_stream_id(stream_id);
66 void Stream::set_backlog_size(size_t new_size)
68 if (backlog_size == new_size) {
73 if (!read_tempfile(data_fd, &existing_data)) { // Closes data_fd.
77 // Unwrap the data so it's no longer circular.
78 if (bytes_received <= backlog_size) {
79 existing_data.resize(bytes_received);
81 size_t pos = bytes_received % backlog_size;
82 existing_data = existing_data.substr(pos, string::npos) +
83 existing_data.substr(0, pos);
86 // See if we need to discard data.
87 if (new_size < existing_data.size()) {
88 size_t to_discard = existing_data.size() - new_size;
89 existing_data = existing_data.substr(to_discard, string::npos);
92 // Create a new, empty data file.
93 data_fd = make_tempfile("");
94 backlog_size = new_size;
96 // Now cheat a bit by rewinding, and adding all the old data back.
97 bytes_received -= existing_data.size();
98 add_data(existing_data.data(), existing_data.size());
101 void Stream::put_client_to_sleep(Client *client)
103 sleeping_clients.push_back(client);
106 void Stream::add_data(const char *data, ssize_t bytes)
108 size_t pos = bytes_received % backlog_size;
109 bytes_received += bytes;
111 if (pos + bytes > backlog_size) {
112 ssize_t to_copy = backlog_size - pos;
113 while (to_copy > 0) {
114 int ret = pwrite(data_fd, data, to_copy, pos);
115 if (ret == -1 && errno == EINTR) {
120 // Dazed and confused, but trying to continue...
132 int ret = pwrite(data_fd, data, bytes, pos);
133 if (ret == -1 && errno == EINTR) {
138 // Dazed and confused, but trying to continue...
147 void Stream::wake_up_all_clients()
149 if (to_process.empty()) {
150 swap(sleeping_clients, to_process);
152 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
153 sleeping_clients.clear();