<ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
<ClInclude Include="concurrency\com_context.h" />\r
<ClInclude Include="concurrency\executor.h" />\r
+ <ClInclude Include="concurrency\message.h" />\r
<ClInclude Include="diagnostics\graph.h" />\r
<ClInclude Include="exception\exceptions.h" />\r
<ClInclude Include="exception\win32_exception.h" />\r
<ClInclude Include="utility\move_on_copy.h">\r
<Filter>source\utility</Filter>\r
</ClInclude>\r
+ <ClInclude Include="concurrency\message.h">\r
+ <Filter>source\concurrency</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
--- /dev/null
+#pragma once\r
+\r
+#include "../memory/safe_ptr.h"\r
+\r
+#include <agents.h>\r
+#include <semaphore.h>\r
+\r
+#include <boost/noncopyable.hpp>\r
+\r
+namespace caspar {\r
+ \r
+class token : boost::noncopyable\r
+{\r
+ std::shared_ptr<Concurrency::semaphore> semaphore_;\r
+public:\r
+\r
+ token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
+ : semaphore_(semaphore)\r
+ {\r
+ semaphore_->acquire();\r
+ }\r
+\r
+ token(token&& other)\r
+ : semaphore_(std::move(other.semaphore_))\r
+ {\r
+ }\r
+\r
+ ~token()\r
+ {\r
+ if(semaphore_)\r
+ semaphore_->release();\r
+ }\r
+};\r
+\r
+template<typename T>\r
+class message\r
+{\r
+ T payload_;\r
+ token token_;\r
+public:\r
+ message(const T& payload, token&& token)\r
+ : payload_(payload)\r
+ , token_(std::move(token))\r
+ {\r
+ }\r
+ \r
+ template<typename U>\r
+ safe_ptr<message<U>> transfer(const U& payload)\r
+ {\r
+ return make_safe<message<U>>(payload, std::move(token_));\r
+ }\r
+\r
+ T& value() {return payload_;}\r
+ const T& value() const {return payload_;}\r
+};\r
+\r
+}
\ No newline at end of file
\r
#include "output.h"\r
\r
-#include "../video_channel_context.h"\r
-\r
#include "../video_format.h"\r
#include "../mixer/gpu/ogl_device.h"\r
#include "../mixer/read_frame.h"\r
\r
#include <tbb/mutex.h>\r
\r
+#include <concrt_extras.h>\r
+\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace core {\r
+ \r
+struct destruction_context\r
+{\r
+ std::shared_ptr<frame_consumer> consumer;\r
+ Concurrency::event event;\r
+\r
+ destruction_context(std::shared_ptr<frame_consumer>&& consumer) \r
+ : consumer(consumer)\r
+ {\r
+ }\r
+};\r
+\r
+void __cdecl destroy_consumer(LPVOID lpParam)\r
+{\r
+ auto destruction = std::unique_ptr<destruction_context>(static_cast<destruction_context*>(lpParam));\r
+ \r
+ try\r
+ { \r
+ if(destruction->consumer.unique())\r
+ {\r
+ Concurrency::scoped_oversubcription_token oversubscribe;\r
+ destruction->consumer.reset();\r
+ }\r
+ else\r
+ CASPAR_LOG(warning) << destruction->consumer->print() << " Not destroyed asynchronously."; \r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
+ \r
+ destruction->event.set();\r
+}\r
+\r
+void __cdecl destroy_and_wait_consumer(LPVOID lpParam)\r
+{\r
+ try\r
+ {\r
+ auto destruction = static_cast<destruction_context*>(lpParam);\r
+ Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, lpParam);\r
+ if(destruction->event.wait(1000) == Concurrency::COOPERATIVE_WAIT_TIMEOUT)\r
+ CASPAR_LOG(warning) << " Potential destruction deadlock detected. Might leak resources.";\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
+}\r
\r
struct output::implementation\r
{ \r
typedef std::pair<safe_ptr<read_frame>, safe_ptr<read_frame>> fill_and_key;\r
\r
- video_channel_context& channel_;\r
- const std::function<void()> restart_channel_;\r
+ const video_format_desc format_desc_;\r
\r
std::map<int, safe_ptr<frame_consumer>> consumers_;\r
typedef std::map<int, safe_ptr<frame_consumer>>::value_type layer_t;\r
\r
high_prec_timer timer_;\r
+\r
+ critical_section mutex_;\r
+ std::shared_ptr<Scheduler> scheduler_;\r
+ call<safe_ptr<message<safe_ptr<read_frame>>>> output_;\r
\r
public:\r
- implementation(video_channel_context& video_channel, const std::function<void()>& restart_channel) \r
- : channel_(video_channel)\r
- , restart_channel_(restart_channel)\r
+ implementation(output::source_t& source, const video_format_desc& format_desc) \r
+ : format_desc_(format_desc)\r
+ //, scheduler_(Scheduler::Create(SchedulerPolicy(1, ContextPriority, THREAD_PRIORITY_ABOVE_NORMAL)), [](Scheduler* p){p->Release();})\r
+ , output_(std::bind(&implementation::execute, this, std::placeholders::_1))\r
{\r
+ source.link_target(&output_);\r
} \r
\r
void add(int index, safe_ptr<frame_consumer>&& consumer)\r
{ \r
- channel_.execution().invoke([&]\r
{\r
+ critical_section::scoped_lock lock(mutex_);\r
consumers_.erase(index);\r
- });\r
-\r
- consumer->initialize(channel_.get_format_desc());\r
+ }\r
\r
- channel_.execution().invoke([&]\r
+ consumer->initialize(format_desc_);\r
+ \r
{\r
+ critical_section::scoped_lock lock(mutex_);\r
consumers_.insert(std::make_pair(index, consumer));\r
\r
CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added.";\r
- });\r
+ }\r
}\r
\r
void remove(int index)\r
{\r
- channel_.execution().invoke([&]\r
+ critical_section::scoped_lock lock(mutex_);\r
+ auto it = consumers_.find(index);\r
+ if(it != consumers_.end())\r
{\r
- auto it = consumers_.find(index);\r
- if(it != consumers_.end())\r
- {\r
- CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
- consumers_.erase(it);\r
- }\r
- });\r
+ CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
+ consumers_.erase(it);\r
+ }\r
}\r
\r
- void execute(const safe_ptr<read_frame>& frame)\r
- { \r
- if(!has_synchronization_clock())\r
- timer_.tick(1.0/channel_.get_format_desc().fps);\r
+ void execute(const safe_ptr<message<safe_ptr<read_frame>>>& msg)\r
+ { \r
+ auto frame = msg->value();\r
\r
- if(frame->image_size() != channel_.get_format_desc().size)\r
- {\r
- timer_.tick(1.0/channel_.get_format_desc().fps);\r
- return;\r
+ critical_section::scoped_lock lock(mutex_); \r
+\r
+ if(!has_synchronization_clock() || frame->image_size() != format_desc_.size)\r
+ { \r
+ scoped_oversubcription_token oversubscribe;\r
+ timer_.tick(1.0/format_desc_.fps);\r
}\r
\r
- auto it = consumers_.begin();\r
- while(it != consumers_.end())\r
- {\r
- auto consumer = it->second;\r
-\r
- if(consumer->get_video_format_desc() != channel_.get_format_desc())\r
- consumer->initialize(channel_.get_format_desc());\r
-\r
+ std::vector<int> removables; \r
+ Concurrency::parallel_for_each(consumers_.begin(), consumers_.end(), [&](const decltype(*consumers_.begin())& pair)\r
+ { \r
try\r
{\r
- if(consumer->send(frame))\r
- ++it;\r
- else\r
- consumers_.erase(it++);\r
+ scoped_oversubcription_token oversubscribe;\r
+ if(!pair.second->send(frame))\r
+ removables.push_back(pair.first);\r
}\r
catch(...)\r
- {\r
+ { \r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(warning) << "Trying to restart consumer: " << consumer->print() << L".";\r
+ CASPAR_LOG(error) << "Consumer error. Trying to recover:" << pair.second->print();\r
try\r
{\r
- consumer->initialize(channel_.get_format_desc());\r
- consumer->send(frame);\r
+ pair.second->initialize(format_desc_);\r
+ pair.second->send(frame);\r
}\r
catch(...)\r
- { \r
- CASPAR_LOG_CURRENT_EXCEPTION(); \r
- CASPAR_LOG(warning) << "Consumer restart failed, trying to restart channel: " << consumer->print() << L"."; \r
-\r
- try\r
- {\r
- restart_channel_();\r
- consumer->initialize(channel_.get_format_desc());\r
- consumer->send(frame);\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
- consumers_.erase(it++);\r
- }\r
+ {\r
+ removables.push_back(pair.first); \r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ CASPAR_LOG(error) << "Failed to recover consumer: " << pair.second->print() << L". Removing it.";\r
}\r
}\r
+ });\r
+\r
+ BOOST_FOREACH(auto& removable, removables)\r
+ {\r
+ std::shared_ptr<frame_consumer> consumer = consumers_.find(removable)->second;\r
+ consumers_.erase(removable); \r
+ Concurrency::CurrentScheduler::ScheduleTask(destroy_consumer, new destruction_context(std::move(consumer)));\r
}\r
}\r
\r
}\r
};\r
\r
-output::output(video_channel_context& video_channel, const std::function<void()>& restart_channel) : impl_(new implementation(video_channel, restart_channel)){}\r
+output::output(output::source_t& source, const video_format_desc& format_desc) : impl_(new implementation(source, format_desc)){}\r
void output::add(int index, safe_ptr<frame_consumer>&& consumer){impl_->add(index, std::move(consumer));}\r
void output::remove(int index){impl_->remove(index);}\r
-void output::execute(const safe_ptr<read_frame>& frame) {impl_->execute(frame); }\r
}}
\ No newline at end of file
\r
#include "../consumer/frame_consumer.h"\r
\r
+#include <common/concurrency/message.h>\r
#include <common/memory/safe_ptr.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
+\r
namespace caspar { namespace core {\r
\r
class video_channel_context;\r
class output : boost::noncopyable\r
{\r
public:\r
- explicit output(video_channel_context& video_channel, const std::function<void()>& restart_channel);\r
+ typedef Concurrency::ISource<safe_ptr<message<safe_ptr<read_frame>>>> source_t;\r
+\r
+ explicit output(source_t& source, const video_format_desc& format_desc);\r
\r
void add(int index, safe_ptr<frame_consumer>&& consumer);\r
void remove(int index);\r
-\r
- void execute(const safe_ptr<read_frame>& frame); // nothrow\r
private:\r
struct implementation;\r
safe_ptr<implementation> impl_;\r
<ClInclude Include="mixer\image\blend_modes.h" />\r
<ClInclude Include="mixer\image\image_shader.h" />\r
<ClInclude Include="video_channel.h" />\r
- <ClInclude Include="video_channel_context.h" />\r
<ClInclude Include="consumer\output.h" />\r
<ClInclude Include="consumer\frame_consumer.h" />\r
<ClInclude Include="mixer\audio\audio_mixer.h" />\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">Create</PrecompiledHeader>\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Develop|Win32'">Create</PrecompiledHeader>\r
</ClCompile>\r
- <ClCompile Include="video_channel_context.cpp" />\r
<ClCompile Include="video_format.cpp">\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Profile|Win32'">StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">StdAfx.h</PrecompiledHeaderFile>\r
<ClInclude Include="video_channel.h">\r
<Filter>source</Filter>\r
</ClInclude>\r
- <ClInclude Include="video_channel_context.h">\r
- <Filter>source</Filter>\r
- </ClInclude>\r
<ClInclude Include="mixer\image\blending_glsl.h">\r
<Filter>source\mixer\image</Filter>\r
</ClInclude>\r
<ClCompile Include="video_channel.cpp">\r
<Filter>source</Filter>\r
</ClCompile>\r
- <ClCompile Include="video_channel_context.cpp">\r
- <Filter>source</Filter>\r
- </ClCompile>\r
<ClCompile Include="video_format.cpp">\r
<Filter>source</Filter>\r
</ClCompile>\r
#include "../gpu/ogl_device.h"\r
#include "../gpu/host_buffer.h"\r
#include "../gpu/device_buffer.h"\r
-#include "../../video_channel_context.h"\r
\r
#include <common/exception/exceptions.h>\r
#include <common/gl/gl_check.h>\r
#include "audio/audio_mixer.h"\r
#include "image/image_mixer.h"\r
\r
-#include "../video_channel_context.h"\r
-\r
#include <common/exception/exceptions.h>\r
#include <common/concurrency/executor.h>\r
#include <common/utility/tweener.h>\r
\r
#include <unordered_map>\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace core {\r
\r
template<typename T>\r
\r
struct mixer::implementation : boost::noncopyable\r
{ \r
- video_channel_context& channel_;\r
+ critical_section mutex_;\r
+ Concurrency::transformer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>, \r
+ safe_ptr<message<safe_ptr<core::read_frame>>>> mixer_;\r
+\r
+ const video_format_desc format_desc_;\r
+ ogl_device& ogl_;\r
\r
audio_mixer audio_mixer_;\r
image_mixer image_mixer_;\r
\r
std::queue<std::pair<boost::unique_future<safe_ptr<host_buffer>>, core::audio_buffer>> buffer_;\r
\r
- const size_t buffer_size_;\r
\r
public:\r
- implementation(video_channel_context& video_channel) \r
- : channel_(video_channel)\r
- , audio_mixer_(channel_.get_format_desc())\r
- , image_mixer_(channel_.ogl(), channel_.get_format_desc())\r
- , buffer_size_(env::properties().get("configuration.producers.buffer-depth", 1))\r
+ implementation(mixer::source_t& source, mixer::target_t& target, const video_format_desc& format_desc, ogl_device& ogl) \r
+ : format_desc_(format_desc)\r
+ , ogl_(ogl)\r
+ , audio_mixer_(format_desc)\r
+ , image_mixer_(ogl, format_desc)\r
+ , mixer_(std::bind(&implementation::mix, this, std::placeholders::_1), &target)\r
{ \r
- CASPAR_LOG(info) << print() << L" Successfully initialized . Buffer-depth: " << buffer_size_; \r
+ CASPAR_LOG(info) << print() << L" Successfully initialized."; \r
+ source.link_target(&mixer_);\r
}\r
- \r
- safe_ptr<read_frame> execute(const std::map<int, safe_ptr<core::basic_frame>>& frames)\r
- { \r
- try\r
+ \r
+ safe_ptr<message<safe_ptr<core::read_frame>>> mix(const safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>& msg)\r
+ { \r
+ auto frames = msg->value();\r
+\r
+ critical_section::scoped_lock lock(mutex_);\r
+\r
+ BOOST_FOREACH(auto& frame, frames)\r
{\r
- BOOST_FOREACH(auto& frame, frames)\r
- {\r
- auto blend_it = blend_modes_.find(frame.first);\r
- image_mixer_.begin_layer(blend_it != blend_modes_.end() ? blend_it->second : blend_mode::normal);\r
+ auto blend_it = blend_modes_.find(frame.first);\r
+ image_mixer_.begin_layer(blend_it != blend_modes_.end() ? blend_it->second : blend_mode::normal);\r
\r
- auto frame1 = make_safe<core::basic_frame>(frame.second);\r
- frame1->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
-\r
- if(channel_.get_format_desc().field_mode != core::field_mode::progressive)\r
- { \r
- auto frame2 = make_safe<core::basic_frame>(frame.second);\r
- frame2->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
- frame1 = core::basic_frame::interlace(frame1, frame2, channel_.get_format_desc().field_mode);\r
- }\r
+ auto frame1 = make_safe<core::basic_frame>(frame.second);\r
+ frame1->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
+\r
+ if(format_desc_.field_mode != core::field_mode::progressive)\r
+ { \r
+ auto frame2 = make_safe<core::basic_frame>(frame.second);\r
+ frame2->get_frame_transform() = transforms_[frame.first].fetch_and_tick(1);\r
+ frame1 = core::basic_frame::interlace(frame1, frame2, format_desc_.field_mode);\r
+ }\r
\r
- frame1->accept(audio_mixer_); \r
- frame1->accept(image_mixer_);\r
+ frame1->accept(audio_mixer_); \r
+ frame1->accept(image_mixer_);\r
\r
- image_mixer_.end_layer();\r
- }\r
+ image_mixer_.end_layer();\r
+ }\r
\r
- auto image = image_mixer_.render();\r
- auto audio = audio_mixer_.mix();\r
+ auto image = image_mixer_.render();\r
+ auto audio = audio_mixer_.mix();\r
\r
- buffer_.push(std::make_pair(std::move(image), audio));\r
+ buffer_.push(std::make_pair(std::move(image), audio));\r
\r
- if(buffer_.size()-1 < buffer_size_) \r
- return make_safe<read_frame>();\r
- \r
- auto res = std::move(buffer_.front());\r
- buffer_.pop();\r
+ if(buffer_.size() < 2)\r
+ return msg->transfer(make_safe<core::read_frame>()); \r
\r
- return make_safe<read_frame>(channel_.ogl(), channel_.get_format_desc().size, std::move(res.first.get()), std::move(res.second)); \r
- }\r
- catch(...)\r
+ auto res = std::move(buffer_.front());\r
+ buffer_.pop();\r
+\r
+ auto buffer = [&]() -> safe_ptr<core::host_buffer>\r
{\r
- CASPAR_LOG(error) << L"[mixer] Error detected.";\r
- throw;\r
- } \r
+ scoped_oversubcription_token oversubscribe;\r
+ return std::move(res.first.get());\r
+ }();\r
+\r
+ auto frame = make_safe<read_frame>(ogl_, format_desc_.size, std::move(buffer), std::move(res.second));\r
+\r
+ return msg->transfer<safe_ptr<core::read_frame>>(std::move(frame)); \r
}\r
- \r
+ \r
safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc)\r
{ \r
return image_mixer_.create_frame(tag, desc);\r
\r
void set_transform(int index, const frame_transform& transform, unsigned int mix_duration, const std::wstring& tween)\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- auto src = transforms_[index].fetch();\r
- auto dst = transform;\r
- transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+\r
+ auto src = transforms_[index].fetch();\r
+ auto dst = transform;\r
+ transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
}\r
\r
void apply_transform(int index, const std::function<frame_transform(frame_transform)>& transform, unsigned int mix_duration, const std::wstring& tween)\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- auto src = transforms_[index].fetch();\r
- auto dst = transform(src);\r
- transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+\r
+ auto src = transforms_[index].fetch();\r
+ auto dst = transform(src);\r
+ transforms_[index] = tweened_transform<frame_transform>(src, dst, mix_duration, tween);\r
}\r
\r
void clear_transforms()\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- transforms_.clear();\r
- blend_modes_.clear();\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+\r
+ transforms_.clear();\r
+ blend_modes_.clear();\r
}\r
\r
void set_blend_mode(int index, blend_mode::type value)\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- blend_modes_[index] = value;\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+\r
+ blend_modes_[index] = value;\r
}\r
\r
std::wstring print() const\r
}\r
};\r
\r
-mixer::mixer(video_channel_context& video_channel) : impl_(new implementation(video_channel)){}\r
-safe_ptr<core::read_frame> mixer::execute(const std::map<int, safe_ptr<core::basic_frame>>& frames){ return impl_->execute(frames);}\r
-core::video_format_desc mixer::get_video_format_desc() const { return impl_->channel_.get_format_desc(); }\r
+mixer::mixer(mixer::source_t& source, mixer::target_t& target, const video_format_desc& format_desc, ogl_device& ogl) : impl_(new implementation(source, target, format_desc, ogl)){}\r
+core::video_format_desc mixer::get_video_format_desc() const { return impl_->format_desc_; }\r
safe_ptr<core::write_frame> mixer::create_frame(const void* tag, const core::pixel_format_desc& desc){ return impl_->create_frame(tag, desc); } \r
boost::unique_future<safe_ptr<write_frame>> mixer::create_frame2(const void* video_stream_tag, const pixel_format_desc& desc){ return impl_->create_frame2(video_stream_tag, desc); } \r
void mixer::set_frame_transform(int index, const core::frame_transform& transform, unsigned int mix_duration, const std::wstring& tween){impl_->set_transform(index, transform, mix_duration, tween);}\r
\r
#include "../producer/frame/frame_factory.h"\r
\r
+#include <common/concurrency/message.h>\r
#include <common/memory/safe_ptr.h>\r
\r
+#include <agents.h>\r
+\r
#include <map>\r
\r
namespace caspar { \r
struct frame_transform;\r
class video_channel_context;;\r
struct pixel_format;\r
+class ogl_device;\r
\r
class mixer : public core::frame_factory\r
{\r
public: \r
+ \r
+ typedef Concurrency::ISource<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> source_t;\r
+ typedef Concurrency::ITarget<safe_ptr<message<safe_ptr<core::read_frame>>>> target_t;\r
\r
- explicit mixer(video_channel_context& video_channel);\r
- \r
- safe_ptr<core::read_frame> execute(const std::map<int, safe_ptr<core::basic_frame>>& frames); // nothrow\r
- \r
+ explicit mixer(source_t& source, target_t& target, const video_format_desc& format_desc, ogl_device& ogl);\r
+ \r
safe_ptr<core::write_frame> create_frame(const void* tag, const core::pixel_format_desc& desc); \r
boost::unique_future<safe_ptr<write_frame>> create_frame2(const void* video_stream_tag, const pixel_format_desc& desc);\r
\r
core::video_format_desc get_video_format_desc() const; // nothrow\r
-\r
-\r
+ \r
void set_frame_transform(int index, const core::frame_transform& transform, unsigned int mix_duration = 0, const std::wstring& tween = L"linear");\r
void apply_frame_transform(int index, const std::function<core::frame_transform(core::frame_transform)>& transform, unsigned int mix_duration = 0, const std::wstring& tween = L"linear");\r
void clear_transforms();\r
\r
#include "layer.h"\r
\r
-#include "../video_channel_context.h"\r
-\r
#include <core/producer/frame/basic_frame.h>\r
#include <core/producer/frame/frame_factory.h>\r
\r
\r
#include <boost/foreach.hpp>\r
\r
+#include <agents.h>\r
#include <ppl.h>\r
\r
#include <map>\r
#include <set>\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace core {\r
\r
-struct stage::implementation : boost::noncopyable\r
+struct stage::implementation : public agent, public boost::noncopyable\r
{ \r
+ overwrite_buffer<bool> is_running_;\r
+ Concurrency::critical_section mutex_;\r
+ stage::target_t& target_;\r
std::map<int, layer> layers_; \r
- video_channel_context& channel_;\r
+ safe_ptr<semaphore> semaphore_;\r
public:\r
- implementation(video_channel_context& video_channel) \r
- : channel_(video_channel)\r
+ implementation(stage::target_t& target) \r
+ : target_(target)\r
+ , semaphore_(make_safe<semaphore>(3))\r
{\r
+ start();\r
}\r
- \r
- std::map<int, safe_ptr<basic_frame>> execute()\r
- { \r
- try\r
+\r
+ ~implementation()\r
+ {\r
+ send(is_running_, false);\r
+ semaphore_->release();\r
+ agent::wait(this);\r
+ }\r
+\r
+ virtual void run()\r
+ {\r
+ send(is_running_, true);\r
+ while(is_running_.value())\r
{\r
- std::map<int, safe_ptr<basic_frame>> frames;\r
+ try\r
+ {\r
+ std::map<int, safe_ptr<basic_frame>> frames;\r
+ {\r
+ critical_section::scoped_lock lock(mutex_);\r
\r
- BOOST_FOREACH(auto& layer, layers_) \r
- frames[layer.first] = basic_frame::empty(); \r
-\r
- Concurrency::parallel_for_each(layers_.begin(), layers_.end(), [&](std::map<int, layer>::value_type& layer) \r
+ BOOST_FOREACH(auto& layer, layers_) \r
+ frames[layer.first] = basic_frame::empty(); \r
+\r
+ Concurrency::parallel_for_each(layers_.begin(), layers_.end(), [&](std::map<int, layer>::value_type& layer) \r
+ {\r
+ frames[layer.first] = layer.second.receive(); \r
+ });\r
+ }\r
+\r
+ send(target_, make_safe<message<decltype(frames)>>(frames, token(semaphore_)));\r
+ }\r
+ catch(...)\r
{\r
- frames[layer.first] = layer.second.receive(); \r
- });\r
-\r
- return frames;\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ } \r
}\r
- catch(...)\r
- {\r
- CASPAR_LOG(error) << L"[stage] Error detected";\r
- throw;\r
- } \r
- }\r
\r
+ send(is_running_, false);\r
+ done();\r
+ }\r
+ \r
void load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta)\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- layers_[index].load(create_destroy_producer_proxy(producer), preview, auto_play_delta);\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ layers_[index].load(create_destroy_producer_proxy(producer), preview, auto_play_delta);\r
}\r
\r
void pause(int index)\r
{ \r
- channel_.execution().invoke([&]\r
- {\r
- layers_[index].pause();\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ layers_[index].pause();\r
}\r
\r
void play(int index)\r
{ \r
- channel_.execution().invoke([&]\r
- {\r
- layers_[index].play();\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ layers_[index].play();\r
}\r
\r
void stop(int index)\r
{ \r
- channel_.execution().invoke([&]\r
- {\r
- layers_[index].stop();\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ layers_[index].stop();\r
}\r
\r
void clear(int index)\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- layers_.erase(index);\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ layers_.erase(index);\r
}\r
\r
void clear()\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- layers_.clear();\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ layers_.clear();\r
} \r
\r
void swap_layer(int index, size_t other_index)\r
{\r
- channel_.execution().invoke([&]\r
- {\r
- std::swap(layers_[index], layers_[other_index]);\r
- }, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ std::swap(layers_[index], layers_[other_index]);\r
}\r
\r
void swap_layer(int index, size_t other_index, stage& other)\r
if(other.impl_.get() == this)\r
swap_layer(index, other_index);\r
else\r
- {\r
- auto func = [&]\r
- {\r
- std::swap(layers_[index], other.impl_->layers_[other_index]);\r
- }; \r
- channel_.execution().invoke([&]{other.impl_->channel_.execution().invoke(func, high_priority);}, high_priority);\r
+ { \r
+ critical_section::scoped_lock lock1(mutex_);\r
+ critical_section::scoped_lock lock2(other.impl_->mutex_);\r
+ std::swap(layers_[index], other.impl_->layers_[other_index]);\r
}\r
}\r
\r
if(other.impl_.get() == this)\r
return;\r
\r
- auto func = [&]\r
- {\r
- std::swap(layers_, other.impl_->layers_);\r
- }; \r
- channel_.execution().invoke([&]{other.impl_->channel_.execution().invoke(func, high_priority);}, high_priority);\r
+ critical_section::scoped_lock lock1(mutex_);\r
+ critical_section::scoped_lock lock2(other.impl_->mutex_);\r
+ std::swap(layers_, other.impl_->layers_);\r
}\r
\r
layer_status get_status(int index)\r
{ \r
- return channel_.execution().invoke([&]\r
- {\r
- return layers_[index].status();\r
- }, high_priority );\r
+ critical_section::scoped_lock lock(mutex_);\r
+ return layers_[index].status();\r
}\r
\r
safe_ptr<frame_producer> foreground(int index)\r
{\r
- return channel_.execution().invoke([=]{return layers_[index].foreground();}, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ return layers_[index].foreground();\r
}\r
\r
safe_ptr<frame_producer> background(int index)\r
{\r
- return channel_.execution().invoke([=]{return layers_[index].background();}, high_priority);\r
+ critical_section::scoped_lock lock(mutex_);\r
+ return layers_[index].background();\r
}\r
\r
std::wstring print() const\r
{\r
- return L"stage [" + boost::lexical_cast<std::wstring>(channel_.index()) + L"]";\r
+ // C-TODO\r
+ return L"stage []";// + boost::lexical_cast<std::wstring>(channel_.index()) + L"]";\r
}\r
\r
};\r
\r
-stage::stage(video_channel_context& video_channel) : impl_(new implementation(video_channel)){}\r
+stage::stage(target_t& target) : impl_(new implementation(target)){}\r
void stage::swap(stage& other){impl_->swap(other);}\r
void stage::load(int index, const safe_ptr<frame_producer>& producer, bool preview, int auto_play_delta){impl_->load(index, producer, preview, auto_play_delta);}\r
void stage::pause(int index){impl_->pause(index);}\r
layer_status stage::get_status(int index){return impl_->get_status(index);}\r
safe_ptr<frame_producer> stage::foreground(size_t index) {return impl_->foreground(index);}\r
safe_ptr<frame_producer> stage::background(size_t index) {return impl_->background(index);}\r
-std::map<int, safe_ptr<basic_frame>> stage::execute(){return impl_->execute();}\r
}}
\ No newline at end of file
#include "frame_producer.h"\r
\r
#include <common/memory/safe_ptr.h>\r
+#include <common/concurrency/message.h>\r
\r
#include <boost/noncopyable.hpp>\r
\r
+#include <agents.h>\r
+\r
namespace caspar { namespace core {\r
\r
struct video_format_desc;\r
class stage : boost::noncopyable\r
{\r
public:\r
- explicit stage(video_channel_context& video_channel);\r
+ typedef Concurrency::ITarget<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> target_t;\r
\r
- void swap(stage& other);\r
+ explicit stage(target_t& target);\r
\r
- std::map<int, safe_ptr<basic_frame>> execute();\r
- \r
+ void swap(stage& other);\r
+ \r
void load(int index, const safe_ptr<frame_producer>& producer, bool preview = false, int auto_play_delta = -1);\r
void pause(int index);\r
void play(int index);\r
\r
#include "video_channel.h"\r
\r
-#include "video_channel_context.h"\r
#include "video_format.h"\r
\r
#include "consumer/output.h"\r
\r
#include "mixer/gpu/ogl_device.h"\r
\r
+#include <agents_extras.h>\r
+\r
#include <boost/timer.hpp>\r
\r
#ifdef _MSC_VER\r
\r
struct video_channel::implementation : boost::noncopyable\r
{\r
- video_channel_context context_;\r
+ Concurrency::unbounded_buffer<safe_ptr<message<std::map<int, safe_ptr<basic_frame>>>>> stage_frames_;\r
+ Concurrency::unbounded_buffer<safe_ptr<message<safe_ptr<read_frame>>>> mixer_frames_;\r
+ \r
+ const video_format_desc format_desc_;\r
\r
safe_ptr<caspar::core::output> output_;\r
std::shared_ptr<caspar::core::mixer> mixer_;\r
safe_ptr<caspar::core::stage> stage_;\r
\r
- safe_ptr<diagnostics::graph> diag_;\r
+ safe_ptr<diagnostics::graph> graph_;\r
boost::timer frame_timer_;\r
boost::timer tick_timer_;\r
boost::timer output_timer_;\r
\r
public:\r
implementation(int index, const video_format_desc& format_desc, ogl_device& ogl) \r
- : context_(index, ogl, format_desc)\r
- , diag_(diagnostics::create_graph(narrow(print())))\r
- , output_(new caspar::core::output(context_, [this]{restart();}))\r
- , mixer_(new caspar::core::mixer(context_))\r
- , stage_(new caspar::core::stage(context_)) \r
+ : graph_(diagnostics::create_graph(narrow(print()), false))\r
+ , format_desc_(format_desc)\r
+ , output_(new caspar::core::output(mixer_frames_, format_desc))\r
+ , mixer_(new caspar::core::mixer(stage_frames_, mixer_frames_, format_desc, ogl))\r
+ , stage_(new caspar::core::stage(stage_frames_)) \r
{\r
- diag_->add_guide("produce-time", 0.5f); \r
- diag_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
- diag_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f)); \r
- diag_->set_color("output-time", diagnostics::color(1.0f, 0.5f, 0.0f));\r
- diag_->set_color("mix-time", diagnostics::color(1.0f, 1.0f, 0.9f));\r
+ graph_->add_guide("produce-time", 0.5f); \r
+ graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f));\r
+ graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f)); \r
+ graph_->set_color("output-time", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+ graph_->set_color("mix-time", diagnostics::color(1.0f, 1.0f, 0.9f));\r
+ graph_->start();\r
\r
CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
- context_.execution().begin_invoke([this]{tick();});\r
- }\r
-\r
- ~implementation()\r
- {\r
- // Stop context before destroying devices.\r
- context_.execution().stop();\r
- context_.execution().join();\r
- }\r
-\r
- void tick()\r
- {\r
- try\r
- {\r
- // Produce\r
-\r
- frame_timer_.restart();\r
-\r
- auto simple_frames = stage_->execute();\r
-\r
- diag_->update_value("produce-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
- \r
- // Mix\r
-\r
- frame_timer_.restart();\r
-\r
- auto finished_frame = mixer_->execute(simple_frames);\r
- \r
- diag_->update_value("mix-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
- \r
- // Consume\r
- \r
- output_timer_.restart();\r
-\r
- output_->execute(finished_frame);\r
- \r
- diag_->update_value("output-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
-\r
- \r
- diag_->update_value("tick-time", tick_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
- tick_timer_.restart();\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(error) << context_.print() << L" Unexpected exception. Clearing stage and freeing memory";\r
- restart();\r
- }\r
-\r
- context_.execution().begin_invoke([this]{tick();});\r
- }\r
-\r
- void restart()\r
- {\r
- stage_->clear();\r
- context_.ogl().gc().wait();\r
-\r
- mixer_ = nullptr;\r
- mixer_.reset(new caspar::core::mixer(context_));\r
}\r
+ \r
+ //void tick()\r
+ //{\r
+ // try\r
+ // {\r
+ // // Produce\r
+\r
+ // frame_timer_.restart();\r
+\r
+ // auto simple_frames = stage_->execute();\r
+\r
+ // graph_->update_value("produce-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+ // \r
+ // // Mix\r
+\r
+ // frame_timer_.restart();\r
+\r
+ // auto finished_frame = mixer_->execute(simple_frames);\r
+ // \r
+ // graph_->update_value("mix-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+ // \r
+ // // Consume\r
+ // \r
+ // output_timer_.restart();\r
+\r
+ // output_->execute(finished_frame);\r
+ // \r
+ // graph_->update_value("output-time", frame_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+\r
+ // \r
+ // graph_->update_value("tick-time", tick_timer_.elapsed()*context_.get_format_desc().fps*0.5);\r
+ // tick_timer_.restart();\r
+ // }\r
+ // catch(...)\r
+ // {\r
+ // CASPAR_LOG_CURRENT_EXCEPTION();\r
+ // CASPAR_LOG(error) << context_.print() << L" Unexpected exception. Clearing stage and freeing memory";\r
+ // restart();\r
+ // }\r
+\r
+ // context_.execution().begin_invoke([this]{tick();});\r
+ //}\r
+\r
+ //void restart()\r
+ //{\r
+ // stage_->clear();\r
+ // context_.ogl().gc().wait();\r
+\r
+ // mixer_ = nullptr;\r
+ // mixer_.reset(new caspar::core::mixer(context_));\r
+ //}\r
\r
std::wstring print() const\r
{\r
- return context_.print();\r
+ return L"video_channel";\r
}\r
\r
- void set_video_format_desc(const video_format_desc& format_desc)\r
- {\r
- context_.execution().begin_invoke([=]\r
- {\r
- stage_->clear();\r
- context_.ogl().gc().wait();\r
- context_.set_format_desc(format_desc);\r
- });\r
- }\r
+ //void set_video_format_desc(const video_format_desc& format_desc)\r
+ //{\r
+ // context_.execution().begin_invoke([=]\r
+ // {\r
+ // stage_->clear();\r
+ // context_.ogl().gc().wait();\r
+ // context_.set_format_desc(format_desc);\r
+ // });\r
+ //}\r
};\r
\r
video_channel::video_channel(int index, const video_format_desc& format_desc, ogl_device& ogl) : impl_(new implementation(index, format_desc, ogl)){}\r
safe_ptr<stage> video_channel::stage() { return impl_->stage_;} \r
safe_ptr<mixer> video_channel::mixer() { return make_safe_ptr(impl_->mixer_);} \r
safe_ptr<output> video_channel::output() { return impl_->output_;} \r
-video_format_desc video_channel::get_video_format_desc() const{return impl_->context_.get_format_desc();}\r
-void video_channel::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
+video_format_desc video_channel::get_video_format_desc() const{return impl_->format_desc_;}\r
std::wstring video_channel::print() const { return impl_->print();}\r
-video_channel_context& video_channel::context(){return impl_->context_;}\r
\r
}}
\ No newline at end of file
safe_ptr<stage> stage();\r
safe_ptr<mixer> mixer();\r
safe_ptr<output> output();\r
-\r
- video_channel_context& context();\r
-\r
+ \r
+ // C_TODO\r
video_format_desc get_video_format_desc() const;\r
- void set_video_format_desc(const video_format_desc& format_desc);\r
+ //void set_video_format_desc(const video_format_desc& format_desc);\r
\r
std::wstring print() const;\r
\r
+++ /dev/null
-#include "stdAfx.h"\r
-\r
-#include "video_channel_context.h"\r
-\r
-namespace caspar { namespace core {\r
-\r
-struct video_channel_context::implementation\r
-{ \r
- mutable tbb::spin_rw_mutex mutex_;\r
- const int index_;\r
- video_format_desc format_desc_;\r
- executor execution_;\r
- ogl_device& ogl_;\r
-\r
- implementation(int index, ogl_device& ogl, const video_format_desc& format_desc)\r
- : index_(index)\r
- , format_desc_(format_desc)\r
- , execution_(print() + L"/execution")\r
- , ogl_(ogl)\r
- {\r
- execution_.set_priority_class(above_normal_priority_class);\r
- }\r
-\r
- std::wstring print() const\r
- {\r
- return L"video_channel[" + boost::lexical_cast<std::wstring>(index_+1) + L"|" + format_desc_.name + L"]";\r
- }\r
-};\r
-\r
-video_channel_context::video_channel_context(int index, ogl_device& ogl, const video_format_desc& format_desc) \r
- : impl_(new implementation(index, ogl, format_desc))\r
-{\r
-}\r
-\r
-const int video_channel_context::index() const {return impl_->index_;}\r
-\r
-video_format_desc video_channel_context::get_format_desc()\r
-{\r
- tbb::spin_rw_mutex::scoped_lock lock(impl_->mutex_, false);\r
- return impl_->format_desc_;\r
-}\r
-\r
-void video_channel_context::set_format_desc(const video_format_desc& format_desc)\r
-{\r
- tbb::spin_rw_mutex::scoped_lock lock(impl_->mutex_, true);\r
- impl_->format_desc_ = format_desc;\r
-}\r
-\r
-executor& video_channel_context::execution()\r
-{\r
- return impl_->execution_;\r
-}\r
-\r
-ogl_device& video_channel_context::ogl()\r
-{\r
- return impl_->ogl_;\r
-}\r
-\r
-std::wstring video_channel_context::print() const\r
-{\r
- return impl_->print();\r
-}\r
-\r
-}}
\ No newline at end of file
+++ /dev/null
-#pragma once\r
-\r
-#include <common/concurrency/executor.h>\r
-\r
-#include <core/video_format.h>\r
-\r
-#include <tbb/spin_rw_mutex.h>\r
-\r
-#include <boost/noncopyable.hpp>\r
-#include <boost/lexical_cast.hpp>\r
-\r
-#include <string>\r
-\r
-namespace caspar { \r
-\r
-class executor;\r
-\r
-namespace core {\r
-\r
-class ogl_device;\r
-\r
-class video_channel_context\r
-{\r
-\r
-public:\r
- video_channel_context(int index, ogl_device& ogl, const video_format_desc& format_desc);\r
-\r
- const int index() const;\r
- video_format_desc get_format_desc();\r
- void set_format_desc(const video_format_desc& format_desc);\r
- executor& execution();\r
- ogl_device& ogl();\r
- std::wstring print() const;\r
-private:\r
- struct implementation;\r
- std::shared_ptr<implementation> impl_;\r
-};\r
- \r
-}}
\ No newline at end of file
\r
class decklink_producer_proxy : public Concurrency::agent, public core::frame_producer\r
{ \r
- Concurrency::bounded_buffer<ffmpeg::video_message_t> video_frames_;\r
- Concurrency::bounded_buffer<ffmpeg::audio_message_t> audio_buffers_;\r
- Concurrency::bounded_buffer<ffmpeg::frame_message_t> muxed_frames_;\r
+ Concurrency::bounded_buffer<safe_ptr<AVFrame>> video_frames_;\r
+ Concurrency::bounded_buffer<safe_ptr<core::audio_buffer>> audio_buffers_;\r
+ Concurrency::bounded_buffer<safe_ptr<core::basic_frame>> muxed_frames_;\r
\r
const core::video_format_desc format_desc_;\r
const size_t device_index_;\r
\r
try\r
{\r
- last_frame_ = frame = Concurrency::receive(muxed_frames_)->payload;\r
+ last_frame_ = frame = Concurrency::receive(muxed_frames_);\r
}\r
catch(Concurrency::operation_timed_out&)\r
{ \r
auto frame = filter_.poll();\r
if(!frame)\r
break;\r
- Concurrency::send(video_frames_, ffmpeg::make_message(frame, std::make_shared<ffmpeg::token>(semaphore_)));\r
+ Concurrency::send(video_frames_, make_safe_ptr(frame));\r
}\r
},\r
[&]\r
{\r
auto sample_frame_count = audio->GetSampleFrameCount();\r
auto audio_data = reinterpret_cast<int32_t*>(bytes);\r
- Concurrency::send(audio_buffers_, ffmpeg::make_message(std::make_shared<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels), std::make_shared<ffmpeg::token>(semaphore_)));\r
+ Concurrency::send(audio_buffers_, make_safe<core::audio_buffer>(audio_data, audio_data + sample_frame_count*format_desc_.audio_channels));\r
}\r
else\r
- Concurrency::send(audio_buffers_, ffmpeg::make_message(ffmpeg::empty_audio(), std::make_shared<ffmpeg::token>(semaphore_))); \r
+ Concurrency::send(audio_buffers_, ffmpeg::empty_audio()); \r
});\r
}\r
\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- overwrite_buffer<bool> is_running_;\r
- unbounded_buffer<packet_message_t> source_;\r
- ITarget<audio_message_t>& target_;\r
+ overwrite_buffer<bool> is_running_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> source_;\r
+ ITarget<safe_ptr<core::audio_buffer>>& target_;\r
\r
public:\r
explicit implementation(audio_decoder::source_t& source, audio_decoder::target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc) \r
format_desc.audio_sample_rate, codec_context_->sample_rate,\r
AV_SAMPLE_FMT_S32, codec_context_->sample_fmt)\r
, buffer1_(AVCODEC_MAX_AUDIO_FRAME_SIZE*2)\r
- , source_([this](const packet_message_t& message)\r
+ , source_([this](const safe_ptr<AVPacket>& packet)\r
{\r
- return message->payload && message->payload->stream_index == index_;\r
+ return packet->stream_index == index_;\r
})\r
, target_(target)\r
{ \r
send(is_running_, true);\r
while(is_running_.value())\r
{ \r
- auto message = receive(source_);\r
- auto packet = message->payload;\r
+ auto packet = receive(source_);\r
\r
- if(!packet)\r
- continue;\r
-\r
if(packet == loop_packet(index_))\r
{\r
- send(target_, make_message(loop_audio()));\r
- break;\r
+ send(target_, loop_audio());\r
+ continue;\r
}\r
\r
if(packet == eof_packet(index_))\r
const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
\r
- send(target_, make_message(std::make_shared<core::audio_buffer>(samples, samples + n_samples)));\r
+ send(target_, make_safe<core::audio_buffer>(samples, samples + n_samples));\r
}\r
}\r
}\r
}\r
\r
send(is_running_, false);\r
- send(target_, make_message(eof_audio()));\r
+ send(target_, eof_audio());\r
\r
done();\r
}\r
{\r
public:\r
\r
- typedef Concurrency::ISource<packet_message_t>& source_t;\r
- typedef Concurrency::ITarget<audio_message_t>& target_t;\r
+ typedef Concurrency::ISource<safe_ptr<AVPacket>>& source_t;\r
+ typedef Concurrency::ITarget<safe_ptr<core::audio_buffer>>& target_t;\r
\r
explicit audio_decoder(source_t& source, target_t& target, AVFormatContext& context, const core::video_format_desc& format_desc);\r
\r
\r
struct ffmpeg_producer : public core::frame_producer\r
{ \r
- const std::wstring filename_;\r
- const int start_;\r
- const bool loop_;\r
- const size_t length_;\r
+ const std::wstring filename_;\r
+ const int start_;\r
+ const bool loop_;\r
+ const size_t length_;\r
\r
- Concurrency::unbounded_buffer<packet_message_t> packets_;\r
- Concurrency::unbounded_buffer<video_message_t> video_;\r
- Concurrency::unbounded_buffer<audio_message_t> audio_;\r
- Concurrency::bounded_buffer<frame_message_t> frames_;\r
- Concurrency::call<packet_message_t> throw_away_;\r
+ call<safe_ptr<AVPacket>> throw_away_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> packets_;\r
+ unbounded_buffer<safe_ptr<AVFrame>> video_;\r
+ unbounded_buffer<safe_ptr<core::audio_buffer>> audio_;\r
+ bounded_buffer<safe_ptr<core::basic_frame>> frames_;\r
\r
- const safe_ptr<diagnostics::graph> graph_;\r
+ const safe_ptr<diagnostics::graph> graph_;\r
\r
- input input_; \r
- std::shared_ptr<video_decoder> video_decoder_;\r
- std::shared_ptr<audio_decoder> audio_decoder_; \r
- std::unique_ptr<frame_muxer2> muxer_;\r
+ input input_; \r
+ std::shared_ptr<video_decoder> video_decoder_;\r
+ std::shared_ptr<audio_decoder> audio_decoder_; \r
+ std::unique_ptr<frame_muxer2> muxer_;\r
\r
- safe_ptr<core::basic_frame> last_frame_;\r
+ safe_ptr<core::basic_frame> last_frame_;\r
\r
public:\r
explicit ffmpeg_producer(const safe_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, const std::wstring& filter, bool loop, int start, size_t length) \r
, start_(start)\r
, loop_(loop)\r
, length_(length)\r
- , throw_away_([](const packet_message_t&){})\r
+ , throw_away_([](const safe_ptr<AVPacket>&){})\r
, frames_(2)\r
, graph_(diagnostics::create_graph("", false))\r
, input_(packets_, graph_, filename_, loop, start, length)\r
~ffmpeg_producer()\r
{\r
input_.stop(); \r
- while(Concurrency::receive(frames_)->payload != core::basic_frame::eof())\r
+ while(Concurrency::receive(frames_) != core::basic_frame::eof())\r
{\r
}\r
}\r
\r
try\r
{ \r
- frame = last_frame_ = Concurrency::receive(frames_, 10)->payload;\r
+ frame = last_frame_ = Concurrency::receive(frames_, 10);\r
graph_->update_text(narrow(print()));\r
}\r
- catch(Concurrency::operation_timed_out&)\r
+ catch(operation_timed_out&)\r
{ \r
graph_->add_tag("underflow"); \r
}\r
\r
struct frame_muxer2::implementation : public Concurrency::agent, boost::noncopyable\r
{ \r
- ITarget<frame_message_t>& target_;\r
- display_mode::type display_mode_;\r
- const double in_fps_;\r
- const video_format_desc format_desc_;\r
- bool auto_transcode_;\r
+ ITarget<safe_ptr<core::basic_frame>>& target_;\r
+ display_mode::type display_mode_;\r
+ const double in_fps_;\r
+ const video_format_desc format_desc_;\r
+ bool auto_transcode_;\r
\r
- filter filter_;\r
- const safe_ptr<core::frame_factory> frame_factory_;\r
+ filter filter_;\r
+ const safe_ptr<core::frame_factory> frame_factory_;\r
\r
- call<video_message_t> push_video_;\r
- call<audio_message_t> push_audio_;\r
+ call<safe_ptr<AVFrame>> push_video_;\r
+ call<safe_ptr<core::audio_buffer>> push_audio_;\r
\r
- transformer<video_message_t, std::shared_ptr<message<std::shared_ptr<write_frame>>>> video_;\r
- unbounded_buffer<audio_message_t> audio_;\r
+ unbounded_buffer<safe_ptr<AVFrame>> video_;\r
+ unbounded_buffer<safe_ptr<core::audio_buffer>> audio_;\r
\r
- core::audio_buffer audio_data_;\r
+ core::audio_buffer audio_data_;\r
\r
- Concurrency::overwrite_buffer<bool> is_running_;\r
+ Concurrency::overwrite_buffer<bool> is_running_;\r
+\r
+ safe_ptr<semaphore> semaphore_;\r
\r
implementation(frame_muxer2::video_source_t* video_source,\r
frame_muxer2::audio_source_t* audio_source,\r
, frame_factory_(make_safe<core::concrt_frame_factory>(frame_factory))\r
, push_video_(std::bind(&implementation::push_video, this, std::placeholders::_1))\r
, push_audio_(std::bind(&implementation::push_audio, this, std::placeholders::_1))\r
- , video_(std::bind(&implementation::make_write_frame, this, std::placeholders::_1))\r
+ , semaphore_(make_safe<semaphore>(8))\r
{\r
if(video_source)\r
video_source->link_target(&push_video_);\r
send(is_running_, false);\r
agent::wait(this);\r
}\r
-\r
- std::shared_ptr<message<std::shared_ptr<core::write_frame>>> make_write_frame(const video_message_t& message)\r
- {\r
- if(message->payload == eof_video())\r
- return make_message<std::shared_ptr<core::write_frame>>(nullptr);\r
-\r
- if(message->payload == empty_video())\r
- return make_message<std::shared_ptr<core::write_frame>>(std::make_shared<core::write_frame>(this));\r
-\r
- return make_message<std::shared_ptr<core::write_frame>>(ffmpeg::make_write_frame(this, make_safe_ptr(message->payload), frame_factory_, 0), message->token);\r
- } \r
-\r
+ \r
virtual void run()\r
{\r
try\r
{\r
auto video = receive(video_);\r
auto audio = receive(audio_); \r
-\r
- if(!audio->payload)\r
+ auto frame = make_safe<core::write_frame>(this);\r
+ \r
+ if(audio == eof_audio())\r
{\r
send(is_running_ , false);\r
break;\r
}\r
\r
- if(!video->payload)\r
+ if(video == eof_video())\r
{\r
send(is_running_ , false);\r
break;\r
}\r
\r
- video->payload->audio_data() = std::move(*audio->payload);\r
+ if(video != empty_video())\r
+ frame = make_write_frame(this, video, frame_factory_, 0);\r
+ if(audio == empty_audio())\r
+ audio = make_safe<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0);\r
+\r
+ frame->audio_data() = std::move(*audio);\r
\r
switch(display_mode_)\r
{\r
case display_mode::deinterlace:\r
case display_mode::deinterlace_bob:\r
{\r
- auto message = make_message(safe_ptr<core::basic_frame>(video->payload), video->token ? video->token : audio->token);\r
- send(target_, message);\r
+ send(target_, safe_ptr<core::basic_frame>(frame));\r
\r
break;\r
}\r
case display_mode::duplicate: \r
{ \r
- auto message = make_message(safe_ptr<core::basic_frame>(video->payload), video->token ? video->token : audio->token);\r
- send(target_, message);\r
- send(target_, message);\r
+ send(target_, safe_ptr<core::basic_frame>(frame));\r
+ send(target_, safe_ptr<core::basic_frame>(frame));\r
\r
break;\r
}\r
case display_mode::half: \r
{ \r
- receive(video_);\r
-\r
- auto message = make_message(safe_ptr<core::basic_frame>(video->payload), video->token ? video->token : audio->token);\r
- send(target_, message);\r
+ receive(video_); // throw away \r
+ send(target_, safe_ptr<core::basic_frame>(frame));\r
\r
break;\r
}\r
case display_mode::deinterlace_bob_reinterlace:\r
case display_mode::interlace: \r
{ \r
- auto frame = safe_ptr<core::basic_frame>(video->payload);\r
+ /*auto frame = safe_ptr<core::basic_frame>(frame);\r
auto video2 = receive(video_); \r
- if(video->payload)\r
- frame = core::basic_frame::interlace(make_safe_ptr(video->payload), make_safe_ptr(video2->payload), format_desc_.field_mode);\r
+ if(video2 != empty_video() && video2 != eof_video())\r
+ frame = core::basic_frame::interlace(frame, safe_ptr<core::basic_frame>(video2), format_desc_.field_mode);\r
else\r
send(is_running_, false);\r
\r
- auto message = make_message<safe_ptr<core::basic_frame>>(frame, video2->token ? video2->token : audio->token);\r
- send(target_, message);\r
+ send(target_, frame);*/\r
\r
break;\r
}\r
}\r
\r
send(is_running_ , false);\r
- send(target_, make_message(core::basic_frame::eof()));\r
+ send(target_, core::basic_frame::eof());\r
\r
done();\r
}\r
\r
- void push_video(const video_message_t& message)\r
- { \r
- auto video_frame = message->payload;\r
-\r
- if(!video_frame)\r
- return;\r
- \r
- if(video_frame == eof_video())\r
+ void push_video(const safe_ptr<AVFrame>& video_frame)\r
+ { \r
+ if(video_frame == eof_video() || video_frame == empty_video())\r
{\r
- send(video_, make_message(eof_video()));\r
+ send(video_, video_frame);\r
return;\r
}\r
-\r
+ \r
if(video_frame == loop_video()) \r
return; \r
- \r
- if(video_frame == empty_video())\r
- {\r
- send(video_, make_message(empty_video()));\r
- return;\r
- }\r
\r
if(display_mode_ == display_mode::invalid)\r
{\r
BOOST_FOREACH(auto av_frame, filter_.poll_all())\r
{ \r
av_frame->format = format; \r
- send(video_, make_message(std::shared_ptr<AVFrame>(av_frame), message->token));\r
+ send(video_, av_frame);\r
}\r
}\r
\r
- void push_audio(const audio_message_t& message)\r
+ void push_audio(const safe_ptr<core::audio_buffer>& audio_samples)\r
{\r
- auto audio_samples = message->payload;\r
-\r
- if(!audio_samples)\r
- return;\r
-\r
- if(audio_samples == eof_audio())\r
+ if(audio_samples == eof_audio() || audio_samples == empty_audio())\r
{\r
- send(audio_, make_message(std::shared_ptr<core::audio_buffer>()));\r
+ send(audio_, audio_samples);\r
return;\r
}\r
\r
if(audio_samples == loop_audio()) \r
return; \r
\r
- if(audio_samples == empty_audio()) \r
- send(audio_, make_message(std::make_shared<core::audio_buffer>(format_desc_.audio_samples_per_frame, 0))); \r
-\r
audio_data_.insert(audio_data_.end(), audio_samples->begin(), audio_samples->end());\r
\r
while(audio_data_.size() >= format_desc_.audio_samples_per_frame)\r
auto begin = audio_data_.begin(); \r
auto end = begin + format_desc_.audio_samples_per_frame;\r
\r
- send(audio_, make_message(std::make_shared<core::audio_buffer>(begin, end), message->token));\r
+ send(audio_, make_safe<core::audio_buffer>(begin, end));\r
\r
audio_data_.erase(begin, end);\r
}\r
{\r
public:\r
\r
- typedef Concurrency::ISource<video_message_t> video_source_t;\r
- typedef Concurrency::ISource<audio_message_t> audio_source_t;\r
- typedef Concurrency::ITarget<frame_message_t> target_t;\r
+ typedef Concurrency::ISource<safe_ptr<AVFrame>> video_source_t;\r
+ typedef Concurrency::ISource<safe_ptr<core::audio_buffer>> audio_source_t;\r
+ typedef Concurrency::ITarget<safe_ptr<core::basic_frame>> target_t;\r
\r
frame_muxer2(video_source_t* video_source,\r
audio_source_t* audio_source, \r
tbb::atomic<size_t> packets_size_;\r
\r
bool stop_;\r
-\r
- safe_ptr<Concurrency::semaphore> semaphore_;\r
- \r
+ \r
public:\r
explicit implementation(input::target_t& target,\r
const safe_ptr<diagnostics::graph>& graph, \r
, length_(length)\r
, frame_number_(0)\r
, stop_(false)\r
- , semaphore_(make_safe<Concurrency::semaphore>(MAX_TOKENS))\r
{ \r
packets_count_ = 0;\r
packets_size_ = 0;\r
void stop()\r
{\r
stop_ = true;\r
- for(size_t n = 0; n < format_context_->nb_streams+1; ++n)\r
- semaphore_->release();\r
agent::wait(this);\r
}\r
\r
if(!packet)\r
break;\r
\r
- Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
- Concurrency::wait(0);\r
-\r
- //std::vector<std::shared_ptr<AVPacket>> buffer;\r
-\r
- //while(buffer.size() < 100 && !stop_)\r
- //{\r
- // Concurrency::scoped_oversubcription_token oversubscribe;\r
- // auto packet = read_next_packet();\r
- // if(!packet)\r
- // stop_ = true;\r
- // else\r
- // buffer.push_back(packet);\r
- //}\r
- // \r
- //std::stable_partition(buffer.begin(), buffer.end(), [this](const std::shared_ptr<AVPacket>& packet)\r
- //{\r
- // return packet->stream_index != default_stream_index_;\r
- //});\r
-\r
- //BOOST_FOREACH(auto packet, buffer)\r
- //{\r
- // Concurrency::asend(target_, make_message(packet, packet->stream_index == default_stream_index_ ? std::make_shared<token>(semaphore_) : nullptr));\r
- // Concurrency::wait(0);\r
- //}\r
+ Concurrency::asend(target_, make_safe_ptr(packet));\r
+ Concurrency::wait(40);\r
}\r
}\r
catch(...)\r
} \r
\r
BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::send(target_, make_message(eof_packet(stream->index), std::make_shared<token>(semaphore_))); \r
+ Concurrency::send(target_, eof_packet(stream->index)); \r
\r
done();\r
}\r
auto size = packet->size;\r
auto data = packet->data; \r
\r
- packet = std::shared_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+ packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
{\r
packet->size = size;\r
packet->data = data;\r
packet->size = 0;\r
\r
BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::send(target_, make_message(loop_packet(stream->index), std::make_shared<token>(semaphore_))); \r
+ Concurrency::asend(target_, loop_packet(stream->index)); \r
\r
graph_->add_tag("seek"); \r
} \r
{\r
public:\r
\r
- typedef Concurrency::ITarget<packet_message_t> target_t;\r
+ typedef Concurrency::ITarget<safe_ptr<AVPacket>> target_t;\r
\r
explicit input(target_t& target, \r
const safe_ptr<diagnostics::graph>& graph, \r
video_context.time_base.den = static_cast<int>(closest_fps*1000000.0);\r
}\r
\r
-std::shared_ptr<AVPacket> create_packet()\r
+safe_ptr<AVPacket> create_packet()\r
{\r
- std::shared_ptr<AVPacket> packet(new AVPacket, [](AVPacket* p)\r
+ safe_ptr<AVPacket> packet(new AVPacket, [](AVPacket* p)\r
{\r
av_free_packet(p);\r
delete p;\r
return packet;\r
}\r
\r
-const std::shared_ptr<AVPacket>& loop_packet(int index)\r
+const safe_ptr<AVPacket>& loop_packet(int index)\r
{\r
- static std::shared_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
+ static safe_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
\r
auto& packet = packets[index];\r
packet->stream_index = index;\r
return packet;\r
}\r
\r
-const std::shared_ptr<AVPacket>& eof_packet(int index)\r
+const safe_ptr<AVPacket>& eof_packet(int index)\r
{\r
- static std::shared_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
+ static safe_ptr<AVPacket> packets[] = {create_packet(), create_packet(), create_packet(), create_packet(), create_packet(), create_packet()};\r
\r
auto& packet = packets[index];\r
packet->stream_index = index;\r
return packet;\r
}\r
\r
-const std::shared_ptr<AVFrame>& loop_video()\r
+const safe_ptr<AVFrame>& loop_video()\r
{\r
- static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+ static auto frame1 = safe_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
return frame1;\r
}\r
\r
-const std::shared_ptr<AVFrame>& empty_video()\r
+const safe_ptr<AVFrame>& empty_video()\r
{\r
- static auto frame1 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+ static auto frame1 = safe_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
return frame1;\r
}\r
\r
-const std::shared_ptr<AVFrame>& eof_video()\r
+const safe_ptr<AVFrame>& eof_video()\r
{\r
- static auto frame2 = std::shared_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
+ static auto frame2 = safe_ptr<AVFrame>(avcodec_alloc_frame(), av_free);\r
return frame2;\r
}\r
\r
-const std::shared_ptr<core::audio_buffer>& loop_audio()\r
+const safe_ptr<core::audio_buffer>& loop_audio()\r
{\r
- static auto audio1 = std::make_shared<core::audio_buffer>();\r
+ static auto audio1 = safe_ptr<core::audio_buffer>();\r
return audio1;\r
}\r
\r
-const std::shared_ptr<core::audio_buffer>& empty_audio()\r
+const safe_ptr<core::audio_buffer>& empty_audio()\r
{\r
- static auto audio1 = std::make_shared<core::audio_buffer>();\r
+ static auto audio1 = safe_ptr<core::audio_buffer>();\r
return audio1;\r
}\r
\r
-const std::shared_ptr<core::audio_buffer>& eof_audio()\r
+const safe_ptr<core::audio_buffer>& eof_audio()\r
{\r
- static auto audio2 = std::make_shared<core::audio_buffer>();\r
+ static auto audio2 = safe_ptr<core::audio_buffer>();\r
return audio2;\r
}\r
\r
namespace ffmpeg {\r
\r
// Dataflow\r
-\r
-class token\r
-{\r
- safe_ptr<Concurrency::semaphore> semaphore_;\r
-public:\r
- token(const safe_ptr<Concurrency::semaphore>& semaphore)\r
- : semaphore_(semaphore)\r
- {\r
- semaphore_->acquire();\r
- }\r
-\r
- ~token()\r
- {\r
- semaphore_->release();\r
- }\r
-};\r
-\r
-template <typename T>\r
-struct message\r
-{\r
- message(const T& payload = T(), const std::shared_ptr<token>& token = nullptr)\r
- : payload(payload)\r
- , token(token)\r
- {\r
- }\r
-\r
- T payload;\r
- std::shared_ptr<token> token;\r
-};\r
-\r
-template<typename T>\r
-safe_ptr<message<T>> make_message(const T& payload, const std::shared_ptr<token>& token = nullptr)\r
-{\r
- return make_safe<message<T>>(payload, token);\r
-}\r
-\r
-typedef safe_ptr<message<std::shared_ptr<AVPacket>>> packet_message_t;\r
-typedef safe_ptr<message<std::shared_ptr<AVFrame>>> video_message_t;\r
-typedef safe_ptr<message<std::shared_ptr<core::audio_buffer>>> audio_message_t;\r
-typedef safe_ptr<message<safe_ptr<core::basic_frame>>> frame_message_t;\r
\r
-const std::shared_ptr<AVPacket>& loop_packet(int index);\r
-const std::shared_ptr<AVPacket>& eof_packet(int index);\r
+const safe_ptr<AVPacket>& loop_packet(int index);\r
+const safe_ptr<AVPacket>& eof_packet(int index);\r
\r
-const std::shared_ptr<AVFrame>& loop_video();\r
-const std::shared_ptr<AVFrame>& empty_video();\r
-const std::shared_ptr<AVFrame>& eof_video();\r
-const std::shared_ptr<core::audio_buffer>& loop_audio();\r
-const std::shared_ptr<core::audio_buffer>& empty_audio();\r
-const std::shared_ptr<core::audio_buffer>& eof_audio();\r
+const safe_ptr<AVFrame>& loop_video();\r
+const safe_ptr<AVFrame>& empty_video();\r
+const safe_ptr<AVFrame>& eof_video();\r
+const safe_ptr<core::audio_buffer>& loop_audio();\r
+const safe_ptr<core::audio_buffer>& empty_audio();\r
+const safe_ptr<core::audio_buffer>& eof_audio();\r
\r
// Utils\r
\r
\r
void fix_meta_data(AVFormatContext& context);\r
\r
-std::shared_ptr<AVPacket> create_packet();\r
+safe_ptr<AVPacket> create_packet();\r
\r
safe_ptr<AVCodecContext> open_codec(AVFormatContext& context, enum AVMediaType type, int& index);\r
safe_ptr<AVFormatContext> open_input(const std::wstring& filename);\r
bool is_progressive_;\r
\r
overwrite_buffer<bool> is_running_;\r
- unbounded_buffer<packet_message_t> source_;\r
- ITarget<video_message_t>& target_;\r
+ unbounded_buffer<safe_ptr<AVPacket>> source_;\r
+ ITarget<safe_ptr<AVFrame>>& target_;\r
\r
- safe_ptr<semaphore> semaphore_;\r
+ safe_ptr<semaphore> semaphore_;\r
\r
public:\r
explicit implementation(video_decoder::source_t& source, video_decoder::target_t& target, AVFormatContext& context) \r
, width_(codec_context_->width)\r
, height_(codec_context_->height)\r
, is_progressive_(true)\r
- , source_([this](const packet_message_t& message)\r
+ , source_([this](const safe_ptr<AVPacket>& packet)\r
{\r
- return message->payload && message->payload->stream_index == index_;\r
+ return packet->stream_index == index_;\r
})\r
, target_(target)\r
, semaphore_(make_safe<Concurrency::semaphore>(1))\r
send(is_running_, true);\r
while(is_running_.value())\r
{\r
- auto message = receive(source_);\r
- auto packet = message->payload;\r
+ auto packet = receive(source_);\r
\r
- if(!packet)\r
- continue;\r
-\r
if(packet == loop_packet(index_))\r
{\r
- send(target_, make_message(loop_video()));\r
+ send(target_, loop_video());\r
continue;\r
}\r
\r
if(packet == eof_packet(index_))\r
break;\r
\r
- token token(semaphore_);\r
- std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this, token](AVFrame* frame)\r
+ std::shared_ptr<AVFrame> decoded_frame(avcodec_alloc_frame(), [this](AVFrame* frame)\r
{\r
av_free(frame);\r
+ semaphore_->release();\r
});\r
+ semaphore_->acquire();\r
\r
int frame_finished = 0;\r
THROW_ON_ERROR2(avcodec_decode_video2(codec_context_.get(), decoded_frame.get(), &frame_finished, packet.get()), "[video_decocer]");\r
\r
is_progressive_ = decoded_frame->interlaced_frame == 0;\r
\r
- send(target_, make_message(decoded_frame, message->token));\r
+ send(target_, make_safe_ptr(decoded_frame));\r
Concurrency::wait(10);\r
}\r
}\r
}\r
\r
send(is_running_, false),\r
- send(target_, make_message(eof_video()));\r
+ send(target_, eof_video());\r
\r
done();\r
}\r
{\r
public:\r
\r
- typedef Concurrency::ISource<packet_message_t> source_t;\r
- typedef Concurrency::ITarget<video_message_t> target_t;\r
+ typedef Concurrency::ISource<safe_ptr<AVPacket>> source_t;\r
+ typedef Concurrency::ITarget<safe_ptr<AVFrame>> target_t;\r
\r
explicit video_decoder(source_t& source, target_t& target, AVFormatContext& context); \r
\r
auto av_frame = get_av_frame();\r
av_frame->data[0] = const_cast<uint8_t*>(frame->image_data().begin());\r
\r
+ filter_.push(av_frame);\r
auto frames = filter_.poll_all();\r
\r
if(frames.empty())\r
\r
#include <core/producer/frame_producer.h>\r
#include <core/video_format.h>\r
-#include <core/video_channel_context.h>\r
#include <core/producer/transition/transition_producer.h>\r
#include <core/producer/frame/frame_transform.h>\r
#include <core/producer/stage.h>\r
\r
if(name == TEXT("MODE"))\r
{\r
- auto format_desc = core::video_format_desc::get(value);\r
- if(format_desc.format != core::video_format::invalid)\r
- {\r
- GetChannel()->set_video_format_desc(format_desc);\r
- SetReplyString(TEXT("202 SET MODE OK\r\n"));\r
- }\r
- else\r
+ // C-TODO\r
+ //auto format_desc = core::video_format_desc::get(value);\r
+ //if(format_desc.format != core::video_format::invalid)\r
+ //{\r
+ // GetChannel()->set_video_format_desc(format_desc);\r
+ // SetReplyString(TEXT("202 SET MODE OK\r\n"));\r
+ //}\r
+ //else\r
SetReplyString(TEXT("501 SET MODE FAILED\r\n"));\r
}\r
else\r
</producers>\r
<channels>\r
<channel>\r
- <video-mode>1080i5000</video-mode>\r
+ <video-mode>PAL</video-mode>\r
<consumers>\r
<decklink>\r
<device>1</device>\r
<embedded-audio>true</embedded-audio>\r
</decklink>\r
+ <screen>\r
+ <device>1</device>\r
+ </screen>\r
<audio></audio>\r
</consumers>\r
</channel>\r