// Copyright (C) 2004-2006 The Trustees of Indiana University. // Use, modification and distribution is subject to the Boost Software // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // Authors: Douglas Gregor // Nick Edmonds // Andrew Lumsdaine #include #include #include #include #include #include #ifndef BOOST_GRAPH_USE_MPI #error "Parallel BGL files should not be included unless has been included" #endif namespace boost { namespace parallel { template template PBGL_DISTRIB_PMAP ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global, const StorageMap& pm, const Reduce& reduce) : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver)) { typedef handle_message Handler; data->ghost_cells.reset(new ghost_cells_type()); data->reset = &data_t::template do_reset; data->process_group.replace_handler(Handler(data, reduce)); data->process_group.template get_receiver() ->setup_triggers(data->process_group); } template PBGL_DISTRIB_PMAP::~distributed_property_map() { } template template void PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce) { typedef handle_message Handler; data->process_group.replace_handler(Handler(data, reduce)); Handler* handler = data->process_group.template get_receiver(); BOOST_ASSERT(handler); handler->setup_triggers(data->process_group); data->get_default_value = reduce; data->has_default_resolver = Reduce::non_default_resolver; int model = data->model; data->reset = &data_t::template do_reset; set_consistency_model(model); } template void PBGL_DISTRIB_PMAP::prune_ghost_cells() const { if (data->max_ghost_cells == 0) return; while (data->ghost_cells->size() > data->max_ghost_cells) { // Evict the last ghost cell if (data->model & cm_flush) { // We need to flush values when we evict them. boost::parallel::detail::untracked_pair const& victim = data->ghost_cells->back(); send(data->process_group, get(data->global, victim.first).first, property_map_put, victim); } // Actually remove the ghost cell data->ghost_cells->pop_back(); } } template typename PBGL_DISTRIB_PMAP::value_type& PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const { // Index by key ghost_cells_key_index_type const& key_index = data->ghost_cells->template get<1>(); // Search for the ghost cell by key, and project back to the sequence iterator ghost_cell = data->ghost_cells->template project<0>(key_index.find(key)); if (ghost_cell == data->ghost_cells->end()) { value_type value; if (data->has_default_resolver) // Since we have a default resolver, use it to create a default // value for this ghost cell. value = data->get_default_value(key); else if (request_if_missing) // Request the actual value of this key from its owner send_oob_with_reply(data->process_group, get(data->global, key).first, property_map_get, key, value); else value = value_type(); // Create a ghost cell containing the new value ghost_cell = data->ghost_cells->push_front(std::make_pair(key, value)).first; // If we need to, prune the ghost cells if (data->max_ghost_cells > 0) prune_ghost_cells(); } else if (data->max_ghost_cells > 0) // Put this cell at the beginning of the MRU list data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell); return const_cast(ghost_cell->second); } template template void PBGL_DISTRIB_PMAP ::handle_message::operator()(process_id_type source, int tag) { BOOST_ASSERT(false); } template template void PBGL_DISTRIB_PMAP::handle_message:: handle_put(int /*source*/, int /*tag*/, const boost::parallel::detail::untracked_pair& req, trigger_receive_context) { using boost::get; shared_ptr data(data_ptr); owner_local_pair p = get(data->global, req.first); BOOST_ASSERT(p.first == process_id(data->process_group)); detail::maybe_put(data->storage, p.second, reduce(req.first, get(data->storage, p.second), req.second)); } template template typename PBGL_DISTRIB_PMAP::value_type PBGL_DISTRIB_PMAP::handle_message:: handle_get(int source, int /*tag*/, const key_type& key, trigger_receive_context) { using boost::get; shared_ptr data(data_ptr); BOOST_ASSERT(data); owner_local_pair p = get(data->global, key); return get(data->storage, p.second); } template template void PBGL_DISTRIB_PMAP::handle_message:: handle_multiget(int source, int tag, const std::vector& keys, trigger_receive_context) { shared_ptr data(data_ptr); BOOST_ASSERT(data); typedef boost::parallel::detail::untracked_pair key_value; std::vector results; std::size_t n = keys.size(); results.reserve(n); using boost::get; for (std::size_t i = 0; i < n; ++i) { local_key_type local_key = get(data->global, keys[i]).second; results.push_back(key_value(keys[i], get(data->storage, local_key))); } send(data->process_group, source, property_map_multiget_reply, results); } template template void PBGL_DISTRIB_PMAP::handle_message:: handle_multiget_reply (int source, int tag, const std::vector >& msg, trigger_receive_context) { shared_ptr data(data_ptr); BOOST_ASSERT(data); // Index by key ghost_cells_key_index_type const& key_index = data->ghost_cells->template get<1>(); std::size_t n = msg.size(); for (std::size_t i = 0; i < n; ++i) { // Search for the ghost cell by key, and project back to the sequence iterator position = data->ghost_cells->template project<0>(key_index.find(msg[i].first)); if (position != data->ghost_cells->end()) const_cast(position->second) = msg[i].second; } } template template void PBGL_DISTRIB_PMAP::handle_message:: handle_multiput (int source, int tag, const std::vector >& values, trigger_receive_context) { using boost::get; shared_ptr data(data_ptr); BOOST_ASSERT(data); std::size_t n = values.size(); for (std::size_t i = 0; i < n; ++i) { local_key_type local_key = values[i].first; value_type local_value = get(data->storage, local_key); detail::maybe_put(data->storage, values[i].first, reduce(values[i].first, local_value, values[i].second)); } } template template void PBGL_DISTRIB_PMAP::handle_message:: setup_triggers(process_group_type& pg) { using boost::graph::parallel::simple_trigger; simple_trigger(pg, property_map_put, this, &handle_message::handle_put); simple_trigger(pg, property_map_get, this, &handle_message::handle_get); simple_trigger(pg, property_map_multiget, this, &handle_message::handle_multiget); simple_trigger(pg, property_map_multiget_reply, this, &handle_message::handle_multiget_reply); simple_trigger(pg, property_map_multiput, this, &handle_message::handle_multiput); } template void PBGL_DISTRIB_PMAP ::on_synchronize::operator()() { int stage=0; // we only get called at the start now shared_ptr data(data_ptr); BOOST_ASSERT(data); // Determine in which stage backward consistency messages should be sent. int backward_stage = -1; if (data->model & cm_backward) { if (data->model & cm_flush) backward_stage = 1; else backward_stage = 0; } // Flush results in first stage if (stage == 0 && data->model & cm_flush) data->flush(); // Backward consistency if (stage == backward_stage && !(data->model & (cm_clear | cm_reset))) data->refresh_ghost_cells(); // Optionally clear results if (data->model & cm_clear) data->clear(); // Optionally reset results if (data->model & cm_reset) { if (data->reset) ((*data).*data->reset)(); } } template void PBGL_DISTRIB_PMAP::set_consistency_model(int model) { data->model = model; int stages = 1; bool need_on_synchronize = (model != cm_forward); // Backward consistency is a two-stage process. if (model & cm_backward) { if (model & cm_flush) stages = 3; else stages = 2; // For backward consistency to work, we absolutely cannot throw // away any ghost cells. data->max_ghost_cells = 0; } // attach the on_synchronize handler. if (need_on_synchronize) data->process_group.replace_on_synchronize_handler(on_synchronize(data)); } template void PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells) { if ((data->model & cm_backward) && max_ghost_cells > 0) boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: " "cannot limit ghost-cell usage with a backward " "consistency model")); if (max_ghost_cells == 1) // It is not safe to have only 1 ghost cell; the cell() method // will fail. max_ghost_cells = 2; data->max_ghost_cells = max_ghost_cells; prune_ghost_cells(); } template void PBGL_DISTRIB_PMAP::clear() { data->clear(); } template void PBGL_DISTRIB_PMAP::data_t::clear() { ghost_cells->clear(); } template void PBGL_DISTRIB_PMAP::reset() { if (data->reset) ((*data).*data->reset)(); } template void PBGL_DISTRIB_PMAP::flush() { data->flush(); } template void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells() { using boost::get; std::vector > keys; keys.resize(num_processes(process_group)); // Collect the set of keys for which we will request values for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) keys[get(global, i->first).first].push_back(i->first); // Send multiget requests to each of the other processors typedef typename ProcessGroup::process_size_type process_size_type; process_size_type n = num_processes(process_group); process_id_type id = process_id(process_group); for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) { if (!keys[p].empty()) send(process_group, p, property_map_multiget, keys[p]); } } template void PBGL_DISTRIB_PMAP::data_t::flush() { using boost::get; int n = num_processes(process_group); std::vector > > values; values.resize(n); // Collect all of the flushed values for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) { std::pair g = get(global, i->first); values[g.first].push_back(std::make_pair(g.second, i->second)); } // Transmit flushed values for (int p = 0; p < n; ++p) { if (!values[p].empty()) send(process_group, p, property_map_multiput, values[p]); } } template void PBGL_DISTRIB_PMAP::do_synchronize() { if (data->model & cm_backward) { synchronize(data->process_group); return; } // Request refreshes of the values of our ghost cells data->refresh_ghost_cells(); // Allows all of the multigets to get to their destinations synchronize(data->process_group); // Allows all of the multiget responses to get to their destinations synchronize(data->process_group); } template template void PBGL_DISTRIB_PMAP::data_t::do_reset() { Resolver* resolver = get_default_value.template target(); BOOST_ASSERT(resolver); for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) const_cast(i->second) = (*resolver)(i->first); } } } // end namespace boost::parallel