GCC Code Coverage Report
Directory: . Exec Total Coverage
File: frame/base/runtime.cpp Lines: 22 490 4.5 %
Date: 2019-01-14 Branches: 28 330 8.5 %

Line Exec Source
1
/**
2
 *  HMLP (High-Performance Machine Learning Primitives)
3
 *
4
 *  Copyright (C) 2014-2017, The University of Texas at Austin
5
 *
6
 *  This program is free software: you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation, either version 3 of the License, or
9
 *  (at your option) any later version.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program. If not, see the LICENSE file.
18
 *
19
 **/
20
21
#include <base/runtime.hpp>
22
23
#ifdef HMLP_USE_CUDA
24
#include <hmlp_gpu.hpp>
25
#endif
26
27
#ifdef HMLP_USE_MAGMA
28
#include <magma_v2.h>
29
#include <magma_lapack.h>
30
#endif
31
32
#define REPORT_RUNTIME_STATUS 1
33
// #define DEBUG_RUNTIME 1
34
// #define DEBUG_SCHEDULER 1
35
36
using namespace std;
37
using namespace hmlp;
38
39
40
struct
41
{
42
  bool operator()( const tuple<bool, double, size_t> &a,
43
                   const tuple<bool, double, size_t> &b )
44
  {
45
    return get<1>( a ) < get<1>( b );
46
  }
47
} EventLess;
48
49
50
51
namespace hmlp
52
{
53
54
/** IMPORTANT: we allocate a static runtime system per (MPI) process */
55
1
static RunTime rt;
56
57
/**
58
 *  class Lock
59
 */
60
61
///** @brief Shared-memory lock that calls either pthread or omp mutex.. */
62
//Lock::Lock()
63
//{
64
//#ifdef USE_PTHREAD_RUNTIME
65
//  if ( pthread_mutex_init( &lock, NULL ) )
66
//  {
67
//    printf( "pthread_mutex_init(): cannot initialize locks properly\n" );
68
//  }
69
//#else
70
//  omp_init_lock( &lock );
71
//#endif
72
//}; /** end Lock::Lock() */
73
//
74
//Lock::~Lock()
75
//{
76
//#ifdef USE_PTHREAD_RUNTIME
77
//  if ( pthread_mutex_destroy( &lock ) )
78
//  {
79
//    printf( "pthread_mutex_destroy(): cannot destroy locks properly\n" );
80
//  }
81
//#else
82
//  omp_destroy_lock( &lock );
83
//#endif
84
//}; /** end Lock::~Lock() */
85
//
86
//void Lock::Acquire()
87
//{
88
//#ifdef USE_PTHREAD_RUNTIME
89
//  if ( pthread_mutex_lock( &lock ) )
90
//  {
91
//    printf( "pthread_mutex_lock(): cannot acquire locks properly\n" );
92
//  }
93
//#else
94
//  omp_set_lock( &lock );
95
//#endif
96
//};
97
//
98
//void Lock::Release()
99
//{
100
//#ifdef USE_PTHREAD_RUNTIME
101
//  if ( pthread_mutex_unlock( &lock ) )
102
//  {
103
//    printf( "pthread_mutex_lock(): cannot release locks properly\n" );
104
//  }
105
//#else
106
//  omp_unset_lock( &lock );
107
//#endif
108
//};
109
//
110
111
/**
112
 *  class Event
113
 */
114
115
/** @brief (Default) Event constructor. */
116
Event::Event() {};
117
118
/** @brief Set the label, flops, and mops. */
119
void Event::Set( string _label, double _flops, double _mops )
120
{
121
  flops = _flops;
122
  mops  = _mops;
123
  label = _label;
124
};
125
126
void Event::AddFlopsMops( double _flops, double _mops )
127
{
128
  flops += _flops; //Possible concurrent write
129
  mops += _mops;
130
};
131
132
133
void Event::Begin( size_t _tid )
134
{
135
  tid = _tid;
136
  beg = omp_get_wtime();
137
};
138
139
void Event::Normalize( double shift )
140
{
141
  beg -= shift;
142
  end -= shift;
143
};
144
145
void Event::Terminate()
146
{
147
  end = omp_get_wtime();
148
  sec = end - beg;
149
};
150
151
double Event::GetBegin() { return beg; };
152
153
double Event::GetEnd() { return end; };
154
155
double Event::GetDuration() { return sec; };
156
157
double Event::GetFlops() { return flops; };
158
159
double Event::GetMops() { return mops; };
160
161
double Event::GflopsPerSecond() { return ( flops / sec ) / 1E+9; };
162
163
164
void Event::Print()
165
{
166
  printf( "beg %5.3lf end %5.3lf sec %5.3lf flops %E mops %E\n",
167
      beg, end, sec, flops, mops );
168
};
169
170
void Event::Timeline( bool isbeg, size_t tag )
171
{
172
  double gflops_peak = 45.0;
173
  double flops_efficiency = flops / ( gflops_peak * sec * 1E+9 );
174
  if ( isbeg )
175
  {
176
    //printf( "@TIMELINE\n" );
177
    //printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 0, beg, (double)tid + 0.0 );
178
    //printf( "@TIMELINE\n" );
179
    //printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 1, beg, (double)tid + flops_efficiency );
180
    printf( "@TIMELINE\n" );
181
    printf( "worker%lu, %lu, %E, %lf\n", tid, tag, beg, (double)tid + flops_efficiency );
182
  }
183
  else
184
  {
185
    //printf( "@TIMELINE\n" );
186
    //printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 0, beg, (double)tid + flops_efficiency );
187
    //printf( "@TIMELINE\n" );
188
    //printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 1, beg, (double)tid + 0.0 );
189
    printf( "@TIMELINE\n" );
190
    printf( "worker%lu, %lu, %E, %lf\n", tid, tag, end, (double)tid + 0.0 );
191
  }
192
193
};
194
195
void Event::MatlabTimeline( FILE *pFile )
196
{
197
  /** TODO: this needs to change according to the arch */
198
  double gflops_peak = 45.0;
199
  double flops_efficiency = 0.0;
200
  if ( sec * 1E+9 > 0.1 )
201
  {
202
    flops_efficiency = flops / ( gflops_peak * sec * 1E+9 );
203
    if ( flops_efficiency > 1.0 ) flops_efficiency = 1.0;
204
  }
205
  fprintf( pFile, "rectangle('position',[%lf %lu %lf %d],'facecolor',[1.0,%lf,%lf]);\n",
206
      beg, tid, ( end - beg ), 1,
207
      flops_efficiency, flops_efficiency );
208
  fprintf( pFile, "text( %lf,%lf,'%s');\n", beg, (double)tid + 0.5, label.data() );
209
};
210
211
212
213
214
/**
215
 *  class Task
216
 */
217
218
/** @brief (Default) Task constructor. */
219
Task::Task()
220
{
221
  /** Change status to allocated. */
222
  SetStatus( ALLOCATED );
223
  /** Whether this is a nested task? */
224
  is_created_in_epoch_session = rt.IsInEpochSession();
225
  /** Which thread creates me? */
226
  created_by = omp_get_thread_num();
227
  /** Move forward to next status "NOTREADY". */
228
  SetStatus( NOTREADY );
229
}; /** end Task::Task() */
230
231
/** @brief (Default) Task destructor. */
232
Task::~Task() {};
233
234
/** @brief (Default) MessageTask constructor. */
235
MessageTask::MessageTask( int src, int tar, int key )
236
{
237
  this->src = src;
238
  this->tar = tar;
239
  this->key = key;
240
}; /** end MessageTask::MessageTask() */
241
242
/** @brief (Default) ListenerTask constructor. */
243
ListenerTask::ListenerTask( int src, int tar, int key ) : MessageTask( src, tar, key ) {};
244
245
/** @brief Status is a private member. */
246
TaskStatus Task::GetStatus() { return status; };
247
248
/** @brief Move foward to the next status. */
249
void Task::SetStatus( TaskStatus next_status ) { this->status = next_status; };
250
251
/** Change the status of all tasks in the batch. */
252
void Task::SetBatchStatus( TaskStatus next_status )
253
{
254
  auto *task = this;
255
  while ( task )
256
  {
257
    task->status = next_status;
258
    /** Move to the next task in the batch */
259
    task = task->next;
260
  }
261
};
262
263
/** @brief Ask the runtime to create an normal task in file. */
264
void Task::Submit() { rt.scheduler->NewTask( this ); };
265
266
/** @brief Ask the runtime to create an message task in file. */
267
void MessageTask::Submit() { rt.scheduler->NewMessageTask( this ); };
268
269
/** @brief Ask the runtime to create a listener task in file. */
270
void ListenerTask::Submit() { rt.scheduler->NewListenerTask( this ); };
271
272
/** @brief This is only for virtual function pointer. */
273
void Task::Set( string user_name, void (*user_function)(Task*), void *user_arg )
274
{
275
  name = user_name;
276
  function = user_function;
277
  arg = user_arg;
278
  status = NOTREADY;
279
}; /** end Task::Set() */
280
281
/** @brief Update the my outgoing and children's incoming edges. */
282
void Task::DependenciesUpdate()
283
{
284
  /** Loop over each out-going edge. */
285
  while ( out.size() )
286
  {
287
    Task *child = out.front();
288
    /** There should be at least "one" remaining dependency to satisfy. */
289
    assert( child->n_dependencies_remaining > 0 && child->GetStatus() == NOTREADY );
290
    /** Acquire execlusive right to modify the task. */
291
    //assert( child->task_lock );
292
    child->Acquire();
293
    {
294
      child->n_dependencies_remaining --;
295
      /** If there is no dependency left, enqueue the task. */
296
      if ( !child->n_dependencies_remaining )
297
      {
298
        /** Nested tasks may not carry the worker pointer. */
299
        if ( worker ) child->Enqueue( worker->tid );
300
        else          child->Enqueue();
301
      }
302
    }
303
    child->Release();
304
    /** Remove this out-going edge. */
305
    out.pop_front();
306
  }
307
  /** Move forward to the last status "DONE". */
308
  SetStatus( DONE );
309
}; /** end Task::DependenciesUpdate() */
310
311
312
void Task::Acquire()
313
{
314
  if ( !task_lock )
315
  {
316
    cout << name << " not submitted" << endl;
317
    assert( task_lock );
318
  }
319
  task_lock->Acquire();
320
};
321
322
void Task::Release()
323
{
324
  if ( !task_lock )
325
  {
326
    cout << name << " not submitted" << endl;
327
    assert( task_lock );
328
  }
329
  task_lock->Release();
330
};
331
332
333
/** All virtual functions. */
334
void Task::GetEventRecord() {};
335
void Task::Prefetch( Worker *user_worker ) {};
336
void Task::DependencyAnalysis() {};
337
338
/** @brief Try to dispatch the task if there is no dependency left. */
339
bool Task::TryEnqueue()
340
{
341
  if ( GetStatus() == NOTREADY && !n_dependencies_remaining )
342
  {
343
    Enqueue();
344
    return true;
345
  }
346
  else return false;
347
};
348
349
void Task::Enqueue() { Enqueue( 0 ); };
350
351
void Task::ForceEnqueue( size_t tid )
352
{
353
  int assignment = tid;
354
355
  rt.scheduler->ready_queue_lock[ assignment ].Acquire();
356
  {
357
    float cost = rt.workers[ assignment ].EstimateCost( this );
358
    /** Move forward to next status "QUEUED". */
359
    SetStatus( QUEUED );
360
    if ( priority )
361
      rt.scheduler->ready_queue[ assignment ].push_front( this );
362
    else
363
      rt.scheduler->ready_queue[ assignment ].push_back( this );
364
365
    /** update the remaining time */
366
    rt.scheduler->time_remaining[ assignment ] += cost;
367
  }
368
  rt.scheduler->ready_queue_lock[ assignment ].Release();
369
}; /** end Task::ForceEnqueue() */
370
371
372
/** @brief */
373
void Task::Enqueue( size_t tid )
374
{
375
  float cost = 0.0;
376
  float earliest_t = -1.0;
377
  int assignment = -1;
378
379
  /** Dispatch to nested queue if created in the epoch session. */
380
  if ( is_created_in_epoch_session )
381
  {
382
    assert( created_by < rt.n_worker );
383
    rt.scheduler->nested_queue_lock[ created_by ].Acquire();
384
    {
385
      /** Move forward to next status "QUEUED". */
386
      SetStatus( QUEUED );
387
      if ( priority )
388
        rt.scheduler->nested_queue[ created_by ].push_front( this );
389
      else
390
        rt.scheduler->nested_queue[ created_by ].push_back( this );
391
    }
392
    rt.scheduler->nested_queue_lock[ created_by ].Release();
393
    /** Finish and return without further going down. */
394
    return;
395
  };
396
397
  /** Determine which worker the task should go to using HEFT policy. */
398
  for ( int p = 0; p < rt.n_worker; p ++ )
399
  {
400
    int i = ( tid + p ) % rt.n_worker;
401
    float cost = rt.workers[ i ].EstimateCost( this );
402
    float terminate_t = rt.scheduler->time_remaining[ i ];
403
    if ( earliest_t == -1.0 || terminate_t + cost < earliest_t )
404
    {
405
      earliest_t = terminate_t + cost;
406
      assignment = i;
407
    }
408
  }
409
410
  /** Dispatch to normal ready queue. */
411
  ForceEnqueue( assignment );
412
413
}; /** end Task::Enqueue() */
414
415
416
/** @brief This is the callback function for the owner of thenested task.*/
417
void Task::CallBackWhileWaiting()
418
{
419
  rt.ExecuteNestedTasksWhileWaiting( this );
420
}; /** end CallBackWhileWaiting() */
421
422
/** @brief */
423
bool Task::IsNested() { return is_created_in_epoch_session; };
424
425
426
427
428
429
430
431
432
/** @breief (Default) ReadWrite constructor. */
433
ReadWrite::ReadWrite() {};
434
435
/** Clean both read and write sets. */
436
void ReadWrite::DependencyCleanUp()
437
{
438
  read.clear();
439
  write.clear();
440
}; /** end DependencyCleanUp() */
441
442
/** @brief This is the key function that encode the dependency. **/
443
void ReadWrite::DependencyAnalysis( ReadWriteType type, Task *task )
444
{
445
  if ( type == R || type == RW )
446
  {
447
    /** Update the read set. */
448
    read.push_back( task );
449
    /** Read-After-Write (RAW) data dependencies. */
450
    for ( auto it : write ) Scheduler::DependencyAdd( it, task );
451
  }
452
  if ( type == W || type == RW )
453
  {
454
    /** Write-After-Read (WAR) anti-dependencies. */
455
    for ( auto it : read ) Scheduler::DependencyAdd( it, task );
456
    /** Clean up both read and write sets. */
457
    DependencyCleanUp();
458
    /** Update the write set. */
459
    write.push_back( task );
460
  }
461
}; /** end ReadWrite::DependencyAnalysis() */
462
463
464
465
466
467
468
469
470
/**
471
 *  class MatrixReadWrite
472
 */
473
474
/** @brief (Default) MatrixReadWrite constructor. */
475
MatrixReadWrite::MatrixReadWrite() {};
476
477
/** @brief */
478
void MatrixReadWrite::Setup( size_t m, size_t n )
479
{
480
  //printf( "%lu %lu setup\n", m, n  );
481
  this->has_been_setup = true;
482
  this->m = m;
483
  this->n = n;
484
  Submatrices.resize( m );
485
  for ( size_t i = 0; i < m; i ++ ) Submatrices[ i ].resize( n );
486
}; /** end MatrixReadWrite::MatrixReadWrite() */
487
488
489
/** @brief */
490
bool MatrixReadWrite::HasBeenSetup() { return has_been_setup; };
491
492
/** @brief */
493
void MatrixReadWrite::DependencyAnalysis(
494
    size_t i, size_t j, ReadWriteType type, Task *task )
495
{
496
  //printf( "%lu %lu analysis\n", i, j  ); fflush( stdout );
497
  assert( i < m && j < n );
498
  Submatrices[ i ][ j ]. DependencyAnalysis( type, task );
499
}; /** end MatrixReadWrite::DependencyAnalysis() */
500
501
/** @brief */
502
void MatrixReadWrite::DependencyCleanUp()
503
{
504
  for ( size_t i = 0; i < m; i ++ )
505
    for ( size_t j = 0; j < n; j ++ )
506
      Submatrices[ i ][ j ]. DependencyCleanUp();
507
}; /** end MatrixReadWrite::DependencyCleanUp() */
508
509
510
511
512
513
/**
514
 *  class Scheduler
515
 */
516
517
/**  @brief (Default) Scheduler constructor. */
518
1
Scheduler::Scheduler( mpi::Comm user_comm )
519
141
  : mpi::MPIObject( user_comm ), timeline_tag( 500 )
520
{
521
#ifdef DEBUG_SCHEDULER
522
  printf( "Scheduler()\n" );
523
#endif
524
1
  listener_tasklist.resize( this->GetCommSize() );
525
  /** Set now as the begining of the time table. */
526
1
  timeline_beg = omp_get_wtime();
527
1
};
528
529
530
/** @brief  */
531
Scheduler::~Scheduler()
532
{
533
#ifdef DEBUG_SCHEDULER
534
  printf( "~Scheduler()\n" );
535
#endif
536
};
537
538
539
/** @brief  */
540
void Scheduler::Init( int user_n_worker )
541
{
542
#ifdef DEBUG_SCHEDULER
543
  printf( "Scheduler::Init()\n" );
544
#endif
545
546
  /** Adjust the number of active works. */
547
  n_worker = user_n_worker;
548
  /** Reset normal and nested task counter. */
549
  n_task_completed = 0;
550
  n_nested_task_completed = 0;
551
	/** Reset async distributed consensus variables. */
552
	do_terminate = false;
553
  has_ibarrier = false;
554
  ibarrier_consensus = 0;
555
556
#ifdef USE_PTHREAD_RUNTIME
557
  for ( int i = 0; i < n_worker; i ++ )
558
  {
559
    rt.workers[ i ].tid = i;
560
    rt.workers[ i ].scheduler = this;
561
    pthread_create
562
    (
563
      &(rt.workers[ i ].pthreadid), NULL,
564
      EntryPoint, (void*)&(rt.workers[ i ])
565
    );
566
  }
567
  /** Now the master thread will enter the EntryPoint. */
568
  EntryPoint( (void*)&(rt.workers[ 0 ]) );
569
#else
570
  #pragma omp parallel for num_threads( n_worker )
571
  for ( int i = 0; i < n_worker; i ++ )
572
  {
573
    assert( omp_get_thread_num() == i );
574
    rt.workers[ i ].tid = i;
575
    rt.workers[ i ].scheduler = this;
576
    EntryPoint( (void*)&(rt.workers[ i ]) );
577
  } /** end pragma omp parallel for */
578
#endif
579
}; /** end Scheduler::Init() */
580
581
582
/** @brief  */
583
void Scheduler::MessageDependencyAnalysis(
584
    int key, int p, ReadWriteType type, Task *task )
585
{
586
  if ( msg_dependencies.find( key ) == msg_dependencies.end() )
587
  {
588
    msg_dependencies[ key ] = vector<ReadWrite>( this->GetCommSize() );
589
  }
590
  msg_dependencies[ key ][ p ].DependencyAnalysis( type, task );
591
}; /** end Scheduler::MessageDependencyAnalysis() */
592
593
594
/** @brief This function is called by RunTime::Submit() to record a new task. */
595
void Scheduler::NewTask( Task *task )
596
{
597
  if ( !task ) return;
598
  /** Acquire the exclusive right to access the tasklist. */
599
  tasklist_lock.Acquire();
600
  {
601
    if ( rt.IsInEpochSession() )
602
    {
603
      task->task_lock = &(task_lock[ nested_tasklist.size() % ( 2 * MAX_WORKER ) ]);
604
      nested_tasklist.push_back( task );
605
    }
606
    else
607
    {
608
      task->task_lock = &(task_lock[ tasklist.size() % ( 2 * MAX_WORKER ) ]);
609
      tasklist.push_back( task );
610
    }
611
  }
612
  tasklist_lock.Release();
613
}; /** end Scheduler::NewTask()  */
614
615
616
/** @brief  */
617
void Scheduler::NewMessageTask( MessageTask *task )
618
{
619
  tasklist_lock.Acquire();
620
  {
621
    /** We use a duplicated (private) communicator to handle message tasks. */
622
    task->comm = this->GetPrivateComm();
623
    task->task_lock = &(task_lock[ tasklist.size() % ( 2 * MAX_WORKER ) ]);
624
    /** Counted toward termination criteria. */
625
    tasklist.push_back( task );
626
    //printf( "NewMessageTask src %d tar %d key %d tasklist.size() %lu\n",
627
    //    task->src, task->tar, task->key, tasklist.size() );
628
  }
629
  tasklist_lock.Release();
630
};
631
632
633
/** @brief  */
634
void Scheduler::NewListenerTask( ListenerTask *task )
635
{
636
  tasklist_lock.Acquire();
637
  {
638
    /** We use a duplicated (private) communicator to handle message tasks. */
639
    task->comm = this->GetPrivateComm();
640
    task->task_lock = &(task_lock[ tasklist.size() % ( 2 * MAX_WORKER ) ]);
641
    listener_tasklist[ task->src ][ task->key ] = task;
642
    /** Counted toward termination criteria. */
643
    tasklist.push_back( task );
644
    //printf( "NewListenerTask src %d tar %d key %d listener_tasklist[].size() %lu\n",
645
    //    task->src, task->tar, task->key, listener_tasklist[ task->src ].size() );
646
  }
647
  tasklist_lock.Release();
648
}; /** ebd Scheduler::NewListenerTask() */
649
650
651
652
/** @brief  */
653
void Scheduler::Finalize()
654
{
655
#ifdef DEBUG_SCHEDULER
656
  printf( "Scheduler::Finalize()\n" );
657
#endif
658
#ifdef USE_PTHREAD_RUNTIME
659
  for ( int i = 0; i < rt.n_worker; i ++ )
660
  {
661
    pthread_join( rt.workers[ i ].pthreadid, NULL );
662
  }
663
#else
664
#endif
665
666
  /** Print out statistics of this epoch */
667
  if ( REPORT_RUNTIME_STATUS ) Summary();
668
669
  /** Reset remaining time. */
670
  for ( int i = 0; i < n_worker; i ++ ) time_remaining[ i ] = 0.0;
671
  /** Free all normal tasks and reset tasklist. */
672
  try
673
  {
674
    for ( auto task : tasklist ) delete task;
675
    tasklist.clear();
676
  }
677
  catch ( exception & e ) { cout << e.what() << endl; };
678
  /** Free all nested tasks and reset nested_tasklist. */
679
  try
680
  {
681
    for ( auto task : nested_tasklist ) delete task;
682
    nested_tasklist.clear();
683
  }
684
  catch ( exception & e ) { cout << e.what() << endl; };
685
  //printf( "Begin Scheduler::Finalize() [cleanup listener_tasklist]\n" );
686
  /** Reset listener_tasklist  */
687
  try
688
  {
689
    for ( auto & plist : listener_tasklist ) plist.clear();
690
  }
691
  catch ( exception & e ) { cout << e.what() << endl; };
692
  //printf( "End   Scheduler::Finalize() [cleanup listener_tasklist]\n" );
693
694
  /** Clean up all message dependencies. */
695
  msg_dependencies.clear();
696
697
}; /** end Scheduler::Finalize() */
698
699
700
/** @brief  */
701
void Scheduler::ReportRemainingTime()
702
{
703
  printf( "ReportRemainingTime:" ); fflush( stdout );
704
  printf( "--------------------\n" ); fflush( stdout );
705
  for ( int i = 0; i < rt.n_worker; i ++ )
706
  {
707
    printf( "worker %2d --> %7.2lf (%4lu jobs)\n",
708
        i, rt.scheduler->time_remaining[ i ],
709
           rt.scheduler->ready_queue[ i ].size() ); fflush( stdout );
710
  }
711
  printf( "--------------------\n" ); fflush( stdout );
712
}; /** end Scheduler::ReportRemainingTime() */
713
714
715
/** @brief Add an direct edge (dependency) from source to target. */
716
void Scheduler::DependencyAdd( Task *source, Task *target )
717
{
718
  /** Avoid self-loop. */
719
  if ( source == target ) return;
720
  /** Update the source out-going edges. */
721
  source->Acquire();
722
  {
723
    source->out.push_back( target );
724
  }
725
  source->Release();
726
  /** Update the target incoming edges. */
727
  target->Acquire();
728
  {
729
    target->in.push_back( source );
730
    /** Only increase the dependency count for incompleted tasks. */
731
    if ( source->GetStatus() != DONE ) target->n_dependencies_remaining ++;
732
  }
733
  target->Release();
734
}; /** end Scheduler::DependencyAdd() */
735
736
737
Task *Scheduler::StealFromQueue( size_t target )
738
{
739
  Task *target_task = NULL;
740
741
  /** get the lock of the target ready queue */
742
  ready_queue_lock[ target ].Acquire();
743
  {
744
    if ( ready_queue[ target ].size() )
745
    {
746
      target_task = ready_queue[ target ].back();
747
      assert( target_task );
748
      if ( target_task->stealable )
749
      {
750
        ready_queue[ target ].pop_back();
751
        time_remaining[ target ] -= target_task->cost;
752
      }
753
      else target_task = NULL;
754
    }
755
  }
756
  ready_queue_lock[ target ].Release();
757
758
  return target_task;
759
}; /** end Scheduler::TryStealFromQueue() */
760
761
762
/** @brief */
763
vector<Task*> Scheduler::StealFromOther()
764
{
765
  int max_remaining_tasks = 0;
766
  int max_remaining_nested_tasks = 0;
767
  int target = 0;
768
  /** Decide which target's normal queue to steal. */
769
  for ( int p = 0; p < n_worker; p ++ )
770
  {
771
    if ( ready_queue[ p ].size() > max_remaining_tasks )
772
    {
773
      max_remaining_tasks = ready_queue[ p ].size();
774
      target = p;
775
    }
776
  }
777
  /** Try to steal from target's ready queue.  */
778
  auto batch = DispatchFromNormalQueue( target );
779
  /** Return if batch is not empty. */
780
  if ( batch.size() ) return batch;
781
  /** Decide which target's nested queue to steal. */
782
  for ( int p = 0; p < n_worker; p ++ )
783
  {
784
    if ( nested_queue[ p ].size() > max_remaining_nested_tasks )
785
    {
786
      max_remaining_nested_tasks = nested_queue[ p ].size();
787
      target = p;
788
    }
789
  }
790
  /** Try to steal from target's nested queue.  */
791
  batch = DispatchFromNestedQueue( target );
792
  /** Return regardless if batch is empty or not. */
793
  return batch;
794
}; /** end Scheduler::StealFromOther() */
795
796
797
798
/** @brief Dispatch a nested task from tid's nested_queue. */
799
vector<Task*> Scheduler::DispatchFromNestedQueue( int tid )
800
{
801
  vector<Task*> batch;
802
  /** Dispatch a nested task from tid's nested_queue. */
803
  nested_queue_lock[ tid ].Acquire();
804
  {
805
    if ( nested_queue[ tid ].size() )
806
    {
807
      auto *target_task = nested_queue[ tid ].front();
808
      /** Check if I can dispatch this task? */
809
      if ( tid == omp_get_thread_num() || target_task->stealable )
810
      {
811
        /** Fetch the first task in the nested queue. */
812
        batch.push_back( target_task );
813
        /** Remove the task from the nested queue. */
814
        nested_queue[ tid ].pop_front();
815
      }
816
    }
817
  }
818
  nested_queue_lock[ tid ].Release();
819
  /** Notice that this can be an empty vector. */
820
  return batch;
821
}; /** end Scheduler::DispatchFromNestedQueue() */
822
823
824
/** @brief */
825
bool Scheduler::IsTimeToExit( int tid )
826
{
827
  /** In the case that do_terminate has been set, return "true". */
828
  if ( do_terminate ) return true;
829
  /** Both normal and nested tasks should all be executed. */
830
  if ( n_task_completed >= tasklist.size() )
831
  {
832
    if ( n_nested_task_completed >= nested_tasklist.size() )
833
    {
834
      /** My ready_queue and nested_queue should all be empty. */
835
      assert( !ready_queue[ tid ].size() );
836
      assert( !nested_queue[ tid ].size() );
837
		  /** Set the termination flag to true. */
838
	    do_terminate = true;
839
      /** Now there should be no tasks left locally. Return "true". */
840
      return true;
841
    }
842
    else printf( "normal %d/%lu nested %d/%lu\n",
843
        n_task_completed, tasklist.size(),
844
        n_nested_task_completed, nested_tasklist.size() );
845
  }
846
  /** Otherwise, it is not yet to terminate. */
847
  return false;
848
}; /** end Scheduler::IsTimeToExit() */
849
850
851
/** @brief */
852
vector<Task*> Scheduler::DispatchFromNormalQueue( int tid )
853
{
854
  size_t maximum_batch_size = 1;
855
  vector<Task*> batch;
856
  /** Dispatch normal tasks from tid's ready queue. */
857
  ready_queue_lock[ tid ].Acquire();
858
  {
859
    for ( int it = 0; it < maximum_batch_size; it ++ )
860
    {
861
      if ( ready_queue[ tid ].size() )
862
      {
863
        auto *target_task = ready_queue[ tid ].front();
864
        /** The target ready_queue is not my queue. */
865
        if ( tid != omp_get_thread_num() )
866
        {
867
          maximum_batch_size = 1;
868
          /** If this task cannot be stole, then break. */
869
          if ( !target_task->stealable ) break;
870
          else time_remaining[ tid ] -= target_task->cost;
871
        }
872
        /** Dequeue a task and push into this batch. */
873
        batch.push_back( target_task );
874
        ready_queue[ tid ].pop_front();
875
      }
876
      else
877
      {
878
        /** Reset my workload counter. */
879
        time_remaining[ tid ] = 0.0;
880
      }
881
    }
882
  }
883
  ready_queue_lock[ tid ].Release();
884
  /** Notice that this can be an empty vector. */
885
  return batch;
886
}; /** end Scheduler::DispatchFromNormalQueue() */
887
888
889
/** @brief */
890
bool Scheduler::ConsumeTasks( Worker *me, vector<Task*> &batch )
891
{
892
  /** Early return if there is no task to execute. */
893
  if ( !batch.size() ) return false;
894
  /** For each task, move forward to the next status "RUNNING". */
895
  for ( auto task : batch ) task->SetStatus( RUNNING );
896
  /** Now the worker will execute all tasks in the batch. */
897
  for ( auto task : batch ) me->Execute( task );
898
  /** For each task, update dependencies and my remining time. */
899
  for ( auto task : batch )
900
  {
901
    task->DependenciesUpdate();
902
    /** Update my remaining time and n_task_completed. */
903
    if ( !task->IsNested() )
904
    {
905
      ready_queue_lock[ me->tid ].Acquire();
906
      {
907
        time_remaining[ me->tid ] -= task->cost;
908
        if ( time_remaining[ me->tid ] < 0.0 )
909
          time_remaining[ me->tid ] = 0.0;
910
      }
911
      ready_queue_lock[ me->tid ].Release();
912
      n_task_lock.Acquire();
913
      {
914
        n_task_completed ++;
915
      }
916
      n_task_lock.Release();
917
    }
918
    else
919
    {
920
      n_task_lock.Acquire();
921
      {
922
        n_nested_task_completed ++;
923
      }
924
      n_task_lock.Release();
925
    }
926
  }
927
  /** Return "true" if at least one task was executed. */
928
  return true;
929
}; /** end Scheduler::ConsumeTasks() */
930
931
932
/** @brief */
933
void Scheduler::ExecuteNestedTasksWhileWaiting( Worker *me, Task *waiting_task )
934
{
935
  assert( me->tid == omp_get_thread_num() );
936
  while ( waiting_task->GetStatus() != DONE )
937
  {
938
    /** Try to get a nested task. */
939
    auto nested_batch = DispatchFromNestedQueue( me->tid );
940
    ConsumeTasks( me, nested_batch );
941
  }
942
}; /** end ExcuteNestedTasksWhileWaiting() */
943
944
945
/** @brief */
946
bool Scheduler::ConsumeTasks( Worker *me, Task *batch, bool is_nested )
947
{
948
  /** Early return */
949
  if ( !batch ) return false;
950
951
  /** Update status */
952
  batch->SetBatchStatus( RUNNING );
953
954
  if ( me->Execute( batch ) )
955
  {
956
    Task *task = batch;
957
    while ( task )
958
    {
959
      task->DependenciesUpdate();
960
      if ( !is_nested )
961
      {
962
        ready_queue_lock[ me->tid ].Acquire();
963
        {
964
          time_remaining[ me->tid ] -= task->cost;
965
          if ( time_remaining[ me->tid ] < 0.0 )
966
            time_remaining[ me->tid ] = 0.0;
967
        }
968
        ready_queue_lock[ me->tid ].Release();
969
        n_task_lock.Acquire();
970
        {
971
          n_task_completed ++;
972
        }
973
        n_task_lock.Release();
974
      }
975
      else
976
      {
977
        n_task_lock.Acquire();
978
        {
979
          n_nested_task_completed ++;
980
        }
981
        n_task_lock.Release();
982
      }
983
      /**  Move to the next task in te batch */
984
      task = task->next;
985
    }
986
  }
987
  return true;
988
}; /** end Scheduler::ConsumeTasks() */
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
/** @brief This is the main body that each worker will go through. */
1003
void* Scheduler::EntryPoint( void* arg )
1004
{
1005
  /** First cast arg back to Worker*. */
1006
  Worker *me = reinterpret_cast<Worker*>( arg );
1007
  /** Get the callback point of scheduler. */
1008
  Scheduler *scheduler = me->scheduler;
1009
  /** This counter measures the idle iteration. */
1010
  size_t idle = 0;
1011
1012
#ifdef DEBUG_SCHEDULER
1013
  printf( "Scheduler::EntryPoint()\n" );
1014
  printf( "pthreadid %d\n", me->tid );
1015
#endif
1016
1017
  /** Prepare listeners (half of total workers). */
1018
  if ( ( me->tid % 2 ) && true )
1019
  {
1020
    /** Update my termination time to infinite. */
1021
    scheduler->ready_queue_lock[ me->tid ].Acquire();
1022
    {
1023
      scheduler->time_remaining[ me->tid ] = numeric_limits<float>::max();
1024
    }
1025
    scheduler->ready_queue_lock[ me->tid ].Release();
1026
    /** Enter listening mode. */
1027
    scheduler->Listen( me );
1028
  }
1029
  /** Start to consume all tasks in this epoch session. */
1030
  while ( 1 )
1031
  {
1032
    /** Try to get a normal task from my own ready queue. */
1033
    auto normal_batch = scheduler->DispatchFromNormalQueue( me->tid );
1034
    /** If there is some jobs to execute, then reset the counter. */
1035
    if ( scheduler->ConsumeTasks( me, normal_batch ) )
1036
    {
1037
      /** Reset the idle counter. */
1038
      idle = 0;
1039
    }
1040
    else /** No task in my ready_queue. Try nested_queue. */
1041
    {
1042
      /** Increase the idle counter. */
1043
      idle ++;
1044
      /** Try to get a nested task. */
1045
      auto nested_batch = scheduler->DispatchFromNestedQueue( me->tid );
1046
      /** Reset the idle counter if there is executable nested tasks. */
1047
      if ( scheduler->ConsumeTasks( me, nested_batch ) ) idle = 0;
1048
    }
1049
    /** Try to steal from others. */
1050
    if ( idle > 10 )
1051
    {
1052
      /** Try to steal a (normal or nested) task. */
1053
      auto stolen_batch = scheduler->StealFromOther();
1054
      /** Reset the idle counter if there is executable stolen tasks. */
1055
      if ( scheduler->ConsumeTasks( me, stolen_batch ) ) idle = 0;
1056
    } /** end if ( idle > 10 ) */
1057
1058
    /** Check if is time to terminate. */
1059
    if ( scheduler->IsTimeToExit( me->tid ) ) break;
1060
  }
1061
  /** Return "NULL". */
1062
  return NULL;
1063
}; /** end Scheduler::EntryPoint() */
1064
1065
1066
/** @brief Listen for asynchronous incoming MPI messages. */
1067
void Scheduler::Listen( Worker *me )
1068
{
1069
  /** We use a duplicated (private) communicator to handle message tasks. */
1070
  auto comm = this->GetPrivateComm();
1071
  auto size = this->GetCommSize();
1072
  auto rank = this->GetCommRank();
1073
1074
  /** Iprobe flag and recv status */
1075
  int probe_flag = 0;
1076
  mpi::Status status;
1077
1078
  /** Keep probing for incoming messages. */
1079
  while ( 1 )
1080
  {
1081
    ListenerTask *task = NULL;
1082
    /** Only one thread will probe and recv message at a time. */
1083
    #pragma omp critical
1084
    {
1085
      /** Asynchronously probe form incoming messages. */
1086
      mpi::Iprobe( MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &probe_flag, &status );
1087
      /** If receive any message, then handle it. */
1088
      if ( probe_flag )
1089
      {
1090
        /** Info from mpi::Status */
1091
        int recv_src = status.MPI_SOURCE;
1092
        int recv_tag = status.MPI_TAG;
1093
        int recv_key = 3 * ( recv_tag / 3 );
1094
1095
        if ( recv_key < 300 )
1096
        {
1097
          printf( "rank %d Iprobe src %d tag %d\n", rank, recv_src, recv_tag );
1098
          fflush( stdout );
1099
        }
1100
1101
        /** Check if there is a corresponding task. */
1102
        auto it = listener_tasklist[ recv_src ].find( recv_key );
1103
        if ( it != listener_tasklist[ recv_src ].end() )
1104
        {
1105
          task = it->second;
1106
          if ( task->GetStatus() == NOTREADY )
1107
          {
1108
            //printf( "rank %d Find a task src %d tag %d\n", rank, recv_src, recv_tag );
1109
            //fflush( stdout );
1110
            task->SetStatus( QUEUED );
1111
            task->Listen();
1112
          }
1113
          else
1114
          {
1115
            printf( "rank %d Find a QUEUED task src %d tag %d\n", rank, recv_src, recv_tag );
1116
            fflush( stdout );
1117
            task = NULL;
1118
            probe_flag = false;
1119
          }
1120
        }
1121
        else
1122
        {
1123
          //printf( "rank %d not match task src %d tag %d\n", rank, recv_src, recv_tag );
1124
          //fflush( stdout );
1125
          probe_flag = false;
1126
        }
1127
      }
1128
    }
1129
1130
    if ( probe_flag )
1131
    {
1132
      /** Execute tasks and update dependencies */
1133
      ConsumeTasks( me, task, false );
1134
    }
1135
    else
1136
    {
1137
      /** Steal a (normal or nested) task from other. */
1138
      auto stolen_batch = StealFromOther();
1139
      ConsumeTasks( me, stolen_batch );
1140
    }
1141
1142
    /** Nonblocking consensus for termination. */
1143
    if ( do_terminate )
1144
    {
1145
      /** We use an ibarrier to make sure global concensus. */
1146
      #pragma omp critical
1147
      {
1148
        /** Only the first worker will issue an Ibarrier. */
1149
        if ( !has_ibarrier )
1150
        {
1151
          mpi::Ibarrier( comm, &ibarrier_request );
1152
          has_ibarrier = true;
1153
        }
1154
        /** Test global consensus on "terminate_request". */
1155
        if ( !ibarrier_consensus )
1156
        {
1157
          mpi::Test( &ibarrier_request, &ibarrier_consensus,
1158
              MPI_STATUS_IGNORE );
1159
        }
1160
      }
1161
      /** If terminate_request has been tested = true, then exit! */
1162
      if ( ibarrier_consensus ) break;
1163
    }
1164
  }
1165
}; /** end Scheduler::Listen() */
1166
1167
1168
/** @brief */
1169
void Scheduler::Summary()
1170
{
1171
  int total_normal_tasks = tasklist.size();
1172
  int total_nested_tasks = nested_tasklist.size();
1173
  int total_listen_tasks = 0;
1174
  for ( auto list : listener_tasklist ) total_listen_tasks += list.size();
1175
  double total_flops = 0.0, total_mops = 0.0;
1176
  time_t rawtime;
1177
  struct tm * timeinfo;
1178
  char buffer[ 80 ];
1179
1180
  time( &rawtime );
1181
  timeinfo = localtime( &rawtime );
1182
  strftime( buffer, 80, "%T.", timeinfo );
1183
1184
  //printf( "%s\n", buffer );
1185
1186
  for ( size_t i = 0; i < tasklist.size(); i ++ )
1187
  {
1188
    total_flops += tasklist[ i ]->event.GetFlops();
1189
    total_mops  += tasklist[ i ]->event.GetMops();
1190
  }
1191
1192
#ifdef HMLP_USE_MPI
1193
	/** In the MPI environment, reduce all flops and mops. */
1194
  int global_normal_tasks = 0, global_listen_tasks = 0, global_nested_tasks = 0;
1195
	double global_flops = 0.0, global_mops = 0.0;
1196
	mpi::Reduce( &total_flops, &global_flops, 1, MPI_SUM, 0, this->GetPrivateComm() );
1197
	mpi::Reduce( &total_mops,  &global_mops,  1, MPI_SUM, 0, this->GetPrivateComm() );
1198
	mpi::Reduce( &total_normal_tasks, &global_normal_tasks, 1, MPI_SUM, 0, this->GetPrivateComm() );
1199
	mpi::Reduce( &total_listen_tasks, &global_listen_tasks, 1, MPI_SUM, 0, this->GetPrivateComm() );
1200
	mpi::Reduce( &total_nested_tasks, &global_nested_tasks, 1, MPI_SUM, 0, this->GetPrivateComm() );
1201
	if ( this->GetCommRank() == 0 )
1202
  {
1203
    printf( "[ RT] %5d [normal] %5d [listen]  %5d [nested] %5.3E flops %5.3E mops\n",
1204
      global_normal_tasks, global_listen_tasks, global_nested_tasks, global_flops, global_mops );
1205
  }
1206
#else
1207
  printf( "[ RT] %5d [normal] %5d [nested] %5.3E flops %5.3E mops\n",
1208
      total_normal_tasks, total_nested_tasks, total_flops, total_mops );
1209
#endif
1210
1211
1212
#ifdef DUMP_ANALYSIS_DATA
1213
  deque<tuple<bool, double, size_t>> timeline;
1214
1215
  if ( tasklist.size() )
1216
  {
1217
    string filename = string( "timeline" ) +
1218
	  to_string( tasklist.size() ) + string( "_rank") +
1219
    to_string( rank) + string( ".m" );
1220
    FILE *pFile = fopen( filename.data(), "w" );
1221
1222
    fprintf( pFile, "figure('Position',[100,100,800,800]);" );
1223
    fprintf( pFile, "hold on;" );
1224
1225
    for ( size_t i = 0; i < tasklist.size(); i ++ )
1226
    {
1227
      tasklist[ i ]->event.Normalize( timeline_beg );
1228
    }
1229
1230
    for ( size_t i = 0; i < tasklist.size(); i ++ )
1231
    {
1232
      auto &event = tasklist[ i ]->event;
1233
      timeline.push_back( make_tuple(  true, event.GetBegin(), i ) );
1234
      timeline.push_back( make_tuple( false, event.GetEnd(),   i ) );
1235
    }
1236
1237
    sort( timeline.begin(), timeline.end(), EventLess );
1238
1239
    for ( size_t i = 0; i < timeline.size(); i ++ )
1240
    {
1241
      auto &data = timeline[ i ];
1242
      auto &event = tasklist[ get<2>( data ) ]->event;
1243
      //event.Timeline( get<0>( data ), i + timeline_tag );
1244
      event.MatlabTimeline( pFile );
1245
    }
1246
1247
    fclose( pFile );
1248
1249
    timeline_tag += timeline.size();
1250
  }
1251
#endif
1252
1253
}; /** end Scheduler::Summary() */
1254
1255
1256
1257
1258
1259
/**
1260
 *  class RunTime
1261
 */
1262
1263
/** @brief */
1264
1
RunTime::RunTime() {};
1265
1266
/** @brief */
1267
2
RunTime::~RunTime() {};
1268
1269
/** @brief */
1270
1
hmlpError_t RunTime::Init( mpi::Comm comm = MPI_COMM_WORLD )
1271
{
1272
2
  #pragma omp critical (init)
1273
  {
1274
1
    if ( !is_init )
1275
    {
1276
1
      n_worker = omp_get_max_threads();
1277
1
      n_max_worker = n_worker;
1278
      /** Check whether MPI has been initialized? */
1279
1
      int is_mpi_init = false;
1280
1
      mpi::Initialized( &is_mpi_init );
1281
1282
1
      scheduler = new Scheduler( comm );
1283
1284
#ifdef HMLP_USE_CUDA
1285
      /** TODO: detect devices */
1286
      device[ 0 ] = new hmlp::gpu::Nvidia( 0 );
1287
      if ( n_worker )
1288
      {
1289
        workers[ 0 ].SetDevice( device[ 0 ] );
1290
      }
1291
#endif
1292
#ifdef HMLP_USE_MAGMA
1293
        magma_init();
1294
#endif
1295
      /* Set the flag such that this is only executed once. */
1296
1
      is_init = true;
1297
    }
1298
  } /* end pragma omp critical */
1299
1300
  /* Return error if the scheduler was failed in allocation. */
1301
1
  if ( !scheduler ) return HMLP_ERROR_ALLOC_FAILED;
1302
  /* Return without error. */
1303
1
  return HMLP_ERROR_SUCCESS;
1304
}; /* end RunTime::Init() */
1305
1306
1307
/** @brief */
1308
hmlpError_t RunTime::Init( int* argc, char*** argv, mpi::Comm comm = MPI_COMM_WORLD )
1309
{
1310
  #pragma omp critical
1311
  {
1312
    if ( !is_init )
1313
    {
1314
      /** Set argument count. */
1315
      this->argc = argc;
1316
      /** Set argument values. */
1317
      this->argv = argv;
1318
      /** Acquire the number of (maximum) workers from OpenMP. */
1319
      n_worker = omp_get_max_threads();
1320
      n_max_worker = n_worker;
1321
      /** Check whether MPI has been initialized? */
1322
      int is_mpi_init = false;
1323
      mpi::Initialized( &is_mpi_init );
1324
      /** Initialize MPI inside HMLP otherwise. */
1325
      if ( !is_mpi_init )
1326
      {
1327
        int provided;
1328
	      mpi::Init_thread( argc, argv, MPI_THREAD_MULTIPLE, &provided );
1329
	      if ( provided != MPI_THREAD_MULTIPLE )
1330
          ExitWithError( string( "MPI_THTREAD_MULTIPLE is not supported" ) );
1331
        /** Flag that MPI is initialized by HMLP. */
1332
        is_mpi_init_by_hmlp = true;
1333
      }
1334
      /** Initialize the scheduler. */
1335
      scheduler = new Scheduler( comm );
1336
#ifdef HMLP_USE_CUDA
1337
      /** TODO: detect devices */
1338
      device[ 0 ] = new hmlp::gpu::Nvidia( 0 );
1339
      if ( n_worker )
1340
      {
1341
        workers[ 0 ].SetDevice( device[ 0 ] );
1342
      }
1343
#endif
1344
#ifdef HMLP_USE_MAGMA
1345
        magma_init();
1346
#endif
1347
      /** Set the flag such that this is only executed once. */
1348
      is_init = true;
1349
    }
1350
  } /* end pragma omp critical */
1351
  /* Return error if the scheduler was failed in allocation. */
1352
  if ( !scheduler ) return HMLP_ERROR_ALLOC_FAILED;
1353
  /* Return without error. */
1354
  return HMLP_ERROR_SUCCESS;
1355
}; /* end RunTime::Init() */
1356
1357
1358
1359
1360
/** @brief **/
1361
void RunTime::Run()
1362
{
1363
  if ( is_in_epoch_session )
1364
  {
1365
    printf( "Fatal Error: more than one concurrent epoch session!\n" );
1366
    exit( 1 );
1367
  }
1368
  if ( !is_init ) Init();
1369
  /** begin this epoch session */
1370
  is_in_epoch_session = true;
1371
  /** schedule jobs to n workers */
1372
  scheduler->Init( n_worker );
1373
  /** clean up */
1374
  scheduler->Finalize();
1375
  /** finish this epoch session */
1376
  is_in_epoch_session = false;
1377
}; /** end RunTime::Run() */
1378
1379
/** @brief */
1380
void RunTime::Finalize()
1381
{
1382
  #pragma omp critical (init)
1383
  {
1384
    if ( is_init )
1385
    {
1386
      /** Finalize the scheduler and delete it. */
1387
      scheduler->Finalize();
1388
      delete scheduler;
1389
      /** Set the initialized flag to false. */
1390
      is_init = false;
1391
      /** Finalize MPI if it was initialized by HMLP. */
1392
      if ( is_mpi_init_by_hmlp ) mpi::Finalize();
1393
    }
1394
  }
1395
}; /** end RunTime::Finalize() */
1396
1397
/** @brief */
1398
bool RunTime::IsInEpochSession() { return is_in_epoch_session; };
1399
1400
/** @brief */
1401
void RunTime::ExecuteNestedTasksWhileWaiting( Task *waiting_task )
1402
{
1403
  /** Use omp_get_thread_num() to acquire tid. */
1404
  auto *me = &(workers[ omp_get_thread_num() ]);
1405
  /** If I am not in a epoch session, then return. */
1406
  if ( IsInEpochSession() )
1407
  {
1408
    scheduler->ExecuteNestedTasksWhileWaiting( me, waiting_task );
1409
  }
1410
}; /** end RunTime::ExecuteNestedTasksWhileWaiting() */
1411
1412
1413
void RunTime::Print( string msg )
1414
{
1415
  cout << "[RT ] " << msg << endl; fflush( stdout );
1416
}; /** end RunTime::Print() */
1417
1418
1419
void RunTime::ExitWithError( string msg )
1420
{
1421
  cout << "[RT ] (Error) " << msg << endl; fflush( stdout );
1422
  exit( 1 );
1423
}; /** end RunTime::ExitWithError() */
1424
1425
1426
//void hmlp_runtime::pool_init()
1427
//{
1428
//
1429
//};
1430
//
1431
//void hmlp_runtime::acquire_memory()
1432
//{
1433
//
1434
//};
1435
//
1436
//void hmlp_runtime::release_memory( void *ptr )
1437
//{
1438
//
1439
//};
1440
1441
1442
/** @brief */
1443
hmlp::Device *hmlp_get_device_host() { return &(hmlp::rt.host); };
1444
1445
void hmlp_msg_dependency_analysis( int key, int p, ReadWriteType type, Task *task )
1446
{
1447
  hmlp::rt.scheduler->MessageDependencyAnalysis( key, p, type, task );
1448
};
1449
1450
1451
}; /** end namespace hmlp */
1452
1453
1454
/**
1455
 *  \brief Initialize the runtime without MPI.
1456
 *  \return the error code.
1457
 */
