14 Stream::Stream(const string &stream_id, size_t backlog_size)
15 : stream_id(stream_id),
16 data_fd(make_tempfile("")),
17 backlog_size(backlog_size),
32 } while (ret == -1 && errno == EINTR);
39 Stream::Stream(const StreamProto &serialized)
40 : stream_id(serialized.stream_id()),
41 header(serialized.header()),
42 data_fd(make_tempfile(serialized.data())),
43 backlog_size(serialized.backlog_size()),
44 bytes_received(serialized.bytes_received()),
52 StreamProto Stream::serialize()
54 StreamProto serialized;
55 serialized.set_header(header);
56 if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd.
59 serialized.set_backlog_size(backlog_size);
60 serialized.set_bytes_received(bytes_received);
61 serialized.set_stream_id(stream_id);
66 void Stream::put_client_to_sleep(Client *client)
68 sleeping_clients.push_back(client);
71 void Stream::wake_up_all_clients()
73 if (to_process.empty()) {
74 swap(sleeping_clients, to_process);
76 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
77 sleeping_clients.clear();