]> git.sesse.net Git - cubemap/blob - stream.cpp
d57157822aa93939c9250940bec1cf4132f212cb
[cubemap] / stream.cpp
1 #include <errno.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <string>
6 #include <vector>
7
8 #include "state.pb.h"
9 #include "stream.h"
10 #include "util.h"
11
12 using namespace std;
13
14 Stream::Stream(const string &stream_id, size_t backlog_size)
15         : stream_id(stream_id),
16           data_fd(make_tempfile("")),
17           backlog_size(backlog_size),
18           bytes_received(0),
19           mark_pool(NULL)
20 {
21         if (data_fd == -1) {
22                 exit(1);
23         }
24 }
25
26 Stream::~Stream()
27 {
28         if (data_fd != -1) {
29                 int ret;
30                 do {
31                         ret = close(data_fd);
32                 } while (ret == -1 && errno == EINTR);
33                 if (ret == -1) {
34                         perror("close");
35                 }
36         }
37 }
38
39 Stream::Stream(const StreamProto &serialized)
40         : stream_id(serialized.stream_id()),
41           header(serialized.header()),
42           data_fd(make_tempfile(serialized.data())),
43           backlog_size(serialized.backlog_size()),
44           bytes_received(serialized.bytes_received()),
45           mark_pool(NULL)
46 {
47         if (data_fd == -1) {
48                 exit(1);
49         }
50 }
51
52 StreamProto Stream::serialize()
53 {
54         StreamProto serialized;
55         serialized.set_header(header);
56         if (!read_tempfile(data_fd, serialized.mutable_data())) {  // Closes data_fd.
57                 exit(1);
58         }
59         serialized.set_backlog_size(backlog_size);
60         serialized.set_bytes_received(bytes_received);
61         serialized.set_stream_id(stream_id);
62         data_fd = -1;
63         return serialized;
64 }
65
66 void Stream::put_client_to_sleep(Client *client)
67 {
68         sleeping_clients.push_back(client);
69 }
70
71 void Stream::add_data(const char *data, ssize_t bytes)
72 {
73         size_t pos = bytes_received % backlog_size;
74         bytes_received += bytes;
75
76         if (pos + bytes > backlog_size) {
77                 ssize_t to_copy = backlog_size - pos;
78                 while (to_copy > 0) {
79                         int ret = pwrite(data_fd, data, to_copy, pos);
80                         if (ret == -1 && errno == EINTR) {
81                                 continue;
82                         }
83                         if (ret == -1) {
84                                 perror("pwrite");
85                                 // Dazed and confused, but trying to continue...
86                                 break;
87                         }
88                         pos += ret;
89                         data += ret;
90                         to_copy -= ret;
91                         bytes -= ret;
92                 }
93                 pos = 0;
94         }
95
96         while (bytes > 0) {
97                 int ret = pwrite(data_fd, data, bytes, pos);
98                 if (ret == -1 && errno == EINTR) {
99                         continue;
100                 }
101                 if (ret == -1) {
102                         perror("pwrite");
103                         // Dazed and confused, but trying to continue...
104                         break;
105                 }
106                 pos += ret;
107                 data += ret;
108                 bytes -= ret;
109         }
110
111         wake_up_all_clients();
112 }
113
114 void Stream::wake_up_all_clients()
115 {
116         if (to_process.empty()) {
117                 swap(sleeping_clients, to_process);
118         } else {
119                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
120                 sleeping_clients.clear();
121         }
122 }