1458
1
hmlpError_t hmlp_init()
1459
{
1460
1
  return hmlp::rt.Init();
1461
};
1462
1463
/**
1464
 *  \brief Initialize the runtime with MPI.
1465
 *  \return the error code.
1466
 */
1467
hmlpError_t hmlp_init( mpi::Comm comm )
1468
{
1469
  return hmlp::rt.Init( comm );
1470
};
1471
1472
/**
1473
 *  \brief Initialize the runtime and parse arguments without MPI.
1474
 *  \return the error code.
1475
 */
1476
hmlpError_t hmlp_init( int *argc, char ***argv )
1477
{
1478
  return hmlp::rt.Init( argc, argv );
1479
};
1480
1481
/**
1482
 *  \brief Initialize the runtime and parse arguments with MPI.
1483
 *  \return the error code.
1484
 */
1485
hmlpError_t hmlp_init( int *argc, char ***argv, mpi::Comm comm )
1486
{
1487
  return hmlp::rt.Init( argc, argv, comm );
1488
};
1489
1490
/**
1491
 *  \brief
1492
 */
1493
void hmlp_set_num_workers( int n_worker )
1494
{
1495
  hmlp::rt.n_worker = n_worker;
1496
};
1497
1498
1499
1500
void hmlp_run() { hmlp::rt.Run(); };
1501
1502
void hmlp_finalize() { hmlp::rt.Finalize(); };
1503
1504
hmlp::RunTime *hmlp_get_runtime_handle() { return &hmlp::rt; };
1505
1506
hmlp::Device *hmlp_get_device( int i ) { return hmlp::rt.device[ i ]; };
1507
1508
bool hmlp_is_in_epoch_session() { return hmlp::rt.IsInEpochSession(); };
1509
1510
int hmlp_get_mpi_rank() { return hmlp::rt.scheduler->GetCommRank(); };
1511
1512
3
int hmlp_get_mpi_size() { return hmlp::rt.scheduler->GetCommSize(); };
1513
1514
//void hmlp_msg_dependency_analysis( int key, int p, ReadWriteType type, Task *task )
1515
//{
1516
//  hmlp::rt.scheduler->MessageDependencyAnalysis( key, p, type, task );
1517
//};
1518