- string src = config[i].parameters["src"];
- Input *input = NULL;
- if (deserialized_inputs.count(stream_id) != 0) {
- input = deserialized_inputs[stream_id];
- if (input->get_url() != src) {
- fprintf(stderr, "INFO: Stream '%s' has changed URL from '%s' to '%s', restarting input.\n",
- stream_id.c_str(), input->get_url().c_str(), src.c_str());
- delete input;
- input = NULL;
+ serialize_start.tv_sec = loaded_state.serialize_start_sec();
+ serialize_start.tv_usec = loaded_state.serialize_start_usec();
+
+ // Deserialize the streams.
+ map<string, string> stream_headers_for_url; // See below.
+ for (int i = 0; i < loaded_state.streams_size(); ++i) {
+ const StreamProto &stream = loaded_state.streams(i);
+
+ if (deleted_urls.count(stream.url()) != 0) {
+ // Delete the stream backlogs.
+ for (int j = 0; j < stream.data_fds_size(); ++j) {
+ safe_close(stream.data_fds(j));
+ }
+ } else {
+ vector<int> data_fds;
+ for (int j = 0; j < stream.data_fds_size(); ++j) {
+ data_fds.push_back(stream.data_fds(j));
+ }
+
+ // Older versions stored the data once in the protobuf instead of
+ // sending around file descriptors.
+ if (data_fds.empty() && stream.has_data()) {
+ data_fds.push_back(make_tempfile(stream.data()));
+ }
+
+ servers->add_stream_from_serialized(stream, data_fds);
+ deserialized_urls.insert(stream.url());
+
+ stream_headers_for_url.insert(make_pair(stream.url(), stream.stream_header()));