GCC Code Coverage Report
Directory: . Exec Total Coverage
File: frame/base/thread.cpp Lines: 2 185 1.1 %
Date: 2019-01-14 Branches: 0 77 0.0 %

Line Exec Source
1
/**
2
 *  HMLP (High-Performance Machine Learning Primitives)
3
 *
4
 *  Copyright (C) 2014-2017, The University of Texas at Austin
5
 *
6
 *  This program is free software: you can redistribute it and/or modify
7
 *  it under the terms of the GNU General Public License as published by
8
 *  the Free Software Foundation, either version 3 of the License, or
9
 *  (at your option) any later version.
10
 *
11
 *  This program is distributed in the hope that it will be useful,
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
 *  GNU General Public License for more details.
15
 *
16
 *  You should have received a copy of the GNU General Public License
17
 *  along with this program. If not, see the LICENSE file.
18
 *
19
 **/
20
21
22
#include <base/runtime.hpp>
23
#include <base/thread.hpp>
24
25
26
using namespace std;
27
28
29
namespace hmlp
30
{
31
32
ostream& operator<<( ostream& os, const thread_communicator& obj )
33
{
34
  os << obj.name << obj.comm_id;
35
  return os;
36
};
37
38
39
40
range::range( int beg, int end, int inc )
41
{
42
  info = std::make_tuple( beg, end, inc );
43
};
44
45
int range::beg() { return std::get<0>( info ); };
46
47
int range::end() { return std::get<1>( info ); };
48
49
int range::inc() { return std::get<2>( info ); };
50
51
range GetRange
52
(
53
  SchedulePolicy strategy,
54
  int beg, int end, int nb, int tid, int nparts
55
)
56
{
57
  switch ( strategy )
58
  {
59
    case HMLP_SCHEDULE_DEFAULT:
60
      {
61
        auto tid_beg = beg + tid * nb;
62
        auto tid_inc = nparts * nb;
63
        return range( tid_beg, end, tid_inc );
64
      }
65
    case HMLP_SCHEDULE_ROUND_ROBIN:
66
      {
67
        auto tid_beg = beg + tid * nb;
68
        auto tid_inc = nparts * nb;
69
        return range( tid_beg, end, tid_inc );
70
      }
71
    case HMLP_SCHEDULE_UNIFORM:
72
      printf( "GetRange(): HMLP_SCHEDULE_UNIFORM not yet implemented yet.\n" );
73
      exit( 1 );
74
    case HMLP_SCHEDULE_HEFT:
75
      {
76
        assert( nparts == 4 );
77
        int len = end - beg - 1;
78
        int big = ( len * 30 ) / 100 + 1;
79
        int sma = ( len * 20 ) / 100 + 1;
80
81
        int tid_beg, tid_end;
82
83
        if ( tid == 0 )
84
        {
85
          tid_beg = beg;
86
          tid_end = beg + big;
87
        }
88
        beg += big;
89
90
        if ( tid == 1 )
91
        {
92
          tid_beg = beg;
93
          tid_end = beg + sma;
94
        }
95
        beg += sma;
96
97
        if ( tid == 2 )
98
        {
99
          tid_beg = beg;
100
          tid_end = beg + sma;
101
        }
102
        beg += sma;
103
104
        if ( tid == 3 )
105
        {
106
          tid_beg = beg;
107
          tid_end = beg + big;
108
        }
109
        beg += big;
110
111
        if ( tid_end > end ) tid_end = end;
112
        return range( tid_beg, tid_end, nb );
113
      }
114
    default:
115
      printf( "GetRange(): not a legal scheduling strategy.\n" );
116
      exit( 1 );
117
  }
118
};
119
120
range GetRange( int beg, int end, int nb, int tid, int nparts )
121
{
122
  return GetRange( HMLP_SCHEDULE_DEFAULT, beg, end, nb, tid, nparts );
123
};
124
125
range GetRange( int beg, int end, int nb )
126
{
127
  return GetRange( HMLP_SCHEDULE_DEFAULT, beg, end, nb, 0, 1 );
128
};
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
151
 *  @brief Recursively create tree base communicators.
152
 */
153
void thread_communicator::Create( int level, int num_threads, int *config )
154
{
155
  if ( level < 1 )
156
  {
157
    kids = NULL;
158
  }
159
  else
160
  {
161
    n_threads = num_threads;
162
    n_groups = config[ level ];
163
164
    if      ( level == 4 ) name = std::string( "jc_comm" );
165
    else if ( level == 3 ) name = std::string( "pc_comm" );
166
    else if ( level == 2 ) name = std::string( "ic_comm" );
167
    else if ( level == 1 ) name = std::string( "jr_comm" );
168
    else                   name = std::string( "na_comm" );
169
170
    //std::cout << name << ", " << n_threads << ", " << n_groups << "\n";
171
172
    kids = new thread_communicator[ n_groups ]();
173
    for ( int i = 0; i < n_groups; i ++ )
174
    {
175
      kids[ i ].Create( level - 1, n_threads / n_groups, config );
176
    }
177
  }
178
};
179
180
thread_communicator::thread_communicator() :
181
  sent_object( NULL ),
182
  comm_id( 0 ),
183
  n_threads( 1 ),
184
  n_groups( 1 ),
185
  barrier_sense( false ),
186
  barrier_threads_arrived( 0 )
187
{};
188
189
/**
190
 *  @brief Default constructor takes 4 partitioning numbers.
191
 */
192
thread_communicator::thread_communicator( int jc_nt, int pc_nt, int ic_nt, int jr_nt ) :
193
  sent_object( NULL ),
194
  comm_id( 0 ),
195
  n_threads( 1 ),
196
  n_groups( 1 ),
197
  barrier_sense( false ),
198
  barrier_threads_arrived( 0 )
199
{
200
  int config[ 6 ] = { 1, 1, jr_nt, ic_nt, pc_nt, jc_nt };
201
  n_threads = jc_nt * pc_nt * ic_nt * jr_nt;
202
  //n_groups  = jc_nt;
203
  n_groups  = 1;
204
  name = std::string( "my_comm" );
205
  kids = new thread_communicator[ n_groups ]();
206
207
  for ( int i = 0; i < n_groups; i ++ )
208
  {
209
    kids[ i ].Create( 5, n_threads / n_groups, config );
210
  }
211
};
212
213
214
int thread_communicator::GetNumThreads()
215
{
216
  return n_threads;
217
};
218
219
int thread_communicator::GetNumGroups()
220
{
221
  return n_groups;
222
};
223
224
/**
225
 *  @brief OpenMP thread barrier from BLIS.
226
 */
227
void thread_communicator::Barrier()
228
{
229
  if ( n_threads < 2 ) return;
230
231
  bool my_sense = barrier_sense;
232
  int my_threads_arrived;
233
234
  #pragma omp atomic capture
235
  my_threads_arrived = ++ barrier_threads_arrived;
236
237
  if ( my_threads_arrived == n_threads )
238
  {
239
    barrier_threads_arrived = 0;
240
    barrier_sense = !barrier_sense;
241
  }
242
  else
243
  {
244
    volatile bool *listener = &barrier_sense;
245
    while ( *listener == my_sense ) {}
246
  }
247
};
248
249
void thread_communicator::Print()
250
{
251
  Barrier();
252
};
253
254
void thread_communicator::Send( void** buffer )
255
{
256
  sent_object = *buffer;
257
};
258
259
void thread_communicator::Recv( void** buffer )
260
{
261
  *buffer = sent_object;
262
};
263
264
/**
265
 *  @brief Device implementation
266
 */
267
//Device::Device()
268
//{
269
//  name = std::string( "Host CPU" );
270
//  devicetype = hmlp::DeviceType::HOST;
271
//};
272
//
273
//void Device::prefetchd2h( void *ptr_h, void *ptr_d, size_t size, int stream_id ) {};
274
//
275
//void Device::prefetchh2d( void *ptr_d, void *ptr_h, size_t size, int stream_id ) {};
276
//
277
//void Device::wait( int stream_id ) {};
278
//
279
//void *Device::malloc( size_t size ) { return NULL; };
280
//
281
//void Device::malloc( void *ptr_d, size_t size ) {};
282
//
283
//size_t Device::get_memory_left() { return 0; };
284
//
285
//
286
//void Device::free( void *ptr_d, size_t size ) {};
287
288
289
290
291
/**
292
 *  @brief Worker implementation
293
 */
294
68
Worker::Worker() {};
295
296
Worker::Worker( thread_communicator *comm ) :
297
  tid( 0 ),
298
  jc_id( 0 ),
299
  pc_id( 0 ),
300
  ic_id( 0 ),
301
  jr_id( 0 )
302
{
303
  int tmp;
304
305
  tid   = omp_get_thread_num();
306
  tmp   = tid;
307
308
  //my_comm = comm;
309
  my_comm = &(comm->kids[ 0 ]);
310
311
  jc_nt = my_comm->GetNumGroups();
312
  jc_id = tmp / ( my_comm->GetNumThreads() / my_comm->GetNumGroups() );
313
  tmp   = tmp % ( my_comm->GetNumThreads() / my_comm->GetNumGroups() );
314
315
  jc_comm = &(my_comm->kids[ jc_id ]);
316
317
  pc_nt = jc_comm->GetNumGroups();
318
  pc_id = tmp / ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() );
319
  tmp   = tmp % ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() );
