15 Stream::Stream(const string &stream_id, size_t backlog_size)
16 : stream_id(stream_id),
17 data_fd(make_tempfile("")),
18 backlog_size(backlog_size),
33 } while (ret == -1 && errno == EINTR);
40 Stream::Stream(const StreamProto &serialized)
41 : stream_id(serialized.stream_id()),
42 header(serialized.header()),
43 data_fd(make_tempfile(serialized.data())),
44 backlog_size(serialized.backlog_size()),
45 bytes_received(serialized.bytes_received()),
53 StreamProto Stream::serialize()
55 StreamProto serialized;
56 serialized.set_header(header);
57 if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd.
60 serialized.set_backlog_size(backlog_size);
61 serialized.set_bytes_received(bytes_received);
62 serialized.set_stream_id(stream_id);
67 void Stream::set_backlog_size(size_t new_size)
69 if (backlog_size == new_size) {
74 if (!read_tempfile(data_fd, &existing_data)) { // Closes data_fd.
78 // Unwrap the data so it's no longer circular.
79 if (bytes_received <= backlog_size) {
80 existing_data.resize(bytes_received);
82 size_t pos = bytes_received % backlog_size;
83 existing_data = existing_data.substr(pos, string::npos) +
84 existing_data.substr(0, pos);
87 // See if we need to discard data.
88 if (new_size < existing_data.size()) {
89 size_t to_discard = existing_data.size() - new_size;
90 existing_data = existing_data.substr(to_discard, string::npos);
93 // Create a new, empty data file.
94 data_fd = make_tempfile("");
95 backlog_size = new_size;
97 // Now cheat a bit by rewinding, and adding all the old data back.
98 bytes_received -= existing_data.size();
99 add_data(existing_data.data(), existing_data.size());
102 void Stream::put_client_to_sleep(Client *client)
104 sleeping_clients.push_back(client);
107 void Stream::add_data(const char *data, ssize_t bytes)
109 size_t pos = bytes_received % backlog_size;
110 bytes_received += bytes;
112 if (pos + bytes > backlog_size) {
113 ssize_t to_copy = backlog_size - pos;
114 while (to_copy > 0) {
115 int ret = pwrite(data_fd, data, to_copy, pos);
116 if (ret == -1 && errno == EINTR) {
120 log_perror("pwrite");
121 // Dazed and confused, but trying to continue...
133 int ret = pwrite(data_fd, data, bytes, pos);
134 if (ret == -1 && errno == EINTR) {
138 log_perror("pwrite");
139 // Dazed and confused, but trying to continue...
148 void Stream::wake_up_all_clients()
150 if (to_process.empty()) {
151 swap(sleeping_clients, to_process);
153 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
154 sleeping_clients.clear();