1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
5 #if !defined(RXCPP_RX_OBSERVER_HPP)
6 #define RXCPP_RX_OBSERVER_HPP
8 #include "rx-includes.hpp"
17 typedef tag_observer observer_tag;
24 void operator()(const T&) const {}
28 void operator()(std::exception_ptr) const {
29 // error implicitly ignored, abort
33 struct OnCompletedEmpty
35 void operator()() const {}
38 template<class T, class F>
42 template<class CT, class CF>
43 static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr));
44 template<class CT, class CF>
45 static not_void check(...);
47 typedef decltype(check<T, typename std::decay<F>::type>(0)) detail_result;
48 static const bool value = std::is_same<detail_result, void>::value;
56 static auto check(int) -> decltype((*(CF*)nullptr)(*(std::exception_ptr*)nullptr));
58 static not_void check(...);
60 static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value;
64 struct is_on_completed
68 static auto check(int) -> decltype((*(CF*)nullptr)());
70 static not_void check(...);
72 static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value;
77 template<class T, class OnNext, class OnError = detail::OnErrorEmpty, class OnCompleted = detail::OnCompletedEmpty>
81 typedef static_observer<T, OnNext, OnError, OnCompleted> this_type;
82 typedef typename std::decay<OnNext>::type on_next_t;
83 typedef typename std::decay<OnError>::type on_error_t;
84 typedef typename std::decay<OnCompleted>::type on_completed_t;
89 on_completed_t oncompleted;
92 static_assert(detail::is_on_next_of<T, on_next_t>::value, "Function supplied for on_next must be a function with the signature void(T);");
93 static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(std::exception_ptr);");
94 static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
96 explicit static_observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
97 : onnext(std::move(n))
98 , onerror(std::move(e))
99 , oncompleted(std::move(c))
102 static_observer(const this_type& o)
105 , oncompleted(o.oncompleted)
108 static_observer(this_type&& o)
109 : onnext(std::move(o.onnext))
110 , onerror(std::move(o.onerror))
111 , oncompleted(std::move(o.oncompleted))
114 this_type& operator=(this_type o) {
115 onnext = std::move(o.onnext);
116 onerror = std::move(o.onerror);
117 oncompleted = std::move(o.oncompleted);
121 // use V so that std::move can be used safely
123 void on_next(V v) const {
124 onnext(std::move(v));
126 void on_error(std::exception_ptr e) const {
129 void on_completed() const {
135 class dynamic_observer
138 typedef tag_dynamic_observer dynamic_observer_tag;
141 typedef dynamic_observer<T> this_type;
142 typedef observer_base<T> base_type;
144 struct virtual_observer : public std::enable_shared_from_this<virtual_observer>
146 virtual void on_next(T) const =0;
147 virtual void on_error(std::exception_ptr e) const =0;
148 virtual void on_completed() const =0;
151 template<class Observer>
152 struct specific_observer : public virtual_observer
154 explicit specific_observer(Observer o)
155 : destination(std::move(o))
159 Observer destination;
160 virtual void on_next(T t) const {
161 destination.on_next(std::move(t));
163 virtual void on_error(std::exception_ptr e) const {
164 destination.on_error(e);
166 virtual void on_completed() const {
167 destination.on_completed();
171 std::shared_ptr<virtual_observer> destination;
173 template<class Observer>
174 static auto make_destination(Observer o)
175 -> std::shared_ptr<virtual_observer> {
176 return std::make_shared<specific_observer<Observer>>(std::move(o));
183 dynamic_observer(const this_type& o)
184 : destination(o.destination)
187 dynamic_observer(this_type&& o)
188 : destination(std::move(o.destination))
192 template<class Observer>
193 explicit dynamic_observer(Observer o)
194 : destination(make_destination(std::move(o)))
198 this_type& operator=(this_type o) {
199 destination = std::move(o.destination);
203 // perfect forwarding delays the copy of the value.
205 void on_next(V&& v) const {
207 destination->on_next(std::forward<V>(v));
210 void on_error(std::exception_ptr e) const {
212 destination->on_error(e);
215 void on_completed() const {
217 destination->on_completed();
223 template<class T, class I>
224 class observer : public observer_base<T>
226 typedef observer<T, I> this_type;
227 typedef typename std::decay<I>::type inner_t;
236 observer(const this_type& o)
240 observer(this_type&& o)
241 : inner(std::move(o.inner))
244 explicit observer(inner_t inner)
245 : inner(std::move(inner))
248 this_type& operator=(this_type o) {
249 inner = std::move(o.inner);
253 void on_next(V&& v) const {
254 inner.on_next(std::forward<V>(v));
256 void on_error(std::exception_ptr e) const {
259 void on_completed() const {
260 inner.on_completed();
262 observer<T> as_dynamic() const {
263 return observer<T>(dynamic_observer<T>(inner));
267 class observer<T, void> : public observer_base<T>
269 typedef observer this_type;
275 void on_next(V&&) const {
277 void on_error(std::exception_ptr) const {
279 void on_completed() const {
285 -> observer<T, void> {
286 return observer<T, void>();
289 template<class T, class U, class I>
290 auto make_observer(observer<U, I> o)
292 return observer<T, I>(std::move(o));
294 template<class T, class Observer>
295 auto make_observer(Observer ob)
296 -> typename std::enable_if<
297 !detail::is_on_next_of<T, Observer>::value &&
298 !is_observer<Observer>::value,
299 observer<T, Observer>>::type {
300 return observer<T, Observer>(std::move(ob));
302 template<class T, class OnNext>
303 auto make_observer(OnNext on)
304 -> typename std::enable_if<
305 detail::is_on_next_of<T, OnNext>::value,
306 observer<T, static_observer<T, OnNext>>>::type {
307 return observer<T, static_observer<T, OnNext>>(
308 static_observer<T, OnNext>(std::move(on)));
310 template<class T, class OnNext, class OnError>
311 auto make_observer(OnNext on, OnError oe)
312 -> typename std::enable_if<
313 detail::is_on_next_of<T, OnNext>::value &&
314 detail::is_on_error<OnError>::value,
315 observer<T, static_observer<T, OnNext, OnError>>>::type {
316 return observer<T, static_observer<T, OnNext, OnError>>(
317 static_observer<T, OnNext, OnError>(std::move(on), std::move(oe)));
319 template<class T, class OnNext, class OnCompleted>
320 auto make_observer(OnNext on, OnCompleted oc)
321 -> typename std::enable_if<
322 detail::is_on_next_of<T, OnNext>::value &&
323 detail::is_on_completed<OnCompleted>::value,
324 observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
325 return observer<T, static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>>(
326 static_observer<T, OnNext, detail::OnErrorEmpty, OnCompleted>(std::move(on), detail::OnErrorEmpty(), std::move(oc)));
328 template<class T, class OnNext, class OnError, class OnCompleted>
329 auto make_observer(OnNext on, OnError oe, OnCompleted oc)
330 -> typename std::enable_if<
331 detail::is_on_next_of<T, OnNext>::value &&
332 detail::is_on_error<OnError>::value &&
333 detail::is_on_completed<OnCompleted>::value,
334 observer<T, static_observer<T, OnNext, OnError, OnCompleted>>>::type {
335 return observer<T, static_observer<T, OnNext, OnError, OnCompleted>>(
336 static_observer<T, OnNext, OnError, OnCompleted>(std::move(on), std::move(oe), std::move(oc)));
340 template<class T, class Observer>
341 auto make_observer_dynamic(Observer o)
342 -> typename std::enable_if<
343 is_observer<Observer>::value,
345 return observer<T>(dynamic_observer<T>(std::move(o)));
347 template<class T, class OnNext>
348 auto make_observer_dynamic(OnNext&& on)
349 -> typename std::enable_if<
350 detail::is_on_next_of<T, OnNext>::value,
351 observer<T, dynamic_observer<T>>>::type {
352 return observer<T, dynamic_observer<T>>(
353 dynamic_observer<T>(make_observer<T>(std::forward<OnNext>(on))));
355 template<class T, class OnNext, class OnError>
356 auto make_observer_dynamic(OnNext&& on, OnError&& oe)
357 -> typename std::enable_if<
358 detail::is_on_next_of<T, OnNext>::value &&
359 detail::is_on_error<OnError>::value,
360 observer<T, dynamic_observer<T>>>::type {
361 return observer<T, dynamic_observer<T>>(
362 dynamic_observer<T>(make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe))));
364 template<class T, class OnNext, class OnCompleted>
365 auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc)
366 -> typename std::enable_if<
367 detail::is_on_next_of<T, OnNext>::value &&
368 detail::is_on_completed<OnCompleted>::value,
369 observer<T, dynamic_observer<T>>>::type {
370 return observer<T, dynamic_observer<T>>(
371 dynamic_observer<T>(make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc))));
373 template<class T, class OnNext, class OnError, class OnCompleted>
374 auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
375 -> typename std::enable_if<
376 detail::is_on_next_of<T, OnNext>::value &&
377 detail::is_on_error<OnError>::value &&
378 detail::is_on_completed<OnCompleted>::value,
379 observer<T, dynamic_observer<T>>>::type {
380 return observer<T, dynamic_observer<T>>(
381 dynamic_observer<T>(make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc))));
387 struct maybe_from_result
389 typedef decltype((*(F*)nullptr)()) decl_result_type;
390 typedef typename std::decay<decl_result_type>::type result_type;
391 typedef rxu::maybe<result_type> type;
396 template<class F, class OnError>
397 auto on_exception(const F& f, const OnError& c)
398 -> typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
399 typename detail::maybe_from_result<F>::type r;
403 c(std::current_exception());
408 template<class F, class Subscriber>
409 auto on_exception(const F& f, const Subscriber& s)
410 -> typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
411 typename detail::maybe_from_result<F>::type r;
415 s.on_error(std::current_exception());