320
321
  pc_comm = &(jc_comm->kids[ pc_id ]);
322
323
  ic_jr = tmp; // for parallel packB
324
  ic_nt = pc_comm->GetNumGroups();
325
  ic_id = tmp / ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() );
326
  jr_id = tmp % ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() );
327
328
  ic_comm = &(pc_comm->kids[ ic_id ]);
329
  jr_nt = ic_comm->GetNumGroups();
330
331
  //printf( "tid %2d jc_id %2d pc_id %2d ic_id %2d jr_id %2d, ic_jr %2d\n",
332
  //    tid, jc_id, pc_id, ic_id, jr_id, ic_jr );
333
};
334
335
void Worker::Communicator( thread_communicator *comm )
336
{
337
  int tmp;
338
339
  tid   = omp_get_thread_num();
340
  tmp   = tid;
341
342
  //my_comm = comm;
343
  my_comm = &(comm->kids[ 0 ]);
344
345
  jc_nt = my_comm->GetNumGroups();
346
  jc_id = tmp / ( my_comm->GetNumThreads() / my_comm->GetNumGroups() );
347
  tmp   = tmp % ( my_comm->GetNumThreads() / my_comm->GetNumGroups() );
348
349
  jc_comm = &(my_comm->kids[ jc_id ]);
350
351
  pc_nt = jc_comm->GetNumGroups();
352
  pc_id = tmp / ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() );
