mpi_process_group.ipp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020
  1. // -*- C++ -*-
  2. // Copyright (C) 2004-2008 The Trustees of Indiana University.
  3. // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com>
  4. // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
  5. // Use, modification and distribution is subject to the Boost Software
  6. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  7. // http://www.boost.org/LICENSE_1_0.txt)
  8. // Authors: Douglas Gregor
  9. // Andrew Lumsdaine
  10. // Matthias Troyer
  11. //#define PBGL_PROCESS_GROUP_DEBUG
  12. #ifndef BOOST_GRAPH_USE_MPI
  13. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  14. #endif
  15. #include <boost/assert.hpp>
  16. #include <algorithm>
  17. #include <boost/graph/parallel/detail/untracked_pair.hpp>
  18. #include <numeric>
  19. #include <iterator>
  20. #include <functional>
  21. #include <vector>
  22. #include <queue>
  23. #include <stack>
  24. #include <list>
  25. #include <map>
  26. #include <boost/graph/distributed/detail/tag_allocator.hpp>
  27. #include <stdio.h>
  28. // #define PBGL_PROCESS_GROUP_DEBUG
  29. #ifdef PBGL_PROCESS_GROUP_DEBUG
  30. # include <iostream>
  31. #endif
  32. namespace boost { namespace graph { namespace distributed {
  33. struct mpi_process_group::impl
  34. {
  35. typedef mpi_process_group::message_header message_header;
  36. typedef mpi_process_group::outgoing_messages outgoing_messages;
  37. /**
  38. * Stores the incoming messages from a particular processor.
  39. *
  40. * @todo Evaluate whether we should use a deque instance, which
  41. * would reduce could reduce the cost of "receiving" messages and
  42. allow us to deallocate memory earlier, but increases the time
  43. spent in the synchronization step.
  44. */
  45. struct incoming_messages {
  46. incoming_messages();
  47. ~incoming_messages() {}
  48. std::vector<message_header> headers;
  49. buffer_type buffer;
  50. std::vector<std::vector<message_header>::iterator> next_header;
  51. };
  52. struct batch_request {
  53. MPI_Request request;
  54. buffer_type buffer;
  55. };
  56. // send once we have a certain number of messages or bytes in the buffer
  57. // these numbers need to be tuned, we keep them small at first for testing
  58. std::size_t batch_header_number;
  59. std::size_t batch_buffer_size;
  60. std::size_t batch_message_size;
  61. /**
  62. * The actual MPI communicator used to transmit data.
  63. */
  64. boost::mpi::communicator comm;
  65. /**
  66. * The MPI communicator used to transmit out-of-band replies.
  67. */
  68. boost::mpi::communicator oob_reply_comm;
  69. /// Outgoing message information, indexed by destination processor.
  70. std::vector<outgoing_messages> outgoing;
  71. /// Incoming message information, indexed by source processor.
  72. std::vector<incoming_messages> incoming;
  73. /// The numbers of processors that have entered a synchronization stage
  74. std::vector<int> processors_synchronizing_stage;
  75. /// The synchronization stage of a processor
  76. std::vector<int> synchronizing_stage;
  77. /// Number of processors still sending messages
  78. std::vector<int> synchronizing_unfinished;
  79. /// Number of batches sent since last synchronization stage
  80. std::vector<int> number_sent_batches;
  81. /// Number of batches received minus number of expected batches
  82. std::vector<int> number_received_batches;
  83. /// The context of the currently-executing trigger, or @c trc_none
  84. /// if no trigger is executing.
  85. trigger_receive_context trigger_context;
  86. /// Non-zero indicates that we're processing batches
  87. /// Increment this when processing patches,
  88. /// decrement it when you're done.
  89. int processing_batches;
  90. /**
  91. * Contains all of the active blocks corresponding to attached
  92. * distributed data structures.
  93. */
  94. blocks_type blocks;
  95. /// Whether we are currently synchronizing
  96. bool synchronizing;
  97. /// The MPI requests for posted sends of oob messages
  98. std::vector<MPI_Request> requests;
  99. /// The MPI buffers for posted irecvs of oob messages
  100. std::map<int,buffer_type> buffers;
  101. /// Queue for message batches received while already processing messages
  102. std::queue<std::pair<int,outgoing_messages> > new_batches;
  103. /// Maximum encountered size of the new_batches queue
  104. std::size_t max_received;
  105. /// The MPI requests and buffers for batchess being sent
  106. std::list<batch_request> sent_batches;
  107. /// Maximum encountered size of the sent_batches list
  108. std::size_t max_sent;
  109. /// Pre-allocated requests in a pool
  110. std::vector<batch_request> batch_pool;
  111. /// A stack controlling which batches are available
  112. std::stack<std::size_t> free_batches;
  113. void free_sent_batches();
  114. // Tag allocator
  115. detail::tag_allocator allocated_tags;
  116. impl(std::size_t num_headers, std::size_t buffers_size,
  117. communicator_type parent_comm);
  118. ~impl();
  119. private:
  120. void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
  121. };
  122. inline trigger_receive_context mpi_process_group::trigger_context() const
  123. {
  124. return impl_->trigger_context;
  125. }
  126. template<typename T>
  127. void
  128. mpi_process_group::send_impl(int dest, int tag, const T& value,
  129. mpl::true_ /*is_mpi_datatype*/) const
  130. {
  131. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  132. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  133. // Start constructing the message header
  134. impl::message_header header;
  135. header.source = process_id(*this);
  136. header.tag = tag;
  137. header.offset = outgoing.buffer.size();
  138. boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
  139. oa << value;
  140. #ifdef PBGL_PROCESS_GROUP_DEBUG
  141. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  142. << tag << ", bytes = " << packed_size << std::endl;
  143. #endif
  144. // Store the header
  145. header.bytes = outgoing.buffer.size() - header.offset;
  146. outgoing.headers.push_back(header);
  147. maybe_send_batch(dest);
  148. }
  149. template<typename T>
  150. void
  151. mpi_process_group::send_impl(int dest, int tag, const T& value,
  152. mpl::false_ /*is_mpi_datatype*/) const
  153. {
  154. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  155. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  156. // Start constructing the message header
  157. impl::message_header header;
  158. header.source = process_id(*this);
  159. header.tag = tag;
  160. header.offset = outgoing.buffer.size();
  161. // Serialize into the buffer
  162. boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
  163. out << value;
  164. // Store the header
  165. header.bytes = outgoing.buffer.size() - header.offset;
  166. outgoing.headers.push_back(header);
  167. maybe_send_batch(dest);
  168. #ifdef PBGL_PROCESS_GROUP_DEBUG
  169. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  170. << tag << ", bytes = " << header.bytes << std::endl;
  171. #endif
  172. }
  173. template<typename T>
  174. inline void
  175. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  176. int tag, const T& value)
  177. {
  178. pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
  179. boost::mpi::is_mpi_datatype<T>());
  180. }
  181. template<typename T>
  182. typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  183. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  184. int tag, const T values[], std::size_t n)
  185. {
  186. pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
  187. boost::serialization::make_array(values,n),
  188. boost::mpl::true_());
  189. }
  190. template<typename T>
  191. typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  192. mpi_process_group::
  193. array_send_impl(int dest, int tag, const T values[], std::size_t n) const
  194. {
  195. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  196. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  197. // Start constructing the message header
  198. impl::message_header header;
  199. header.source = process_id(*this);
  200. header.tag = tag;
  201. header.offset = outgoing.buffer.size();
  202. // Serialize into the buffer
  203. boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
  204. out << n;
  205. for (std::size_t i = 0; i < n; ++i)
  206. out << values[i];
  207. // Store the header
  208. header.bytes = outgoing.buffer.size() - header.offset;
  209. outgoing.headers.push_back(header);
  210. maybe_send_batch(dest);
  211. #ifdef PBGL_PROCESS_GROUP_DEBUG
  212. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  213. << tag << ", bytes = " << header.bytes << std::endl;
  214. #endif
  215. }
  216. template<typename T>
  217. typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  218. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  219. int tag, const T values[], std::size_t n)
  220. {
  221. pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
  222. values, n);
  223. }
  224. template<typename InputIterator>
  225. void
  226. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  227. int tag, InputIterator first, InputIterator last)
  228. {
  229. typedef typename std::iterator_traits<InputIterator>::value_type value_type;
  230. std::vector<value_type> values(first, last);
  231. if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
  232. else send(pg, dest, tag, &values[0], values.size());
  233. }
  234. template<typename T>
  235. bool
  236. mpi_process_group::receive_impl(int source, int tag, T& value,
  237. mpl::true_ /*is_mpi_datatype*/) const
  238. {
  239. #ifdef PBGL_PROCESS_GROUP_DEBUG
  240. std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
  241. << tag << std::endl;
  242. #endif
  243. impl::incoming_messages& incoming = impl_->incoming[source];
  244. // Find the next header with the right tag
  245. std::vector<impl::message_header>::iterator header =
  246. incoming.next_header[my_block_number()];
  247. while (header != incoming.headers.end() && header->tag != tag) ++header;
  248. // If no header is found, notify the caller
  249. if (header == incoming.headers.end()) return false;
  250. // Unpack the data
  251. if (header->bytes > 0) {
  252. boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
  253. archive::no_header, header->offset);
  254. ia >> value;
  255. }
  256. // Mark this message as received
  257. header->tag = -1;
  258. // Move the "next header" indicator to the next unreceived message
  259. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  260. && incoming.next_header[my_block_number()]->tag == -1)
  261. ++incoming.next_header[my_block_number()];
  262. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  263. bool finished = true;
  264. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  265. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  266. }
  267. if (finished) {
  268. std::vector<impl::message_header> no_headers;
  269. incoming.headers.swap(no_headers);
  270. buffer_type empty_buffer;
  271. incoming.buffer.swap(empty_buffer);
  272. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  273. incoming.next_header[i] = incoming.headers.end();
  274. }
  275. }
  276. return true;
  277. }
  278. template<typename T>
  279. bool
  280. mpi_process_group::receive_impl(int source, int tag, T& value,
  281. mpl::false_ /*is_mpi_datatype*/) const
  282. {
  283. impl::incoming_messages& incoming = impl_->incoming[source];
  284. // Find the next header with the right tag
  285. std::vector<impl::message_header>::iterator header =
  286. incoming.next_header[my_block_number()];
  287. while (header != incoming.headers.end() && header->tag != tag) ++header;
  288. // If no header is found, notify the caller
  289. if (header == incoming.headers.end()) return false;
  290. // Deserialize the data
  291. boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
  292. archive::no_header, header->offset);
  293. in >> value;
  294. // Mark this message as received
  295. header->tag = -1;
  296. // Move the "next header" indicator to the next unreceived message
  297. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  298. && incoming.next_header[my_block_number()]->tag == -1)
  299. ++incoming.next_header[my_block_number()];
  300. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  301. bool finished = true;
  302. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  303. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  304. }
  305. if (finished) {
  306. std::vector<impl::message_header> no_headers;
  307. incoming.headers.swap(no_headers);
  308. buffer_type empty_buffer;
  309. incoming.buffer.swap(empty_buffer);
  310. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  311. incoming.next_header[i] = incoming.headers.end();
  312. }
  313. }
  314. return true;
  315. }
  316. template<typename T>
  317. typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
  318. mpi_process_group::
  319. array_receive_impl(int source, int tag, T* values, std::size_t& n) const
  320. {
  321. impl::incoming_messages& incoming = impl_->incoming[source];
  322. // Find the next header with the right tag
  323. std::vector<impl::message_header>::iterator header =
  324. incoming.next_header[my_block_number()];
  325. while (header != incoming.headers.end() && header->tag != tag) ++header;
  326. // If no header is found, notify the caller
  327. if (header == incoming.headers.end()) return false;
  328. // Deserialize the data
  329. boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
  330. archive::no_header, header->offset);
  331. std::size_t num_sent;
  332. in >> num_sent;
  333. if (num_sent > n)
  334. std::cerr << "ERROR: Have " << num_sent << " items but only space for "
  335. << n << " items\n";
  336. for (std::size_t i = 0; i < num_sent; ++i)
  337. in >> values[i];
  338. n = num_sent;
  339. // Mark this message as received
  340. header->tag = -1;
  341. // Move the "next header" indicator to the next unreceived message
  342. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  343. && incoming.next_header[my_block_number()]->tag == -1)
  344. ++incoming.next_header[my_block_number()];
  345. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  346. bool finished = true;
  347. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  348. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  349. }
  350. if (finished) {
  351. std::vector<impl::message_header> no_headers;
  352. incoming.headers.swap(no_headers);
  353. buffer_type empty_buffer;
  354. incoming.buffer.swap(empty_buffer);
  355. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  356. incoming.next_header[i] = incoming.headers.end();
  357. }
  358. }
  359. return true;
  360. }
  361. // Construct triggers
  362. template<typename Type, typename Handler>
  363. void mpi_process_group::trigger(int tag, const Handler& handler)
  364. {
  365. BOOST_ASSERT(block_num);
  366. install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
  367. new trigger_launcher<Type, Handler>(*this, tag, handler)));
  368. }
  369. template<typename Type, typename Handler>
  370. void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
  371. {
  372. BOOST_ASSERT(block_num);
  373. install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
  374. new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
  375. }
  376. template<typename Type, typename Handler>
  377. void mpi_process_group::global_trigger(int tag, const Handler& handler,
  378. std::size_t sz)
  379. {
  380. if (sz==0) // normal trigger
  381. install_trigger(tag,0,shared_ptr<trigger_base>(
  382. new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
  383. else // trigger with irecv
  384. install_trigger(tag,0,shared_ptr<trigger_base>(
  385. new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
  386. }
  387. namespace detail {
  388. template<typename Type>
  389. void do_oob_receive(mpi_process_group const& self,
  390. int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
  391. {
  392. using boost::mpi::get_mpi_datatype;
  393. //self.impl_->comm.recv(source,tag,data);
  394. MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
  395. MPI_STATUS_IGNORE);
  396. }
  397. template<typename Type>
  398. void do_oob_receive(mpi_process_group const& self,
  399. int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
  400. {
  401. // self.impl_->comm.recv(source,tag,data);
  402. // Receive the size of the data packet
  403. boost::mpi::status status;
  404. status = self.impl_->comm.probe(source, tag);
  405. #if BOOST_VERSION >= 103600
  406. int size = status.count<boost::mpi::packed>().get();
  407. #else
  408. int size;
  409. MPI_Status& mpi_status = status;
  410. MPI_Get_count(&mpi_status, MPI_PACKED, &size);
  411. #endif
  412. // Receive the data packed itself
  413. boost::mpi::packed_iarchive in(self.impl_->comm);
  414. in.resize(size);
  415. MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
  416. MPI_STATUS_IGNORE);
  417. // Deserialize the data
  418. in >> data;
  419. }
  420. template<typename Type>
  421. void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
  422. {
  423. do_oob_receive(self, source, tag, data,
  424. boost::mpi::is_mpi_datatype<Type>());
  425. }
  426. } // namespace detail
  427. template<typename Type, typename Handler>
  428. void
  429. mpi_process_group::trigger_launcher<Type, Handler>::
  430. receive(mpi_process_group const&, int source, int tag,
  431. trigger_receive_context context, int block) const
  432. {
  433. #ifdef PBGL_PROCESS_GROUP_DEBUG
  434. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  435. << " receive from source " << source << " and tag " << tag
  436. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  437. #endif
  438. Type data;
  439. if (context == trc_out_of_band) {
  440. // Receive the message directly off the wire
  441. int realtag = self.encode_tag(
  442. block == -1 ? self.my_block_number() : block, tag);
  443. detail::do_oob_receive(self,source,realtag,data);
  444. }
  445. else
  446. // Receive the message out of the local buffer
  447. boost::graph::distributed::receive(self, source, tag, data);
  448. // Pass the message off to the handler
  449. handler(source, tag, data, context);
  450. }
  451. template<typename Type, typename Handler>
  452. void
  453. mpi_process_group::reply_trigger_launcher<Type, Handler>::
  454. receive(mpi_process_group const&, int source, int tag,
  455. trigger_receive_context context, int block) const
  456. {
  457. #ifdef PBGL_PROCESS_GROUP_DEBUG
  458. std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
  459. << " receive from source " << source << " and tag " << tag
  460. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  461. #endif
  462. BOOST_ASSERT(context == trc_out_of_band);
  463. boost::parallel::detail::untracked_pair<int, Type> data;
  464. // Receive the message directly off the wire
  465. int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block,
  466. tag);
  467. detail::do_oob_receive(self, source, realtag, data);
  468. // Pass the message off to the handler and send the result back to
  469. // the source.
  470. send_oob(self, source, data.first,
  471. handler(source, tag, data.second, context), -2);
  472. }
  473. template<typename Type, typename Handler>
  474. void
  475. mpi_process_group::global_trigger_launcher<Type, Handler>::
  476. receive(mpi_process_group const& self, int source, int tag,
  477. trigger_receive_context context, int block) const
  478. {
  479. #ifdef PBGL_PROCESS_GROUP_DEBUG
  480. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  481. << " receive from source " << source << " and tag " << tag
  482. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  483. #endif
  484. Type data;
  485. if (context == trc_out_of_band) {
  486. // Receive the message directly off the wire
  487. int realtag = self.encode_tag(
  488. block == -1 ? self.my_block_number() : block, tag);
  489. detail::do_oob_receive(self,source,realtag,data);
  490. }
  491. else
  492. // Receive the message out of the local buffer
  493. boost::graph::distributed::receive(self, source, tag, data);
  494. // Pass the message off to the handler
  495. handler(self, source, tag, data, context);
  496. }
  497. template<typename Type, typename Handler>
  498. void
  499. mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
  500. receive(mpi_process_group const& self, int source, int tag,
  501. trigger_receive_context context, int block) const
  502. {
  503. #ifdef PBGL_PROCESS_GROUP_DEBUG
  504. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  505. << " receive from source " << source << " and tag " << tag
  506. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  507. #endif
  508. Type data;
  509. if (context == trc_out_of_band) {
  510. return;
  511. }
  512. BOOST_ASSERT (context == trc_irecv_out_of_band);
  513. // force posting of new MPI_Irecv, even though buffer is already allocated
  514. boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
  515. ia >> data;
  516. // Start a new receive
  517. prepare_receive(self,tag,true);
  518. // Pass the message off to the handler
  519. handler(self, source, tag, data, context);
  520. }
  521. template<typename Type, typename Handler>
  522. void
  523. mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
  524. prepare_receive(mpi_process_group const& self, int tag, bool force) const
  525. {
  526. #ifdef PBGL_PROCESS_GROUP_DEBUG
  527. std::cerr << ("Posting Irecv for trigger")
  528. << " receive with tag " << tag << std::endl;
  529. #endif
  530. if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
  531. self.impl_->buffers[tag].resize(buffer_size);
  532. force = true;
  533. }
  534. BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
  535. //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
  536. if (force) {
  537. self.impl_->requests.push_back(MPI_Request());
  538. MPI_Request* request = &self.impl_->requests.back();
  539. MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
  540. MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
  541. }
  542. }
  543. template<typename T>
  544. inline mpi_process_group::process_id_type
  545. receive(const mpi_process_group& pg, int tag, T& value)
  546. {
  547. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  548. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  549. value, boost::mpi::is_mpi_datatype<T>()))
  550. return source;
  551. }
  552. BOOST_ASSERT (false);
  553. }
  554. template<typename T>
  555. typename
  556. enable_if<boost::mpi::is_mpi_datatype<T>,
  557. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  558. receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
  559. {
  560. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  561. bool result =
  562. pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  563. boost::serialization::make_array(values,n),
  564. boost::mpl::true_());
  565. if (result)
  566. return std::make_pair(source, n);
  567. }
  568. BOOST_ASSERT(false);
  569. }
  570. template<typename T>
  571. typename
  572. disable_if<boost::mpi::is_mpi_datatype<T>,
  573. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  574. receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
  575. {
  576. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  577. if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  578. values, n))
  579. return std::make_pair(source, n);
  580. }
  581. BOOST_ASSERT(false);
  582. }
  583. template<typename T>
  584. mpi_process_group::process_id_type
  585. receive(const mpi_process_group& pg,
  586. mpi_process_group::process_id_type source, int tag, T& value)
  587. {
  588. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  589. value, boost::mpi::is_mpi_datatype<T>()))
  590. return source;
  591. else {
  592. fprintf(stderr,
  593. "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
  594. process_id(pg), source, tag, pg.my_block_number());
  595. BOOST_ASSERT(false);
  596. abort();
  597. }
  598. }
  599. template<typename T>
  600. typename
  601. enable_if<boost::mpi::is_mpi_datatype<T>,
  602. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  603. receive(const mpi_process_group& pg, int source, int tag, T values[],
  604. std::size_t n)
  605. {
  606. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  607. boost::serialization::make_array(values,n),
  608. boost::mpl::true_()))
  609. return std::make_pair(source,n);
  610. else {
  611. fprintf(stderr,
  612. "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
  613. process_id(pg), source, tag, pg.my_block_number());
  614. BOOST_ASSERT(false);
  615. abort();
  616. }
  617. }
  618. template<typename T>
  619. typename
  620. disable_if<boost::mpi::is_mpi_datatype<T>,
  621. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  622. receive(const mpi_process_group& pg, int source, int tag, T values[],
  623. std::size_t n)
  624. {
  625. pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  626. values, n);
  627. return std::make_pair(source, n);
  628. }
  629. template<typename T, typename BinaryOperation>
  630. T*
  631. all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
  632. BinaryOperation bin_op)
  633. {
  634. synchronize(pg);
  635. bool inplace = first == out;
  636. if (inplace) out = new T [last-first];
  637. boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
  638. boost::mpi::comm_attach),
  639. first, last-first, out, bin_op);
  640. if (inplace) {
  641. std::copy(out, out + (last-first), first);
  642. delete [] out;
  643. return last;
  644. }
  645. return out;
  646. }
  647. template<typename T>
  648. void
  649. broadcast(const mpi_process_group& pg, T& val,
  650. mpi_process_group::process_id_type root)
  651. {
  652. // broadcast the seed
  653. boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
  654. boost::mpi::broadcast(comm,val,root);
  655. }
  656. template<typename T, typename BinaryOperation>
  657. T*
  658. scan(const mpi_process_group& pg, T* first, T* last, T* out,
  659. BinaryOperation bin_op)
  660. {
  661. synchronize(pg);
  662. bool inplace = first == out;
  663. if (inplace) out = new T [last-first];
  664. boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
  665. if (inplace) {
  666. std::copy(out, out + (last-first), first);
  667. delete [] out;
  668. return last;
  669. }
  670. return out;
  671. }
  672. template<typename InputIterator, typename T>
  673. void
  674. all_gather(const mpi_process_group& pg, InputIterator first,
  675. InputIterator last, std::vector<T>& out)
  676. {
  677. synchronize(pg);
  678. // Stick a copy of the local values into a vector, so we can broadcast it
  679. std::vector<T> local_values(first, last);
  680. // Collect the number of vertices stored in each process
  681. int size = local_values.size();
  682. std::vector<int> sizes(num_processes(pg));
  683. int result = MPI_Allgather(&size, 1, MPI_INT,
  684. &sizes[0], 1, MPI_INT,
  685. communicator(pg));
  686. BOOST_ASSERT(result == MPI_SUCCESS);
  687. (void)result;
  688. // Adjust sizes based on the number of bytes
  689. //
  690. // std::transform(sizes.begin(), sizes.end(), sizes.begin(),
  691. // std::bind2nd(std::multiplies<int>(), sizeof(T)));
  692. //
  693. // std::bind2nd has been removed from C++17
  694. for( std::size_t i = 0, n = sizes.size(); i < n; ++i )
  695. {
  696. sizes[ i ] *= sizeof( T );
  697. }
  698. // Compute displacements
  699. std::vector<int> displacements;
  700. displacements.reserve(sizes.size() + 1);
  701. displacements.push_back(0);
  702. std::partial_sum(sizes.begin(), sizes.end(),
  703. std::back_inserter(displacements));
  704. // Gather all of the values
  705. out.resize(displacements.back() / sizeof(T));
  706. if (!out.empty()) {
  707. result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
  708. /* local results */: (void*)&local_values[0],
  709. local_values.size() * sizeof(T),
  710. MPI_BYTE,
  711. &out[0], &sizes[0], &displacements[0], MPI_BYTE,
  712. communicator(pg));
  713. }
  714. BOOST_ASSERT(result == MPI_SUCCESS);
  715. }
  716. template<typename InputIterator>
  717. mpi_process_group
  718. process_subgroup(const mpi_process_group& pg,
  719. InputIterator first, InputIterator last)
  720. {
  721. /*
  722. boost::mpi::group current_group = communicator(pg).group();
  723. boost::mpi::group new_group = current_group.include(first,last);
  724. boost::mpi::communicator new_comm(communicator(pg),new_group);
  725. return mpi_process_group(new_comm);
  726. */
  727. std::vector<int> ranks(first, last);
  728. MPI_Group current_group;
  729. int result = MPI_Comm_group(communicator(pg), &current_group);
  730. BOOST_ASSERT(result == MPI_SUCCESS);
  731. (void)result;
  732. MPI_Group new_group;
  733. result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
  734. BOOST_ASSERT(result == MPI_SUCCESS);
  735. MPI_Comm new_comm;
  736. result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
  737. BOOST_ASSERT(result == MPI_SUCCESS);
  738. result = MPI_Group_free(&new_group);
  739. BOOST_ASSERT(result == MPI_SUCCESS);
  740. result = MPI_Group_free(&current_group);
  741. BOOST_ASSERT(result == MPI_SUCCESS);
  742. if (new_comm != MPI_COMM_NULL) {
  743. mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
  744. result = MPI_Comm_free(&new_comm);
  745. BOOST_ASSERT(result == 0);
  746. return result_pg;
  747. } else {
  748. return mpi_process_group(mpi_process_group::create_empty());
  749. }
  750. }
  751. template<typename Receiver>
  752. Receiver* mpi_process_group::get_receiver()
  753. {
  754. return impl_->blocks[my_block_number()]->on_receive
  755. .template target<Receiver>();
  756. }
  757. template<typename T>
  758. typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
  759. receive_oob(const mpi_process_group& pg,
  760. mpi_process_group::process_id_type source, int tag, T& value, int block)
  761. {
  762. using boost::mpi::get_mpi_datatype;
  763. // Determine the actual message we expect to receive, and which
  764. // communicator it will come by.
  765. std::pair<boost::mpi::communicator, int> actual
  766. = pg.actual_communicator_and_tag(tag, block);
  767. // Post a non-blocking receive that waits until we complete this request.
  768. MPI_Request request;
  769. MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
  770. source, actual.second, actual.first, &request);
  771. int done = 0;
  772. do {
  773. MPI_Test(&request, &done, MPI_STATUS_IGNORE);
  774. if (!done)
  775. pg.poll(/*wait=*/false, block);
  776. } while (!done);
  777. }
  778. template<typename T>
  779. typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
  780. receive_oob(const mpi_process_group& pg,
  781. mpi_process_group::process_id_type source, int tag, T& value, int block)
  782. {
  783. // Determine the actual message we expect to receive, and which
  784. // communicator it will come by.
  785. std::pair<boost::mpi::communicator, int> actual
  786. = pg.actual_communicator_and_tag(tag, block);
  787. boost::optional<boost::mpi::status> status;
  788. do {
  789. status = actual.first.iprobe(source, actual.second);
  790. if (!status)
  791. pg.poll();
  792. } while (!status);
  793. //actual.first.recv(status->source(), status->tag(),value);
  794. // Allocate the receive buffer
  795. boost::mpi::packed_iarchive in(actual.first);
  796. #if BOOST_VERSION >= 103600
  797. in.resize(status->count<boost::mpi::packed>().get());
  798. #else
  799. int size;
  800. MPI_Status mpi_status = *status;
  801. MPI_Get_count(&mpi_status, MPI_PACKED, &size);
  802. in.resize(size);
  803. #endif
  804. // Receive the message data
  805. MPI_Recv(in.address(), in.size(), MPI_PACKED,
  806. status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
  807. // Unpack the message data
  808. in >> value;
  809. }
  810. template<typename SendT, typename ReplyT>
  811. typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  812. send_oob_with_reply(const mpi_process_group& pg,
  813. mpi_process_group::process_id_type dest,
  814. int tag, const SendT& send_value, ReplyT& reply_value,
  815. int block)
  816. {
  817. detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
  818. send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
  819. (int)reply_tag, send_value), block);
  820. receive_oob(pg, dest, reply_tag, reply_value);
  821. }
  822. template<typename SendT, typename ReplyT>
  823. typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  824. send_oob_with_reply(const mpi_process_group& pg,
  825. mpi_process_group::process_id_type dest,
  826. int tag, const SendT& send_value, ReplyT& reply_value,
  827. int block)
  828. {
  829. detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
  830. send_oob(pg, dest, tag,
  831. boost::parallel::detail::make_untracked_pair((int)reply_tag,
  832. send_value), block);
  833. receive_oob(pg, dest, reply_tag, reply_value);
  834. }
  835. } } } // end namespace boost::graph::distributed