1 |
|
/** |
2 |
|
* HMLP (High-Performance Machine Learning Primitives) |
3 |
|
* |
4 |
|
* Copyright (C) 2014-2017, The University of Texas at Austin |
5 |
|
* |
6 |
|
* This program is free software: you can redistribute it and/or modify |
7 |
|
* it under the terms of the GNU General Public License as published by |
8 |
|
* the Free Software Foundation, either version 3 of the License, or |
9 |
|
* (at your option) any later version. |
10 |
|
* |
11 |
|
* This program is distributed in the hope that it will be useful, |
12 |
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 |
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 |
|
* GNU General Public License for more details. |
15 |
|
* |
16 |
|
* You should have received a copy of the GNU General Public License |
17 |
|
* along with this program. If not, see the LICENSE file. |
18 |
|
* |
19 |
|
**/ |
20 |
|
|
21 |
|
#include <base/runtime.hpp> |
22 |
|
|
23 |
|
#ifdef HMLP_USE_CUDA |
24 |
|
#include <hmlp_gpu.hpp> |
25 |
|
#endif |
26 |
|
|
27 |
|
#ifdef HMLP_USE_MAGMA |
28 |
|
#include <magma_v2.h> |
29 |
|
#include <magma_lapack.h> |
30 |
|
#endif |
31 |
|
|
32 |
|
#define REPORT_RUNTIME_STATUS 1 |
33 |
|
// #define DEBUG_RUNTIME 1 |
34 |
|
// #define DEBUG_SCHEDULER 1 |
35 |
|
|
36 |
|
using namespace std; |
37 |
|
using namespace hmlp; |
38 |
|
|
39 |
|
|
40 |
|
struct |
41 |
|
{ |
42 |
|
bool operator()( const tuple<bool, double, size_t> &a, |
43 |
|
const tuple<bool, double, size_t> &b ) |
44 |
|
{ |
45 |
|
return get<1>( a ) < get<1>( b ); |
46 |
|
} |
47 |
|
} EventLess; |
48 |
|
|
49 |
|
|
50 |
|
|
51 |
|
namespace hmlp |
52 |
|
{ |
53 |
|
|
54 |
|
/** IMPORTANT: we allocate a static runtime system per (MPI) process */ |
55 |
1 |
static RunTime rt; |
56 |
|
|
57 |
|
/** |
58 |
|
* class Lock |
59 |
|
*/ |
60 |
|
|
61 |
|
///** @brief Shared-memory lock that calls either pthread or omp mutex.. */ |
62 |
|
//Lock::Lock() |
63 |
|
//{ |
64 |
|
//#ifdef USE_PTHREAD_RUNTIME |
65 |
|
// if ( pthread_mutex_init( &lock, NULL ) ) |
66 |
|
// { |
67 |
|
// printf( "pthread_mutex_init(): cannot initialize locks properly\n" ); |
68 |
|
// } |
69 |
|
//#else |
70 |
|
// omp_init_lock( &lock ); |
71 |
|
//#endif |
72 |
|
//}; /** end Lock::Lock() */ |
73 |
|
// |
74 |
|
//Lock::~Lock() |
75 |
|
//{ |
76 |
|
//#ifdef USE_PTHREAD_RUNTIME |
77 |
|
// if ( pthread_mutex_destroy( &lock ) ) |
78 |
|
// { |
79 |
|
// printf( "pthread_mutex_destroy(): cannot destroy locks properly\n" ); |
80 |
|
// } |
81 |
|
//#else |
82 |
|
// omp_destroy_lock( &lock ); |
83 |
|
//#endif |
84 |
|
//}; /** end Lock::~Lock() */ |
85 |
|
// |
86 |
|
//void Lock::Acquire() |
87 |
|
//{ |
88 |
|
//#ifdef USE_PTHREAD_RUNTIME |
89 |
|
// if ( pthread_mutex_lock( &lock ) ) |
90 |
|
// { |
91 |
|
// printf( "pthread_mutex_lock(): cannot acquire locks properly\n" ); |
92 |
|
// } |
93 |
|
//#else |
94 |
|
// omp_set_lock( &lock ); |
95 |
|
//#endif |
96 |
|
//}; |
97 |
|
// |
98 |
|
//void Lock::Release() |
99 |
|
//{ |
100 |
|
//#ifdef USE_PTHREAD_RUNTIME |
101 |
|
// if ( pthread_mutex_unlock( &lock ) ) |
102 |
|
// { |
103 |
|
// printf( "pthread_mutex_lock(): cannot release locks properly\n" ); |
104 |
|
// } |
105 |
|
//#else |
106 |
|
// omp_unset_lock( &lock ); |
107 |
|
//#endif |
108 |
|
//}; |
109 |
|
// |
110 |
|
|
111 |
|
/** |
112 |
|
* class Event |
113 |
|
*/ |
114 |
|
|
115 |
|
/** @brief (Default) Event constructor. */ |
116 |
|
Event::Event() {}; |
117 |
|
|
118 |
|
/** @brief Set the label, flops, and mops. */ |
119 |
|
void Event::Set( string _label, double _flops, double _mops ) |
120 |
|
{ |
121 |
|
flops = _flops; |
122 |
|
mops = _mops; |
123 |
|
label = _label; |
124 |
|
}; |
125 |
|
|
126 |
|
void Event::AddFlopsMops( double _flops, double _mops ) |
127 |
|
{ |
128 |
|
flops += _flops; //Possible concurrent write |
129 |
|
mops += _mops; |
130 |
|
}; |
131 |
|
|
132 |
|
|
133 |
|
void Event::Begin( size_t _tid ) |
134 |
|
{ |
135 |
|
tid = _tid; |
136 |
|
beg = omp_get_wtime(); |
137 |
|
}; |
138 |
|
|
139 |
|
void Event::Normalize( double shift ) |
140 |
|
{ |
141 |
|
beg -= shift; |
142 |
|
end -= shift; |
143 |
|
}; |
144 |
|
|
145 |
|
void Event::Terminate() |
146 |
|
{ |
147 |
|
end = omp_get_wtime(); |
148 |
|
sec = end - beg; |
149 |
|
}; |
150 |
|
|
151 |
|
double Event::GetBegin() { return beg; }; |
152 |
|
|
153 |
|
double Event::GetEnd() { return end; }; |
154 |
|
|
155 |
|
double Event::GetDuration() { return sec; }; |
156 |
|
|
157 |
|
double Event::GetFlops() { return flops; }; |
158 |
|
|
159 |
|
double Event::GetMops() { return mops; }; |
160 |
|
|
161 |
|
double Event::GflopsPerSecond() { return ( flops / sec ) / 1E+9; }; |
162 |
|
|
163 |
|
|
164 |
|
void Event::Print() |
165 |
|
{ |
166 |
|
printf( "beg %5.3lf end %5.3lf sec %5.3lf flops %E mops %E\n", |
167 |
|
beg, end, sec, flops, mops ); |
168 |
|
}; |
169 |
|
|
170 |
|
void Event::Timeline( bool isbeg, size_t tag ) |
171 |
|
{ |
172 |
|
double gflops_peak = 45.0; |
173 |
|
double flops_efficiency = flops / ( gflops_peak * sec * 1E+9 ); |
174 |
|
if ( isbeg ) |
175 |
|
{ |
176 |
|
//printf( "@TIMELINE\n" ); |
177 |
|
//printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 0, beg, (double)tid + 0.0 ); |
178 |
|
//printf( "@TIMELINE\n" ); |
179 |
|
//printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 1, beg, (double)tid + flops_efficiency ); |
180 |
|
printf( "@TIMELINE\n" ); |
181 |
|
printf( "worker%lu, %lu, %E, %lf\n", tid, tag, beg, (double)tid + flops_efficiency ); |
182 |
|
} |
183 |
|
else |
184 |
|
{ |
185 |
|
//printf( "@TIMELINE\n" ); |
186 |
|
//printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 0, beg, (double)tid + flops_efficiency ); |
187 |
|
//printf( "@TIMELINE\n" ); |
188 |
|
//printf( "worker%lu, %lu, %E, %lf\n", tid, 2 * tag + 1, beg, (double)tid + 0.0 ); |
189 |
|
printf( "@TIMELINE\n" ); |
190 |
|
printf( "worker%lu, %lu, %E, %lf\n", tid, tag, end, (double)tid + 0.0 ); |
191 |
|
} |
192 |
|
|
193 |
|
}; |
194 |
|
|
195 |
|
void Event::MatlabTimeline( FILE *pFile ) |
196 |
|
{ |
197 |
|
/** TODO: this needs to change according to the arch */ |
198 |
|
double gflops_peak = 45.0; |
199 |
|
double flops_efficiency = 0.0; |
200 |
|
if ( sec * 1E+9 > 0.1 ) |
201 |
|
{ |
202 |
|
flops_efficiency = flops / ( gflops_peak * sec * 1E+9 ); |
203 |
|
if ( flops_efficiency > 1.0 ) flops_efficiency = 1.0; |
204 |
|
} |
205 |
|
fprintf( pFile, "rectangle('position',[%lf %lu %lf %d],'facecolor',[1.0,%lf,%lf]);\n", |
206 |
|
beg, tid, ( end - beg ), 1, |
207 |
|
flops_efficiency, flops_efficiency ); |
208 |
|
fprintf( pFile, "text( %lf,%lf,'%s');\n", beg, (double)tid + 0.5, label.data() ); |
209 |
|
}; |
210 |
|
|
211 |
|
|
212 |
|
|
213 |
|
|
214 |
|
/** |
215 |
|
* class Task |
216 |
|
*/ |
217 |
|
|
218 |
|
/** @brief (Default) Task constructor. */ |
219 |
|
Task::Task() |
220 |
|
{ |
221 |
|
/** Change status to allocated. */ |
222 |
|
SetStatus( ALLOCATED ); |
223 |
|
/** Whether this is a nested task? */ |
224 |
|
is_created_in_epoch_session = rt.IsInEpochSession(); |
225 |
|
/** Which thread creates me? */ |
226 |
|
created_by = omp_get_thread_num(); |
227 |
|
/** Move forward to next status "NOTREADY". */ |
228 |
|
SetStatus( NOTREADY ); |
229 |
|
}; /** end Task::Task() */ |
230 |
|
|
231 |
|
/** @brief (Default) Task destructor. */ |
232 |
|
Task::~Task() {}; |
233 |
|
|
234 |
|
/** @brief (Default) MessageTask constructor. */ |
235 |
|
MessageTask::MessageTask( int src, int tar, int key ) |
236 |
|
{ |
237 |
|
this->src = src; |
238 |
|
this->tar = tar; |
239 |
|
this->key = key; |
240 |
|
}; /** end MessageTask::MessageTask() */ |
241 |
|
|
242 |
|
/** @brief (Default) ListenerTask constructor. */ |
243 |
|
ListenerTask::ListenerTask( int src, int tar, int key ) : MessageTask( src, tar, key ) {}; |
244 |
|
|
245 |
|
/** @brief Status is a private member. */ |
246 |
|
TaskStatus Task::GetStatus() { return status; }; |
247 |
|
|
248 |
|
/** @brief Move foward to the next status. */ |
249 |
|
void Task::SetStatus( TaskStatus next_status ) { this->status = next_status; }; |
250 |
|
|
251 |
|
/** Change the status of all tasks in the batch. */ |
252 |
|
void Task::SetBatchStatus( TaskStatus next_status ) |
253 |
|
{ |
254 |
|
auto *task = this; |
255 |
|
while ( task ) |
256 |
|
{ |
257 |
|
task->status = next_status; |
258 |
|
/** Move to the next task in the batch */ |
259 |
|
task = task->next; |
260 |
|
} |
261 |
|
}; |
262 |
|
|
263 |
|
/** @brief Ask the runtime to create an normal task in file. */ |
264 |
|
void Task::Submit() { rt.scheduler->NewTask( this ); }; |
265 |
|
|
266 |
|
/** @brief Ask the runtime to create an message task in file. */ |
267 |
|
void MessageTask::Submit() { rt.scheduler->NewMessageTask( this ); }; |
268 |
|
|
269 |
|
/** @brief Ask the runtime to create a listener task in file. */ |
270 |
|
void ListenerTask::Submit() { rt.scheduler->NewListenerTask( this ); }; |
271 |
|
|
272 |
|
/** @brief This is only for virtual function pointer. */ |
273 |
|
void Task::Set( string user_name, void (*user_function)(Task*), void *user_arg ) |
274 |
|
{ |
275 |
|
name = user_name; |
276 |
|
function = user_function; |
277 |
|
arg = user_arg; |
278 |
|
status = NOTREADY; |
279 |
|
}; /** end Task::Set() */ |
280 |
|
|
281 |
|
/** @brief Update the my outgoing and children's incoming edges. */ |
282 |
|
void Task::DependenciesUpdate() |
283 |
|
{ |
284 |
|
/** Loop over each out-going edge. */ |
285 |
|
while ( out.size() ) |
286 |
|
{ |
287 |
|
Task *child = out.front(); |
288 |
|
/** There should be at least "one" remaining dependency to satisfy. */ |
289 |
|
assert( child->n_dependencies_remaining > 0 && child->GetStatus() == NOTREADY ); |
290 |
|
/** Acquire execlusive right to modify the task. */ |
291 |
|
//assert( child->task_lock ); |
292 |
|
child->Acquire(); |
293 |
|
{ |
294 |
|
child->n_dependencies_remaining --; |
295 |
|
/** If there is no dependency left, enqueue the task. */ |
296 |
|
if ( !child->n_dependencies_remaining ) |
297 |
|
{ |
298 |
|
/** Nested tasks may not carry the worker pointer. */ |
299 |
|
if ( worker ) child->Enqueue( worker->tid ); |
300 |
|
else child->Enqueue(); |
301 |
|
} |
302 |
|
} |
303 |
|
child->Release(); |
304 |
|
/** Remove this out-going edge. */ |
305 |
|
out.pop_front(); |
306 |
|
} |
307 |
|
/** Move forward to the last status "DONE". */ |
308 |
|
SetStatus( DONE ); |
309 |
|
}; /** end Task::DependenciesUpdate() */ |
310 |
|
|
311 |
|
|
312 |
|
void Task::Acquire() |
313 |
|
{ |
314 |
|
if ( !task_lock ) |
315 |
|
{ |
316 |
|
cout << name << " not submitted" << endl; |
317 |
|
assert( task_lock ); |
318 |
|
} |
319 |
|
task_lock->Acquire(); |
320 |
|
}; |
321 |
|
|
322 |
|
void Task::Release() |
323 |
|
{ |
324 |
|
if ( !task_lock ) |
325 |
|
{ |
326 |
|
cout << name << " not submitted" << endl; |
327 |
|
assert( task_lock ); |
328 |
|
} |
329 |
|
task_lock->Release(); |
330 |
|
}; |
331 |
|
|
332 |
|
|
333 |
|
/** All virtual functions. */ |
334 |
|
void Task::GetEventRecord() {}; |
335 |
|
void Task::Prefetch( Worker *user_worker ) {}; |
336 |
|
void Task::DependencyAnalysis() {}; |
337 |
|
|
338 |
|
/** @brief Try to dispatch the task if there is no dependency left. */ |
339 |
|
bool Task::TryEnqueue() |
340 |
|
{ |
341 |
|
if ( GetStatus() == NOTREADY && !n_dependencies_remaining ) |
342 |
|
{ |
343 |
|
Enqueue(); |
344 |
|
return true; |
345 |
|
} |
346 |
|
else return false; |
347 |
|
}; |
348 |
|
|
349 |
|
void Task::Enqueue() { Enqueue( 0 ); }; |
350 |
|
|
351 |
|
void Task::ForceEnqueue( size_t tid ) |
352 |
|
{ |
353 |
|
int assignment = tid; |
354 |
|
|
355 |
|
rt.scheduler->ready_queue_lock[ assignment ].Acquire(); |
356 |
|
{ |
357 |
|
float cost = rt.workers[ assignment ].EstimateCost( this ); |
358 |
|
/** Move forward to next status "QUEUED". */ |
359 |
|
SetStatus( QUEUED ); |
360 |
|
if ( priority ) |
361 |
|
rt.scheduler->ready_queue[ assignment ].push_front( this ); |
362 |
|
else |
363 |
|
rt.scheduler->ready_queue[ assignment ].push_back( this ); |
364 |
|
|
365 |
|
/** update the remaining time */ |
366 |
|
rt.scheduler->time_remaining[ assignment ] += cost; |
367 |
|
} |
368 |
|
rt.scheduler->ready_queue_lock[ assignment ].Release(); |
369 |
|
}; /** end Task::ForceEnqueue() */ |
370 |
|
|
371 |
|
|
372 |
|
/** @brief */ |
373 |
|
void Task::Enqueue( size_t tid ) |
374 |
|
{ |
375 |
|
float cost = 0.0; |
376 |
|
float earliest_t = -1.0; |
377 |
|
int assignment = -1; |
378 |
|
|
379 |
|
/** Dispatch to nested queue if created in the epoch session. */ |
380 |
|
if ( is_created_in_epoch_session ) |
381 |
|
{ |
382 |
|
assert( created_by < rt.n_worker ); |
383 |
|
rt.scheduler->nested_queue_lock[ created_by ].Acquire(); |
384 |
|
{ |
385 |
|
/** Move forward to next status "QUEUED". */ |
386 |
|
SetStatus( QUEUED ); |
387 |
|
if ( priority ) |
388 |
|
rt.scheduler->nested_queue[ created_by ].push_front( this ); |
389 |
|
else |
390 |
|
rt.scheduler->nested_queue[ created_by ].push_back( this ); |
391 |
|
} |
392 |
|
rt.scheduler->nested_queue_lock[ created_by ].Release(); |
393 |
|
/** Finish and return without further going down. */ |
394 |
|
return; |
395 |
|
}; |
396 |
|
|
397 |
|
/** Determine which worker the task should go to using HEFT policy. */ |
398 |
|
for ( int p = 0; p < rt.n_worker; p ++ ) |
399 |
|
{ |
400 |
|
int i = ( tid + p ) % rt.n_worker; |
401 |
|
float cost = rt.workers[ i ].EstimateCost( this ); |
402 |
|
float terminate_t = rt.scheduler->time_remaining[ i ]; |
403 |
|
if ( earliest_t == -1.0 || terminate_t + cost < earliest_t ) |
404 |
|
{ |
405 |
|
earliest_t = terminate_t + cost; |
406 |
|
assignment = i; |
407 |
|
} |
408 |
|
} |
409 |
|
|
410 |
|
/** Dispatch to normal ready queue. */ |
411 |
|
ForceEnqueue( assignment ); |
412 |
|
|
413 |
|
}; /** end Task::Enqueue() */ |
414 |
|
|
415 |
|
|
416 |
|
/** @brief This is the callback function for the owner of thenested task.*/ |
417 |
|
void Task::CallBackWhileWaiting() |
418 |
|
{ |
419 |
|
rt.ExecuteNestedTasksWhileWaiting( this ); |
420 |
|
}; /** end CallBackWhileWaiting() */ |
421 |
|
|
422 |
|
/** @brief */ |
423 |
|
bool Task::IsNested() { return is_created_in_epoch_session; }; |
424 |
|
|
425 |
|
|
426 |
|
|
427 |
|
|
428 |
|
|
429 |
|
|
430 |
|
|
431 |
|
|
432 |
|
/** @breief (Default) ReadWrite constructor. */ |
433 |
|
ReadWrite::ReadWrite() {}; |
434 |
|
|
435 |
|
/** Clean both read and write sets. */ |
436 |
|
void ReadWrite::DependencyCleanUp() |
437 |
|
{ |
438 |
|
read.clear(); |
439 |
|
write.clear(); |
440 |
|
}; /** end DependencyCleanUp() */ |
441 |
|
|
442 |
|
/** @brief This is the key function that encode the dependency. **/ |
443 |
|
void ReadWrite::DependencyAnalysis( ReadWriteType type, Task *task ) |
444 |
|
{ |
445 |
|
if ( type == R || type == RW ) |
446 |
|
{ |
447 |
|
/** Update the read set. */ |
448 |
|
read.push_back( task ); |
449 |
|
/** Read-After-Write (RAW) data dependencies. */ |
450 |
|
for ( auto it : write ) Scheduler::DependencyAdd( it, task ); |
451 |
|
} |
452 |
|
if ( type == W || type == RW ) |
453 |
|
{ |
454 |
|
/** Write-After-Read (WAR) anti-dependencies. */ |
455 |
|
for ( auto it : read ) Scheduler::DependencyAdd( it, task ); |
456 |
|
/** Clean up both read and write sets. */ |
457 |
|
DependencyCleanUp(); |
458 |
|
/** Update the write set. */ |
459 |
|
write.push_back( task ); |
460 |
|
} |
461 |
|
}; /** end ReadWrite::DependencyAnalysis() */ |
462 |
|
|
463 |
|
|
464 |
|
|
465 |
|
|
466 |
|
|
467 |
|
|
468 |
|
|
469 |
|
|
470 |
|
/** |
471 |
|
* class MatrixReadWrite |
472 |
|
*/ |
473 |
|
|
474 |
|
/** @brief (Default) MatrixReadWrite constructor. */ |
475 |
|
MatrixReadWrite::MatrixReadWrite() {}; |
476 |
|
|
477 |
|
/** @brief */ |
478 |
|
void MatrixReadWrite::Setup( size_t m, size_t n ) |
479 |
|
{ |
480 |
|
//printf( "%lu %lu setup\n", m, n ); |
481 |
|
this->has_been_setup = true; |
482 |
|
this->m = m; |
483 |
|
this->n = n; |
484 |
|
Submatrices.resize( m ); |
485 |
|
for ( size_t i = 0; i < m; i ++ ) Submatrices[ i ].resize( n ); |
486 |
|
}; /** end MatrixReadWrite::MatrixReadWrite() */ |
487 |
|
|
488 |
|
|
489 |
|
/** @brief */ |
490 |
|
bool MatrixReadWrite::HasBeenSetup() { return has_been_setup; }; |
491 |
|
|
492 |
|
/** @brief */ |
493 |
|
void MatrixReadWrite::DependencyAnalysis( |
494 |
|
size_t i, size_t j, ReadWriteType type, Task *task ) |
495 |
|
{ |
496 |
|
//printf( "%lu %lu analysis\n", i, j ); fflush( stdout ); |
497 |
|
assert( i < m && j < n ); |
498 |
|
Submatrices[ i ][ j ]. DependencyAnalysis( type, task ); |
499 |
|
}; /** end MatrixReadWrite::DependencyAnalysis() */ |
500 |
|
|
501 |
|
/** @brief */ |
502 |
|
void MatrixReadWrite::DependencyCleanUp() |
503 |
|
{ |
504 |
|
for ( size_t i = 0; i < m; i ++ ) |
505 |
|
for ( size_t j = 0; j < n; j ++ ) |
506 |
|
Submatrices[ i ][ j ]. DependencyCleanUp(); |
507 |
|
}; /** end MatrixReadWrite::DependencyCleanUp() */ |
508 |
|
|
509 |
|
|
510 |
|
|
511 |
|
|
512 |
|
|
513 |
|
/** |
514 |
|
* class Scheduler |
515 |
|
*/ |
516 |
|
|
517 |
|
/** @brief (Default) Scheduler constructor. */ |
518 |
1 |
Scheduler::Scheduler( mpi::Comm user_comm ) |
519 |
141 |
: mpi::MPIObject( user_comm ), timeline_tag( 500 ) |
520 |
|
{ |
521 |
|
#ifdef DEBUG_SCHEDULER |
522 |
|
printf( "Scheduler()\n" ); |
523 |
|
#endif |
524 |
1 |
listener_tasklist.resize( this->GetCommSize() ); |
525 |
|
/** Set now as the begining of the time table. */ |
526 |
1 |
timeline_beg = omp_get_wtime(); |
527 |
1 |
}; |
528 |
|
|
529 |
|
|
530 |
|
/** @brief */ |
531 |
|
Scheduler::~Scheduler() |
532 |
|
{ |
533 |
|
#ifdef DEBUG_SCHEDULER |
534 |
|
printf( "~Scheduler()\n" ); |
535 |
|
#endif |
536 |
|
}; |
537 |
|
|
538 |
|
|
539 |
|
/** @brief */ |
540 |
|
void Scheduler::Init( int user_n_worker ) |
541 |
|
{ |
542 |
|
#ifdef DEBUG_SCHEDULER |
543 |
|
printf( "Scheduler::Init()\n" ); |
544 |
|
#endif |
545 |
|
|
546 |
|
/** Adjust the number of active works. */ |
547 |
|
n_worker = user_n_worker; |
548 |
|
/** Reset normal and nested task counter. */ |
549 |
|
n_task_completed = 0; |
550 |
|
n_nested_task_completed = 0; |
551 |
|
/** Reset async distributed consensus variables. */ |
552 |
|
do_terminate = false; |
553 |
|
has_ibarrier = false; |
554 |
|
ibarrier_consensus = 0; |
555 |
|
|
556 |
|
#ifdef USE_PTHREAD_RUNTIME |
557 |
|
for ( int i = 0; i < n_worker; i ++ ) |
558 |
|
{ |
559 |
|
rt.workers[ i ].tid = i; |
560 |
|
rt.workers[ i ].scheduler = this; |
561 |
|
pthread_create |
562 |
|
( |
563 |
|
&(rt.workers[ i ].pthreadid), NULL, |
564 |
|
EntryPoint, (void*)&(rt.workers[ i ]) |
565 |
|
); |
566 |
|
} |
567 |
|
/** Now the master thread will enter the EntryPoint. */ |
568 |
|
EntryPoint( (void*)&(rt.workers[ 0 ]) ); |
569 |
|
#else |
570 |
|
#pragma omp parallel for num_threads( n_worker ) |
571 |
|
for ( int i = 0; i < n_worker; i ++ ) |
572 |
|
{ |
573 |
|
assert( omp_get_thread_num() == i ); |
574 |
|
rt.workers[ i ].tid = i; |
575 |
|
rt.workers[ i ].scheduler = this; |
576 |
|
EntryPoint( (void*)&(rt.workers[ i ]) ); |
577 |
|
} /** end pragma omp parallel for */ |
578 |
|
#endif |
579 |
|
}; /** end Scheduler::Init() */ |
580 |
|
|
581 |
|
|
582 |
|
/** @brief */ |
583 |
|
void Scheduler::MessageDependencyAnalysis( |
584 |
|
int key, int p, ReadWriteType type, Task *task ) |
585 |
|
{ |
586 |
|
if ( msg_dependencies.find( key ) == msg_dependencies.end() ) |
587 |
|
{ |
588 |
|
msg_dependencies[ key ] = vector<ReadWrite>( this->GetCommSize() ); |
589 |
|
} |
590 |
|
msg_dependencies[ key ][ p ].DependencyAnalysis( type, task ); |
591 |
|
}; /** end Scheduler::MessageDependencyAnalysis() */ |
592 |
|
|
593 |
|
|
594 |
|
/** @brief This function is called by RunTime::Submit() to record a new task. */ |
595 |
|
void Scheduler::NewTask( Task *task ) |
596 |
|
{ |
597 |
|
if ( !task ) return; |
598 |
|
/** Acquire the exclusive right to access the tasklist. */ |
599 |
|
tasklist_lock.Acquire(); |
600 |
|
{ |
601 |
|
if ( rt.IsInEpochSession() ) |
602 |
|
{ |
603 |
|
task->task_lock = &(task_lock[ nested_tasklist.size() % ( 2 * MAX_WORKER ) ]); |
604 |
|
nested_tasklist.push_back( task ); |
605 |
|
} |
606 |
|
else |
607 |
|
{ |
608 |
|
task->task_lock = &(task_lock[ tasklist.size() % ( 2 * MAX_WORKER ) ]); |
609 |
|
tasklist.push_back( task ); |
610 |
|
} |
611 |
|
} |
612 |
|
tasklist_lock.Release(); |
613 |
|
}; /** end Scheduler::NewTask() */ |
614 |
|
|
615 |
|
|
616 |
|
/** @brief */ |
617 |
|
void Scheduler::NewMessageTask( MessageTask *task ) |
618 |
|
{ |
619 |
|
tasklist_lock.Acquire(); |
620 |
|
{ |
621 |
|
/** We use a duplicated (private) communicator to handle message tasks. */ |
622 |
|
task->comm = this->GetPrivateComm(); |
623 |
|
task->task_lock = &(task_lock[ tasklist.size() % ( 2 * MAX_WORKER ) ]); |
624 |
|
/** Counted toward termination criteria. */ |
625 |
|
tasklist.push_back( task ); |
626 |
|
//printf( "NewMessageTask src %d tar %d key %d tasklist.size() %lu\n", |
627 |
|
// task->src, task->tar, task->key, tasklist.size() ); |
628 |
|
} |
629 |
|
tasklist_lock.Release(); |
630 |
|
}; |
631 |
|
|
632 |
|
|
633 |
|
/** @brief */ |
634 |
|
void Scheduler::NewListenerTask( ListenerTask *task ) |
635 |
|
{ |
636 |
|
tasklist_lock.Acquire(); |
637 |
|
{ |
638 |
|
/** We use a duplicated (private) communicator to handle message tasks. */ |
639 |
|
task->comm = this->GetPrivateComm(); |
640 |
|
task->task_lock = &(task_lock[ tasklist.size() % ( 2 * MAX_WORKER ) ]); |
641 |
|
listener_tasklist[ task->src ][ task->key ] = task; |
642 |
|
/** Counted toward termination criteria. */ |
643 |
|
tasklist.push_back( task ); |
644 |
|
//printf( "NewListenerTask src %d tar %d key %d listener_tasklist[].size() %lu\n", |
645 |
|
// task->src, task->tar, task->key, listener_tasklist[ task->src ].size() ); |
646 |
|
} |
647 |
|
tasklist_lock.Release(); |
648 |
|
}; /** ebd Scheduler::NewListenerTask() */ |
649 |
|
|
650 |
|
|
651 |
|
|
652 |
|
/** @brief */ |
653 |
|
void Scheduler::Finalize() |
654 |
|
{ |
655 |
|
#ifdef DEBUG_SCHEDULER |
656 |
|
printf( "Scheduler::Finalize()\n" ); |
657 |
|
#endif |
658 |
|
#ifdef USE_PTHREAD_RUNTIME |
659 |
|
for ( int i = 0; i < rt.n_worker; i ++ ) |
660 |
|
{ |
661 |
|
pthread_join( rt.workers[ i ].pthreadid, NULL ); |
662 |
|
} |
663 |
|
#else |
664 |
|
#endif |
665 |
|
|
666 |
|
/** Print out statistics of this epoch */ |
667 |
|
if ( REPORT_RUNTIME_STATUS ) Summary(); |
668 |
|
|
669 |
|
/** Reset remaining time. */ |
670 |
|
for ( int i = 0; i < n_worker; i ++ ) time_remaining[ i ] = 0.0; |
671 |
|
/** Free all normal tasks and reset tasklist. */ |
672 |
|
try |
673 |
|
{ |
674 |
|
for ( auto task : tasklist ) delete task; |
675 |
|
tasklist.clear(); |
676 |
|
} |
677 |
|
catch ( exception & e ) { cout << e.what() << endl; }; |
678 |
|
/** Free all nested tasks and reset nested_tasklist. */ |
679 |
|
try |
680 |
|
{ |
681 |
|
for ( auto task : nested_tasklist ) delete task; |
682 |
|
nested_tasklist.clear(); |
683 |
|
} |
684 |
|
catch ( exception & e ) { cout << e.what() << endl; }; |
685 |
|
//printf( "Begin Scheduler::Finalize() [cleanup listener_tasklist]\n" ); |
686 |
|
/** Reset listener_tasklist */ |
687 |
|
try |
688 |
|
{ |
689 |
|
for ( auto & plist : listener_tasklist ) plist.clear(); |
690 |
|
} |
691 |
|
catch ( exception & e ) { cout << e.what() << endl; }; |
692 |
|
//printf( "End Scheduler::Finalize() [cleanup listener_tasklist]\n" ); |
693 |
|
|
694 |
|
/** Clean up all message dependencies. */ |
695 |
|
msg_dependencies.clear(); |
696 |
|
|
697 |
|
}; /** end Scheduler::Finalize() */ |
698 |
|
|
699 |
|
|
700 |
|
/** @brief */ |
701 |
|
void Scheduler::ReportRemainingTime() |
702 |
|
{ |
703 |
|
printf( "ReportRemainingTime:" ); fflush( stdout ); |
704 |
|
printf( "--------------------\n" ); fflush( stdout ); |
705 |
|
for ( int i = 0; i < rt.n_worker; i ++ ) |
706 |
|
{ |
707 |
|
printf( "worker %2d --> %7.2lf (%4lu jobs)\n", |
708 |
|
i, rt.scheduler->time_remaining[ i ], |
709 |
|
rt.scheduler->ready_queue[ i ].size() ); fflush( stdout ); |
710 |
|
} |
711 |
|
printf( "--------------------\n" ); fflush( stdout ); |
712 |
|
}; /** end Scheduler::ReportRemainingTime() */ |
713 |
|
|
714 |
|
|
715 |
|
/** @brief Add an direct edge (dependency) from source to target. */ |
716 |
|
void Scheduler::DependencyAdd( Task *source, Task *target ) |
717 |
|
{ |
718 |
|
/** Avoid self-loop. */ |
719 |
|
if ( source == target ) return; |
720 |
|
/** Update the source out-going edges. */ |
721 |
|
source->Acquire(); |
722 |
|
{ |
723 |
|
source->out.push_back( target ); |
724 |
|
} |
725 |
|
source->Release(); |
726 |
|
/** Update the target incoming edges. */ |
727 |
|
target->Acquire(); |
728 |
|
{ |
729 |
|
target->in.push_back( source ); |
730 |
|
/** Only increase the dependency count for incompleted tasks. */ |
731 |
|
if ( source->GetStatus() != DONE ) target->n_dependencies_remaining ++; |
732 |
|
} |
733 |
|
target->Release(); |
734 |
|
}; /** end Scheduler::DependencyAdd() */ |
735 |
|
|
736 |
|
|
737 |
|
Task *Scheduler::StealFromQueue( size_t target ) |
738 |
|
{ |
739 |
|
Task *target_task = NULL; |
740 |
|
|
741 |
|
/** get the lock of the target ready queue */ |
742 |
|
ready_queue_lock[ target ].Acquire(); |
743 |
|
{ |
744 |
|
if ( ready_queue[ target ].size() ) |
745 |
|
{ |
746 |
|
target_task = ready_queue[ target ].back(); |
747 |
|
assert( target_task ); |
748 |
|
if ( target_task->stealable ) |
749 |
|
{ |
750 |
|
ready_queue[ target ].pop_back(); |
751 |
|
time_remaining[ target ] -= target_task->cost; |
752 |
|
} |
753 |
|
else target_task = NULL; |
754 |
|
} |
755 |
|
} |
756 |
|
ready_queue_lock[ target ].Release(); |
757 |
|
|
758 |
|
return target_task; |
759 |
|
}; /** end Scheduler::TryStealFromQueue() */ |
760 |
|
|
761 |
|
|
762 |
|
/** @brief */ |
763 |
|
vector<Task*> Scheduler::StealFromOther() |
764 |
|
{ |
765 |
|
int max_remaining_tasks = 0; |
766 |
|
int max_remaining_nested_tasks = 0; |
767 |
|
int target = 0; |
768 |
|
/** Decide which target's normal queue to steal. */ |
769 |
|
for ( int p = 0; p < n_worker; p ++ ) |
770 |
|
{ |
771 |
|
if ( ready_queue[ p ].size() > max_remaining_tasks ) |
772 |
|
{ |
773 |
|
max_remaining_tasks = ready_queue[ p ].size(); |
774 |
|
target = p; |
775 |
|
} |
776 |
|
} |
777 |
|
/** Try to steal from target's ready queue. */ |
778 |
|
auto batch = DispatchFromNormalQueue( target ); |
779 |
|
/** Return if batch is not empty. */ |
780 |
|
if ( batch.size() ) return batch; |
781 |
|
/** Decide which target's nested queue to steal. */ |
782 |
|
for ( int p = 0; p < n_worker; p ++ ) |
783 |
|
{ |
784 |
|
if ( nested_queue[ p ].size() > max_remaining_nested_tasks ) |
785 |
|
{ |
786 |
|
max_remaining_nested_tasks = nested_queue[ p ].size(); |
787 |
|
target = p; |
788 |
|
} |
789 |
|
} |
790 |
|
/** Try to steal from target's nested queue. */ |
791 |
|
batch = DispatchFromNestedQueue( target ); |
792 |
|
/** Return regardless if batch is empty or not. */ |
793 |
|
return batch; |
794 |
|
}; /** end Scheduler::StealFromOther() */ |
795 |
|
|
796 |
|
|
797 |
|
|
798 |
|
/** @brief Dispatch a nested task from tid's nested_queue. */ |
799 |
|
vector<Task*> Scheduler::DispatchFromNestedQueue( int tid ) |
800 |
|
{ |
801 |
|
vector<Task*> batch; |
802 |
|
/** Dispatch a nested task from tid's nested_queue. */ |
803 |
|
nested_queue_lock[ tid ].Acquire(); |
804 |
|
{ |
805 |
|
if ( nested_queue[ tid ].size() ) |
806 |
|
{ |
807 |
|
auto *target_task = nested_queue[ tid ].front(); |
808 |
|
/** Check if I can dispatch this task? */ |
809 |
|
if ( tid == omp_get_thread_num() || target_task->stealable ) |
810 |
|
{ |
811 |
|
/** Fetch the first task in the nested queue. */ |
812 |
|
batch.push_back( target_task ); |
813 |
|
/** Remove the task from the nested queue. */ |
814 |
|
nested_queue[ tid ].pop_front(); |
815 |
|
} |
816 |
|
} |
817 |
|
} |
818 |
|
nested_queue_lock[ tid ].Release(); |
819 |
|
/** Notice that this can be an empty vector. */ |
820 |
|
return batch; |
821 |
|
}; /** end Scheduler::DispatchFromNestedQueue() */ |
822 |
|
|
823 |
|
|
824 |
|
/** @brief */ |
825 |
|
bool Scheduler::IsTimeToExit( int tid ) |
826 |
|
{ |
827 |
|
/** In the case that do_terminate has been set, return "true". */ |
828 |
|
if ( do_terminate ) return true; |
829 |
|
/** Both normal and nested tasks should all be executed. */ |
830 |
|
if ( n_task_completed >= tasklist.size() ) |
831 |
|
{ |
832 |
|
if ( n_nested_task_completed >= nested_tasklist.size() ) |
833 |
|
{ |
834 |
|
/** My ready_queue and nested_queue should all be empty. */ |
835 |
|
assert( !ready_queue[ tid ].size() ); |
836 |
|
assert( !nested_queue[ tid ].size() ); |
837 |
|
/** Set the termination flag to true. */ |
838 |
|
do_terminate = true; |
839 |
|
/** Now there should be no tasks left locally. Return "true". */ |
840 |
|
return true; |
841 |
|
} |
842 |
|
else printf( "normal %d/%lu nested %d/%lu\n", |
843 |
|
n_task_completed, tasklist.size(), |
844 |
|
n_nested_task_completed, nested_tasklist.size() ); |
845 |
|
} |
846 |
|
/** Otherwise, it is not yet to terminate. */ |
847 |
|
return false; |
848 |
|
}; /** end Scheduler::IsTimeToExit() */ |
849 |
|
|
850 |
|
|
851 |
|
/** @brief */ |
852 |
|
vector<Task*> Scheduler::DispatchFromNormalQueue( int tid ) |
853 |
|
{ |
854 |
|
size_t maximum_batch_size = 1; |
855 |
|
vector<Task*> batch; |
856 |
|
/** Dispatch normal tasks from tid's ready queue. */ |
857 |
|
ready_queue_lock[ tid ].Acquire(); |
858 |
|
{ |
859 |
|
for ( int it = 0; it < maximum_batch_size; it ++ ) |
860 |
|
{ |
861 |
|
if ( ready_queue[ tid ].size() ) |
862 |
|
{ |
863 |
|
auto *target_task = ready_queue[ tid ].front(); |
864 |
|
/** The target ready_queue is not my queue. */ |
865 |
|
if ( tid != omp_get_thread_num() ) |
866 |
|
{ |
867 |
|
maximum_batch_size = 1; |
868 |
|
/** If this task cannot be stole, then break. */ |
869 |
|
if ( !target_task->stealable ) break; |
870 |
|
else time_remaining[ tid ] -= target_task->cost; |
871 |
|
} |
872 |
|
/** Dequeue a task and push into this batch. */ |
873 |
|
batch.push_back( target_task ); |
874 |
|
ready_queue[ tid ].pop_front(); |
875 |
|
} |
876 |
|
else |
877 |
|
{ |
878 |
|
/** Reset my workload counter. */ |
879 |
|
time_remaining[ tid ] = 0.0; |
880 |
|
} |
881 |
|
} |
882 |
|
} |
883 |
|
ready_queue_lock[ tid ].Release(); |
884 |
|
/** Notice that this can be an empty vector. */ |
885 |
|
return batch; |
886 |
|
}; /** end Scheduler::DispatchFromNormalQueue() */ |
887 |
|
|
888 |
|
|
889 |
|
/** @brief */ |
890 |
|
bool Scheduler::ConsumeTasks( Worker *me, vector<Task*> &batch ) |
891 |
|
{ |
892 |
|
/** Early return if there is no task to execute. */ |
893 |
|
if ( !batch.size() ) return false; |
894 |
|
/** For each task, move forward to the next status "RUNNING". */ |
895 |
|
for ( auto task : batch ) task->SetStatus( RUNNING ); |
896 |
|
/** Now the worker will execute all tasks in the batch. */ |
897 |
|
for ( auto task : batch ) me->Execute( task ); |
898 |
|
/** For each task, update dependencies and my remining time. */ |
899 |
|
for ( auto task : batch ) |
900 |
|
{ |
901 |
|
task->DependenciesUpdate(); |
902 |
|
/** Update my remaining time and n_task_completed. */ |
903 |
|
if ( !task->IsNested() ) |
904 |
|
{ |
905 |
|
ready_queue_lock[ me->tid ].Acquire(); |
906 |
|
{ |
907 |
|
time_remaining[ me->tid ] -= task->cost; |
908 |
|
if ( time_remaining[ me->tid ] < 0.0 ) |
909 |
|
time_remaining[ me->tid ] = 0.0; |
910 |
|
} |
911 |
|
ready_queue_lock[ me->tid ].Release(); |
912 |
|
n_task_lock.Acquire(); |
913 |
|
{ |
914 |
|
n_task_completed ++; |
915 |
|
} |
916 |
|
n_task_lock.Release(); |
917 |
|
} |
918 |
|
else |
919 |
|
{ |
920 |
|
n_task_lock.Acquire(); |
921 |
|
{ |
922 |
|
n_nested_task_completed ++; |
923 |
|
} |
924 |
|
n_task_lock.Release(); |
925 |
|
} |
926 |
|
} |
927 |
|
/** Return "true" if at least one task was executed. */ |
928 |
|
return true; |
929 |
|
}; /** end Scheduler::ConsumeTasks() */ |
930 |
|
|
931 |
|
|
932 |
|
/** @brief */ |
933 |
|
void Scheduler::ExecuteNestedTasksWhileWaiting( Worker *me, Task *waiting_task ) |
934 |
|
{ |
935 |
|
assert( me->tid == omp_get_thread_num() ); |
936 |
|
while ( waiting_task->GetStatus() != DONE ) |
937 |
|
{ |
938 |
|
/** Try to get a nested task. */ |
939 |
|
auto nested_batch = DispatchFromNestedQueue( me->tid ); |
940 |
|
ConsumeTasks( me, nested_batch ); |
941 |
|
} |
942 |
|
}; /** end ExcuteNestedTasksWhileWaiting() */ |
943 |
|
|
944 |
|
|
945 |
|
/** @brief */ |
946 |
|
bool Scheduler::ConsumeTasks( Worker *me, Task *batch, bool is_nested ) |
947 |
|
{ |
948 |
|
/** Early return */ |
949 |
|
if ( !batch ) return false; |
950 |
|
|
951 |
|
/** Update status */ |
952 |
|
batch->SetBatchStatus( RUNNING ); |
953 |
|
|
954 |
|
if ( me->Execute( batch ) ) |
955 |
|
{ |
956 |
|
Task *task = batch; |
957 |
|
while ( task ) |
958 |
|
{ |
959 |
|
task->DependenciesUpdate(); |
960 |
|
if ( !is_nested ) |
961 |
|
{ |
962 |
|
ready_queue_lock[ me->tid ].Acquire(); |
963 |
|
{ |
964 |
|
time_remaining[ me->tid ] -= task->cost; |
965 |
|
if ( time_remaining[ me->tid ] < 0.0 ) |
966 |
|
time_remaining[ me->tid ] = 0.0; |
967 |
|
} |
968 |
|
ready_queue_lock[ me->tid ].Release(); |
969 |
|
n_task_lock.Acquire(); |
970 |
|
{ |
971 |
|
n_task_completed ++; |
972 |
|
} |
973 |
|
n_task_lock.Release(); |
974 |
|
} |
975 |
|
else |
976 |
|
{ |
977 |
|
n_task_lock.Acquire(); |
978 |
|
{ |
979 |
|
n_nested_task_completed ++; |
980 |
|
} |
981 |
|
n_task_lock.Release(); |
982 |
|
} |
983 |
|
/** Move to the next task in te batch */ |
984 |
|
task = task->next; |
985 |
|
} |
986 |
|
} |
987 |
|
return true; |
988 |
|
}; /** end Scheduler::ConsumeTasks() */ |
989 |
|
|
990 |
|
|
991 |
|
|
992 |
|
|
993 |
|
|
994 |
|
|
995 |
|
|
996 |
|
|
997 |
|
|
998 |
|
|
999 |
|
|
1000 |
|
|
1001 |
|
|
1002 |
|
/** @brief This is the main body that each worker will go through. */ |
1003 |
|
void* Scheduler::EntryPoint( void* arg ) |
1004 |
|
{ |
1005 |
|
/** First cast arg back to Worker*. */ |
1006 |
|
Worker *me = reinterpret_cast<Worker*>( arg ); |
1007 |
|
/** Get the callback point of scheduler. */ |
1008 |
|
Scheduler *scheduler = me->scheduler; |
1009 |
|
/** This counter measures the idle iteration. */ |
1010 |
|
size_t idle = 0; |
1011 |
|
|
1012 |
|
#ifdef DEBUG_SCHEDULER |
1013 |
|
printf( "Scheduler::EntryPoint()\n" ); |
1014 |
|
printf( "pthreadid %d\n", me->tid ); |
1015 |
|
#endif |
1016 |
|
|
1017 |
|
/** Prepare listeners (half of total workers). */ |
1018 |
|
if ( ( me->tid % 2 ) && true ) |
1019 |
|
{ |
1020 |
|
/** Update my termination time to infinite. */ |
1021 |
|
scheduler->ready_queue_lock[ me->tid ].Acquire(); |
1022 |
|
{ |
1023 |
|
scheduler->time_remaining[ me->tid ] = numeric_limits<float>::max(); |
1024 |
|
} |
1025 |
|
scheduler->ready_queue_lock[ me->tid ].Release(); |
1026 |
|
/** Enter listening mode. */ |
1027 |
|
scheduler->Listen( me ); |
1028 |
|
} |
1029 |
|
/** Start to consume all tasks in this epoch session. */ |
1030 |
|
while ( 1 ) |
1031 |
|
{ |
1032 |
|
/** Try to get a normal task from my own ready queue. */ |
1033 |
|
auto normal_batch = scheduler->DispatchFromNormalQueue( me->tid ); |
1034 |
|
/** If there is some jobs to execute, then reset the counter. */ |
1035 |
|
if ( scheduler->ConsumeTasks( me, normal_batch ) ) |
1036 |
|
{ |
1037 |
|
/** Reset the idle counter. */ |
1038 |
|
idle = 0; |
1039 |
|
} |
1040 |
|
else /** No task in my ready_queue. Try nested_queue. */ |
1041 |
|
{ |
1042 |
|
/** Increase the idle counter. */ |
1043 |
|
idle ++; |
1044 |
|
/** Try to get a nested task. */ |
1045 |
|
auto nested_batch = scheduler->DispatchFromNestedQueue( me->tid ); |
1046 |
|
/** Reset the idle counter if there is executable nested tasks. */ |
1047 |
|
if ( scheduler->ConsumeTasks( me, nested_batch ) ) idle = 0; |
1048 |
|
} |
1049 |
|
/** Try to steal from others. */ |
1050 |
|
if ( idle > 10 ) |
1051 |
|
{ |
1052 |
|
/** Try to steal a (normal or nested) task. */ |
1053 |
|
auto stolen_batch = scheduler->StealFromOther(); |
1054 |
|
/** Reset the idle counter if there is executable stolen tasks. */ |
1055 |
|
if ( scheduler->ConsumeTasks( me, stolen_batch ) ) idle = 0; |
1056 |
|
} /** end if ( idle > 10 ) */ |
1057 |
|
|
1058 |
|
/** Check if is time to terminate. */ |
1059 |
|
if ( scheduler->IsTimeToExit( me->tid ) ) break; |
1060 |
|
} |
1061 |
|
/** Return "NULL". */ |
1062 |
|
return NULL; |
1063 |
|
}; /** end Scheduler::EntryPoint() */ |
1064 |
|
|
1065 |
|
|
1066 |
|
/** @brief Listen for asynchronous incoming MPI messages. */ |
1067 |
|
void Scheduler::Listen( Worker *me ) |
1068 |
|
{ |
1069 |
|
/** We use a duplicated (private) communicator to handle message tasks. */ |
1070 |
|
auto comm = this->GetPrivateComm(); |
1071 |
|
auto size = this->GetCommSize(); |
1072 |
|
auto rank = this->GetCommRank(); |
1073 |
|
|
1074 |
|
/** Iprobe flag and recv status */ |
1075 |
|
int probe_flag = 0; |
1076 |
|
mpi::Status status; |
1077 |
|
|
1078 |
|
/** Keep probing for incoming messages. */ |
1079 |
|
while ( 1 ) |
1080 |
|
{ |
1081 |
|
ListenerTask *task = NULL; |
1082 |
|
/** Only one thread will probe and recv message at a time. */ |
1083 |
|
#pragma omp critical |
1084 |
|
{ |
1085 |
|
/** Asynchronously probe form incoming messages. */ |
1086 |
|
mpi::Iprobe( MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &probe_flag, &status ); |
1087 |
|
/** If receive any message, then handle it. */ |
1088 |
|
if ( probe_flag ) |
1089 |
|
{ |
1090 |
|
/** Info from mpi::Status */ |
1091 |
|
int recv_src = status.MPI_SOURCE; |
1092 |
|
int recv_tag = status.MPI_TAG; |
1093 |
|
int recv_key = 3 * ( recv_tag / 3 ); |
1094 |
|
|
1095 |
|
if ( recv_key < 300 ) |
1096 |
|
{ |
1097 |
|
printf( "rank %d Iprobe src %d tag %d\n", rank, recv_src, recv_tag ); |
1098 |
|
fflush( stdout ); |
1099 |
|
} |
1100 |
|
|
1101 |
|
/** Check if there is a corresponding task. */ |
1102 |
|
auto it = listener_tasklist[ recv_src ].find( recv_key ); |
1103 |
|
if ( it != listener_tasklist[ recv_src ].end() ) |
1104 |
|
{ |
1105 |
|
task = it->second; |
1106 |
|
if ( task->GetStatus() == NOTREADY ) |
1107 |
|
{ |
1108 |
|
//printf( "rank %d Find a task src %d tag %d\n", rank, recv_src, recv_tag ); |
1109 |
|
//fflush( stdout ); |
1110 |
|
task->SetStatus( QUEUED ); |
1111 |
|
task->Listen(); |
1112 |
|
} |
1113 |
|
else |
1114 |
|
{ |
1115 |
|
printf( "rank %d Find a QUEUED task src %d tag %d\n", rank, recv_src, recv_tag ); |
1116 |
|
fflush( stdout ); |
1117 |
|
task = NULL; |
1118 |
|
probe_flag = false; |
1119 |
|
} |
1120 |
|
} |
1121 |
|
else |
1122 |
|
{ |
1123 |
|
//printf( "rank %d not match task src %d tag %d\n", rank, recv_src, recv_tag ); |
1124 |
|
//fflush( stdout ); |
1125 |
|
probe_flag = false; |
1126 |
|
} |
1127 |
|
} |
1128 |
|
} |
1129 |
|
|
1130 |
|
if ( probe_flag ) |
1131 |
|
{ |
1132 |
|
/** Execute tasks and update dependencies */ |
1133 |
|
ConsumeTasks( me, task, false ); |
1134 |
|
} |
1135 |
|
else |
1136 |
|
{ |
1137 |
|
/** Steal a (normal or nested) task from other. */ |
1138 |
|
auto stolen_batch = StealFromOther(); |
1139 |
|
ConsumeTasks( me, stolen_batch ); |
1140 |
|
} |
1141 |
|
|
1142 |
|
/** Nonblocking consensus for termination. */ |
1143 |
|
if ( do_terminate ) |
1144 |
|
{ |
1145 |
|
/** We use an ibarrier to make sure global concensus. */ |
1146 |
|
#pragma omp critical |
1147 |
|
{ |
1148 |
|
/** Only the first worker will issue an Ibarrier. */ |
1149 |
|
if ( !has_ibarrier ) |
1150 |
|
{ |
1151 |
|
mpi::Ibarrier( comm, &ibarrier_request ); |
1152 |
|
has_ibarrier = true; |
1153 |
|
} |
1154 |
|
/** Test global consensus on "terminate_request". */ |
1155 |
|
if ( !ibarrier_consensus ) |
1156 |
|
{ |
1157 |
|
mpi::Test( &ibarrier_request, &ibarrier_consensus, |
1158 |
|
MPI_STATUS_IGNORE ); |
1159 |
|
} |
1160 |
|
} |
1161 |
|
/** If terminate_request has been tested = true, then exit! */ |
1162 |
|
if ( ibarrier_consensus ) break; |
1163 |
|
} |
1164 |
|
} |
1165 |
|
}; /** end Scheduler::Listen() */ |
1166 |
|
|
1167 |
|
|
1168 |
|
/** @brief */ |
1169 |
|
void Scheduler::Summary() |
1170 |
|
{ |
1171 |
|
int total_normal_tasks = tasklist.size(); |
1172 |
|
int total_nested_tasks = nested_tasklist.size(); |
1173 |
|
int total_listen_tasks = 0; |
1174 |
|
for ( auto list : listener_tasklist ) total_listen_tasks += list.size(); |
1175 |
|
double total_flops = 0.0, total_mops = 0.0; |
1176 |
|
time_t rawtime; |
1177 |
|
struct tm * timeinfo; |
1178 |
|
char buffer[ 80 ]; |
1179 |
|
|
1180 |
|
time( &rawtime ); |
1181 |
|
timeinfo = localtime( &rawtime ); |
1182 |
|
strftime( buffer, 80, "%T.", timeinfo ); |
1183 |
|
|
1184 |
|
//printf( "%s\n", buffer ); |
1185 |
|
|
1186 |
|
for ( size_t i = 0; i < tasklist.size(); i ++ ) |
1187 |
|
{ |
1188 |
|
total_flops += tasklist[ i ]->event.GetFlops(); |
1189 |
|
total_mops += tasklist[ i ]->event.GetMops(); |
1190 |
|
} |
1191 |
|
|
1192 |
|
#ifdef HMLP_USE_MPI |
1193 |
|
/** In the MPI environment, reduce all flops and mops. */ |
1194 |
|
int global_normal_tasks = 0, global_listen_tasks = 0, global_nested_tasks = 0; |
1195 |
|
double global_flops = 0.0, global_mops = 0.0; |
1196 |
|
mpi::Reduce( &total_flops, &global_flops, 1, MPI_SUM, 0, this->GetPrivateComm() ); |
1197 |
|
mpi::Reduce( &total_mops, &global_mops, 1, MPI_SUM, 0, this->GetPrivateComm() ); |
1198 |
|
mpi::Reduce( &total_normal_tasks, &global_normal_tasks, 1, MPI_SUM, 0, this->GetPrivateComm() ); |
1199 |
|
mpi::Reduce( &total_listen_tasks, &global_listen_tasks, 1, MPI_SUM, 0, this->GetPrivateComm() ); |
1200 |
|
mpi::Reduce( &total_nested_tasks, &global_nested_tasks, 1, MPI_SUM, 0, this->GetPrivateComm() ); |
1201 |
|
if ( this->GetCommRank() == 0 ) |
1202 |
|
{ |
1203 |
|
printf( "[ RT] %5d [normal] %5d [listen] %5d [nested] %5.3E flops %5.3E mops\n", |
1204 |
|
global_normal_tasks, global_listen_tasks, global_nested_tasks, global_flops, global_mops ); |
1205 |
|
} |
1206 |
|
#else |
1207 |
|
printf( "[ RT] %5d [normal] %5d [nested] %5.3E flops %5.3E mops\n", |
1208 |
|
total_normal_tasks, total_nested_tasks, total_flops, total_mops ); |
1209 |
|
#endif |
1210 |
|
|
1211 |
|
|
1212 |
|
#ifdef DUMP_ANALYSIS_DATA |
1213 |
|
deque<tuple<bool, double, size_t>> timeline; |
1214 |
|
|
1215 |
|
if ( tasklist.size() ) |
1216 |
|
{ |
1217 |
|
string filename = string( "timeline" ) + |
1218 |
|
to_string( tasklist.size() ) + string( "_rank") + |
1219 |
|
to_string( rank) + string( ".m" ); |
1220 |
|
FILE *pFile = fopen( filename.data(), "w" ); |
1221 |
|
|
1222 |
|
fprintf( pFile, "figure('Position',[100,100,800,800]);" ); |
1223 |
|
fprintf( pFile, "hold on;" ); |
1224 |
|
|
1225 |
|
for ( size_t i = 0; i < tasklist.size(); i ++ ) |
1226 |
|
{ |
1227 |
|
tasklist[ i ]->event.Normalize( timeline_beg ); |
1228 |
|
} |
1229 |
|
|
1230 |
|
for ( size_t i = 0; i < tasklist.size(); i ++ ) |
1231 |
|
{ |
1232 |
|
auto &event = tasklist[ i ]->event; |
1233 |
|
timeline.push_back( make_tuple( true, event.GetBegin(), i ) ); |
1234 |
|
timeline.push_back( make_tuple( false, event.GetEnd(), i ) ); |
1235 |
|
} |
1236 |
|
|
1237 |
|
sort( timeline.begin(), timeline.end(), EventLess ); |
1238 |
|
|
1239 |
|
for ( size_t i = 0; i < timeline.size(); i ++ ) |
1240 |
|
{ |
1241 |
|
auto &data = timeline[ i ]; |
1242 |
|
auto &event = tasklist[ get<2>( data ) ]->event; |
1243 |
|
//event.Timeline( get<0>( data ), i + timeline_tag ); |
1244 |
|
event.MatlabTimeline( pFile ); |
1245 |
|
} |
1246 |
|
|
1247 |
|
fclose( pFile ); |
1248 |
|
|
1249 |
|
timeline_tag += timeline.size(); |
1250 |
|
} |
1251 |
|
#endif |
1252 |
|
|
1253 |
|
}; /** end Scheduler::Summary() */ |
1254 |
|
|
1255 |
|
|
1256 |
|
|
1257 |
|
|
1258 |
|
|
1259 |
|
/** |
1260 |
|
* class RunTime |
1261 |
|
*/ |
1262 |
|
|
1263 |
|
/** @brief */ |
1264 |
1 |
RunTime::RunTime() {}; |
1265 |
|
|
1266 |
|
/** @brief */ |
1267 |
2 |
RunTime::~RunTime() {}; |
1268 |
|
|
1269 |
|
/** @brief */ |
1270 |
1 |
hmlpError_t RunTime::Init( mpi::Comm comm = MPI_COMM_WORLD ) |
1271 |
|
{ |
1272 |
2 |
#pragma omp critical (init) |
1273 |
|
{ |
1274 |
1 |
if ( !is_init ) |
1275 |
|
{ |
1276 |
1 |
n_worker = omp_get_max_threads(); |
1277 |
1 |
n_max_worker = n_worker; |
1278 |
|
/** Check whether MPI has been initialized? */ |
1279 |
1 |
int is_mpi_init = false; |
1280 |
1 |
mpi::Initialized( &is_mpi_init ); |
1281 |
|
|
1282 |
1 |
scheduler = new Scheduler( comm ); |
1283 |
|
|
1284 |
|
#ifdef HMLP_USE_CUDA |
1285 |
|
/** TODO: detect devices */ |
1286 |
|
device[ 0 ] = new hmlp::gpu::Nvidia( 0 ); |
1287 |
|
if ( n_worker ) |
1288 |
|
{ |
1289 |
|
workers[ 0 ].SetDevice( device[ 0 ] ); |
1290 |
|
} |
1291 |
|
#endif |
1292 |
|
#ifdef HMLP_USE_MAGMA |
1293 |
|
magma_init(); |
1294 |
|
#endif |
1295 |
|
/* Set the flag such that this is only executed once. */ |
1296 |
1 |
is_init = true; |
1297 |
|
} |
1298 |
|
} /* end pragma omp critical */ |
1299 |
|
|
1300 |
|
/* Return error if the scheduler was failed in allocation. */ |
1301 |
1 |
if ( !scheduler ) return HMLP_ERROR_ALLOC_FAILED; |
1302 |
|
/* Return without error. */ |
1303 |
1 |
return HMLP_ERROR_SUCCESS; |
1304 |
|
}; /* end RunTime::Init() */ |
1305 |
|
|
1306 |
|
|
1307 |
|
/** @brief */ |
1308 |
|
hmlpError_t RunTime::Init( int* argc, char*** argv, mpi::Comm comm = MPI_COMM_WORLD ) |
1309 |
|
{ |
1310 |
|
#pragma omp critical |
1311 |
|
{ |
1312 |
|
if ( !is_init ) |
1313 |
|
{ |
1314 |
|
/** Set argument count. */ |
1315 |
|
this->argc = argc; |
1316 |
|
/** Set argument values. */ |
1317 |
|
this->argv = argv; |
1318 |
|
/** Acquire the number of (maximum) workers from OpenMP. */ |
1319 |
|
n_worker = omp_get_max_threads(); |
1320 |
|
n_max_worker = n_worker; |
1321 |
|
/** Check whether MPI has been initialized? */ |
1322 |
|
int is_mpi_init = false; |
1323 |
|
mpi::Initialized( &is_mpi_init ); |
1324 |
|
/** Initialize MPI inside HMLP otherwise. */ |
1325 |
|
if ( !is_mpi_init ) |
1326 |
|
{ |
1327 |
|
int provided; |
1328 |
|
mpi::Init_thread( argc, argv, MPI_THREAD_MULTIPLE, &provided ); |
1329 |
|
if ( provided != MPI_THREAD_MULTIPLE ) |
1330 |
|
ExitWithError( string( "MPI_THTREAD_MULTIPLE is not supported" ) ); |
1331 |
|
/** Flag that MPI is initialized by HMLP. */ |
1332 |
|
is_mpi_init_by_hmlp = true; |
1333 |
|
} |
1334 |
|
/** Initialize the scheduler. */ |
1335 |
|
scheduler = new Scheduler( comm ); |
1336 |
|
#ifdef HMLP_USE_CUDA |
1337 |
|
/** TODO: detect devices */ |
1338 |
|
device[ 0 ] = new hmlp::gpu::Nvidia( 0 ); |
1339 |
|
if ( n_worker ) |
1340 |
|
{ |
1341 |
|
workers[ 0 ].SetDevice( device[ 0 ] ); |
1342 |
|
} |
1343 |
|
#endif |
1344 |
|
#ifdef HMLP_USE_MAGMA |
1345 |
|
magma_init(); |
1346 |
|
#endif |
1347 |
|
/** Set the flag such that this is only executed once. */ |
1348 |
|
is_init = true; |
1349 |
|
} |
1350 |
|
} /* end pragma omp critical */ |
1351 |
|
/* Return error if the scheduler was failed in allocation. */ |
1352 |
|
if ( !scheduler ) return HMLP_ERROR_ALLOC_FAILED; |
1353 |
|
/* Return without error. */ |
1354 |
|
return HMLP_ERROR_SUCCESS; |
1355 |
|
}; /* end RunTime::Init() */ |
1356 |
|
|
1357 |
|
|
1358 |
|
|
1359 |
|
|
1360 |
|
/** @brief **/ |
1361 |
|
void RunTime::Run() |
1362 |
|
{ |
1363 |
|
if ( is_in_epoch_session ) |
1364 |
|
{ |
1365 |
|
printf( "Fatal Error: more than one concurrent epoch session!\n" ); |
1366 |
|
exit( 1 ); |
1367 |
|
} |
1368 |
|
if ( !is_init ) Init(); |
1369 |
|
/** begin this epoch session */ |
1370 |
|
is_in_epoch_session = true; |
1371 |
|
/** schedule jobs to n workers */ |
1372 |
|
scheduler->Init( n_worker ); |
1373 |
|
/** clean up */ |
1374 |
|
scheduler->Finalize(); |
1375 |
|
/** finish this epoch session */ |
1376 |
|
is_in_epoch_session = false; |
1377 |
|
}; /** end RunTime::Run() */ |
1378 |
|
|
1379 |
|
/** @brief */ |
1380 |
|
void RunTime::Finalize() |
1381 |
|
{ |
1382 |
|
#pragma omp critical (init) |
1383 |
|
{ |
1384 |
|
if ( is_init ) |
1385 |
|
{ |
1386 |
|
/** Finalize the scheduler and delete it. */ |
1387 |
|
scheduler->Finalize(); |
1388 |
|
delete scheduler; |
1389 |
|
/** Set the initialized flag to false. */ |
1390 |
|
is_init = false; |
1391 |
|
/** Finalize MPI if it was initialized by HMLP. */ |
1392 |
|
if ( is_mpi_init_by_hmlp ) mpi::Finalize(); |
1393 |
|
} |
1394 |
|
} |
1395 |
|
}; /** end RunTime::Finalize() */ |
1396 |
|
|
1397 |
|
/** @brief */ |
1398 |
|
bool RunTime::IsInEpochSession() { return is_in_epoch_session; }; |
1399 |
|
|
1400 |
|
/** @brief */ |
1401 |
|
void RunTime::ExecuteNestedTasksWhileWaiting( Task *waiting_task ) |
1402 |
|
{ |
1403 |
|
/** Use omp_get_thread_num() to acquire tid. */ |
1404 |
|
auto *me = &(workers[ omp_get_thread_num() ]); |
1405 |
|
/** If I am not in a epoch session, then return. */ |
1406 |
|
if ( IsInEpochSession() ) |
1407 |
|
{ |
1408 |
|
scheduler->ExecuteNestedTasksWhileWaiting( me, waiting_task ); |
1409 |
|
} |
1410 |
|
}; /** end RunTime::ExecuteNestedTasksWhileWaiting() */ |
1411 |
|
|
1412 |
|
|
1413 |
|
void RunTime::Print( string msg ) |
1414 |
|
{ |
1415 |
|
cout << "[RT ] " << msg << endl; fflush( stdout ); |
1416 |
|
}; /** end RunTime::Print() */ |
1417 |
|
|
1418 |
|
|
1419 |
|
void RunTime::ExitWithError( string msg ) |
1420 |
|
{ |
1421 |
|
cout << "[RT ] (Error) " << msg << endl; fflush( stdout ); |
1422 |
|
exit( 1 ); |
1423 |
|
}; /** end RunTime::ExitWithError() */ |
1424 |
|
|
1425 |
|
|
1426 |
|
//void hmlp_runtime::pool_init() |
1427 |
|
//{ |
1428 |
|
// |
1429 |
|
//}; |
1430 |
|
// |
1431 |
|
//void hmlp_runtime::acquire_memory() |
1432 |
|
//{ |
1433 |
|
// |
1434 |
|
//}; |
1435 |
|
// |
1436 |
|
//void hmlp_runtime::release_memory( void *ptr ) |
1437 |
|
//{ |
1438 |
|
// |
1439 |
|
//}; |
1440 |
|
|
1441 |
|
|
1442 |
|
/** @brief */ |
1443 |
|
hmlp::Device *hmlp_get_device_host() { return &(hmlp::rt.host); }; |
1444 |
|
|
1445 |
|
void hmlp_msg_dependency_analysis( int key, int p, ReadWriteType type, Task *task ) |
1446 |
|
{ |
1447 |
|
hmlp::rt.scheduler->MessageDependencyAnalysis( key, p, type, task ); |
1448 |
|
}; |
1449 |
|
|
1450 |
|
|
1451 |
|
}; /** end namespace hmlp */ |
1452 |
|
|
1453 |
|
|
1454 |
|
/** |
1455 |
|
* \brief Initialize the runtime without MPI. |
1456 |
|
* \return the error code. |
1457 |
|
*/ |
1458 |
1 |
hmlpError_t hmlp_init() |
1459 |
|
{ |
1460 |
1 |
return hmlp::rt.Init(); |
1461 |
|
}; |
1462 |
|
|
1463 |
|
/** |
1464 |
|
* \brief Initialize the runtime with MPI. |
1465 |
|
* \return the error code. |
1466 |
|
*/ |
1467 |
|
hmlpError_t hmlp_init( mpi::Comm comm ) |
1468 |
|
{ |
1469 |
|
return hmlp::rt.Init( comm ); |
1470 |
|
}; |
1471 |
|
|
1472 |
|
/** |
1473 |
|
* \brief Initialize the runtime and parse arguments without MPI. |
1474 |
|
* \return the error code. |
1475 |
|
*/ |
1476 |
|
hmlpError_t hmlp_init( int *argc, char ***argv ) |
1477 |
|
{ |
1478 |
|
return hmlp::rt.Init( argc, argv ); |
1479 |
|
}; |
1480 |
|
|
1481 |
|
/** |
1482 |
|
* \brief Initialize the runtime and parse arguments with MPI. |
1483 |
|
* \return the error code. |
1484 |
|
*/ |
1485 |
|
hmlpError_t hmlp_init( int *argc, char ***argv, mpi::Comm comm ) |
1486 |
|
{ |
1487 |
|
return hmlp::rt.Init( argc, argv, comm ); |
1488 |
|
}; |
1489 |
|
|
1490 |
|
/** |
1491 |
|
* \brief |
1492 |
|
*/ |
1493 |
|
void hmlp_set_num_workers( int n_worker ) |
1494 |
|
{ |
1495 |
|
hmlp::rt.n_worker = n_worker; |
1496 |
|
}; |
1497 |
|
|
1498 |
|
|
1499 |
|
|
1500 |
|
void hmlp_run() { hmlp::rt.Run(); }; |
1501 |
|
|
1502 |
|
void hmlp_finalize() { hmlp::rt.Finalize(); }; |
1503 |
|
|
1504 |
|
hmlp::RunTime *hmlp_get_runtime_handle() { return &hmlp::rt; }; |
1505 |
|
|
1506 |
|
hmlp::Device *hmlp_get_device( int i ) { return hmlp::rt.device[ i ]; }; |
1507 |
|
|
1508 |
|
bool hmlp_is_in_epoch_session() { return hmlp::rt.IsInEpochSession(); }; |
1509 |
|
|
1510 |
|
int hmlp_get_mpi_rank() { return hmlp::rt.scheduler->GetCommRank(); }; |
1511 |
|
|
1512 |
3 |
int hmlp_get_mpi_size() { return hmlp::rt.scheduler->GetCommSize(); }; |
1513 |
|
|
1514 |
|
//void hmlp_msg_dependency_analysis( int key, int p, ReadWriteType type, Task *task ) |
1515 |
|
//{ |
1516 |
|
// hmlp::rt.scheduler->MessageDependencyAnalysis( key, p, type, task ); |
1517 |
|
//}; |
1518 |
|
|