353
  tmp   = tmp % ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() );
354
355
  pc_comm = &(jc_comm->kids[ pc_id ]);
356
357
  ic_jr = tmp; // for parallel packB
358
  ic_nt = pc_comm->GetNumGroups();
359
  ic_id = tmp / ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() );
360
  jr_id = tmp % ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() );
361
362
  ic_comm = &(pc_comm->kids[ ic_id ]);
363
  jr_nt = ic_comm->GetNumGroups();
364
};
365
366
bool Worker::Master() { return tid == 0; };
367
368
void Worker::Barrier()
369
{
370
  assert( comm );
371
  comm->Barrier();
372
};
373
374
375
//void Worker::Bcast( void** buffer )
376
//{
377
//  if ( Master() ) comm->Send( buffer );
378
//  Barrier();
379
//  if ( !Master() ) comm->Recv( buffer );
380
//};
381
382
383
384
void Worker::InitWithCommunicator( thread_communicator* comm, size_t tid, size_t gid )
385
{
386
  this->comm = comm;
387
  this->tid = tid;
388
  this->gid = gid;
389
};
390
391
Worker Worker::Split()
392
{
393
  Worker child;
394
395
  //printf( "Before Split %s\n", comm->name.data() ); fflush( stdout );
396
397
  size_t n_splits  = comm->GetNumGroups();
398
  size_t n_threads = comm->GetNumThreads();
399
400
  //printf( "Split %s n_split %lu n_thread %lu\n",
401
  //    comm->name.data(), n_splits, n_threads ); fflush( stdout );
402
403
  if ( n_splits == 1 )
404
  {
405
    child.InitWithCommunicator( &(comm->kids[ 0 ]), tid, 0 );
406
    return child;
407
  }
408
409
  /**
410
   *  By default, we split threads evenly usine "close" affinity.
411
   *  Threads with the same color will be in the same subcomm.
412
   *
413
   *  example: (n_splits=2)
414
   *
415
   *  tid        0  1  2  3  4  5  6  7
416
   *  color      0  0  0  0  1  1  1  1  (gang id)
417
   *  first      0  0  0  0  4  4  4  4
418
   *  last       4  4  4  4  8  8  8  8
419
   *  child_tid  0  1  2  3  0  1  2  3
420
   *             4  4  4  4  4  4  4  4  (child_n_threads)
421
   */
422
  size_t color = ( n_splits * tid ) / n_threads;
423
  size_t first = ( ( color + 0 ) * n_threads ) / n_splits;
424
  size_t last  = ( ( color + 1 ) * n_threads ) / n_splits;
425
  size_t child_tid = tid - first;
426
  size_t child_n_threads = last - first;
427
428
  //printf( "Split %s n_split %lu n_thread %lu color %lu\n",
429
  //    comm->name.data(), n_splits, n_threads, color ); fflush( stdout );
430
431
  gid = color;
432
433
  /** make sure all threads have the new communicator */
434
  child.InitWithCommunicator( &(comm->kids[ gid ]), child_tid, 0 );
435
436
  return child;
437
438
};
439
440
441
size_t Worker::BalanceOver1DGangs( size_t n, size_t default_size, size_t nb )
442
{
443
  size_t nparts = comm->GetNumGroups();
444
  if ( nparts > 1 ) return ( ( n - 1 ) / ( nb * nparts ) + 1 ) * nb;
445
  return default_size;
446
};
447
448
449
tuple<size_t, size_t, size_t> Worker::DistributeOver1DGangs(
450
    size_t beg, size_t end, size_t nb )
