]> git.sesse.net Git - casparcg/blob - dependencies/boost/boost/property_map/parallel/impl/distributed_property_map.ipp
Manually merged pull request #222
[casparcg] / dependencies / boost / boost / property_map / parallel / impl / distributed_property_map.ipp
1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 //  Authors: Douglas Gregor
8 //           Nick Edmonds
9 //           Andrew Lumsdaine
10 #include <boost/assert.hpp>
11 #include <boost/property_map/parallel/distributed_property_map.hpp>
12 #include <boost/graph/parallel/detail/untracked_pair.hpp>
13 #include <boost/type_traits/is_base_and_derived.hpp>
14 #include <boost/bind.hpp>
15 #include <boost/graph/parallel/simple_trigger.hpp>
16
17 #ifndef BOOST_GRAPH_USE_MPI
18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
19 #endif
20
21 namespace boost { namespace parallel {
22
23 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
24 template<typename Reduce>
25 PBGL_DISTRIB_PMAP
26 ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
27                            const StorageMap& pm, const Reduce& reduce)
28   : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
29 {
30   typedef handle_message<Reduce> Handler;
31
32   data->ghost_cells.reset(new ghost_cells_type());
33   data->reset = &data_t::template do_reset<Reduce>;
34   data->process_group.replace_handler(Handler(data, reduce));
35   data->process_group.template get_receiver<Handler>()
36     ->setup_triggers(data->process_group);
37 }
38
39 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
40 PBGL_DISTRIB_PMAP::~distributed_property_map() { }
41
42 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
43 template<typename Reduce>
44 void 
45 PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
46 {
47   typedef handle_message<Reduce> Handler;
48   data->process_group.replace_handler(Handler(data, reduce));
49   Handler* handler = data->process_group.template get_receiver<Handler>();
50   BOOST_ASSERT(handler);
51   handler->setup_triggers(data->process_group);
52   data->get_default_value = reduce;
53   data->has_default_resolver = Reduce::non_default_resolver;
54   int model = data->model;
55   data->reset = &data_t::template do_reset<Reduce>;
56   set_consistency_model(model);
57 }
58
59 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
60 void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
61 {
62   if (data->max_ghost_cells == 0)
63     return;
64
65   while (data->ghost_cells->size() > data->max_ghost_cells) {
66     // Evict the last ghost cell
67
68     if (data->model & cm_flush) {
69       // We need to flush values when we evict them.
70       boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
71         = data->ghost_cells->back();
72       send(data->process_group, get(data->global, victim.first).first, 
73            property_map_put, victim);
74     }
75
76     // Actually remove the ghost cell
77     data->ghost_cells->pop_back();
78   }
79 }
80
81 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
82 typename PBGL_DISTRIB_PMAP::value_type&
83 PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
84 {
85   // Index by key
86   ghost_cells_key_index_type const& key_index 
87     = data->ghost_cells->template get<1>();
88
89   // Search for the ghost cell by key, and project back to the sequence
90   iterator ghost_cell 
91     = data->ghost_cells->template project<0>(key_index.find(key));
92   if (ghost_cell == data->ghost_cells->end()) {
93     value_type value;
94     if (data->has_default_resolver)
95       // Since we have a default resolver, use it to create a default
96       // value for this ghost cell.
97       value = data->get_default_value(key);
98     else if (request_if_missing)
99       // Request the actual value of this key from its owner
100       send_oob_with_reply(data->process_group, get(data->global, key).first, 
101                           property_map_get, key, value);
102     else
103       value = value_type();
104
105     // Create a ghost cell containing the new value
106     ghost_cell 
107       = data->ghost_cells->push_front(std::make_pair(key, value)).first;
108
109     // If we need to, prune the ghost cells
110     if (data->max_ghost_cells > 0)
111       prune_ghost_cells();
112   } else if (data->max_ghost_cells > 0)
113     // Put this cell at the beginning of the MRU list
114     data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
115
116   return const_cast<value_type&>(ghost_cell->second);
117 }
118
119 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
120 template<typename Reduce>
121 void
122 PBGL_DISTRIB_PMAP
123 ::handle_message<Reduce>::operator()(process_id_type source, int tag)
124 {
125   BOOST_ASSERT(false);
126 }
127
128 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
129 template<typename Reduce>
130 void
131 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
132 handle_put(int /*source*/, int /*tag*/, 
133            const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
134 {
135   using boost::get;
136
137   shared_ptr<data_t> data(data_ptr);
138
139   owner_local_pair p = get(data->global, req.first);
140   BOOST_ASSERT(p.first == process_id(data->process_group));
141
142   detail::maybe_put(data->storage, p.second,
143                     reduce(req.first,
144                            get(data->storage, p.second),
145                            req.second));
146 }
147
148 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
149 template<typename Reduce>
150 typename PBGL_DISTRIB_PMAP::value_type
151 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
152 handle_get(int source, int /*tag*/, const key_type& key, 
153            trigger_receive_context)
154 {
155   using boost::get;
156
157   shared_ptr<data_t> data(data_ptr);
158   BOOST_ASSERT(data);
159
160   owner_local_pair p = get(data->global, key);
161   return get(data->storage, p.second);
162 }
163
164 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
165 template<typename Reduce>
166 void
167 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
168 handle_multiget(int source, int tag, const std::vector<key_type>& keys,
169                 trigger_receive_context)
170 {
171   shared_ptr<data_t> data(data_ptr);
172   BOOST_ASSERT(data);
173
174   typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
175   std::vector<key_value> results;
176   std::size_t n = keys.size();
177   results.reserve(n);
178
179   using boost::get;
180
181   for (std::size_t i = 0; i < n; ++i) {
182     local_key_type local_key = get(data->global, keys[i]).second;
183     results.push_back(key_value(keys[i], get(data->storage, local_key)));
184   }
185   send(data->process_group, source, property_map_multiget_reply, results);
186 }
187
188 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
189 template<typename Reduce>
190 void
191 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
192 handle_multiget_reply
193   (int source, int tag, 
194    const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
195    trigger_receive_context)
196 {
197   shared_ptr<data_t> data(data_ptr);
198   BOOST_ASSERT(data);
199
200   // Index by key
201   ghost_cells_key_index_type const& key_index 
202     = data->ghost_cells->template get<1>();
203
204   std::size_t n = msg.size();
205   for (std::size_t i = 0; i < n; ++i) {
206     // Search for the ghost cell by key, and project back to the sequence
207     iterator position
208       = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
209
210     if (position != data->ghost_cells->end())
211       const_cast<value_type&>(position->second) = msg[i].second;
212   }
213 }
214
215 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
216 template<typename Reduce>
217 void
218 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
219 handle_multiput
220   (int source, int tag, 
221    const std::vector<unsafe_pair<local_key_type, value_type> >& values,
222    trigger_receive_context)
223 {
224   using boost::get;
225
226   shared_ptr<data_t> data(data_ptr);
227   BOOST_ASSERT(data);
228
229   std::size_t n = values.size();
230   for (std::size_t i = 0; i < n; ++i) {
231     local_key_type local_key = values[i].first;
232     value_type local_value = get(data->storage, local_key);
233     detail::maybe_put(data->storage, values[i].first,
234                       reduce(values[i].first,
235                              local_value,
236                              values[i].second));
237   }
238 }
239
240 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
241 template<typename Reduce>
242 void
243 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
244 setup_triggers(process_group_type& pg)
245 {
246   using boost::graph::parallel::simple_trigger;
247
248   simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
249   simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
250   simple_trigger(pg, property_map_multiget, this, 
251                  &handle_message::handle_multiget);
252   simple_trigger(pg, property_map_multiget_reply, this, 
253                  &handle_message::handle_multiget_reply);
254   simple_trigger(pg, property_map_multiput, this, 
255                  &handle_message::handle_multiput);
256 }
257
258 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
259 void
260 PBGL_DISTRIB_PMAP
261 ::on_synchronize::operator()()
262 {
263   int stage=0; // we only get called at the start now
264   shared_ptr<data_t> data(data_ptr);
265   BOOST_ASSERT(data);
266
267   // Determine in which stage backward consistency messages should be sent.
268   int backward_stage = -1;
269   if (data->model & cm_backward) {
270     if (data->model & cm_flush) backward_stage = 1;
271     else backward_stage = 0;
272   }
273
274   // Flush results in first stage
275   if (stage == 0 && data->model & cm_flush)
276     data->flush();
277
278   // Backward consistency
279   if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
280     data->refresh_ghost_cells();
281
282   // Optionally clear results
283   if (data->model & cm_clear)
284     data->clear();
285
286   // Optionally reset results
287   if (data->model & cm_reset) {
288     if (data->reset) ((*data).*data->reset)();
289   }
290 }
291
292
293 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
294 void
295 PBGL_DISTRIB_PMAP::set_consistency_model(int model)
296 {
297   data->model = model;
298
299   int stages = 1;
300   bool need_on_synchronize = (model != cm_forward);
301
302   // Backward consistency is a two-stage process.
303   if (model & cm_backward) {
304     if (model & cm_flush) stages = 3;
305     else stages = 2;
306
307     // For backward consistency to work, we absolutely cannot throw
308     // away any ghost cells.
309     data->max_ghost_cells = 0;
310   }
311
312   // attach the on_synchronize handler.
313   if (need_on_synchronize)
314     data->process_group.replace_on_synchronize_handler(on_synchronize(data));
315 }
316
317 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
318 void
319 PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
320 {
321   if ((data->model & cm_backward) && max_ghost_cells > 0)
322       boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
323                                                 "cannot limit ghost-cell usage with a backward "
324                                                 "consistency model"));
325
326   if (max_ghost_cells == 1)
327     // It is not safe to have only 1 ghost cell; the cell() method
328     // will fail.
329     max_ghost_cells = 2;
330
331   data->max_ghost_cells = max_ghost_cells;
332   prune_ghost_cells();
333 }
334
335 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
336 void PBGL_DISTRIB_PMAP::clear()
337 {
338   data->clear();
339 }
340
341 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
342 void PBGL_DISTRIB_PMAP::data_t::clear()
343 {
344   ghost_cells->clear();
345 }
346
347 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
348 void PBGL_DISTRIB_PMAP::reset()
349 {
350   if (data->reset) ((*data).*data->reset)();
351 }
352
353 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
354 void PBGL_DISTRIB_PMAP::flush()
355 {
356   data->flush();
357 }
358
359 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
360 void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
361 {
362   using boost::get;
363
364   std::vector<std::vector<key_type> > keys;
365   keys.resize(num_processes(process_group));
366
367   // Collect the set of keys for which we will request values
368   for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
369     keys[get(global, i->first).first].push_back(i->first);
370
371   // Send multiget requests to each of the other processors
372   typedef typename ProcessGroup::process_size_type process_size_type;
373   process_size_type n = num_processes(process_group);
374   process_id_type id = process_id(process_group);
375   for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
376     if (!keys[p].empty())
377       send(process_group, p, property_map_multiget, keys[p]);
378   }  
379 }
380
381 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
382 void PBGL_DISTRIB_PMAP::data_t::flush()
383 {
384   using boost::get;
385
386   int n = num_processes(process_group);
387   std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
388   values.resize(n);
389
390   // Collect all of the flushed values
391   for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
392     std::pair<int, local_key_type> g = get(global, i->first);
393     values[g.first].push_back(std::make_pair(g.second, i->second));
394   }
395
396   // Transmit flushed values
397   for (int p = 0; p < n; ++p) {
398     if (!values[p].empty())
399       send(process_group, p, property_map_multiput, values[p]);
400   }
401 }
402
403 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
404 void PBGL_DISTRIB_PMAP::do_synchronize()
405 {
406   if (data->model & cm_backward) {
407     synchronize(data->process_group);
408     return;
409   }
410
411   // Request refreshes of the values of our ghost cells
412   data->refresh_ghost_cells();
413
414   // Allows all of the multigets to get to their destinations
415   synchronize(data->process_group);
416
417   // Allows all of the multiget responses to get to their destinations
418   synchronize(data->process_group);
419 }
420
421 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
422 template<typename Resolver>
423 void PBGL_DISTRIB_PMAP::data_t::do_reset()
424 {
425   Resolver* resolver = get_default_value.template target<Resolver>();
426   BOOST_ASSERT(resolver);
427
428   for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
429     const_cast<value_type&>(i->second) = (*resolver)(i->first);
430 }
431
432 } } // end namespace boost::parallel