Make backlog_size changeable across HUPs.
[cubemap] / stream.cpp
1 #include <errno.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <string>
6 #include <vector>
7
8 #include "state.pb.h"
9 #include "stream.h"
10 #include "util.h"
11
12 using namespace std;
13
14 Stream::Stream(const string &stream_id, size_t backlog_size)
15         : stream_id(stream_id),
16           data_fd(make_tempfile("")),
17           backlog_size(backlog_size),
18           bytes_received(0),
19           mark_pool(NULL)
20 {
21         if (data_fd == -1) {
22                 exit(1);
23         }
24 }
25
26 Stream::~Stream()
27 {
28         if (data_fd != -1) {
29                 int ret;
30                 do {
31                         ret = close(data_fd);
32                 } while (ret == -1 && errno == EINTR);
33                 if (ret == -1) {
34                         perror("close");
35                 }
36         }
37 }
38
39 Stream::Stream(const StreamProto &serialized)
40         : stream_id(serialized.stream_id()),
41           header(serialized.header()),
42           data_fd(make_tempfile(serialized.data())),
43           backlog_size(serialized.backlog_size()),
44           bytes_received(serialized.bytes_received()),
45           mark_pool(NULL)
46 {
47         if (data_fd == -1) {
48                 exit(1);
49         }
50 }
51
52 StreamProto Stream::serialize()
53 {
54         StreamProto serialized;
55         serialized.set_header(header);
56         if (!read_tempfile(data_fd, serialized.mutable_data())) {  // Closes data_fd.
57                 exit(1);
58         }
59         serialized.set_backlog_size(backlog_size);
60         serialized.set_bytes_received(bytes_received);
61         serialized.set_stream_id(stream_id);
62         data_fd = -1;
63         return serialized;
64 }
65         
66 void Stream::set_backlog_size(size_t new_size)
67 {
68         if (backlog_size == new_size) {
69                 return;
70         }
71
72         string existing_data;
73         if (!read_tempfile(data_fd, &existing_data)) {  // Closes data_fd.
74                 exit(1);
75         }
76
77         // Unwrap the data so it's no longer circular.
78         if (bytes_received <= backlog_size) {
79                 existing_data.resize(bytes_received);
80         } else {
81                 size_t pos = bytes_received % backlog_size;
82                 existing_data = existing_data.substr(pos, string::npos) +
83                         existing_data.substr(0, pos);
84         }
85
86         // See if we need to discard data.
87         if (new_size < existing_data.size()) {
88                 size_t to_discard = existing_data.size() - new_size;
89                 existing_data = existing_data.substr(to_discard, string::npos);
90         }
91
92         // Create a new, empty data file.
93         data_fd = make_tempfile("");
94         backlog_size = new_size;
95
96         // Now cheat a bit by rewinding, and adding all the old data back.
97         bytes_received -= existing_data.size();
98         add_data(existing_data.data(), existing_data.size());
99 }
100
101 void Stream::put_client_to_sleep(Client *client)
102 {
103         sleeping_clients.push_back(client);
104 }
105
106 void Stream::add_data(const char *data, ssize_t bytes)
107 {
108         size_t pos = bytes_received % backlog_size;
109         bytes_received += bytes;
110
111         if (pos + bytes > backlog_size) {
112                 ssize_t to_copy = backlog_size - pos;
113                 while (to_copy > 0) {
114                         int ret = pwrite(data_fd, data, to_copy, pos);
115                         if (ret == -1 && errno == EINTR) {
116                                 continue;
117                         }
118                         if (ret == -1) {
119                                 perror("pwrite");
120                                 // Dazed and confused, but trying to continue...
121                                 break;
122                         }
123                         pos += ret;
124                         data += ret;
125                         to_copy -= ret;
126                         bytes -= ret;
127                 }
128                 pos = 0;
129         }
130
131         while (bytes > 0) {
132                 int ret = pwrite(data_fd, data, bytes, pos);
133                 if (ret == -1 && errno == EINTR) {
134                         continue;
135                 }
136                 if (ret == -1) {
137                         perror("pwrite");
138                         // Dazed and confused, but trying to continue...
139                         break;
140                 }
141                 pos += ret;
142                 data += ret;
143                 bytes -= ret;
144         }
145 }
146
147 void Stream::wake_up_all_clients()
148 {
149         if (to_process.empty()) {
150                 swap(sleeping_clients, to_process);
151         } else {
152                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
153                 sleeping_clients.clear();
154         }
155 }