451
{
452
  size_t nparts = comm->GetNumGroups();
453
454
  SchedulePolicy strategy = HMLP_SCHEDULE_DEFAULT;
455
456
  switch ( strategy )
457
  {
458
    case HMLP_SCHEDULE_DEFAULT:
459
    {
460
      size_t gid_beg = beg + gid * nb;
461
      size_t gid_inc = nparts * nb;
462
      return make_tuple( gid_beg, end, gid_inc );
463
    }
464
    case HMLP_SCHEDULE_ROUND_ROBIN:
465
    {
466
      auto gid_beg = beg + gid * nb;
467
      auto gid_inc = nparts * nb;
468
      return make_tuple( gid_beg, end, gid_inc );
469
    }
470
    case HMLP_SCHEDULE_UNIFORM:
471
    {
472
      printf( "GetRange(): HMLP_SCHEDULE_UNIFORM not yet implemented yet.\n" );
473
      exit( 1 );
474
    }
475
    case HMLP_SCHEDULE_HEFT:
476
    {
477
      printf( "GetRange(): HMLP_SCHEDULE_HEFT not yet implemented yet.\n" );
478
      exit( 1 );
479
    }
480
    default:
481
    {
482
      exit( 1 );
483
    }
484
  }
485
};
486
487
488
489
490
tuple<size_t, size_t, size_t> Worker::DistributeOver1DThreads(
491
    size_t beg, size_t end, size_t nb )
