Make backlog_size changeable across HUPs.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 13 Apr 2013 20:47:24 +0000 (22:47 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 13 Apr 2013 20:47:24 +0000 (22:47 +0200)
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
stream.cpp
stream.h

index 8ac7802..0727314 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -146,6 +146,8 @@ void create_streams(const Config &config,
                const StreamConfig &stream_config = config.streams[i];
                if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
                        servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
                const StreamConfig &stream_config = config.streams[i];
                if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
                        servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
+               } else {
+                       servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
                }
                expecting_stream_ids.erase(stream_config.stream_id);
 
                }
                expecting_stream_ids.erase(stream_config.stream_id);
 
index 266d418..2c8661a 100644 (file)
@@ -194,6 +194,13 @@ void Server::add_stream_from_serialized(const StreamProto &stream)
        streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
 }
        
        streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
 }
        
+void Server::set_backlog_size(const std::string &stream_id, size_t new_size)
+{
+       MutexLock lock(&mutex);
+       assert(streams.count(stream_id) != 0);
+       streams[stream_id]->set_backlog_size(new_size);
+}
+       
 void Server::set_header(const string &stream_id, const string &header)
 {
        MutexLock lock(&mutex);
 void Server::set_header(const string &stream_id, const string &header)
 {
        MutexLock lock(&mutex);
@@ -527,6 +534,7 @@ void Server::process_queued_data()
             ++queued_it) {
                Stream *stream = find_stream(queued_it->first);
                stream->add_data(queued_it->second.data(), queued_it->second.size());
             ++queued_it) {
                Stream *stream = find_stream(queued_it->first);
                stream->add_data(queued_it->second.data(), queued_it->second.size());
+               stream->wake_up_all_clients();
        }
        queued_data.clear();
 }
        }
        queued_data.clear();
 }
index 94fdb67..2d227b6 100644 (file)
--- a/server.h
+++ b/server.h
@@ -53,6 +53,7 @@ public:
        void add_client_from_serialized(const ClientProto &client);
        void add_stream(const std::string &stream_id, size_t bytes_received);
        void add_stream_from_serialized(const StreamProto &stream);
        void add_client_from_serialized(const ClientProto &client);
        void add_stream(const std::string &stream_id, size_t bytes_received);
        void add_stream_from_serialized(const StreamProto &stream);
+       void set_backlog_size(const std::string &stream_id, size_t new_size);
 
 private:
        // Mutex protecting queued_data only. Note that if you want to hold both this
 
 private:
        // Mutex protecting queued_data only. Note that if you want to hold both this
index ee3587f..352f719 100644 (file)
@@ -106,3 +106,10 @@ void ServerPool::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool
                servers[i].set_mark_pool(stream_id, mark_pool);
        }       
 }
                servers[i].set_mark_pool(stream_id, mark_pool);
        }       
 }
+
+void ServerPool::set_backlog_size(const std::string &stream_id, size_t new_size)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_backlog_size(stream_id, new_size);
+       }       
+}
index 106877a..ba60d7c 100644 (file)
@@ -36,6 +36,9 @@ public:
        // Connects the given stream to the given mark pool for all the servers.
        void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
 
        // Connects the given stream to the given mark pool for all the servers.
        void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
 
+       // Changes the given stream's backlog size on all the servers.
+       void set_backlog_size(const std::string &stream_id, size_t new_size);
+
        // Starts all the servers.
        void run();
 
        // Starts all the servers.
        void run();
 
index d571578..ab10e35 100644 (file)
@@ -62,6 +62,41 @@ StreamProto Stream::serialize()
        data_fd = -1;
        return serialized;
 }
        data_fd = -1;
        return serialized;
 }
+       
+void Stream::set_backlog_size(size_t new_size)
+{
+       if (backlog_size == new_size) {
+               return;
+       }
+
+       string existing_data;
+       if (!read_tempfile(data_fd, &existing_data)) {  // Closes data_fd.
+               exit(1);
+       }
+
+       // Unwrap the data so it's no longer circular.
+       if (bytes_received <= backlog_size) {
+               existing_data.resize(bytes_received);
+       } else {
+               size_t pos = bytes_received % backlog_size;
+               existing_data = existing_data.substr(pos, string::npos) +
+                       existing_data.substr(0, pos);
+       }
+
+       // See if we need to discard data.
+       if (new_size < existing_data.size()) {
+               size_t to_discard = existing_data.size() - new_size;
+               existing_data = existing_data.substr(to_discard, string::npos);
+       }
+
+       // Create a new, empty data file.
+       data_fd = make_tempfile("");
+       backlog_size = new_size;
+
+       // Now cheat a bit by rewinding, and adding all the old data back.
+       bytes_received -= existing_data.size();
+       add_data(existing_data.data(), existing_data.size());
+}
 
 void Stream::put_client_to_sleep(Client *client)
 {
 
 void Stream::put_client_to_sleep(Client *client)
 {
@@ -107,8 +142,6 @@ void Stream::add_data(const char *data, ssize_t bytes)
                data += ret;
                bytes -= ret;
        }
                data += ret;
                bytes -= ret;
        }
-
-       wake_up_all_clients();
 }
 
 void Stream::wake_up_all_clients()
 }
 
 void Stream::wake_up_all_clients()
index a57c6d8..7fabd19 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -21,6 +21,9 @@ struct Stream {
        Stream(const StreamProto &serialized);
        StreamProto serialize();
 
        Stream(const StreamProto &serialized);
        StreamProto serialize();
 
+       // Changes the backlog size, restructuring the data as needed.
+       void set_backlog_size(size_t new_size);
+
        std::string stream_id;
 
        // The HTTP response header, plus the video stream header (if any).
        std::string stream_id;
 
        // The HTTP response header, plus the video stream header (if any).
@@ -60,14 +63,15 @@ struct Stream {
        // in the list of clients to wake up when we do.
        void put_client_to_sleep(Client *client);
 
        // in the list of clients to wake up when we do.
        void put_client_to_sleep(Client *client);
 
-       // Add more input data to the stream, and wake up all clients that are sleeping.
+       // Add more input data to the stream. You should probably call wake_up_all_clients()
+       // after that.
        void add_data(const char *data, ssize_t bytes);
 
        void add_data(const char *data, ssize_t bytes);
 
-private:
-       Stream(const Stream& other);
-
        // We have more data, so mark all clients that are sleeping as ready to go.
        void wake_up_all_clients();
        // We have more data, so mark all clients that are sleeping as ready to go.
        void wake_up_all_clients();
+
+private:
+       Stream(const Stream& other);
 };
 
 #endif  // !defined(_STREAM_H)
 };
 
 #endif  // !defined(_STREAM_H)