GCC Code Coverage Report
Directory: . Exec Total Coverage
File: frame/base/runtime.hpp Lines: 0 49 0.0 %
Date: 2019-01-14 Branches: 0 93 0.0 %

Line Exec Source
1
/**
2
 *  HMLP (High-Performance Machine Learning Primitives)
3
 *
4
 *  Copyright (C) 2014-2017, The University of Texas at Austin
5
 *
6
 *  This program is free software: you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation, either version 3 of the License, or
9
 *  (at your option) any later version.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program. If not, see the LICENSE file.
18
 *
19
 **/
20
21
22
#ifndef HMLP_RUNTIME_HPP
23
#define HMLP_RUNTIME_HPP
24
25
#include <time.h>
26
#include <tuple>
27
#include <algorithm>
28
#include <vector>
29
#include <deque>
30
#include <map>
31
#include <unordered_map>
32
#include <limits>
33
#include <cstdint>
34
#include <cassert>
35
#include <stdio.h>
36
#include <stdlib.h>
37
#include <iostream>
38
#include <cstddef>
39
#include <omp.h>
40
41
//#ifdef USE_INTEL
42
//#include <mkl.h>
43
//#endif
44
45
#ifdef USE_PTHREAD_RUNTIME
46
#include <pthread.h>
47
#endif
48
49
#include <base/thread.hpp>
50
#include <base/tci.hpp>
51
#include <hmlp_mpi.hpp>
52
53
#define MAX_WORKER 68
54
55
56
using namespace std;
57
58
59
60
61
62
63
namespace hmlp
64
{
65
66
//typedef enum
67
//{
68
//  HMLP_SCHEDULE_DEFAULT,
69
//  HMLP_SCHEDULE_ROUND_ROBIN,
70
//  HMLP_SCHEDULE_UNIFORM,
71
//  HMLP_SCHEDULE_HEFT
72
//} SchedulePolicy;
73
74
/** @brief */
75
typedef enum { ALLOCATED, NOTREADY, QUEUED, RUNNING, EXECUTED, DONE, CANCELLED } TaskStatus;
76
77
/** @brief */
78
typedef enum { R, W, RW } ReadWriteType;
79
80
81
///**
82
// *  class Lock
83
// */
84
//
85
///** @brief Wrapper for omp or pthread mutex.  */
86
//class Lock
87
//{
88
//  public:
89
//
90
//    Lock();
91
//
92
//    ~Lock();
93
//
94
//    void Acquire();
95
//
96
//    void Release();
97
//
98
//  private:
99
//#ifdef USE_PTHREAD_RUNTIME
100
//    pthread_mutex_t lock;
101
//#else
102
//    omp_lock_t lock;
103
//#endif
104
//}; /** end class Lock */
105
//
106
//
107
108
/**
109
 *  class Event
110
 */
111
112
/** @brief Events are attached to tasks to record specific activities. */
113
class Event
114
{
115
  public:
116
117
    Event();
118
119
    void Set( string, double, double );
120
121
    void AddFlopsMops( double, double );
122
123
    void Begin( size_t );
124
125
    double GetBegin();
126
127
    double GetEnd();
128
129
    double GetDuration();
130
131
    double GetFlops();
132
133
    double GetMops();
134
135
    double GflopsPerSecond();
136
137
    void Normalize( double shift );
138
139
    void Terminate();
140
141
    void Print();
142
143
    void Timeline( bool isbeg, size_t tag );
144
145
    void MatlabTimeline( FILE *pFile );
146
147
  private:
148
149
    size_t tid = 0;
150
151
	  string label;
152
153
    double flops = 0.0;
154
155
    double mops = 0.0;
156
157
    double beg = 0.0;
158
159
    double end = 0.0;
160
161
    double sec = 0.0;
162
163
}; /** end class Event */
164
165
166
167
168
169
/**
170
 *  class Task
171
 */
172
173
/** @brief  */
174
class Task
175
{
176
  public:
177
178
    Task();
179
180
    ~Task();
181
182
    class Worker *worker = NULL;
183
184
    string name;
185
186
    string label;
187
188
    int taskid;
189
190
    float cost = 0;
191
192
    bool priority = false;
193
194
    Event event;
195
196
    TaskStatus GetStatus();
197
198
    void SetStatus( TaskStatus status );
199
    void SetBatchStatus( TaskStatus status );
200
201
    void Submit();
202
203
204
    virtual void Set( string user_name, void (*user_function)(Task*), void *user_arg );
205
206
    virtual void Prefetch( Worker* );
207
208
    void Enqueue();
209
210
    void Enqueue( size_t tid );
211
212
    bool TryEnqueue();
213
214
    void ForceEnqueue( size_t tid );
215
216
    void CallBackWhileWaiting();
217
218
    virtual void Execute( Worker* ) = 0;
219
220
    virtual void GetEventRecord();
221
222
    virtual void DependencyAnalysis();
223
224
    /* function ptr */
225
    void (*function)(Task*);
226
227
    /* function context */
228
    void *arg;
229
230
    /* Dependencies related members */
231
    volatile int n_dependencies_remaining = 0;
232
    void DependenciesUpdate();
233
234
    /** Read/write sets for dependency analysis */
235
    deque<Task*> in;
236
    deque<Task*> out;
237
238
    /** Task lock */
239
    void Acquire();
240
    void Release();
241
242
    Lock* task_lock = NULL;
243
244
    /** The next task in the batch job */
245
    Task *next = NULL;
246
247
    /** Preserve the current task in the call stack and context switch. */
248
    //bool ContextSwitchToNextTask( Worker* );
249
250
    /** If true, this task can be stolen */
251
    volatile bool stealable = true;
252
253
    volatile int created_by = 0;
254
255
    bool IsNested();
256
257
  private:
258
259
    volatile TaskStatus status;
260
261
    /** If true, then this is a nested task */
262
    volatile bool is_created_in_epoch_session = false;
263
264
}; /** end class Task */
265
266
267
268
269
void hmlp_msg_dependency_analysis(
270
    int key, int p, hmlp::ReadWriteType type, hmlp::Task *task );
271
272
273
274
275
/** @brief This is a specific type of task that represents NOP. */
276
template<typename ARGUMENT>
277
class NULLTask : public Task
278
{
279
  public:
280
281
    void Set( ARGUMENT *arg ) {};
282
283
    void Execute( Worker* ) {};
284
285
}; /** end class NULLTask */
286
287
288
289
290
291
/**
292
 *  class MessageTask
293
 */
294
295
/** @brief This task is designed to take care MPI communications. */
296
class MessageTask : public Task
297
{
298
  public:
299
300
    /** MPI communicator will be provided during Submit(). */
301
    mpi::Comm comm;
302
303
    /** Provided during the construction. */
304
    int tar = 0;
305
    int src = 0;
306
    int key = 0;
307
308
    /** (Default) constructor. */
309
    MessageTask( int src, int tar, int key );
310
311
    void Submit();
312
}; /** end class MessageTask */
313
314
315
/** @brief This task is the abstraction for all tasks handled by Listeners. */
316
class ListenerTask : public MessageTask
317
{
318
  public:
319
320
    /** (Default) constructor. */
321
    ListenerTask( int src, int tar, int key );
322
323
    void Submit();
324
325
    virtual void Listen() = 0;
326
}; /** end class ListenerTask */
327
328
329
/** @brief  */
330
template<typename T, typename ARG>
331
class SendTask : public MessageTask
332
{
333
  public:
334
335
    ARG *arg = NULL;
336
337
    vector<size_t> send_sizes;
338
    vector<size_t> send_skels;
339
    vector<T>      send_buffs;
340
341
    SendTask( ARG *user_arg, int src, int tar, int key ) : MessageTask( src, tar, key )
342
    {
343
      this->arg = user_arg;
344
    };
345
346
    /** Override Set() */
347
    void Set( ARG *user_arg, int src, int tar, int key )
348
    {
349
      name = string( "Send" );
350
      label = to_string( tar );
351
      this->arg = user_arg;
352
      this->src = src;
353
      this->tar = tar;
354
      this->key = key;
355
      /** Compute FLOPS and MOPS */
356
      double flops = 0, mops = 0;
357
      /** Setup the event */
358
      event.Set( label + name, flops, mops );
359
      /** "HIGH" priority */
360
      priority = true;
361
    };
362
363
    virtual void Pack() = 0;
364
365
    void Execute( Worker *user_worker )
366
    {
367
      Pack();
368
      mpi::Request req1, req2, req3;
369
      mpi::Isend( this->send_sizes.data(), this->send_sizes.size(),
370
          this->tar, this->key + 0, this->comm, &req1 );
371
      //mpi::Isend( this->send_skels.data(), this->send_skels.size(),
372
      //    this->tar, this->key + 1, this->comm, &req2 );
373
      mpi::Isend( this->send_buffs.data(), this->send_buffs.size(),
374
          this->tar, this->key + 2, this->comm, &req3 );
375
    };
376
}; /** end class SendTask */
377
378
379
template<typename T, typename ARG>
380
class RecvTask : public ListenerTask
381
{
382
	public:
383
384
    ARG *arg = NULL;
385
386
    vector<size_t> recv_sizes;
387
    vector<size_t> recv_skels;
388
    vector<T>      recv_buffs;
389
390
    RecvTask( ARG *user_arg, int src, int tar, int key )
391
      : ListenerTask( src, tar, key ) { this->arg = user_arg; };
392
393
    /** Override Set() */
394
    void Set( ARG *user_arg, int src, int tar, int key )
395
    {
396
      name = string( "Listener" );
397
      label = to_string( src );
398
      this->arg = user_arg;
399
      this->src = src;
400
      this->tar = tar;
401
      this->key = key;
402
      /** Compute FLOPS and MOPS */
403
      double flops = 0, mops = 0;
404
      /** Setup the event */
405
      event.Set( label + name, flops, mops );
406
    };
407
408
    void DependencyAnalysis()
409
    {
410
      hmlp_msg_dependency_analysis( this->key, this->src, RW, this );
411
    };
412
413
    void Listen()
414
    {
415
      int src = this->src;
416
      int tar = this->tar;
417
      int key = this->key;
418
      mpi::Comm comm = this->comm;
419
      int cnt = 0;
420
      mpi::Status status;
421
      /** Probe the message that contains recv_sizes */
422
      mpi::Probe( src, key + 0, comm, &status );
423
      mpi::Get_count( &status, HMLP_MPI_SIZE_T, &cnt );
424
      recv_sizes.resize( cnt );
425
      mpi::Recv( recv_sizes.data(), cnt, src, key + 0, comm, &status );
426
      /** Calculate the total size of recv_buffs */
427
      cnt = 0;
428
      for ( auto c : recv_sizes ) cnt += c;
429
      recv_buffs.resize( cnt );
430
      /** Receive recv_buffs as well */
431
      mpi::Recv( recv_buffs.data(), cnt, src, key + 2, comm, &status );
432
    };
433
434
    virtual void Unpack() = 0;
435
436
    void Execute( Worker *user_worker ) { Unpack(); };
437
438
}; /** end class RecvTask */
439
440
441
442
443
444
/** @brief Recursive task sibmission (base case). */
445
template<typename ARG>
446
void RecuTaskSubmit( ARG *arg ) { /** do nothing */ };
447
448
449
/** @brief Recursive task sibmission. */
450
template<typename ARG, typename TASK, typename... Args>
451
void RecuTaskSubmit( ARG *arg, TASK& dummy, Args&... dummyargs )
452
{
453
  using NULLTASK = NULLTask<ARG>;
454
  /** Create the first normal task is it is not a NULLTask. */
455
  if ( !std::is_same<NULLTASK, TASK>::value )
456
  {
457
    auto task = new TASK();
458
    task->Submit();
459
    task->Set( arg );
460
    task->DependencyAnalysis();
461
  }
462
  /** now recurs to Args&... args, types are deduced automatically */
463
  RecuTaskSubmit( arg, dummyargs... );
464
}; /** end RecuDistTaskSubmit() */
465
466
467
/** @brief Recursive task execution (base case). */
468
template<typename ARG>
469
void RecuTaskExecute( ARG *arg ) { /** do nothing */ };
470
471
472
/** @brief Recursive task execution. */
473
template<typename ARG, typename TASK, typename... Args>
474
void RecuTaskExecute( ARG *arg, TASK& dummy, Args&... dummyargs )
475
{
476
  using NULLTASK = NULLTask<ARG>;
477
  /** Create the first normal task is it is not a NULLTask */
478
  if ( !std::is_same<NULLTASK, TASK>::value )
479
  {
480
    auto *task = new TASK();
481
    task->Set( arg );
482
    task->Execute( NULL );
483
    delete task;
484
  }
485
  /** Now recurs to Args&... args, types are deduced automatically */
486
  RecuTaskExecute( arg, dummyargs... );
487
}; /** end RecuDistTaskExecute() */
488
489
490
491
492
493
/**
494
 *  class ReadWrite
495
 */
496
497
/** @brief This class provides the ability to perform dependency analysis. */
498
class ReadWrite
499
{
500
  public:
501
502
    ReadWrite();
503
504
    /** Tracking the read set of the object. */
505
    deque<Task*> read;
506
507
    /** Tracking the write set of the object. */
508
    deque<Task*> write;
509
510
    void DependencyAnalysis( ReadWriteType type, Task *task );
511
512
    void DependencyCleanUp();
513
514
  private:
515
516
}; /** end class ReadWrite */
517
518
519
520
521
522
/**
523
 *  class MatrixReadWrite
524
 */
525
526
/** @brief This class creates 2D grids for 2D matrix partition. */
527
class MatrixReadWrite
528
{
529
  public:
530
531
    MatrixReadWrite();
532
533
    void Setup( size_t m, size_t n );
534
535
    bool HasBeenSetup();
536
537
    void DependencyAnalysis( size_t i, size_t j, ReadWriteType type, Task *task );
538
539
    void DependencyCleanUp();
540
541
  private:
542
543
    bool has_been_setup = false;
544
545
    size_t m = 0;
546
547
    size_t n = 0;
548
549
    vector<vector<ReadWrite> > Submatrices;
550
551
}; /** end class MatrixReadWrite */
552
553
554
555
556
557
/**
558
 *  class Scheduler
559
 */
560
561
/** @brief */
562
class Scheduler : public mpi::MPIObject
563
{
564
  public:
565
566
    Scheduler( mpi::Comm comm );
567
568
    ~Scheduler();
569
570
    void Init( int n_worker );
571
572
    void Finalize();
573
574
    int n_worker = 0;
575
576
    size_t timeline_tag;
577
578
    double timeline_beg;
579
580
    /** Ready queues for normal tasks. */
581
    deque<Task*> ready_queue[ MAX_WORKER ];
582
    /** The tasklist records all tasks created in this epoch. */
583
    //deque<Task*> tasklist;
584
    /** Accessing ready_queue requires exclusive right to avoid race condition. */
585
    Lock ready_queue_lock[ MAX_WORKER ];
586
587
    /** The ready queue for nested tasks. */
588
    deque<Task*> nested_queue[ MAX_WORKER ];
589
    /** The tasklist records all nested tasks created in this epoch. */
590
    //deque<Task*> nested_tasklist;
591
    /** Accessing nested_ready_queue requires exclusive right to avoid race condition. */
592
    Lock nested_queue_lock[ MAX_WORKER ];
593
594
595
    /** The hashmap for asynchronous MPI tasks. */
596
    vector<unordered_map<int, ListenerTask*>> listener_tasklist;
597
    Lock listener_queue_lock;
598
599
    float time_remaining[ MAX_WORKER ];
600
601
    void ReportRemainingTime();
602
603
    /** Manually describe the dependencies */
604
    static void DependencyAdd( Task *source, Task *target );
605
606
    void MessageDependencyAnalysis( int key, int p, ReadWriteType type, Task *task );
607
608
    void NewTask( Task *task );
609
610
    void NewMessageTask( MessageTask *task );
611
612
    void NewListenerTask( ListenerTask *task );
613
614
    void ExecuteNestedTasksWhileWaiting( Worker *me, Task *waiting_task );
615
616
    void Summary();
617
618
619
  private:
620
621
    /** Main worker entry for the main iteration. */
622
    static void* EntryPoint( void* );
623
624
    /** The tasklist records all tasks created in this epoch. */
625
    deque<Task*> tasklist;
626
627
    /** The tasklist records all nested tasks created in this epoch. */
628
    deque<Task*> nested_tasklist;
629
630
    /** This mutex grants exclusive right to modify tasklists. */
631
    Lock tasklist_lock;
632
633
    /** Number of tasks and nested tasks that have been completed. */
634
    int n_task_completed = 0;
635
    int n_nested_task_completed = 0;
636
637
    /** Mutex for updating n_task_completed and n_nested_task_completed. */
638
    Lock n_task_lock;
639
640
    vector<Task*> DispatchFromNormalQueue( int tid );
641
642
    vector<Task*> DispatchFromNestedQueue( int tid );
643
644
    vector<Task*> StealFromOther();
645
646
    Task *StealFromQueue( size_t target );
647
648
    bool ConsumeTasks( Worker *me, vector<Task*> &batch );
649
650
    bool ConsumeTasks( Worker *me, Task *batch, bool is_nested );
651
652
    bool IsTimeToExit( int tid );
653
654
    void Listen( Worker* );
655
656
    Lock task_lock[ 2 * MAX_WORKER ];
657
658
    Lock run_lock[ MAX_WORKER ];
659
660
    Lock pci_lock;
661
662
    Lock gpu_lock;
663
664
    /** This maps a key to a vector of p ReadWrite objects. */
665
    unordered_map<int, vector<ReadWrite>> msg_dependencies;
666
667
    /** If the flag is set, then there is a local conseneus. */
668
		bool do_terminate = false;
669
670
    /** If the flag is set, then an Ibarrier is post. */
671
    bool has_ibarrier = false;
672
673
    /** MPI request for Ibarrier. */
674
    mpi::Request ibarrier_request;
675
676
    /** If the flag is set, then there is a global consensus. */
677
    int ibarrier_consensus = 0;
678
}; /** end class Scheduler */
679
680
681
682
683
684
/**
685
 *  class RunTime
686
 */
687
688
/** @brief RunTime is statically created in hmlp_runtime.cpp. */
689
class RunTime
690
{
691
  public:
692
693
    RunTime();
694
695
    ~RunTime();
696
697
    hmlpError_t Init( mpi::Comm comm );
698
699
    hmlpError_t Init( int *argc, char ***argv, mpi::Comm comm );
700
701
    void Run();
702
703
    void Finalize();
704
705
    /** Whether the runtime is in a epoch session. */
706
    bool IsInEpochSession();
707
708
    /** Consuming nested tasks while in a epoch session. */
709
    void ExecuteNestedTasksWhileWaiting( Task *waiting_task );
710
711
    //void pool_init();
712
713
    //void acquire_memory();
714
715
    //void release_memory( void* ptr );
716
717
    int n_worker = 0;
718
719
    int n_max_worker = 0;
720
721
    int n_background_worker = 1;
722
723
    thread_communicator *mycomm;
724
725
    class Worker workers[ MAX_WORKER ];
726
727
    class Device host;
728
729
    class Device* device[ 1 ];
730
731
    Scheduler *scheduler;
732
733
  private:
734
735
    /** Argument count. */
736
    int* argc = NULL;
737
    /** Argument varliables. */
738
    char*** argv = NULL;
739
    /** Whether the runtime has been initialized on this process? */
740
    bool is_init = false;
741
    /** Whether MPI is initialized by the runtime? */
742
    bool is_mpi_init_by_hmlp = false;
743
    /** Whether the runtime is in a epoch sesson? */
744
    bool is_in_epoch_session = false;
745
    /** Print progress with prefix information. */
746
    void Print( string msg );
747
    /** Print error message and exit with error. */
748
    void ExitWithError( string msg );
749
750
}; /** end class Runtime */
751
752
}; /** end namespace hmlp */
753
754
hmlp::RunTime *hmlp_get_runtime_handle();
755
756
hmlp::Device *hmlp_get_device( int i );
757
758
bool hmlp_is_in_epoch_session();
759
760
//bool hmlp_is_nested_queue_empty();
761
762
//void hmlp_set_num_background_worker( int n_background_worker );
763
764
int hmlp_get_mpi_rank();
765
766
int hmlp_get_mpi_size();
767
768
//void hmlp_msg_dependency_analysis(
769
//    int key, int p, hmlp::ReadWriteType type, hmlp::Task *task );
770
771
772
#endif /** define HMLP_RUNTIME_HPP */