]> git.sesse.net Git - cubemap/blob - stream.cpp
Move Client and Stream into their own files.
[cubemap] / stream.cpp
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <errno.h>
4 #include <algorithm>
5 #include <string>
6 #include <vector>
7
8 #include "stream.h"
9 #include "util.h"
10 #include "state.pb.h"
11
12 using namespace std;
13
14 Stream::Stream(const string &stream_id, size_t backlog_size)
15         : stream_id(stream_id),
16           data_fd(make_tempfile("")),
17           backlog_size(backlog_size),
18           bytes_received(0),
19           mark_pool(NULL)
20 {
21         if (data_fd == -1) {
22                 exit(1);
23         }
24 }
25
26 Stream::~Stream()
27 {
28         if (data_fd != -1) {
29                 int ret;
30                 do {
31                         ret = close(data_fd);
32                 } while (ret == -1 && errno == EINTR);
33                 if (ret == -1) {
34                         perror("close");
35                 }
36         }
37 }
38
39 Stream::Stream(const StreamProto &serialized)
40         : stream_id(serialized.stream_id()),
41           header(serialized.header()),
42           data_fd(make_tempfile(serialized.data())),
43           backlog_size(serialized.backlog_size()),
44           bytes_received(serialized.bytes_received()),
45           mark_pool(NULL)
46 {
47         if (data_fd == -1) {
48                 exit(1);
49         }
50 }
51
52 StreamProto Stream::serialize()
53 {
54         StreamProto serialized;
55         serialized.set_header(header);
56         if (!read_tempfile(data_fd, serialized.mutable_data())) {  // Closes data_fd.
57                 exit(1);
58         }
59         serialized.set_backlog_size(backlog_size);
60         serialized.set_bytes_received(bytes_received);
61         serialized.set_stream_id(stream_id);
62         data_fd = -1;
63         return serialized;
64 }
65
66 void Stream::put_client_to_sleep(Client *client)
67 {
68         sleeping_clients.push_back(client);
69 }
70
71 void Stream::wake_up_all_clients()
72 {
73         if (to_process.empty()) {
74                 swap(sleeping_clients, to_process);
75         } else {
76                 to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
77                 sleeping_clients.clear();
78         }
79 }