HMLP: High-performance Machine Learning Primitives
runtime.hpp
1 
22 #ifndef HMLP_RUNTIME_HPP
23 #define HMLP_RUNTIME_HPP
24 
25 #include <time.h>
26 #include <tuple>
27 #include <algorithm>
28 #include <vector>
29 #include <deque>
30 #include <map>
31 #include <unordered_map>
32 #include <limits>
33 #include <cstdint>
34 #include <cassert>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <iostream>
38 #include <cstddef>
39 #include <omp.h>
40 
41 //#ifdef USE_INTEL
42 //#include <mkl.h>
43 //#endif
44 
45 #ifdef USE_PTHREAD_RUNTIME
46 #include <pthread.h>
47 #endif
48 
49 #include <base/thread.hpp>
50 #include <base/tci.hpp>
51 #include <hmlp_mpi.hpp>
52 
53 #define MAX_WORKER 68
54 
55 
56 using namespace std;
57 
58 
59 
60 
61 
62 
63 namespace hmlp
64 {
65 
66 //typedef enum
67 //{
68 // HMLP_SCHEDULE_DEFAULT,
69 // HMLP_SCHEDULE_ROUND_ROBIN,
70 // HMLP_SCHEDULE_UNIFORM,
71 // HMLP_SCHEDULE_HEFT
72 //} SchedulePolicy;
73 
75 typedef enum { ALLOCATED, NOTREADY, QUEUED, RUNNING, EXECUTED, DONE, CANCELLED } TaskStatus;
76 
78 typedef enum { R, W, RW } ReadWriteType;
79 
80 
82 // * class Lock
83 // */
84 //
86 //class Lock
87 //{
88 // public:
89 //
90 // Lock();
91 //
92 // ~Lock();
93 //
94 // void Acquire();
95 //
96 // void Release();
97 //
98 // private:
99 //#ifdef USE_PTHREAD_RUNTIME
100 // pthread_mutex_t lock;
101 //#else
102 // omp_lock_t lock;
103 //#endif
104 //}; /** end class Lock */
105 //
106 //
107 
113 class Event
114 {
115  public:
116 
117  Event();
118 
119  void Set( string, double, double );
120 
121  void AddFlopsMops( double, double );
122 
123  void Begin( size_t );
124 
125  double GetBegin();
126 
127  double GetEnd();
128 
129  double GetDuration();
130 
131  double GetFlops();
132 
133  double GetMops();
134 
135  double GflopsPerSecond();
136 
137  void Normalize( double shift );
138 
139  void Terminate();
140 
141  void Print();
142 
143  void Timeline( bool isbeg, size_t tag );
144 
145  void MatlabTimeline( FILE *pFile );
146 
147  private:
148 
149  size_t tid = 0;
150 
151  string label;
152 
153  double flops = 0.0;
154 
155  double mops = 0.0;
156 
157  double beg = 0.0;
158 
159  double end = 0.0;
160 
161  double sec = 0.0;
162 
163 };
174 class Task
175 {
176  public:
177 
178  Task();
179 
180  ~Task();
181 
182  class Worker *worker = NULL;
183 
184  string name;
185 
186  string label;
187 
188  int taskid;
189 
190  float cost = 0;
191 
192  bool priority = false;
193 
194  Event event;
195 
196  TaskStatus GetStatus();
197 
198  void SetStatus( TaskStatus status );
199  void SetBatchStatus( TaskStatus status );
200 
201  void Submit();
202 
203 
204  virtual void Set( string user_name, void (*user_function)(Task*), void *user_arg );
205 
206  virtual void Prefetch( Worker* );
207 
208  void Enqueue();
209 
210  void Enqueue( size_t tid );
211 
212  bool TryEnqueue();
213 
214  void ForceEnqueue( size_t tid );
215 
216  void CallBackWhileWaiting();
217 
218  virtual void Execute( Worker* ) = 0;
219 
220  virtual void GetEventRecord();
221 
222  virtual void DependencyAnalysis();
223 
224  /* function ptr */
225  void (*function)(Task*);
226 
227  /* function context */
228  void *arg;
229 
230  /* Dependencies related members */
231  volatile int n_dependencies_remaining = 0;
232  void DependenciesUpdate();
233 
235  deque<Task*> in;
236  deque<Task*> out;
237 
239  void Acquire();
240  void Release();
241 
242  Lock* task_lock = NULL;
243 
245  Task *next = NULL;
246 
248  //bool ContextSwitchToNextTask( Worker* );
249 
251  volatile bool stealable = true;
252 
253  volatile int created_by = 0;
254 
255  bool IsNested();
256 
257  private:
258 
259  volatile TaskStatus status;
260 
262  volatile bool is_created_in_epoch_session = false;
263 
264 };
270  int key, int p, hmlp::ReadWriteType type, hmlp::Task *task );
271 
272 
273 
274 
276 template<typename ARGUMENT>
277 class NULLTask : public Task
278 {
279  public:
280 
281  void Set( ARGUMENT *arg ) {};
282 
283  void Execute( Worker* ) {};
284 
285 };
296 class MessageTask : public Task
297 {
298  public:
299 
301  mpi::Comm comm;
302 
304  int tar = 0;
305  int src = 0;
306  int key = 0;
307 
309  MessageTask( int src, int tar, int key );
310 
311  void Submit();
312 };
316 class ListenerTask : public MessageTask
317 {
318  public:
319 
321  ListenerTask( int src, int tar, int key );
322 
323  void Submit();
324 
325  virtual void Listen() = 0;
326 };
330 template<typename T, typename ARG>
331 class SendTask : public MessageTask
332 {
333  public:
334 
335  ARG *arg = NULL;
336 
337  vector<size_t> send_sizes;
338  vector<size_t> send_skels;
339  vector<T> send_buffs;
340 
341  SendTask( ARG *user_arg, int src, int tar, int key ) : MessageTask( src, tar, key )
342  {
343  this->arg = user_arg;
344  };
345 
347  void Set( ARG *user_arg, int src, int tar, int key )
348  {
349  name = string( "Send" );
350  label = to_string( tar );
351  this->arg = user_arg;
352  this->src = src;
353  this->tar = tar;
354  this->key = key;
356  double flops = 0, mops = 0;
358  event.Set( label + name, flops, mops );
360  priority = true;
361  };
362 
363  virtual void Pack() = 0;
364 
365  void Execute( Worker *user_worker )
366  {
367  Pack();
368  mpi::Request req1, req2, req3;
369  mpi::Isend( this->send_sizes.data(), this->send_sizes.size(),
370  this->tar, this->key + 0, this->comm, &req1 );
371  //mpi::Isend( this->send_skels.data(), this->send_skels.size(),
372  // this->tar, this->key + 1, this->comm, &req2 );
373  mpi::Isend( this->send_buffs.data(), this->send_buffs.size(),
374  this->tar, this->key + 2, this->comm, &req3 );
375  };
376 };
379 template<typename T, typename ARG>
380 class RecvTask : public ListenerTask
381 {
382  public:
383 
384  ARG *arg = NULL;
385 
386  vector<size_t> recv_sizes;
387  vector<size_t> recv_skels;
388  vector<T> recv_buffs;
389 
390  RecvTask( ARG *user_arg, int src, int tar, int key )
391  : ListenerTask( src, tar, key ) { this->arg = user_arg; };
392 
394  void Set( ARG *user_arg, int src, int tar, int key )
395  {
396  name = string( "Listener" );
397  label = to_string( src );
398  this->arg = user_arg;
399  this->src = src;
400  this->tar = tar;
401  this->key = key;
403  double flops = 0, mops = 0;
405  event.Set( label + name, flops, mops );
406  };
407 
408  void DependencyAnalysis()
409  {
410  hmlp_msg_dependency_analysis( this->key, this->src, RW, this );
411  };
412 
413  void Listen()
414  {
415  int src = this->src;
416  int tar = this->tar;
417  int key = this->key;
418  mpi::Comm comm = this->comm;
419  int cnt = 0;
420  mpi::Status status;
422  mpi::Probe( src, key + 0, comm, &status );
423  mpi::Get_count( &status, HMLP_MPI_SIZE_T, &cnt );
424  recv_sizes.resize( cnt );
425  mpi::Recv( recv_sizes.data(), cnt, src, key + 0, comm, &status );
427  cnt = 0;
428  for ( auto c : recv_sizes ) cnt += c;
429  recv_buffs.resize( cnt );
431  mpi::Recv( recv_buffs.data(), cnt, src, key + 2, comm, &status );
432  };
433 
434  virtual void Unpack() = 0;
435 
436  void Execute( Worker *user_worker ) { Unpack(); };
437 
438 };
445 template<typename ARG>
446 void RecuTaskSubmit( ARG *arg ) { };
447 
448 
450 template<typename ARG, typename TASK, typename... Args>
451 void RecuTaskSubmit( ARG *arg, TASK& dummy, Args&... dummyargs )
452 {
453  using NULLTASK = NULLTask<ARG>;
455  if ( !std::is_same<NULLTASK, TASK>::value )
456  {
457  auto task = new TASK();
458  task->Submit();
459  task->Set( arg );
460  task->DependencyAnalysis();
461  }
463  RecuTaskSubmit( arg, dummyargs... );
464 };
468 template<typename ARG>
469 void RecuTaskExecute( ARG *arg ) { };
470 
471 
473 template<typename ARG, typename TASK, typename... Args>
474 void RecuTaskExecute( ARG *arg, TASK& dummy, Args&... dummyargs )
475 {
476  using NULLTASK = NULLTask<ARG>;
478  if ( !std::is_same<NULLTASK, TASK>::value )
479  {
480  auto *task = new TASK();
481  task->Set( arg );
482  task->Execute( NULL );
483  delete task;
484  }
486  RecuTaskExecute( arg, dummyargs... );
487 };
499 {
500  public:
501 
502  ReadWrite();
503 
505  deque<Task*> read;
506 
508  deque<Task*> write;
509 
510  void DependencyAnalysis( ReadWriteType type, Task *task );
511 
512  void DependencyCleanUp();
513 
514  private:
515 
516 };
528 {
529  public:
530 
531  MatrixReadWrite();
532 
533  void Setup( size_t m, size_t n );
534 
535  bool HasBeenSetup();
536 
537  void DependencyAnalysis( size_t i, size_t j, ReadWriteType type, Task *task );
538 
539  void DependencyCleanUp();
540 
541  private:
542 
543  bool has_been_setup = false;
544 
545  size_t m = 0;
546 
547  size_t n = 0;
548 
549  vector<vector<ReadWrite> > Submatrices;
550 
551 };
562 class Scheduler : public mpi::MPIObject
563 {
564  public:
565 
566  Scheduler( mpi::Comm comm );
567 
568  ~Scheduler();
569 
570  void Init( int n_worker );
571 
572  void Finalize();
573 
574  int n_worker = 0;
575 
576  size_t timeline_tag;
577 
578  double timeline_beg;
579 
581  deque<Task*> ready_queue[ MAX_WORKER ];
583  //deque<Task*> tasklist;
585  Lock ready_queue_lock[ MAX_WORKER ];
586 
588  deque<Task*> nested_queue[ MAX_WORKER ];
590  //deque<Task*> nested_tasklist;
592  Lock nested_queue_lock[ MAX_WORKER ];
593 
594 
596  vector<unordered_map<int, ListenerTask*>> listener_tasklist;
597  Lock listener_queue_lock;
598 
599  float time_remaining[ MAX_WORKER ];
600 
601  void ReportRemainingTime();
602 
604  static void DependencyAdd( Task *source, Task *target );
605 
606  void MessageDependencyAnalysis( int key, int p, ReadWriteType type, Task *task );
607 
608  void NewTask( Task *task );
609 
610  void NewMessageTask( MessageTask *task );
611 
612  void NewListenerTask( ListenerTask *task );
613 
614  void ExecuteNestedTasksWhileWaiting( Worker *me, Task *waiting_task );
615 
616  void Summary();
617 
618 
619  private:
620 
622  static void* EntryPoint( void* );
623 
625  deque<Task*> tasklist;
626 
628  deque<Task*> nested_tasklist;
629 
631  Lock tasklist_lock;
632 
634  int n_task_completed = 0;
635  int n_nested_task_completed = 0;
636 
638  Lock n_task_lock;
639 
640  vector<Task*> DispatchFromNormalQueue( int tid );
641 
642  vector<Task*> DispatchFromNestedQueue( int tid );
643 
644  vector<Task*> StealFromOther();
645 
646  Task *StealFromQueue( size_t target );
647 
648  bool ConsumeTasks( Worker *me, vector<Task*> &batch );
649 
650  bool ConsumeTasks( Worker *me, Task *batch, bool is_nested );
651 
652  bool IsTimeToExit( int tid );
653 
654  void Listen( Worker* );
655 
656  Lock task_lock[ 2 * MAX_WORKER ];
657 
658  Lock run_lock[ MAX_WORKER ];
659 
660  Lock pci_lock;
661 
662  Lock gpu_lock;
663 
665  unordered_map<int, vector<ReadWrite>> msg_dependencies;
666 
668  bool do_terminate = false;
669 
671  bool has_ibarrier = false;
672 
674  mpi::Request ibarrier_request;
675 
677  int ibarrier_consensus = 0;
678 };
689 class RunTime
690 {
691  public:
692 
693  RunTime();
694 
695  ~RunTime();
696 
697  hmlpError_t init( mpi::Comm comm );
698 
699  hmlpError_t init( int *argc, char ***argv, mpi::Comm comm );
700 
701  hmlpError_t run();
702 
703  hmlpError_t finalize();
704 
706  bool isInit() const noexcept;
707 
709  bool isInEpochSession() const noexcept;
710 
712  void ExecuteNestedTasksWhileWaiting( Task *waiting_task );
713 
714  hmlpError_t setNumberOfWorkers( int num_of_workers ) noexcept;
715 
716  int getNumberOfWorkers() const noexcept;
717 
718  thread_communicator *mycomm;
719 
720  class Worker workers[ MAX_WORKER ];
721 
722  class Device host;
723 
724  class Device* device[ 1 ];
725 
726  Scheduler *scheduler;
727 
728  private:
729 
731  int num_of_workers_ = 1;
732  int num_of_max_workers_ = 1;
734  int* argc = NULL;
736  char*** argv = NULL;
738  bool is_init_ = false;
740  bool is_mpi_init_by_hmlp_ = false;
742  bool is_in_epoch_session_ = false;
744  void Print( string msg );
746  void ExitWithError( string msg );
747 
748 };
750 };
752 hmlp::RunTime *hmlp_get_runtime_handle();
753 
754 hmlp::Device *hmlp_get_device( int i );
755 
756 int hmlp_get_mpi_rank();
757 
758 int hmlp_get_mpi_size();
759 
760 //void hmlp_msg_dependency_analysis(
761 // int key, int p, hmlp::ReadWriteType type, hmlp::Task *task );
762 
763 
764 #endif
Definition: runtime.hpp:562
vector< unordered_map< int, ListenerTask * > > listener_tasklist
Definition: runtime.hpp:596
Definition: mpi_prototypes.h:81
Definition: runtime.hpp:331
void RecuTaskSubmit(ARG *arg)
Recursive task sibmission (base case).
Definition: runtime.hpp:446
deque< Task * > write
Definition: runtime.hpp:508
void hmlp_msg_dependency_analysis(int key, int p, ReadWriteType type, Task *task)
Definition: runtime.cpp:1485
Definition: runtime.hpp:380
This class provides the ability to perform dependency analysis.
Definition: runtime.hpp:498
This task is the abstraction for all tasks handled by Listeners.
Definition: runtime.hpp:316
Definition: hmlp_mpi.hpp:89
Definition: thread.hpp:107
This class creates 2D grids for 2D matrix partition.
Definition: runtime.hpp:527
void Listen()
Definition: runtime.hpp:413
void Set(ARG *user_arg, int src, int tar, int key)
Definition: runtime.hpp:394
deque< Task * > in
Definition: runtime.hpp:235
This task is designed to take care MPI communications.
Definition: runtime.hpp:296
Wrapper for omp or pthread mutex.
Definition: tci.hpp:50
This is a specific type of task that represents NOP.
Definition: runtime.hpp:277
void RecuTaskExecute(ARG *arg)
Recursive task execution (base case).
Definition: runtime.hpp:469
void Set(ARG *user_arg, int src, int tar, int key)
Definition: runtime.hpp:347
deque< Task * > read
Definition: runtime.hpp:505
RunTime is statically created in hmlp_runtime.cpp.
Definition: runtime.hpp:689
This class describes devices or accelerators that require a master thread to control. A device can accept tasks from multiple workers. All received tasks are expected to be executed independently in a time-sharing fashion. Whether these tasks are executed in parallel, sequential or with some built-in context switching scheme does not matter.
Definition: device.hpp:125
Wrapper for omp or pthread mutex.
Definition: runtime.hpp:113
Definition: gofmm.hpp:83
Definition: runtime.hpp:174
mpi::Comm comm
Definition: runtime.hpp:301
Definition: thread.hpp:166