492
{
493
  size_t nparts = comm->GetNumThreads();
494
495
  SchedulePolicy strategy = HMLP_SCHEDULE_DEFAULT;
496
497
  switch ( strategy )
498
  {
499
    case HMLP_SCHEDULE_DEFAULT:
500
    {
501
      size_t tid_beg = beg + tid * nb;
502
      size_t tid_inc = nparts * nb;
503
      return make_tuple( tid_beg, end, tid_inc );
504
    }
505
    case HMLP_SCHEDULE_ROUND_ROBIN:
506
    {
507
      auto tid_beg = beg + tid * nb;
508
      auto tid_inc = nparts * nb;
509
      return make_tuple( tid_beg, end, tid_inc );
510
    }
511
    case HMLP_SCHEDULE_UNIFORM:
512
    {
513
      printf( "GetRange(): HMLP_SCHEDULE_UNIFORM not yet implemented yet.\n" );
514
      exit( 1 );
515
    }
516
    case HMLP_SCHEDULE_HEFT:
517
    {
518
      printf( "GetRange(): HMLP_SCHEDULE_HEFT not yet implemented yet.\n" );
519
      exit( 1 );
520
    }
521
    default:
522
    {
523
      exit( 1 );
524
    }
525
  }
526
};
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
/** Assign a device to this worker. */
550
void Worker::SetDevice( class Device *device ) { this->device = device; };
551
552
553
/** Return the device pointer attached to the worker. */
554
class Device* Worker::GetDevice() { return device; };
555
556
557
//bool Worker::Execute( vector<Task*> & batch )
558
//{
559
//  /** Loop over each task in the batch. */
560
//  for ( auto task : batch )
561
//  {
562
//    assert( task->GetStatus() == RUNNING );
563
//    task->worker = this;
564
//    task->event.Begin( this->tid );
565
//    /** Notice that this may be an asynchronous execution. */
566
//    task->Execute( this );
567
//  }
568
//
569
//  /** Wait for all tasks in the batch to terminate. */
570
//  WaitExecute();
571
//
572
//  /** Loop over each task in the batch. */
573
//  for ( auto task : batch )
574
//  {
575
//    task->event.Terminate();
576
//    task->GetEventRecord();
577
//  }
578
//  return true;
579
//}; /** end Worker::Execute() */
580
581
582
583
/**
584
 *  @brief The work executes the task in the runtime system. I left some
585
 *         code commented out because there is no GPU support now.
586
 *         With GPUs (or some distributed devices), we need to first
587
 *         gather data before the execution.
588
 *
589
 *  @param *task The current task pointer.
590
 *
591
 */
592
bool Worker::Execute( Task *batch )
593
{
594
  current_task = batch;
595
  Task *task = batch;
596
597
  while ( task )
598
  {
599
    /** Some tasks may be in "EXECUTED" status. */
600
    if ( task->GetStatus() == RUNNING )
601
    {
602
      task->worker = this;
603
      task->event.Begin( this->tid );
604
      task->Execute( this );
605
    }
606
    /** Move to the next task in the batch */
607
    task = task->next;
608
  }
609
610
  /** Wait for all tasks in the batch to terminate. */
611
  WaitExecute();
612
613
  task = batch;
614
  while ( task )
615
  {
616
    task->event.Terminate();
617
    task->GetEventRecord();
618
    /** Move to the next task in the batch */
619
    task = task->next;
620
  }
621
622
  /** Set my current executing task to NULL. */
623
  current_task = NULL;
624
625
  return true;
626
}; /** end Worker::Execute() */
627
628
629
630
/**
631
 *  @brief Pose a barrier if the device owned by this worker
632
 *         is performing asybchronous execution.
633
 */
634
void Worker::WaitExecute() { if ( device ) device->waitexecute(); };
635
636
float Worker::EstimateCost( class Task *task ) { return task->cost; };
637
638
639
640
641
642
2
}; /** end namespace hmlp */