+
+ process_client(client_ptr);
+}
+
+void Server::add_client_from_serialized(const ClientProto &client)
+{
+ MutexLock lock(&mutex);
+ Stream *stream;
+ int stream_index = lookup_stream_by_url(client.url());
+ if (stream_index == -1) {
+ assert(client.state() != Client::SENDING_DATA);
+ stream = NULL;
+ } else {
+ stream = streams[stream_index];
+ }
+ pair<map<int, Client>::iterator, bool> ret =
+ clients.insert(make_pair(client.sock(), Client(client, stream)));
+ assert(ret.second == true); // Should not already exist.
+ Client *client_ptr = &ret.first->second;
+
+ // Start listening on data from this socket.
+ epoll_event ev;
+ if (client.state() == Client::READING_REQUEST) {
+ ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
+ } else {
+ // If we don't have more data for this client, we'll be putting it into
+ // the sleeping array again soon.
+ ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
+ }
+ ev.data.u64 = reinterpret_cast<uint64_t>(client_ptr);
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client.sock(), &ev) == -1) {
+ log_perror("epoll_ctl(EPOLL_CTL_ADD)");
+ exit(1);
+ }
+
+ if (client_ptr->state == Client::SENDING_DATA &&
+ client_ptr->stream_pos == client_ptr->stream->bytes_received) {
+ client_ptr->stream->put_client_to_sleep(client_ptr);
+ } else {
+ process_client(client_ptr);
+ }
+}
+
+int Server::lookup_stream_by_url(const std::string &url) const
+{
+ map<string, int>::const_iterator url_it = url_map.find(url);
+ if (url_it == url_map.end()) {
+ return -1;
+ }
+ return url_it->second;
+}
+
+int Server::add_stream(const string &url, size_t backlog_size, Stream::Encoding encoding)
+{
+ MutexLock lock(&mutex);
+ url_map.insert(make_pair(url, streams.size()));
+ streams.push_back(new Stream(url, backlog_size, encoding));
+ return streams.size() - 1;
+}
+
+int Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
+{
+ MutexLock lock(&mutex);
+ url_map.insert(make_pair(stream.url(), streams.size()));
+ streams.push_back(new Stream(stream, data_fd));
+ return streams.size() - 1;