HMLP: High-performance Machine Learning Primitives
hmlp_mpi.hpp
1 #ifndef HMLP_MPI_HPP
2 #define HMLP_MPI_HPP
3 #include <external/mpi_prototypes.h>
4 
5 #include <vector>
6 #include <stdlib.h>
7 #include <stdint.h>
8 #include <limits.h>
9 #include <type_traits>
10 #include <cstdio>
11 #include <cstdint>
12 #include <cassert>
13 #include <vector>
14 
15 #if SIZE_MAX == UCHAR_MAX
16 #define HMLP_MPI_SIZE_T MPI_UNSIGNED_CHAR
17 #elif SIZE_MAX == USHRT_MAX
18 #define HMLP_MPI_SIZE_T MPI_UNSIGNED_SHORT
19 #elif SIZE_MAX == UINT_MAX
20 #define HMLP_MPI_SIZE_T MPI_UNSIGNED
21 #elif SIZE_MAX == ULONG_MAX
22 #define HMLP_MPI_SIZE_T MPI_UNSIGNED_LONG
23 #elif SIZE_MAX == ULLONG_MAX
24 #define HMLP_MPI_SIZE_T MPI_UNSIGNED_LONG_LONG
25 #else
26 #error "what is happening here?"
27 #endif
28 
29 
31 #ifdef HMLP_MIC_AVX512
32 #include <hbwmalloc.h>
33 #include <hbw_allocator.h>
34 #endif // ifdef HMLP}_MIC_AVX512
35 
36 
37 using namespace std;
38 
39 namespace hmlp
40 {
42 namespace mpi
43 {
44 typedef MPI_Status Status;
45 typedef MPI_Request Request;
46 typedef MPI_Comm Comm;
47 typedef MPI_Datatype Datatype;
48 typedef MPI_Op Op;
49 
51 int Init( int *argc, char ***argv );
52 int Initialized( int *flag );
53 int Finalize( void );
54 int Finalized( int *flag );
55 int Send( const void *buf, int count, Datatype datatype, int dest, int tag, Comm comm );
56 int Isend( const void *buf, int count, Datatype datatype, int dest, int tag, Comm comm, Request *request );
57 int Recv( void *buf, int count, Datatype datatype, int source, int tag, Comm comm, Status *status );
58 int Irecv( void *buf, int count, Datatype datatype, int source, int tag, Comm comm, Request *request );
59 int Sendrecv( void *sendbuf, int sendcount, Datatype sendtype, int dest, int sendtag, void *recvbuf, int recvcount, Datatype recvtype, int source, int recvtag, Comm comm, Status *status );
60 int Get_count( Status *status, Datatype datatype, int *count );
61 int Comm_size( Comm comm, int *size );
62 int Comm_rank( Comm comm, int *rank );
63 int Comm_dup( Comm comm, Comm *newcomm );
64 int Comm_split( Comm comm, int color, int key, Comm *newcomm );
65 int Type_contiguous( int count, Datatype oldtype, Datatype *newtype );
66 int Type_commit( Datatype *datatype );
67 int Test( Request *request, int *flag, Status *status );
68 int Barrier( Comm comm );
69 int Ibarrier( Comm comm, Request *request );
70 int Bcast( void *buffer, int count, Datatype datatype, int root, Comm comm );
71 int Reduce( void *sendbuf, void *recvbuf, int count, Datatype datatype, Op op, int root, Comm comm );
72 int Gather( const void *sendbuf, int sendcount, Datatype sendtype, void *recvbuf, int recvcount, Datatype recvtype, int root, Comm comm );
73 int Gatherv( void *sendbuf, int sendcount, Datatype sendtype, void *recvbuf, const int *recvcounts, const int *displs, Datatype recvtype, int root, Comm comm );
74 int Scan( void *sendbuf, void *recvbuf, int count, Datatype datatype, Op op, Comm comm );
75 int Allreduce( void* sendbuf, void* recvbuf, int count, Datatype datatype, Op op, Comm comm );
76 int Allgather( void *sendbuf, int sendcount, Datatype sendtype, void *recvbuf, int recvcount, Datatype recvtype, Comm comm );
77 int Allgatherv( void *sendbuf, int sendcount, Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, Datatype recvtype, Comm comm );
78 int Alltoall( void *sendbuf, int sendcount, Datatype sendtype, void *recvbuf, int recvcount, Datatype recvtype, Comm comm );
79 int Alltoallv( void *sendbuf, int *sendcounts, int *sdispls, Datatype sendtype, void *recvbuf, int *recvcounts, int *rdispls, Datatype recvtype, Comm comm );
80 
82 int Init_thread( int *argc, char ***argv, int required, int *provided );
83 int Probe( int source, int tag, Comm comm, Status *status );
84 int Iprobe( int source, int tag, Comm comm, int *flag, Status *status );
85 
87 void PrintProgress( const char *s, mpi::Comm comm );
88 
89 class MPIObject
90 {
91  public:
92 
93  MPIObject() {};
94 
95  MPIObject( mpi::Comm comm ) { AssignCommunicator( comm ); };
96 
97  void AssignCommunicator( mpi::Comm &comm )
98  {
99  this->comm = comm;
100  mpi::Comm_dup( comm, &private_comm );
101  mpi::Comm_size( comm, &comm_size );
102  mpi::Comm_rank( comm, &comm_rank );
103  };
104 
105  mpi::Comm GetComm() { return comm; };
106 
107  mpi::Comm GetPrivateComm() { return private_comm; };
108 
109  int GetCommSize() { return comm_size; };
110 
111  int GetCommRank() { return comm_rank; };
112 
113  int Comm_size()
114  {
115  int size;
116  mpi::Comm_size( comm, &size );
117  return size;
118  };
119 
120  int Comm_rank()
121  {
122  int rank;
123  mpi::Comm_rank( comm, &rank );
124  return rank;
125  };
126 
127  int Barrier() { return mpi::Barrier( comm ); };
128 
129  int PrivateBarrier() { return mpi::Barrier( private_comm ); };
130 
131  private:
132 
134  mpi::Comm comm = MPI_COMM_WORLD;
136  mpi::Comm private_comm;
138  int comm_size = 1;
140  int comm_rank = 0;
141 
142 };
151 template<typename T>
152 struct NumberIntPair { T val; int key; };
153 
154 
155 template<typename T>
156 Datatype GetMPIDatatype()
157 {
158  Datatype datatype;
159  if ( std::is_same<T, int>::value )
160  datatype = MPI_INT;
161  else if ( std::is_same<T, float>::value )
162  datatype = MPI_FLOAT;
163  else if ( std::is_same<T, double>::value )
164  datatype = MPI_DOUBLE;
165  else if ( std::is_same<T, size_t>::value )
166  datatype = HMLP_MPI_SIZE_T;
167  else if ( std::is_same<T, NumberIntPair<float> >::value )
168  datatype = MPI_FLOAT_INT;
169  else if ( std::is_same<T, NumberIntPair<double> >::value )
170  {
171  datatype = MPI_DOUBLE_INT;
172  }
173  else
174  {
175  Type_contiguous( sizeof( T ), MPI_BYTE, &datatype );
176  Type_commit( &datatype );
177 
178  //printf( "request datatype is not supported in the simplified interface\n" );
179  //exit( 1 );
180  }
181  return datatype;
182 
183 };
187 template<typename TSEND>
188 int Send( TSEND *buf, int count,
189  int dest, int tag, Comm comm )
190 {
191  Datatype datatype = GetMPIDatatype<TSEND>();
192  return Send( buf, count, datatype, dest, tag, comm );
193 };
196 template<typename TSEND>
197 int Isend( TSEND *buf, int count,
198  int dest, int tag, Comm comm, Request *request )
199 {
200  Datatype datatype = GetMPIDatatype<TSEND>();
201  return Isend( buf, count, datatype, dest, tag, comm, request );
202 };
206 template<typename TRECV>
207 int Recv( TRECV *buf, int count,
208  int source, int tag, Comm comm, Status *status )
209 {
210  Datatype datatype = GetMPIDatatype<TRECV>();
211  return Recv( buf, count, datatype, source, tag, comm, status );
212 };
214 template<typename T>
215 int Bcast( T *buffer, int count, int root, Comm comm )
216 {
217  Datatype datatype = GetMPIDatatype<T>();
218  return Bcast( buffer, count, datatype, root, comm );
219 };
222 template<typename TSEND, typename TRECV>
223 int Sendrecv(
224  TSEND *sendbuf, int sendcount, int dest, int sendtag,
225  TRECV *recvbuf, int recvcount, int source, int recvtag,
226  Comm comm, Status *status )
227 {
228  Datatype sendtype = GetMPIDatatype<TSEND>();
229  Datatype recvtype = GetMPIDatatype<TRECV>();
230  return Sendrecv(
231  sendbuf, sendcount, sendtype, dest, sendtag,
232  recvbuf, recvcount, recvtype, source, recvtag,
233  comm, status );
234 
235 };
238 template<typename T>
239 int Reduce( T *sendbuf, T *recvbuf, int count,
240  Op op, int root, Comm comm )
241 {
242  Datatype datatype = GetMPIDatatype<T>();
243  return Reduce( sendbuf, recvbuf, count, datatype, op, root, comm );
244 };
248 template<typename TSEND, typename TRECV>
249 int Gather( const TSEND *sendbuf, int sendcount,
250  TRECV *recvbuf, int recvcount,
251  int root, Comm comm )
252 {
253  Datatype sendtype = GetMPIDatatype<TSEND>();
254  Datatype recvtype = GetMPIDatatype<TRECV>();
255  return Gather( sendbuf, sendcount, sendtype,
256  recvbuf, recvcount, recvtype, root, comm );
257 };
258 
259 
260 template<typename TSEND, typename TRECV>
261 int Gatherv(
262  TSEND *sendbuf, int sendcount,
263  TRECV *recvbuf, const int *recvcounts, const int *displs,
264  int root, Comm comm )
265 {
266  Datatype sendtype = GetMPIDatatype<TSEND>();
267  Datatype recvtype = GetMPIDatatype<TRECV>();
268  return Gatherv( sendbuf, sendcount, sendtype,
269  recvbuf, recvcounts, displs, recvtype, root, comm );
270 };
273 template<typename T>
274 int Allreduce( T* sendbuf, T* recvbuf, int count, Op op, Comm comm )
275 {
276  Datatype datatype = GetMPIDatatype<T>();
277  return Allreduce( sendbuf, recvbuf, count, datatype, op, comm );
278 };
281 template<typename TSEND, typename TRECV>
282 int Allgather(
283  TSEND *sendbuf, int sendcount,
284  TRECV *recvbuf, int recvcount, Comm comm )
285 {
286  Datatype sendtype = GetMPIDatatype<TSEND>();
287  Datatype recvtype = GetMPIDatatype<TRECV>();
288  return Allgather(
289  sendbuf, sendcount, sendtype,
290  recvbuf, recvcount, recvtype, comm );
291 };
294 template<typename TSEND, typename TRECV>
295 int Allgatherv(
296  TSEND *sendbuf, int sendcount,
297  TRECV *recvbuf, int *recvcounts, int *displs, Comm comm )
298 {
299  Datatype sendtype = GetMPIDatatype<TSEND>();
300  Datatype recvtype = GetMPIDatatype<TRECV>();
301  return Allgatherv(
302  sendbuf, sendcount, sendtype,
303  recvbuf, recvcounts, displs, recvtype, comm );
304 };
307 template<typename TSEND, typename TRECV>
308 int Alltoall(
309  TSEND *sendbuf, int sendcount,
310  TRECV *recvbuf, int recvcount, Comm comm )
311 {
312  Datatype sendtype = GetMPIDatatype<TSEND>();
313  Datatype recvtype = GetMPIDatatype<TRECV>();
314  return Alltoall(
315  sendbuf, sendcount, sendtype,
316  recvbuf, recvcount, recvtype, comm );
317 };
320 template<typename TSEND, typename TRECV>
321 int Alltoallv(
322  TSEND *sendbuf, int *sendcounts, int *sdispls,
323  TRECV *recvbuf, int *recvcounts, int *rdispls, Comm comm )
324 {
325  Datatype sendtype = GetMPIDatatype<TSEND>();
326  Datatype recvtype = GetMPIDatatype<TRECV>();
327  return Alltoallv(
328  sendbuf, sendcounts, sdispls, sendtype,
329  recvbuf, recvcounts, rdispls, recvtype, comm );
330 };
337 #ifdef HMLP_MIC_AVX512
338 
339 template<class T, class Allocator = hbw::allocator<T> >
340 #else
341 
342 template<class T, class Allocator = std::allocator<T> >
343 #endif
345  std::vector<T, Allocator> &bufvector, int dest, int tag, Comm comm )
346 {
347  Datatype datatype = GetMPIDatatype<T>();
348  size_t count = bufvector.size();
349 
351  //printf( "beg send count %lu to %d\n", count, dest );
352  Send( &count, 1, dest, tag, comm );
353  //printf( "end send count %lu to %d\n", count, dest );
354 
356  return Send( bufvector.data(), count, dest, tag, comm );
357 
358 };
366 #ifdef HMLP_MIC_AVX512
367 
368 template<class T, class Allocator = hbw::allocator<T> >
369 #else
370 
371 template<class T, class Allocator = std::allocator<T> >
372 #endif
374  std::vector<T, Allocator> &bufvector, int source, int tag, Comm comm, Status *status )
375 {
376  Datatype datatype = GetMPIDatatype<T>();
377  size_t count = 0;
378 
380  //printf( "beg recv count %lu from %d\n", count, source );
381  Recv( &count, 1, source, tag, comm, status );
382  //printf( "end recv count %lu from %d\n", count, source );
383 
385  bufvector.resize( count );
386 
388  return Recv( bufvector.data(), count, source, tag, comm, status );
389 
390 };
394 #ifdef HMLP_MIC_AVX512
395 
396 template<class T, class Allocator = hbw::allocator<T> >
397 #else
398 
399 template<class T, class Allocator = std::allocator<T> >
400 #endif
402  vector<T, Allocator> &sendvector, int dest, int sendtag,
403  vector<T, Allocator> &recvvector, int source, int recvtag,
404  Comm comm, Status *status )
405 {
406  Datatype datatype = GetMPIDatatype<T>();
407  size_t send_size = sendvector.size();
408  size_t recv_size = 0;
409 
411  Sendrecv(
412  &send_size, 1, dest, sendtag,
413  &recv_size, 1, source, recvtag,
414  comm, status );
416  recvvector.resize( recv_size );
418  return Sendrecv(
419  sendvector.data(), send_size, dest, sendtag,
420  recvvector.data(), recv_size, source, recvtag,
421  comm, status );
422 
423 };
426 template<typename T>
427 int GatherVector( vector<T> &sendvector, vector<T> &recvvector, int root, Comm comm )
428 {
429  int size = 0; Comm_size( comm, &size );
430  int rank = 0; Comm_rank( comm, &rank );
431 
432  Datatype datatype = GetMPIDatatype<T>();
433  int send_size = sendvector.size();
434  vector<int> recv_sizes( size, 0 );
435  vector<int> rdispls( size + 1, 0 );
436 
438  Gather( &send_size, 1, recv_sizes.data(), 1, root, comm );
440  for ( int i = 1; i < size + 1; i ++ )
441  rdispls[ i ] = rdispls[ i - 1 ] + recv_sizes[ i - 1 ];
443  recvvector.resize( rdispls[ size ] );
445  return Gatherv( sendvector.data(), send_size,
446  recvvector.data(), recv_sizes.data(), rdispls.data(), root, comm );
447 };
453 #ifdef HMLP_MIC_AVX512
454 
455 template<class T, class Allocator = hbw::allocator<T> >
456 #else
457 
458 template<class T, class Allocator = std::allocator<T> >
459 #endif
461  const vector<vector<T, Allocator>>& sendvector,
462  vector<vector<T, Allocator>>& recvvector, Comm comm )
463 {
464  int size = 0; Comm_size( comm, &size );
465  int rank = 0; Comm_rank( comm, &rank );
466 
467  assert( sendvector.size() == size );
468  assert( recvvector.size() == size );
469 
470  vector<T> sendbuf;
471  vector<T> recvbuf;
472  vector<int> sendcounts( size, 0 );
473  vector<int> recvcounts( size, 0 );
474  vector<int> sdispls( size + 1, 0 );
475  vector<int> rdispls( size + 1, 0 );
476 
477  //printf( "rank %d ", rank );
478  for ( size_t p = 0; p < size; p ++ )
479  {
480  sendcounts[ p ] = sendvector[ p ].size();
481  sdispls[ p + 1 ] = sdispls[ p ] + sendcounts[ p ];
482  sendbuf.insert( sendbuf.end(),
483  sendvector[ p ].begin(),
484  sendvector[ p ].end() );
485  //printf( "%d ", sendcounts[ j ] );
486  }
487  //printf( "\n" );
488 
489 
490  //printf( "before Alltoall (mpi size = %d)\n", size ); fflush( stdout );
491  //Barrier( comm );
492 
494  Alltoall( sendcounts.data(), 1, recvcounts.data(), 1, comm );
495 
496 
497  //printf( "after Alltoall\n" ); fflush( stdout );
498  //Barrier( comm );
499 
500  //printf( "rank %d ", rank );
501  size_t total_recvcount = 0;
502  for ( size_t p = 0; p < size; p ++ )
503  {
504  rdispls[ p + 1 ] = rdispls[ p ] + recvcounts[ p ];
505  total_recvcount += recvcounts[ p ];
506  //printf( "%d ", recvcounts[ j ] );
507  }
508  //printf( "\n" );
509 
511  recvbuf.resize( total_recvcount );
512 
513 
514  //printf( "before Alltoall2 recvbuf.size() %lu\n", recvbuf.size() ); fflush( stdout );
515  //Barrier( comm );
516 
517 
518  Alltoallv(
519  sendbuf.data(), sendcounts.data(), sdispls.data(),
520  recvbuf.data(), recvcounts.data(), rdispls.data(), comm );
521 
522 
523  //printf( "after Alltoall2\n" ); fflush( stdout );
524  //Barrier( comm );
525 
526 
527  recvvector.resize( size );
528  for ( size_t p = 0; p < size; p ++ )
529  {
530  recvvector[ p ].insert( recvvector[ p ].end(),
531  recvbuf.begin() + rdispls[ p ],
532  recvbuf.begin() + rdispls[ p + 1 ] );
533  }
534 
535  return 0;
536 
537 };
546 //template<typename T>
547 //int AlltoallVector(
548 // std::vector<hmlp::Data<T>> &sendvector,
549 // std::vector<hmlp::Data<T>> &recvvector, Comm comm )
550 //{
551 // int size = 0;
552 // int rank = 0;
553 // Comm_size( comm, &size );
554 // Comm_rank( comm, &rank );
555 //
556 // assert( sendvector.size() == size );
557 // assert( recvvector.size() == size );
558 //
559 // std::vector<T> sendbuf;
560 // std::vector<T> recvbuf;
561 // std::vector<int> sendcounts( size, 0 );
562 // std::vector<int> recvcounts( size, 0 );
563 // std::vector<int> sdispls( size + 1, 0 );
564 // std::vector<int> rdispls( size + 1, 0 );
565 //
566 //
567 // for ( size_t p = 0; p < size; p ++ )
568 // {
569 // sendcounts[ p ] = sendvector[ p ].size();
570 // sdispls[ p + 1 ] = sdispls[ p ] + sendcounts[ p ];
571 // sendbuf.insert( sendbuf.end(),
572 // sendvector[ p ].begin(),
573 // sendvector[ p ].end() );
574 // }
575 //
576 // /** exchange sendcount */
577 // Alltoall( sendcounts.data(), 1, recvcounts.data(), 1, comm );
578 //
579 // /** compute total receiving count */
580 // size_t total_recvcount = 0;
581 // for ( size_t p = 0; p < size; p ++ )
582 // {
583 // rdispls[ p + 1 ] = rdispls[ p ] + recvcounts[ p ];
584 // total_recvcount += recvcounts[ p ];
585 // }
586 //
587 // /** resize receving buffer */
588 // recvbuf.resize( total_recvcount );
589 //
590 //
591 //
592 //
593 //
594 //
595 //}; /** end AlltoallVector() */
596 
597 
598 
599 
600 };
601 };
603 #endif
Definition: mpi_prototypes.h:81
Definition: hmlp_mpi.hpp:152
int Comm_rank(Comm comm, int *rank)
Definition: hmlp_mpi.cpp:128
int Init_thread(int *argc, char ***argv, int required, int *provided)
Definition: hmlp_mpi.cpp:351
Definition: hmlp_mpi.hpp:89
int RecvVector(std::vector< T, Allocator > &bufvector, int source, int tag, Comm comm, Status *status)
This is a short hand for receving a vector, which involves two MPI_Recv() calls.
Definition: hmlp_mpi.hpp:373
int GatherVector(vector< T > &sendvector, vector< T > &recvvector, int root, Comm comm)
Definition: hmlp_mpi.hpp:427
int ExchangeVector(vector< T, Allocator > &sendvector, int dest, int sendtag, vector< T, Allocator > &recvvector, int source, int recvtag, Comm comm, Status *status)
Definition: hmlp_mpi.hpp:401
int Iprobe(int source, int tag, Comm comm, int *flag, Status *status)
Definition: hmlp_mpi.cpp:373
int SendVector(std::vector< T, Allocator > &bufvector, int dest, int tag, Comm comm)
This is a short hand for sending a vector, which involves two MPI_Send() calls.
Definition: hmlp_mpi.hpp:344
void PrintProgress(const char *s, mpi::Comm comm)
Definition: hmlp_mpi.cpp:384
int AlltoallVector(const vector< vector< T, Allocator >> &sendvector, vector< vector< T, Allocator >> &recvvector, Comm comm)
Definition: hmlp_mpi.hpp:460
int Test(Request *request, int *flag, Status *status)
Definition: hmlp_mpi.cpp:180
Definition: gofmm.hpp:83
int Init(int *argc, char ***argv)
Definition: hmlp_mpi.cpp:13
int Comm_size(Comm comm, int *size)
Definition: hmlp_mpi.cpp:117