Identify UDPInput error messages by the stream, too.
[cubemap] / stream.cpp
1 #include <errno.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <string>
6 #include <vector>
7
8 #include "state.pb.h"
9 #include "log.h"
10 #include "stream.h"
11 #include "util.h"
12
13 using namespace std;
14
15 Stream::Stream(const string &stream_id, size_t backlog_size)
16         : stream_id(stream_id),
17           data_fd(make_tempfile("")),
18           backlog_size(backlog_size),
19           bytes_received(0),
20           mark_pool(NULL)
21 {
22         if (data_fd == -1) {
23                 exit(1);
24         }
25 }
26
27 Stream::~Stream()
28 {
29         if (data_fd != -1) {
30                 int ret;
31                 do {
32                         ret = close(data_fd);
33                 } while (ret == -1 && errno == EINTR);
34                 if (ret == -1) {
35                         log_perror("close");
36                 }
37         }
38 }
39
40 Stream::Stream(const StreamProto &serialized)
41         : stream_id(serialized.stream_id()),
42           header(serialized.header()),
43           data_fd(make_tempfile(serialized.data())),
44           backlog_size(serialized.backlog_size()),
45           bytes_received(serialized.bytes_received()),
46           mark_pool(NULL)
47 {
48         if (data_fd == -1) {
49                 exit(1);
50         }
51 }
52
53 StreamProto Stream::serialize()
54 {
55         StreamProto serialized;
56         serialized.set_header(header);
57         if (!read_tempfile(data_fd, serialized.mutable_data())) {  // Closes data_fd.
58                 exit(1);
59         }
60         serialized.set_backlog_size(backlog_size);
61         serialized.set_bytes_received(bytes_received);
62         serialized.set_stream_id(stream_id);
63         data_fd = -1;
64         return serialized;
65 }
66         
67 void Stream::set_backlog_size(size_t new_size)
68 {
69         if (backlog_size == new_size) {
70                 return;
71         }
72
73         string existing_data;
74         if (!read_tempfile(data_fd, &existing_data)) {  // Closes data_fd.
75                 exit(1);
76         }
77
78         // Unwrap the data so it's no longer circular.
79         if (bytes_received <= backlog_size) {
80                 existing_data.resize(bytes_received);
81         } else {
82                 size_t pos = bytes_received % backlog_size;
83                 existing_data = existing_data.substr(pos, string::npos) +
84                         existing_data.substr(0, pos);
85         }
86
87         // See if we need to discard data.
88         if (new_size < existing_data.size()) {
89                 size_t to_discard = existing_data.size() - new_size;
90                 existing_data = existing_data.substr(to_discard, string::npos);
91         }
92
93         // Create a new, empty data file.
94         data_fd = make_tempfile("");
95         backlog_size = new_size;
96
97         // Now cheat a bit by rewinding, and adding all the old data back.
98         bytes_received -= existing_data.size();
99         add_data(existing_data.data(), existing_data.size());
100 }
101
102 void Stream::put_client_to_sleep(Client *client)
103 {
104         sleeping_clients.push_back(client);
105 }
106
107 void Stream::add_data(const char *data, ssize_t bytes)
108 {
109         size_t pos = bytes_received % backlog_size;
110         bytes_received += bytes;
111
112         if (pos + bytes > backlog_size) {
113                 ssize_t to_copy = backlog_size - pos;
114                 while (to_copy > 0) {
115                         int ret = pwrite(data_fd, data, to_copy, pos);
116                         if (ret == -1 && errno == EINTR) {
117                                 continue;
118                         }
119                         if (ret == -1) {
120                                 log_perror("pwrite");
121                                 // Dazed and confused, but trying to continue...
122                                 break;
123                         }
124                         pos += ret;
125                         data += ret;
126                         to_copy -= ret;
127                         bytes -= ret;
128                 }
129                 pos = 0;
130         }
131
132         while (bytes > 0) {
133                 int ret = pwrite(data_fd, data, bytes, pos);
134                 if (ret == -1 && errno == EINTR) {
135                         continue;
136                 }
137                 if (ret == -1) {
138                         log_perror("pwrite");
139                         // Dazed and confused, but trying to continue...
140                         break;
141                 }
142                 pos += ret;
143                 data += ret;
144                 bytes -= ret;
145         }
146 }
147
148 void Stream::wake_up_all_clients()
149 {
150         if (to_process.empty()) {
151                 swap(sleeping_clients, to_process);
152         } else {
153                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
154                 sleeping_clients.clear();
155         }
156 }