]> git.sesse.net Git - cubemap/commitdiff
Make backlog_size changeable across HUPs.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 13 Apr 2013 20:47:24 +0000 (22:47 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 13 Apr 2013 20:47:24 +0000 (22:47 +0200)
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
stream.cpp
stream.h

index 8ac7802bb30454f690822e1d7761ed06b8080bf9..07273141fe04b5ec17bf0e3de7394deb414b501a 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -146,6 +146,8 @@ void create_streams(const Config &config,
                const StreamConfig &stream_config = config.streams[i];
                if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
                        servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
+               } else {
+                       servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
                }
                expecting_stream_ids.erase(stream_config.stream_id);
 
index 266d41809765075caaa750be0fd06809d707481d..2c8661ab87395164033588669fac728f515ba556 100644 (file)
@@ -194,6 +194,13 @@ void Server::add_stream_from_serialized(const StreamProto &stream)
        streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
 }
        
+void Server::set_backlog_size(const std::string &stream_id, size_t new_size)
+{
+       MutexLock lock(&mutex);
+       assert(streams.count(stream_id) != 0);
+       streams[stream_id]->set_backlog_size(new_size);
+}
+       
 void Server::set_header(const string &stream_id, const string &header)
 {
        MutexLock lock(&mutex);
@@ -527,6 +534,7 @@ void Server::process_queued_data()
             ++queued_it) {
                Stream *stream = find_stream(queued_it->first);
                stream->add_data(queued_it->second.data(), queued_it->second.size());
+               stream->wake_up_all_clients();
        }
        queued_data.clear();
 }
index 94fdb67e735e996c46f8c0472f570366bd97a556..2d227b6e1797ec331c7a6e966ec412d5f6c7a79b 100644 (file)
--- a/server.h
+++ b/server.h
@@ -53,6 +53,7 @@ public:
        void add_client_from_serialized(const ClientProto &client);
        void add_stream(const std::string &stream_id, size_t bytes_received);
        void add_stream_from_serialized(const StreamProto &stream);
+       void set_backlog_size(const std::string &stream_id, size_t new_size);
 
 private:
        // Mutex protecting queued_data only. Note that if you want to hold both this
index ee3587f79ef7dbf3ffad312229554a849d3725ed..352f7191f209a50630ede4c96a8aecb1ae33cd8a 100644 (file)
@@ -106,3 +106,10 @@ void ServerPool::set_mark_pool(const std::string &stream_id, MarkPool *mark_pool
                servers[i].set_mark_pool(stream_id, mark_pool);
        }       
 }
+
+void ServerPool::set_backlog_size(const std::string &stream_id, size_t new_size)
+{
+       for (int i = 0; i < num_servers; ++i) {
+               servers[i].set_backlog_size(stream_id, new_size);
+       }       
+}
index 106877a65cfdc48086aeabaf8dc2100059b34f7a..ba60d7c0d9b0705e01c372ed9f4d71d9db99e35d 100644 (file)
@@ -36,6 +36,9 @@ public:
        // Connects the given stream to the given mark pool for all the servers.
        void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
 
+       // Changes the given stream's backlog size on all the servers.
+       void set_backlog_size(const std::string &stream_id, size_t new_size);
+
        // Starts all the servers.
        void run();
 
index d57157822aa93939c9250940bec1cf4132f212cb..ab10e35390fd9d6a16abe261e2d28f1f7f9b4e9f 100644 (file)
@@ -62,6 +62,41 @@ StreamProto Stream::serialize()
        data_fd = -1;
        return serialized;
 }
+       
+void Stream::set_backlog_size(size_t new_size)
+{
+       if (backlog_size == new_size) {
+               return;
+       }
+
+       string existing_data;
+       if (!read_tempfile(data_fd, &existing_data)) {  // Closes data_fd.
+               exit(1);
+       }
+
+       // Unwrap the data so it's no longer circular.
+       if (bytes_received <= backlog_size) {
+               existing_data.resize(bytes_received);
+       } else {
+               size_t pos = bytes_received % backlog_size;
+               existing_data = existing_data.substr(pos, string::npos) +
+                       existing_data.substr(0, pos);
+       }
+
+       // See if we need to discard data.
+       if (new_size < existing_data.size()) {
+               size_t to_discard = existing_data.size() - new_size;
+               existing_data = existing_data.substr(to_discard, string::npos);
+       }
+
+       // Create a new, empty data file.
+       data_fd = make_tempfile("");
+       backlog_size = new_size;
+
+       // Now cheat a bit by rewinding, and adding all the old data back.
+       bytes_received -= existing_data.size();
+       add_data(existing_data.data(), existing_data.size());
+}
 
 void Stream::put_client_to_sleep(Client *client)
 {
@@ -107,8 +142,6 @@ void Stream::add_data(const char *data, ssize_t bytes)
                data += ret;
                bytes -= ret;
        }
-
-       wake_up_all_clients();
 }
 
 void Stream::wake_up_all_clients()
index a57c6d88e7ff0955a68e0d02fceeb0910bd7491f..7fabd193d1249016af37a821a5d67b3fce33ef91 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -21,6 +21,9 @@ struct Stream {
        Stream(const StreamProto &serialized);
        StreamProto serialize();
 
+       // Changes the backlog size, restructuring the data as needed.
+       void set_backlog_size(size_t new_size);
+
        std::string stream_id;
 
        // The HTTP response header, plus the video stream header (if any).
@@ -60,14 +63,15 @@ struct Stream {
        // in the list of clients to wake up when we do.
        void put_client_to_sleep(Client *client);
 
-       // Add more input data to the stream, and wake up all clients that are sleeping.
+       // Add more input data to the stream. You should probably call wake_up_all_clients()
+       // after that.
        void add_data(const char *data, ssize_t bytes);
 
-private:
-       Stream(const Stream& other);
-
        // We have more data, so mark all clients that are sleeping as ready to go.
        void wake_up_all_clients();
+
+private:
+       Stream(const Stream& other);
 };
 
 #endif  // !defined(_STREAM_H)