My Project
p2pcommunicator_impl.hh
1 /*
2  Copyright 2015 IRIS AS
3 
4  This file is part of the Open Porous Media project (OPM).
5 
6  OPM is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  OPM is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with OPM. If not, see <http://www.gnu.org/licenses/>.
18 */
19 #ifndef DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
20 #define DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
21 
22 #include <iostream>
23 
24 namespace Dune
25 {
26 
27  template <class MsgBuffer>
28  inline void
31  {
32  sendLinkage_.clear();
33  recvLinkage_.clear();
34  sendDest_.clear();
35  recvSource_.clear();
36 
37  // clear previously stored buffer sizes
38  _recvBufferSizes.clear();
39  _recvBufferSizesComputed = false ;
40  }
41 
42  template <class MsgBuffer>
43  inline void
45  computeDestinations( const linkage_t& linkage, vector_t& dest )
46  {
47  typedef linkage_t::const_iterator const_iterator ;
48  dest.resize( linkage.size() );
49  const const_iterator linkageEnd = linkage.end ();
50  for( const_iterator i = linkage.begin (); i != linkageEnd; ++i )
51  {
52  dest[ (*i).second ] = (*i).first;
53  }
54  }
55 
56  template <class MsgBuffer>
57  inline void
59  insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks )
60  {
61  // remove old linkage
62  removeLinkage();
63 
64  const int me_rank = rank ();
65 
66  {
67  typedef std::map< int, int >::iterator iterator ;
68  typedef std::set< int >::const_iterator const_iterator;
69 
70  const iterator sendEnd = sendLinkage_.end ();
71  const iterator recvEnd = recvLinkage_.end ();
72  const const_iterator sendLinksEnd = sendLinks.end ();
73  int sendLink = 0 ;
74  int recvLink = 0 ;
75  for (const_iterator i = sendLinks.begin (); i != sendLinksEnd; ++i )
76  {
77  const int rank = (*i);
78  // if rank was not inserted, insert with current link number
79  if( rank != me_rank && (sendLinkage_.find ( rank ) == sendEnd ) )
80  {
81  sendLinkage_.insert( std::make_pair( rank, sendLink++) );
82  }
83  }
84 
85  const const_iterator recvLinksEnd = recvLinks.end ();
86  for (const_iterator i = recvLinks.begin (); i != recvLinksEnd; ++i )
87  {
88  const int rank = (*i);
89  // if rank was not inserted, insert with current link number
90  if( rank != me_rank && (recvLinkage_.find ( rank ) == recvEnd ) )
91  {
92  recvLinkage_.insert( std::make_pair( rank, recvLink++) );
93  }
94  }
95  }
96 
97  // compute send destinations
98  computeDestinations( sendLinkage_, sendDest_ );
99 
100  // compute send destinations
101  computeDestinations( recvLinkage_, recvSource_ );
102  }
103 
104 
106  // non-blocking communication object
107  // this class is defined here since it contains MPI information
109 #if HAVE_MPI
110 
111 #ifndef NDEBUG
112 // this is simply to avoid warning of unused variables
113 #define MY_INT_TEST int test =
114 #else
115 #define MY_INT_TEST
116 #endif
117 
118  template < class P2PCommunicator >
119  class NonBlockingExchangeImplementation
120  {
121  typedef P2PCommunicator P2PCommunicatorType ;
122  const P2PCommunicatorType& _p2pCommunicator;
123 
124  const int _sendLinks;
125  const int _recvLinks;
126  const int _tag;
127 
128  MPI_Request* _sendRequest;
129  MPI_Request* _recvRequest;
130 
131  const bool _recvBufferSizesKnown;
132  bool _needToSend ;
133 
134  // no copying
135  NonBlockingExchangeImplementation( const NonBlockingExchangeImplementation& );
136 
137  // return vector of send requests for number of send links is positive
138  MPI_Request* createSendRequest() const
139  {
140  return ( _sendLinks > 0 ) ? new MPI_Request [ _sendLinks ] : 0;
141  }
142 
143  // return vector of recv requests when
144  // number of recv links is positive and symmetric is true
145  MPI_Request* createRecvRequest( const bool recvBufferSizesKnown ) const
146  {
147  return ( _recvLinks > 0 && recvBufferSizesKnown ) ? new MPI_Request [ _recvLinks ] : 0;
148  }
149 
150  // call cast operator on CollectiveCommunication to retreive MPI_Comm
151  MPI_Comm mpiCommunicator() const { return static_cast< MPI_Comm > (_p2pCommunicator); }
152 
153  public:
154  typedef typename P2PCommunicatorType :: DataHandleInterface DataHandleInterface;
155  typedef typename P2PCommunicatorType :: MessageBufferType MessageBufferType;
156 
157  NonBlockingExchangeImplementation( const P2PCommunicatorType& p2pComm,
158  const int tag,
159  const bool recvBufferSizesKnown = false )
160  : _p2pCommunicator( p2pComm ),
161  _sendLinks( _p2pCommunicator.sendLinks() ),
162  _recvLinks( _p2pCommunicator.recvLinks() ),
163  _tag( tag ),
164  _sendRequest( createSendRequest() ),
165  _recvRequest( createRecvRequest( recvBufferSizesKnown ) ),
166  _recvBufferSizesKnown( recvBufferSizesKnown ),
167  _needToSend( true )
168  {
169  // make sure every process has the same tag
170 #ifndef NDEBUG
171  int mytag = tag ;
172  assert ( mytag == _p2pCommunicator.max( mytag ) );
173 #endif
174  }
175 
176  NonBlockingExchangeImplementation( const P2PCommunicatorType& p2pComm,
177  const int tag,
178  const std::vector< MessageBufferType > & sendBuffers )
179  : _p2pCommunicator( p2pComm ),
180  _sendLinks( _p2pCommunicator.sendLinks() ),
181  _recvLinks( _p2pCommunicator.recvLinks() ),
182  _tag( tag ),
183  _sendRequest( createSendRequest() ),
184  _recvRequest( createRecvRequest( false ) ),
185  _recvBufferSizesKnown( false ),
186  _needToSend( false )
187  {
188  // make sure every process has the same tag
189 #ifndef NDEBUG
190  int mytag = tag ;
191  assert ( mytag == _p2pCommunicator.max( mytag ) );
192 #endif
193 
194  assert ( _sendLinks == int( sendBuffers.size() ) );
195  sendImpl( sendBuffers );
196  }
197 
199  // interface methods
201  ~NonBlockingExchangeImplementation()
202  {
203  if( _sendRequest )
204  {
205  delete [] _sendRequest;
206  _sendRequest = 0;
207  }
208 
209  if( _recvRequest )
210  {
211  delete [] _recvRequest;
212  _recvRequest = 0;
213  }
214  }
215 
216  // virtual methods
217  void send( const std::vector< MessageBufferType > & sendBuffers ) { sendImpl( sendBuffers ); }
218  std::vector< MessageBufferType > receive() { return receiveImpl(); }
219 
221  // implementation
223 
224  // send data implementation
225  void sendImpl( const std::vector< MessageBufferType > & sendBuffers )
226  {
227  // get mpi communicator
228  MPI_Comm comm = mpiCommunicator();
229 
230  // get vector with destinations
231  const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
232 
233  // send data
234  for (int link = 0; link < _sendLinks; ++link)
235  {
236  sendLink( sendDest[ link ], _tag, sendBuffers[ link ], _sendRequest[ link ], comm );
237  }
238 
239  // set send info
240  _needToSend = false ;
241  }
242 
243  // receive data without buffer given
244  std::vector< MessageBufferType > receiveImpl ()
245  {
246  // create vector of empty streams
247  std::vector< MessageBufferType > recvBuffer( _recvLinks );
248  receiveImpl( recvBuffer );
249  return recvBuffer;
250  }
251 
252  // receive data implementation with given buffers
253  void receiveImpl ( std::vector< MessageBufferType >& recvBuffers, DataHandleInterface* dataHandle = 0)
254  {
255  // do nothing if number of links is zero
256  if( (_recvLinks + _sendLinks) == 0 ) return;
257 
258  // get mpi communicator
259  MPI_Comm comm = mpiCommunicator();
260 
261  // get vector with destinations
262  const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
263 
264  // check whether out vector has more than one stream
265  const bool useFirstStreamOnly = (recvBuffers.size() == 1) ;
266 
267  // flag vector holding information about received links
268  std::vector< bool > linkNotReceived( _recvLinks, true );
269 
270  // count noumber of received messages
271  int numReceived = 0;
272  while( numReceived < _recvLinks )
273  {
274  // check for all links messages
275  for (int link = 0; link < _recvLinks; ++link )
276  {
277  // if message was not received yet, check again
278  if( linkNotReceived[ link ] )
279  {
280  // get appropriate object stream
281  MessageBufferType& recvBuffer = useFirstStreamOnly ? recvBuffers[ 0 ] : recvBuffers[ link ];
282 
283  // check whether a message was completely received
284  // if message was received the unpack data
285  if( probeAndReceive( comm, recvSource[ link ], _tag, recvBuffer ) )
286  {
287  // if data handle was given do unpack
288  if( dataHandle ) dataHandle->unpack( link, recvBuffer );
289 
290  // mark link as received
291  linkNotReceived[ link ] = false ;
292 
293  // increase number of received messages
294  ++ numReceived;
295  }
296  }
297  }
298  }
299 
300  // if send request exists, i.e. some messages have been sent
301  if( _sendRequest )
302  {
303  // wait until all processes are done with receiving
304  MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
305  assert (test == MPI_SUCCESS);
306  }
307  }
308 
309  // receive data implementation with given buffers
310  void unpackRecvBufferSizeKnown( std::vector< MessageBufferType >& recvBuffers, DataHandleInterface& dataHandle )
311  {
312  // do nothing if number of links is zero
313  if( _recvLinks == 0 ) return;
314 
315  // flag vector holding information about received links
316  std::vector< bool > linkNotReceived( _recvLinks, true );
317 
318  // count noumber of received messages
319  int numReceived = 0;
320  while( numReceived < _recvLinks )
321  {
322  // check for all links messages
323  for (int link = 0; link < _recvLinks; ++link )
324  {
325  // if message was not received yet, check again
326  if( linkNotReceived[ link ] )
327  {
328  assert( _recvRequest );
329  // check whether message was received, and if unpack data
330  if( receivedMessage( _recvRequest[ link ], recvBuffers[ link ] ) )
331  {
332  // if data handle was given do unpack
333  dataHandle.unpack( link, recvBuffers[ link ] );
334 
335  // mark link as received
336  linkNotReceived[ link ] = false ;
337  // increase number of received messages
338  ++ numReceived;
339  }
340  }
341  }
342  }
343 
344  // if send request exists, i.e. some messages have been sent
345  if( _sendRequest )
346  {
347  // wait until all processes are done with receiving
348  MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
349  assert (test == MPI_SUCCESS);
350  }
351  }
352 
353  // receive data implementation with given buffers
354  void send( std::vector< MessageBufferType >& sendBuffers,
355  DataHandleInterface& dataHandle )
356  {
357  std::vector< MessageBufferType > recvBuffers;
358  send( sendBuffers, recvBuffers, dataHandle );
359  }
360 
361  // receive data implementation with given buffers
362  void send( std::vector< MessageBufferType >& sendBuffer,
363  std::vector< MessageBufferType >& recvBuffer,
364  DataHandleInterface& dataHandle )
365  {
366  if( _needToSend )
367  {
368  // get mpi communicator
369  MPI_Comm comm = mpiCommunicator();
370 
371  // get vector with destinations
372  const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
373 
374  // send data
375  for (int link = 0; link < _sendLinks; ++link)
376  {
377  // pack data
378  dataHandle.pack( link, sendBuffer[ link ] );
379 
380  // send data
381  sendLink( sendDest[ link ], _tag, sendBuffer[ link ], _sendRequest[ link ], comm );
382  }
383 
384  // set send info
385  _needToSend = false ;
386  }
387 
388  // resize receive buffer if in symmetric mode
389  if( _recvBufferSizesKnown )
390  {
391  // get mpi communicator
392  MPI_Comm comm = mpiCommunicator();
393 
394  recvBuffer.resize( _recvLinks );
395 
396  // get vector with destinations
397  const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
398  const std::vector< int >& recvBufferSizes = _p2pCommunicator.recvBufferSizes();
399 
400  // send data
401  for (int link = 0; link < _recvLinks; ++link)
402  {
403  // send data
404  const int bufferSize = recvBufferSizes[ link ];
405 
406  // post receive if in symmetric mode
407  assert( _recvRequest );
408  assert( &_recvRequest[ link ] );
409  postReceive( recvSource[ link ], _tag, bufferSize, recvBuffer[ link ], _recvRequest[ link ], comm );
410  }
411  }
412  }
413 
414  // receive data implementation with given buffers
415  void receive( DataHandleInterface& dataHandle )
416  {
417  // do work that can be done between send and receive
418  dataHandle.localComputation() ;
419 
420  // create receive message buffers
421  std::vector< MessageBufferType > recvBuffer( 1 );
422  // receive data
423  receiveImpl( recvBuffer, &dataHandle );
424  }
425 
426  // receive data implementation with given buffers
427  void exchange( DataHandleInterface& dataHandle )
428  {
429  const int recvLinks = _p2pCommunicator.recvLinks();
430  // do nothing if number of links is zero
431  if( (recvLinks + _sendLinks) == 0 ) return;
432 
433  // send message buffers, we need several because of the
434  // non-blocking send routines, send might not be finished
435  // when we start recieving
436  std::vector< MessageBufferType > sendBuffers ;
437  std::vector< MessageBufferType > recvBuffers ;
438 
439  // if data was noy send yet, do it now
440  if( _needToSend )
441  {
442  // resize message buffer vector
443  sendBuffers.resize( _sendLinks );
444 
445  // send data
446  send( sendBuffers, recvBuffers, dataHandle );
447  }
448 
449  // now receive data
450  if( _recvBufferSizesKnown )
451  unpackRecvBufferSizeKnown( recvBuffers, dataHandle );
452  else
453  receive( dataHandle );
454  }
455 
456  protected:
457  int sendLink( const int dest, const int tag,
458  const MessageBufferType& msgBuffer, MPI_Request& request, MPI_Comm& comm )
459  {
460  // buffer = point to mem and size
461  std::pair< char*, int > buffer = msgBuffer.buffer();
462 
463  MY_INT_TEST MPI_Isend ( buffer.first, buffer.second, MPI_BYTE, dest, tag, comm, &request );
464  assert (test == MPI_SUCCESS);
465 
466  return buffer.second;
467  }
468 
469  void postReceive( const int source, const int tag, const int bufferSize,
470  MessageBufferType& msgBuffer, MPI_Request& request, MPI_Comm& comm )
471  {
472  // reserve memory for receive buffer
473  msgBuffer.resize( bufferSize );
474  // reset read position
475  msgBuffer.resetReadPosition();
476 
477  // get buffer and size
478  std::pair< char*, int > buffer = msgBuffer.buffer();
479 
480  // MPI receive (non-blocking)
481  {
482  MY_INT_TEST MPI_Irecv ( buffer.first, buffer.second, MPI_BYTE, source, tag, comm, & request);
483  assert (test == MPI_SUCCESS);
484  }
485  }
486 
487  // does receive operation for one link
488  bool receivedMessage( MPI_Request& request, MessageBufferType&
489 #ifndef NDEBUG
490  buffer
491 #endif
492  )
493  {
494 #ifndef NDEBUG
495  // for checking whether the buffer size is correct
496  MPI_Status status ;
497 #endif
498  // msg received, 0 or 1
499  int received = 0;
500 
501  // if receive of message is finished, unpack
502  MPI_Test( &request, &received,
503 #ifndef NDEBUG
504  &status
505 #else
506  MPI_STATUS_IGNORE // ignore status in non-debug mode for performance reasons
507 #endif
508  );
509 
510 #ifndef NDEBUG
511  if( received )
512  {
513  int checkBufferSize = -1;
514  MPI_Get_count ( & status, MPI_BYTE, &checkBufferSize );
515  if( checkBufferSize != int(buffer.size()) )
516  std::cout << "Buffer sizes don't match: " << checkBufferSize << " " << buffer.size() << std::endl;
517  assert( checkBufferSize == int(buffer.size()) );
518  }
519 #endif
520  return bool(received);
521  }
522 
523  // does receive operation for one link
524  bool probeAndReceive( MPI_Comm& comm,
525  const int source,
526  const int tag,
527  MessageBufferType& recvBuffer )
528  {
529  // corresponding MPI status
530  MPI_Status status;
531 
532  // msg available, 0 or 1
533  // available does not mean already received
534  int available = 0;
535 
536  // check for any message with tag (nonblocking)
537  MPI_Iprobe( source, tag, comm, &available, &status );
538 
539  // receive message if available flag is true
540  if( available )
541  {
542  // this should be the same, otherwise we got an error
543  assert ( source == status.MPI_SOURCE );
544 
545  // length of message
546  int bufferSize = -1;
547 
548  // get length of message
549  {
550  MY_INT_TEST MPI_Get_count ( &status, MPI_BYTE, &bufferSize );
551  assert (test == MPI_SUCCESS);
552  }
553 
554  // reserve memory
555  recvBuffer.resize( bufferSize );
556  // reset read position for unpack
557  recvBuffer.resetReadPosition();
558 
559  // get buffer
560  std::pair< char*, int > buffer = recvBuffer.buffer();
561 
562  // MPI receive (blocking)
563  {
564  MY_INT_TEST MPI_Recv ( buffer.first, buffer.second, MPI_BYTE, status.MPI_SOURCE, tag, comm, & status);
565  assert (test == MPI_SUCCESS);
566  }
567 
568  return true ; // received
569  }
570  return false ; // not yet received
571  }
572  }; // end NonBlockingExchangeImplementation
573 
574 #undef MY_INT_TEST
575 #endif // #if HAVE_MPI
576 
577  // --exchange
578 
579  template <class MsgBuffer>
580  inline void
582 #if HAVE_MPI
583  exchange( DataHandleInterface& handle) const
584 #else
585  exchange( DataHandleInterface&) const
586 #endif
587  {
588  assert( _recvBufferSizes.empty () );
589 #if HAVE_MPI
590  NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *this, getMessageTag() );
591  nonBlockingExchange.exchange( handle );
592 #endif
593  }
594 
595  // --exchange
596  template <class MsgBuffer>
597  inline std::vector< MsgBuffer >
599  exchange( const std::vector< MessageBufferType > & in ) const
600  {
601 #if HAVE_MPI
602  // note: for the non-blocking exchange the message tag
603  // should be different each time to avoid MPI problems
604  NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *this, getMessageTag(), in );
605  return nonBlockingExchange.receiveImpl();
606 #else
607  // don nothing when MPI is not found
608  return in;
609 #endif
610  }
611 
612  // --exchange
613  template <class MsgBuffer>
614  inline void
616 #if HAVE_MPI
617  exchangeCached( DataHandleInterface& handle ) const
618 #else
619  exchangeCached( DataHandleInterface&) const
620 #endif
621  {
622 #if HAVE_MPI
623  if( ! _recvBufferSizesComputed )
624  {
625  const int nSendLinks = sendLinks();
626  std::vector< MsgBuffer > buffers( nSendLinks );
627  // pack all data
628  for( int link=0; link<nSendLinks; ++link )
629  {
630  handle.pack( link, buffers[ link ] );
631  }
632  // exchange data
633  buffers = exchange( buffers );
634  const int nRecvLinks = recvLinks();
635  // unpack all data
636  for( int link=0; link<nRecvLinks; ++link )
637  {
638  handle.unpack( link, buffers[ link ] );
639  }
640  // store receive buffer sizes
641  _recvBufferSizes.resize( nRecvLinks );
642  for( int link=0; link<nRecvLinks; ++link )
643  {
644  _recvBufferSizes[ link ] = buffers[ link ].size();
645  }
646  _recvBufferSizesComputed = true ;
647  }
648  else
649  {
650  NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *this, getMessageTag(), _recvBufferSizesComputed );
651  nonBlockingExchange.exchange( handle );
652  }
653 #endif
654  }
655 
656 } // namespace Dune
657 #endif // #ifndef DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
Definition: p2pcommunicator.hh:177
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:143
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:599
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition: p2pcommunicator_impl.hh:59
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
Copyright 2019 Equinor AS.
Definition: CartesianIndexMapper.hpp:10