22 #ifndef HMLP_RUNTIME_HPP 23 #define HMLP_RUNTIME_HPP 31 #include <unordered_map> 45 #ifdef USE_PTHREAD_RUNTIME 49 #include <base/thread.hpp> 50 #include <base/tci.hpp> 51 #include <hmlp_mpi.hpp> 75 typedef enum { ALLOCATED, NOTREADY, QUEUED, RUNNING, EXECUTED, DONE, CANCELLED } TaskStatus;
78 typedef enum { R, W, RW } ReadWriteType;
119 void Set(
string,
double,
double );
121 void AddFlopsMops(
double,
double );
123 void Begin(
size_t );
129 double GetDuration();
135 double GflopsPerSecond();
137 void Normalize(
double shift );
143 void Timeline(
bool isbeg,
size_t tag );
145 void MatlabTimeline( FILE *pFile );
182 class Worker *worker = NULL;
192 bool priority =
false;
196 TaskStatus GetStatus();
198 void SetStatus( TaskStatus status );
199 void SetBatchStatus( TaskStatus status );
204 virtual void Set(
string user_name,
void (*user_function)(
Task*),
void *user_arg );
206 virtual void Prefetch(
Worker* );
210 void Enqueue(
size_t tid );
214 void ForceEnqueue(
size_t tid );
216 void CallBackWhileWaiting();
218 virtual void Execute(
Worker* ) = 0;
220 virtual void GetEventRecord();
222 virtual void DependencyAnalysis();
225 void (*
function)(
Task*);
231 volatile int n_dependencies_remaining = 0;
232 void DependenciesUpdate();
242 Lock* task_lock = NULL;
251 volatile bool stealable =
true;
253 volatile int created_by = 0;
259 volatile TaskStatus status;
262 volatile bool is_created_in_epoch_session =
false;
270 int key,
int p, hmlp::ReadWriteType type,
hmlp::Task *task );
276 template<
typename ARGUMENT>
281 void Set( ARGUMENT *arg ) {};
283 void Execute(
Worker* ) {};
325 virtual void Listen() = 0;
330 template<
typename T,
typename ARG>
337 vector<size_t> send_sizes;
338 vector<size_t> send_skels;
339 vector<T> send_buffs;
343 this->arg = user_arg;
347 void Set( ARG *user_arg,
int src,
int tar,
int key )
349 name = string(
"Send" );
350 label = to_string( tar );
351 this->arg = user_arg;
356 double flops = 0, mops = 0;
358 event.Set( label + name, flops, mops );
363 virtual void Pack() = 0;
365 void Execute(
Worker *user_worker )
368 mpi::Request req1, req2, req3;
369 mpi::Isend( this->send_sizes.data(), this->send_sizes.size(),
370 this->tar, this->key + 0, this->comm, &req1 );
373 mpi::Isend( this->send_buffs.data(), this->send_buffs.size(),
374 this->tar, this->key + 2, this->comm, &req3 );
379 template<
typename T,
typename ARG>
386 vector<size_t> recv_sizes;
387 vector<size_t> recv_skels;
388 vector<T> recv_buffs;
390 RecvTask( ARG *user_arg,
int src,
int tar,
int key )
391 :
ListenerTask( src, tar, key ) { this->arg = user_arg; };
394 void Set( ARG *user_arg,
int src,
int tar,
int key )
396 name = string(
"Listener" );
397 label = to_string( src );
398 this->arg = user_arg;
403 double flops = 0, mops = 0;
405 event.Set( label + name, flops, mops );
408 void DependencyAnalysis()
418 mpi::Comm comm = this->comm;
422 mpi::Probe( src, key + 0, comm, &status );
423 mpi::Get_count( &status, HMLP_MPI_SIZE_T, &cnt );
424 recv_sizes.resize( cnt );
425 mpi::Recv( recv_sizes.data(), cnt, src, key + 0, comm, &status );
428 for (
auto c : recv_sizes ) cnt += c;
429 recv_buffs.resize( cnt );
431 mpi::Recv( recv_buffs.data(), cnt, src, key + 2, comm, &status );
434 virtual void Unpack() = 0;
436 void Execute(
Worker *user_worker ) { Unpack(); };
445 template<
typename ARG>
450 template<
typename ARG,
typename TASK,
typename... Args>
455 if ( !std::is_same<NULLTASK, TASK>::value )
457 auto task =
new TASK();
460 task->DependencyAnalysis();
468 template<
typename ARG>
473 template<
typename ARG,
typename TASK,
typename... Args>
478 if ( !std::is_same<NULLTASK, TASK>::value )
480 auto *task =
new TASK();
482 task->Execute( NULL );
510 void DependencyAnalysis( ReadWriteType type,
Task *task );
512 void DependencyCleanUp();
533 void Setup(
size_t m,
size_t n );
537 void DependencyAnalysis(
size_t i,
size_t j, ReadWriteType type,
Task *task );
539 void DependencyCleanUp();
543 bool has_been_setup =
false;
549 vector<vector<ReadWrite> > Submatrices;
570 void Init(
int n_worker );
581 deque<Task*> ready_queue[ MAX_WORKER ];
585 Lock ready_queue_lock[ MAX_WORKER ];
588 deque<Task*> nested_queue[ MAX_WORKER ];
592 Lock nested_queue_lock[ MAX_WORKER ];
597 Lock listener_queue_lock;
599 float time_remaining[ MAX_WORKER ];
601 void ReportRemainingTime();
604 static void DependencyAdd(
Task *source,
Task *target );
606 void MessageDependencyAnalysis(
int key,
int p, ReadWriteType type,
Task *task );
608 void NewTask(
Task *task );
614 void ExecuteNestedTasksWhileWaiting(
Worker *me,
Task *waiting_task );
622 static void* EntryPoint(
void* );
625 deque<Task*> tasklist;
628 deque<Task*> nested_tasklist;
634 int n_task_completed = 0;
635 int n_nested_task_completed = 0;
640 vector<Task*> DispatchFromNormalQueue(
int tid );
642 vector<Task*> DispatchFromNestedQueue(
int tid );
644 vector<Task*> StealFromOther();
646 Task *StealFromQueue(
size_t target );
648 bool ConsumeTasks(
Worker *me, vector<Task*> &batch );
650 bool ConsumeTasks(
Worker *me,
Task *batch,
bool is_nested );
652 bool IsTimeToExit(
int tid );
656 Lock task_lock[ 2 * MAX_WORKER ];
658 Lock run_lock[ MAX_WORKER ];
665 unordered_map<int, vector<ReadWrite>> msg_dependencies;
668 bool do_terminate =
false;
671 bool has_ibarrier =
false;
674 mpi::Request ibarrier_request;
677 int ibarrier_consensus = 0;
697 hmlpError_t init( mpi::Comm comm );
699 hmlpError_t init(
int *argc,
char ***argv, mpi::Comm comm );
703 hmlpError_t finalize();
706 bool isInit()
const noexcept;
709 bool isInEpochSession()
const noexcept;
712 void ExecuteNestedTasksWhileWaiting(
Task *waiting_task );
714 hmlpError_t setNumberOfWorkers(
int num_of_workers ) noexcept;
716 int getNumberOfWorkers()
const noexcept;
720 class Worker workers[ MAX_WORKER ];
724 class Device* device[ 1 ];
731 int num_of_workers_ = 1;
732 int num_of_max_workers_ = 1;
738 bool is_init_ =
false;
740 bool is_mpi_init_by_hmlp_ =
false;
742 bool is_in_epoch_session_ =
false;
744 void Print(
string msg );
746 void ExitWithError(
string msg );
756 int hmlp_get_mpi_rank();
758 int hmlp_get_mpi_size();
Definition: runtime.hpp:562
vector< unordered_map< int, ListenerTask * > > listener_tasklist
Definition: runtime.hpp:596
Definition: mpi_prototypes.h:81
Definition: runtime.hpp:331
void RecuTaskSubmit(ARG *arg)
Recursive task sibmission (base case).
Definition: runtime.hpp:446
deque< Task * > write
Definition: runtime.hpp:508
void hmlp_msg_dependency_analysis(int key, int p, ReadWriteType type, Task *task)
Definition: runtime.cpp:1485
Definition: runtime.hpp:380
This class provides the ability to perform dependency analysis.
Definition: runtime.hpp:498
This task is the abstraction for all tasks handled by Listeners.
Definition: runtime.hpp:316
Definition: hmlp_mpi.hpp:89
Definition: thread.hpp:107
This class creates 2D grids for 2D matrix partition.
Definition: runtime.hpp:527
void Listen()
Definition: runtime.hpp:413
void Set(ARG *user_arg, int src, int tar, int key)
Definition: runtime.hpp:394
deque< Task * > in
Definition: runtime.hpp:235
This task is designed to take care MPI communications.
Definition: runtime.hpp:296
Wrapper for omp or pthread mutex.
Definition: tci.hpp:50
This is a specific type of task that represents NOP.
Definition: runtime.hpp:277
void RecuTaskExecute(ARG *arg)
Recursive task execution (base case).
Definition: runtime.hpp:469
void Set(ARG *user_arg, int src, int tar, int key)
Definition: runtime.hpp:347
deque< Task * > read
Definition: runtime.hpp:505
RunTime is statically created in hmlp_runtime.cpp.
Definition: runtime.hpp:689
This class describes devices or accelerators that require a master thread to control. A device can accept tasks from multiple workers. All received tasks are expected to be executed independently in a time-sharing fashion. Whether these tasks are executed in parallel, sequential or with some built-in context switching scheme does not matter.
Definition: device.hpp:125
Wrapper for omp or pthread mutex.
Definition: runtime.hpp:113
Definition: runtime.hpp:174
mpi::Comm comm
Definition: runtime.hpp:301
Definition: thread.hpp:166