]> git.sesse.net Git - casparcg/blob - dependencies/boost/boost/graph/distributed/detail/remote_update_set.hpp
Manually merged pull request #222
[casparcg] / dependencies / boost / boost / graph / distributed / detail / remote_update_set.hpp
1 // Copyright (C) 2005-2006 The Trustees of Indiana University.
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 //  Authors: Douglas Gregor
8 //           Andrew Lumsdaine
9 #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
10 #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
11
12 #ifndef BOOST_GRAPH_USE_MPI
13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
14 #endif
15
16 #include <boost/graph/parallel/process_group.hpp>
17 #include <boost/type_traits/is_convertible.hpp>
18 #include <vector>
19 #include <boost/assert.hpp>
20 #include <boost/optional.hpp>
21 #include <queue>
22
23 namespace boost { namespace graph { namespace detail {
24
25 template<typename ProcessGroup>
26 void do_synchronize(ProcessGroup& pg)
27
28   using boost::parallel::synchronize;
29   synchronize(pg);
30 }
31
32 struct remote_set_queued {};
33 struct remote_set_immediate {};
34
35 template<typename ProcessGroup>
36 class remote_set_semantics
37 {
38   BOOST_STATIC_CONSTANT
39     (bool, 
40      queued = (is_convertible<
41                  typename ProcessGroup::communication_category,
42                  parallel::bsp_process_group_tag>::value));
43
44  public:
45   typedef typename mpl::if_c<queued, 
46                              remote_set_queued, 
47                              remote_set_immediate>::type type;
48 };
49
50
51 template<typename Derived, typename ProcessGroup, typename Value,
52          typename OwnerMap,
53          typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
54 class remote_update_set;
55
56 /**********************************************************************
57  * Remote updating set that queues messages until synchronization     *
58  **********************************************************************/
59 template<typename Derived, typename ProcessGroup, typename Value,
60          typename OwnerMap>
61 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
62                         remote_set_queued>
63 {
64   typedef typename property_traits<OwnerMap>::key_type Key;
65   typedef std::vector<std::pair<Key, Value> > Updates;
66   typedef typename Updates::size_type   updates_size_type;
67   typedef typename Updates::value_type  updates_pair_type;
68
69 public:
70
71 private:
72   typedef typename ProcessGroup::process_id_type process_id_type;
73
74   enum message_kind {
75     /** Message containing the number of updates that will be sent in
76      *  a msg_updates message that will immediately follow. This
77      *  message will contain a single value of type
78      *  updates_size_type. 
79      */
80     msg_num_updates,
81
82     /** Contains (key, value) pairs with all of the updates from a
83      *  particular source. The number of updates is variable, but will
84      *  be provided in a msg_num_updates message that immediately
85      *  preceeds this message.
86      *
87      */
88     msg_updates
89   };
90
91   struct handle_messages
92   {
93     explicit 
94     handle_messages(remote_update_set* self, const ProcessGroup& pg)
95       : self(self), update_sizes(num_processes(pg), 0) { }
96
97     void operator()(process_id_type source, int tag) 
98     { 
99       switch(tag) {
100       case msg_num_updates:
101         {
102           // Receive the # of updates
103           updates_size_type num_updates;
104           receive(self->process_group, source, tag, num_updates);
105
106           update_sizes[source] = num_updates;
107         }
108         break;
109
110       case msg_updates:
111         {
112           updates_size_type num_updates = update_sizes[source];
113           BOOST_ASSERT(num_updates);
114
115           // Receive the actual updates
116           std::vector<updates_pair_type> updates(num_updates);
117           receive(self->process_group, source, msg_updates, &updates[0],
118                   num_updates);
119           
120           // Send updates to derived "receive_update" member
121           Derived* derived = static_cast<Derived*>(self);
122           for (updates_size_type u = 0; u < num_updates; ++u)
123             derived->receive_update(source, updates[u].first, updates[u].second);
124
125           update_sizes[source] = 0;
126         }
127         break;
128       };
129     }
130
131   private:
132     remote_update_set* self;
133     std::vector<updates_size_type> update_sizes;
134   };
135   friend struct handle_messages;
136
137  protected:
138   remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
139     : process_group(pg, handle_messages(this, pg)),
140       updates(num_processes(pg)), owner(owner) { 
141     }
142
143
144   void update(const Key& key, const Value& value)
145   { 
146     if (get(owner, key) == process_id(process_group)) {
147       Derived* derived = static_cast<Derived*>(this);
148       derived->receive_update(get(owner, key), key, value);
149     }
150     else {
151       updates[get(owner, key)].push_back(std::make_pair(key, value));
152     }
153   }
154
155   void collect() { }
156
157   void synchronize()
158   {
159     // Emit all updates and then remove them
160     process_id_type num_processes = updates.size();
161     for (process_id_type p = 0; p < num_processes; ++p) {
162       if (!updates[p].empty()) {
163         send(process_group, p, msg_num_updates, updates[p].size());
164         send(process_group, p, msg_updates, 
165              &updates[p].front(), updates[p].size());
166         updates[p].clear();
167       }
168     }
169     
170     do_synchronize(process_group);
171   }
172
173   ProcessGroup process_group;
174
175  private:
176   std::vector<Updates> updates;
177   OwnerMap owner;
178 };
179
180 /**********************************************************************
181  * Remote updating set that sends messages immediately                *
182  **********************************************************************/
183 template<typename Derived, typename ProcessGroup, typename Value,
184          typename OwnerMap>
185 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
186                         remote_set_immediate>
187 {
188   typedef typename property_traits<OwnerMap>::key_type Key;
189   typedef std::pair<Key, Value> update_pair_type;
190   typedef typename std::vector<update_pair_type>::size_type updates_size_type;
191
192 public:
193   typedef typename ProcessGroup::process_id_type process_id_type;
194
195 private:
196   enum message_kind {
197     /** Contains a (key, value) pair that will be updated. */
198     msg_update
199   };
200
201   struct handle_messages
202   {
203     explicit handle_messages(remote_update_set* self, const ProcessGroup& pg) 
204       : self(self)
205     { update_sizes.resize(num_processes(pg), 0); }
206
207     void operator()(process_id_type source, int tag) 
208     { 
209       // Receive the # of updates
210       BOOST_ASSERT(tag == msg_update);
211       update_pair_type update;
212       receive(self->process_group, source, tag, update);
213       
214       // Send update to derived "receive_update" member
215       Derived* derived = static_cast<Derived*>(self);
216       derived->receive_update(source, update.first, update.second);
217     }
218
219   private:
220     std::vector<updates_size_type> update_sizes;
221     remote_update_set* self;
222   };
223   friend struct handle_messages;
224
225  protected:
226   remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
227     : process_group(pg, handle_messages(this, pg)), owner(owner) { }
228
229   void update(const Key& key, const Value& value)
230   { 
231     if (get(owner, key) == process_id(process_group)) {
232       Derived* derived = static_cast<Derived*>(this);
233       derived->receive_update(get(owner, key), key, value);
234     }
235     else
236       send(process_group, get(owner, key), msg_update, 
237            update_pair_type(key, value));
238   }
239
240   void collect() 
241   { 
242     typedef std::pair<process_id_type, int> probe_type;
243     handle_messages handler(this, process_group);
244     while (optional<probe_type> stp = probe(process_group))
245       if (stp->second == msg_update) handler(stp->first, stp->second);
246   }
247
248   void synchronize()
249   {
250     do_synchronize(process_group);
251   }
252
253   ProcessGroup process_group;
254   OwnerMap owner;
255 };
256
257 } } } // end namespace boost::graph::detail
258
259 #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP