]> git.sesse.net Git - nageru/blob - httpd.cpp
Write 1.4.0 changelog.
[nageru] / httpd.cpp
1 #include "httpd.h"
2
3 #include <assert.h>
4 #include <byteswap.h>
5 #include <endian.h>
6 #include <microhttpd.h>
7 #include <netinet/in.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <sys/time.h>
11 #include <time.h>
12 #include <memory>
13
14 #include "defs.h"
15 #include "metacube2.h"
16
17 struct MHD_Connection;
18 struct MHD_Response;
19
20 using namespace std;
21
22 HTTPD::HTTPD()
23 {
24 }
25
26 HTTPD::~HTTPD()
27 {
28         if (mhd) {
29                 MHD_quiesce_daemon(mhd);
30                 for (Stream *stream : streams) {
31                         stream->stop();
32                 }
33                 MHD_stop_daemon(mhd);
34         }
35 }
36
37 void HTTPD::start(int port)
38 {
39         mhd = MHD_start_daemon(MHD_USE_THREAD_PER_CONNECTION | MHD_USE_POLL_INTERNALLY | MHD_USE_DUAL_STACK,
40                                port,
41                                nullptr, nullptr,
42                                &answer_to_connection_thunk, this,
43                                MHD_OPTION_NOTIFY_COMPLETED, nullptr, this,
44                                MHD_OPTION_END);
45         if (mhd == nullptr) {
46                 fprintf(stderr, "Warning: Could not open HTTP server. (Port already in use?)\n");
47         }
48 }
49
50 void HTTPD::add_data(const char *buf, size_t size, bool keyframe)
51 {
52         unique_lock<mutex> lock(streams_mutex);
53         for (Stream *stream : streams) {
54                 stream->add_data(buf, size, keyframe ? Stream::DATA_TYPE_KEYFRAME : Stream::DATA_TYPE_OTHER);
55         }
56 }
57
58 int HTTPD::answer_to_connection_thunk(void *cls, MHD_Connection *connection,
59                                       const char *url, const char *method,
60                                       const char *version, const char *upload_data,
61                                       size_t *upload_data_size, void **con_cls)
62 {
63         HTTPD *httpd = (HTTPD *)cls;
64         return httpd->answer_to_connection(connection, url, method, version, upload_data, upload_data_size, con_cls);
65 }
66
67 int HTTPD::answer_to_connection(MHD_Connection *connection,
68                                 const char *url, const char *method,
69                                 const char *version, const char *upload_data,
70                                 size_t *upload_data_size, void **con_cls)
71 {
72         // See if the URL ends in “.metacube”.
73         HTTPD::Stream::Framing framing;
74         if (strstr(url, ".metacube") == url + strlen(url) - strlen(".metacube")) {
75                 framing = HTTPD::Stream::FRAMING_METACUBE;
76         } else {
77                 framing = HTTPD::Stream::FRAMING_RAW;
78         }
79
80         HTTPD::Stream *stream = new HTTPD::Stream(this, framing);
81         stream->add_data(header.data(), header.size(), Stream::DATA_TYPE_HEADER);
82         {
83                 unique_lock<mutex> lock(streams_mutex);
84                 streams.insert(stream);
85         }
86         *con_cls = stream;
87
88         // Does not strictly have to be equal to MUX_BUFFER_SIZE.
89         MHD_Response *response = MHD_create_response_from_callback(
90                 (size_t)-1, MUX_BUFFER_SIZE, &HTTPD::Stream::reader_callback_thunk, stream, &HTTPD::free_stream);
91
92         // TODO: Content-type?
93         if (framing == HTTPD::Stream::FRAMING_METACUBE) {
94                 MHD_add_response_header(response, "Content-encoding", "metacube");
95         }
96         int ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
97         MHD_destroy_response(response);  // Only decreases the refcount; actual free is after the request is done.
98
99         return ret;
100 }
101
102 void HTTPD::free_stream(void *cls)
103 {
104         HTTPD::Stream *stream = (HTTPD::Stream *)cls;
105         HTTPD *httpd = stream->get_parent();
106         {
107                 unique_lock<mutex> lock(httpd->streams_mutex);
108                 delete stream;
109                 httpd->streams.erase(stream);
110         }
111 }
112
113 ssize_t HTTPD::Stream::reader_callback_thunk(void *cls, uint64_t pos, char *buf, size_t max)
114 {
115         HTTPD::Stream *stream = (HTTPD::Stream *)cls;
116         return stream->reader_callback(pos, buf, max);
117 }
118
119 ssize_t HTTPD::Stream::reader_callback(uint64_t pos, char *buf, size_t max)
120 {
121         unique_lock<mutex> lock(buffer_mutex);
122         has_buffered_data.wait(lock, [this]{ return should_quit || !buffered_data.empty(); });
123         if (should_quit) {
124                 return 0;
125         }
126
127         ssize_t ret = 0;
128         while (max > 0 && !buffered_data.empty()) {
129                 const string &s = buffered_data.front();
130                 assert(s.size() > used_of_buffered_data);
131                 size_t len = s.size() - used_of_buffered_data;
132                 if (max >= len) {
133                         // Consume the entire (rest of the) string.
134                         memcpy(buf, s.data() + used_of_buffered_data, len);
135                         buf += len;
136                         ret += len;
137                         max -= len;
138                         buffered_data.pop_front();
139                         used_of_buffered_data = 0;
140                 } else {
141                         // We don't need the entire string; just use the first part of it.
142                         memcpy(buf, s.data() + used_of_buffered_data, max);
143                         buf += max;
144                         used_of_buffered_data += max;
145                         ret += max;
146                         max = 0;
147                 }
148         }
149
150         return ret;
151 }
152
153 void HTTPD::Stream::add_data(const char *buf, size_t buf_size, HTTPD::Stream::DataType data_type)
154 {
155         if (buf_size == 0) {
156                 return;
157         }
158         if (data_type == DATA_TYPE_KEYFRAME) {
159                 seen_keyframe = true;
160         } else if (data_type == DATA_TYPE_OTHER && !seen_keyframe) {
161                 // Start sending only once we see a keyframe.
162                 return;
163         }
164
165         unique_lock<mutex> lock(buffer_mutex);
166
167         if (framing == FRAMING_METACUBE) {
168                 metacube2_block_header hdr;
169                 memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
170                 hdr.size = htonl(buf_size);
171                 int flags = 0;
172                 if (data_type == DATA_TYPE_HEADER) {
173                         flags |= METACUBE_FLAGS_HEADER;
174                 } else if (data_type == DATA_TYPE_OTHER) {
175                         flags |= METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
176                 }
177                 hdr.flags = htons(flags);
178                 hdr.csum = htons(metacube2_compute_crc(&hdr));
179                 buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
180         }
181         buffered_data.emplace_back(buf, buf_size);
182
183         // Send a Metacube2 timestamp every keyframe.
184         if (framing == FRAMING_METACUBE && data_type == DATA_TYPE_KEYFRAME) {
185                 timespec now;
186                 clock_gettime(CLOCK_REALTIME, &now);
187
188                 metacube2_timestamp_packet packet;
189                 packet.type = htobe64(METACUBE_METADATA_TYPE_ENCODER_TIMESTAMP);
190                 packet.tv_sec = htobe64(now.tv_sec);
191                 packet.tv_nsec = htobe64(now.tv_nsec);
192
193                 metacube2_block_header hdr;
194                 memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
195                 hdr.size = htonl(sizeof(packet));
196                 hdr.flags = htons(METACUBE_FLAGS_METADATA);
197                 hdr.csum = htons(metacube2_compute_crc(&hdr));
198                 buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
199                 buffered_data.emplace_back((char *)&packet, sizeof(packet));
200         }
201
202         has_buffered_data.notify_all(); 
203 }
204
205 void HTTPD::Stream::stop()
206 {
207         unique_lock<mutex> lock(buffer_mutex);
208         should_quit = true;
209         has_buffered_data.notify_all();
210 }