1 |
|
/** |
2 |
|
* HMLP (High-Performance Machine Learning Primitives) |
3 |
|
* |
4 |
|
* Copyright (C) 2014-2017, The University of Texas at Austin |
5 |
|
* |
6 |
|
* This program is free software: you can redistribute it and/or modify |
7 |
|
* it under the terms of the GNU General Public License as published by |
8 |
|
* the Free Software Foundation, either version 3 of the License, or |
9 |
|
* (at your option) any later version. |
10 |
|
* |
11 |
|
* This program is distributed in the hope that it will be useful, |
12 |
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 |
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 |
|
* GNU General Public License for more details. |
15 |
|
* |
16 |
|
* You should have received a copy of the GNU General Public License |
17 |
|
* along with this program. If not, see the LICENSE file. |
18 |
|
* |
19 |
|
**/ |
20 |
|
|
21 |
|
|
22 |
|
#include <base/runtime.hpp> |
23 |
|
#include <base/thread.hpp> |
24 |
|
|
25 |
|
|
26 |
|
using namespace std; |
27 |
|
|
28 |
|
|
29 |
|
namespace hmlp |
30 |
|
{ |
31 |
|
|
32 |
|
ostream& operator<<( ostream& os, const thread_communicator& obj ) |
33 |
|
{ |
34 |
|
os << obj.name << obj.comm_id; |
35 |
|
return os; |
36 |
|
}; |
37 |
|
|
38 |
|
|
39 |
|
|
40 |
|
range::range( int beg, int end, int inc ) |
41 |
|
{ |
42 |
|
info = std::make_tuple( beg, end, inc ); |
43 |
|
}; |
44 |
|
|
45 |
|
int range::beg() { return std::get<0>( info ); }; |
46 |
|
|
47 |
|
int range::end() { return std::get<1>( info ); }; |
48 |
|
|
49 |
|
int range::inc() { return std::get<2>( info ); }; |
50 |
|
|
51 |
|
range GetRange |
52 |
|
( |
53 |
|
SchedulePolicy strategy, |
54 |
|
int beg, int end, int nb, int tid, int nparts |
55 |
|
) |
56 |
|
{ |
57 |
|
switch ( strategy ) |
58 |
|
{ |
59 |
|
case HMLP_SCHEDULE_DEFAULT: |
60 |
|
{ |
61 |
|
auto tid_beg = beg + tid * nb; |
62 |
|
auto tid_inc = nparts * nb; |
63 |
|
return range( tid_beg, end, tid_inc ); |
64 |
|
} |
65 |
|
case HMLP_SCHEDULE_ROUND_ROBIN: |
66 |
|
{ |
67 |
|
auto tid_beg = beg + tid * nb; |
68 |
|
auto tid_inc = nparts * nb; |
69 |
|
return range( tid_beg, end, tid_inc ); |
70 |
|
} |
71 |
|
case HMLP_SCHEDULE_UNIFORM: |
72 |
|
printf( "GetRange(): HMLP_SCHEDULE_UNIFORM not yet implemented yet.\n" ); |
73 |
|
exit( 1 ); |
74 |
|
case HMLP_SCHEDULE_HEFT: |
75 |
|
{ |
76 |
|
assert( nparts == 4 ); |
77 |
|
int len = end - beg - 1; |
78 |
|
int big = ( len * 30 ) / 100 + 1; |
79 |
|
int sma = ( len * 20 ) / 100 + 1; |
80 |
|
|
81 |
|
int tid_beg, tid_end; |
82 |
|
|
83 |
|
if ( tid == 0 ) |
84 |
|
{ |
85 |
|
tid_beg = beg; |
86 |
|
tid_end = beg + big; |
87 |
|
} |
88 |
|
beg += big; |
89 |
|
|
90 |
|
if ( tid == 1 ) |
91 |
|
{ |
92 |
|
tid_beg = beg; |
93 |
|
tid_end = beg + sma; |
94 |
|
} |
95 |
|
beg += sma; |
96 |
|
|
97 |
|
if ( tid == 2 ) |
98 |
|
{ |
99 |
|
tid_beg = beg; |
100 |
|
tid_end = beg + sma; |
101 |
|
} |
102 |
|
beg += sma; |
103 |
|
|
104 |
|
if ( tid == 3 ) |
105 |
|
{ |
106 |
|
tid_beg = beg; |
107 |
|
tid_end = beg + big; |
108 |
|
} |
109 |
|
beg += big; |
110 |
|
|
111 |
|
if ( tid_end > end ) tid_end = end; |
112 |
|
return range( tid_beg, tid_end, nb ); |
113 |
|
} |
114 |
|
default: |
115 |
|
printf( "GetRange(): not a legal scheduling strategy.\n" ); |
116 |
|
exit( 1 ); |
117 |
|
} |
118 |
|
}; |
119 |
|
|
120 |
|
range GetRange( int beg, int end, int nb, int tid, int nparts ) |
121 |
|
{ |
122 |
|
return GetRange( HMLP_SCHEDULE_DEFAULT, beg, end, nb, tid, nparts ); |
123 |
|
}; |
124 |
|
|
125 |
|
range GetRange( int beg, int end, int nb ) |
126 |
|
{ |
127 |
|
return GetRange( HMLP_SCHEDULE_DEFAULT, beg, end, nb, 0, 1 ); |
128 |
|
}; |
129 |
|
|
130 |
|
|
131 |
|
|
132 |
|
|
133 |
|
|
134 |
|
|
135 |
|
|
136 |
|
|
137 |
|
|
138 |
|
|
139 |
|
|
140 |
|
|
141 |
|
|
142 |
|
|
143 |
|
|
144 |
|
|
145 |
|
|
146 |
|
|
147 |
|
|
148 |
|
|
149 |
|
|
150 |
|
/** |
151 |
|
* @brief Recursively create tree base communicators. |
152 |
|
*/ |
153 |
|
void thread_communicator::Create( int level, int num_threads, int *config ) |
154 |
|
{ |
155 |
|
if ( level < 1 ) |
156 |
|
{ |
157 |
|
kids = NULL; |
158 |
|
} |
159 |
|
else |
160 |
|
{ |
161 |
|
n_threads = num_threads; |
162 |
|
n_groups = config[ level ]; |
163 |
|
|
164 |
|
if ( level == 4 ) name = std::string( "jc_comm" ); |
165 |
|
else if ( level == 3 ) name = std::string( "pc_comm" ); |
166 |
|
else if ( level == 2 ) name = std::string( "ic_comm" ); |
167 |
|
else if ( level == 1 ) name = std::string( "jr_comm" ); |
168 |
|
else name = std::string( "na_comm" ); |
169 |
|
|
170 |
|
//std::cout << name << ", " << n_threads << ", " << n_groups << "\n"; |
171 |
|
|
172 |
|
kids = new thread_communicator[ n_groups ](); |
173 |
|
for ( int i = 0; i < n_groups; i ++ ) |
174 |
|
{ |
175 |
|
kids[ i ].Create( level - 1, n_threads / n_groups, config ); |
176 |
|
} |
177 |
|
} |
178 |
|
}; |
179 |
|
|
180 |
|
thread_communicator::thread_communicator() : |
181 |
|
sent_object( NULL ), |
182 |
|
comm_id( 0 ), |
183 |
|
n_threads( 1 ), |
184 |
|
n_groups( 1 ), |
185 |
|
barrier_sense( false ), |
186 |
|
barrier_threads_arrived( 0 ) |
187 |
|
{}; |
188 |
|
|
189 |
|
/** |
190 |
|
* @brief Default constructor takes 4 partitioning numbers. |
191 |
|
*/ |
192 |
|
thread_communicator::thread_communicator( int jc_nt, int pc_nt, int ic_nt, int jr_nt ) : |
193 |
|
sent_object( NULL ), |
194 |
|
comm_id( 0 ), |
195 |
|
n_threads( 1 ), |
196 |
|
n_groups( 1 ), |
197 |
|
barrier_sense( false ), |
198 |
|
barrier_threads_arrived( 0 ) |
199 |
|
{ |
200 |
|
int config[ 6 ] = { 1, 1, jr_nt, ic_nt, pc_nt, jc_nt }; |
201 |
|
n_threads = jc_nt * pc_nt * ic_nt * jr_nt; |
202 |
|
//n_groups = jc_nt; |
203 |
|
n_groups = 1; |
204 |
|
name = std::string( "my_comm" ); |
205 |
|
kids = new thread_communicator[ n_groups ](); |
206 |
|
|
207 |
|
for ( int i = 0; i < n_groups; i ++ ) |
208 |
|
{ |
209 |
|
kids[ i ].Create( 5, n_threads / n_groups, config ); |
210 |
|
} |
211 |
|
}; |
212 |
|
|
213 |
|
|
214 |
|
int thread_communicator::GetNumThreads() |
215 |
|
{ |
216 |
|
return n_threads; |
217 |
|
}; |
218 |
|
|
219 |
|
int thread_communicator::GetNumGroups() |
220 |
|
{ |
221 |
|
return n_groups; |
222 |
|
}; |
223 |
|
|
224 |
|
/** |
225 |
|
* @brief OpenMP thread barrier from BLIS. |
226 |
|
*/ |
227 |
|
void thread_communicator::Barrier() |
228 |
|
{ |
229 |
|
if ( n_threads < 2 ) return; |
230 |
|
|
231 |
|
bool my_sense = barrier_sense; |
232 |
|
int my_threads_arrived; |
233 |
|
|
234 |
|
#pragma omp atomic capture |
235 |
|
my_threads_arrived = ++ barrier_threads_arrived; |
236 |
|
|
237 |
|
if ( my_threads_arrived == n_threads ) |
238 |
|
{ |
239 |
|
barrier_threads_arrived = 0; |
240 |
|
barrier_sense = !barrier_sense; |
241 |
|
} |
242 |
|
else |
243 |
|
{ |
244 |
|
volatile bool *listener = &barrier_sense; |
245 |
|
while ( *listener == my_sense ) {} |
246 |
|
} |
247 |
|
}; |
248 |
|
|
249 |
|
void thread_communicator::Print() |
250 |
|
{ |
251 |
|
Barrier(); |
252 |
|
}; |
253 |
|
|
254 |
|
void thread_communicator::Send( void** buffer ) |
255 |
|
{ |
256 |
|
sent_object = *buffer; |
257 |
|
}; |
258 |
|
|
259 |
|
void thread_communicator::Recv( void** buffer ) |
260 |
|
{ |
261 |
|
*buffer = sent_object; |
262 |
|
}; |
263 |
|
|
264 |
|
/** |
265 |
|
* @brief Device implementation |
266 |
|
*/ |
267 |
|
//Device::Device() |
268 |
|
//{ |
269 |
|
// name = std::string( "Host CPU" ); |
270 |
|
// devicetype = hmlp::DeviceType::HOST; |
271 |
|
//}; |
272 |
|
// |
273 |
|
//void Device::prefetchd2h( void *ptr_h, void *ptr_d, size_t size, int stream_id ) {}; |
274 |
|
// |
275 |
|
//void Device::prefetchh2d( void *ptr_d, void *ptr_h, size_t size, int stream_id ) {}; |
276 |
|
// |
277 |
|
//void Device::wait( int stream_id ) {}; |
278 |
|
// |
279 |
|
//void *Device::malloc( size_t size ) { return NULL; }; |
280 |
|
// |
281 |
|
//void Device::malloc( void *ptr_d, size_t size ) {}; |
282 |
|
// |
283 |
|
//size_t Device::get_memory_left() { return 0; }; |
284 |
|
// |
285 |
|
// |
286 |
|
//void Device::free( void *ptr_d, size_t size ) {}; |
287 |
|
|
288 |
|
|
289 |
|
|
290 |
|
|
291 |
|
/** |
292 |
|
* @brief Worker implementation |
293 |
|
*/ |
294 |
68 |
Worker::Worker() {}; |
295 |
|
|
296 |
|
Worker::Worker( thread_communicator *comm ) : |
297 |
|
tid( 0 ), |
298 |
|
jc_id( 0 ), |
299 |
|
pc_id( 0 ), |
300 |
|
ic_id( 0 ), |
301 |
|
jr_id( 0 ) |
302 |
|
{ |
303 |
|
int tmp; |
304 |
|
|
305 |
|
tid = omp_get_thread_num(); |
306 |
|
tmp = tid; |
307 |
|
|
308 |
|
//my_comm = comm; |
309 |
|
my_comm = &(comm->kids[ 0 ]); |
310 |
|
|
311 |
|
jc_nt = my_comm->GetNumGroups(); |
312 |
|
jc_id = tmp / ( my_comm->GetNumThreads() / my_comm->GetNumGroups() ); |
313 |
|
tmp = tmp % ( my_comm->GetNumThreads() / my_comm->GetNumGroups() ); |
314 |
|
|
315 |
|
jc_comm = &(my_comm->kids[ jc_id ]); |
316 |
|
|
317 |
|
pc_nt = jc_comm->GetNumGroups(); |
318 |
|
pc_id = tmp / ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() ); |
319 |
|
tmp = tmp % ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() ); |
320 |
|
|
321 |
|
pc_comm = &(jc_comm->kids[ pc_id ]); |
322 |
|
|
323 |
|
ic_jr = tmp; // for parallel packB |
324 |
|
ic_nt = pc_comm->GetNumGroups(); |
325 |
|
ic_id = tmp / ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() ); |
326 |
|
jr_id = tmp % ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() ); |
327 |
|
|
328 |
|
ic_comm = &(pc_comm->kids[ ic_id ]); |
329 |
|
jr_nt = ic_comm->GetNumGroups(); |
330 |
|
|
331 |
|
//printf( "tid %2d jc_id %2d pc_id %2d ic_id %2d jr_id %2d, ic_jr %2d\n", |
332 |
|
// tid, jc_id, pc_id, ic_id, jr_id, ic_jr ); |
333 |
|
}; |
334 |
|
|
335 |
|
void Worker::Communicator( thread_communicator *comm ) |
336 |
|
{ |
337 |
|
int tmp; |
338 |
|
|
339 |
|
tid = omp_get_thread_num(); |
340 |
|
tmp = tid; |
341 |
|
|
342 |
|
//my_comm = comm; |
343 |
|
my_comm = &(comm->kids[ 0 ]); |
344 |
|
|
345 |
|
jc_nt = my_comm->GetNumGroups(); |
346 |
|
jc_id = tmp / ( my_comm->GetNumThreads() / my_comm->GetNumGroups() ); |
347 |
|
tmp = tmp % ( my_comm->GetNumThreads() / my_comm->GetNumGroups() ); |
348 |
|
|
349 |
|
jc_comm = &(my_comm->kids[ jc_id ]); |
350 |
|
|
351 |
|
pc_nt = jc_comm->GetNumGroups(); |
352 |
|
pc_id = tmp / ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() ); |
353 |
|
tmp = tmp % ( jc_comm->GetNumThreads() / jc_comm->GetNumGroups() ); |
354 |
|
|
355 |
|
pc_comm = &(jc_comm->kids[ pc_id ]); |
356 |
|
|
357 |
|
ic_jr = tmp; // for parallel packB |
358 |
|
ic_nt = pc_comm->GetNumGroups(); |
359 |
|
ic_id = tmp / ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() ); |
360 |
|
jr_id = tmp % ( pc_comm->GetNumThreads() / pc_comm->GetNumGroups() ); |
361 |
|
|
362 |
|
ic_comm = &(pc_comm->kids[ ic_id ]); |
363 |
|
jr_nt = ic_comm->GetNumGroups(); |
364 |
|
}; |
365 |
|
|
366 |
|
bool Worker::Master() { return tid == 0; }; |
367 |
|
|
368 |
|
void Worker::Barrier() |
369 |
|
{ |
370 |
|
assert( comm ); |
371 |
|
comm->Barrier(); |
372 |
|
}; |
373 |
|
|
374 |
|
|
375 |
|
//void Worker::Bcast( void** buffer ) |
376 |
|
//{ |
377 |
|
// if ( Master() ) comm->Send( buffer ); |
378 |
|
// Barrier(); |
379 |
|
// if ( !Master() ) comm->Recv( buffer ); |
380 |
|
//}; |
381 |
|
|
382 |
|
|
383 |
|
|
384 |
|
void Worker::InitWithCommunicator( thread_communicator* comm, size_t tid, size_t gid ) |
385 |
|
{ |
386 |
|
this->comm = comm; |
387 |
|
this->tid = tid; |
388 |
|
this->gid = gid; |
389 |
|
}; |
390 |
|
|
391 |
|
Worker Worker::Split() |
392 |
|
{ |
393 |
|
Worker child; |
394 |
|
|
395 |
|
//printf( "Before Split %s\n", comm->name.data() ); fflush( stdout ); |
396 |
|
|
397 |
|
size_t n_splits = comm->GetNumGroups(); |
398 |
|
size_t n_threads = comm->GetNumThreads(); |
399 |
|
|
400 |
|
//printf( "Split %s n_split %lu n_thread %lu\n", |
401 |
|
// comm->name.data(), n_splits, n_threads ); fflush( stdout ); |
402 |
|
|
403 |
|
if ( n_splits == 1 ) |
404 |
|
{ |
405 |
|
child.InitWithCommunicator( &(comm->kids[ 0 ]), tid, 0 ); |
406 |
|
return child; |
407 |
|
} |
408 |
|
|
409 |
|
/** |
410 |
|
* By default, we split threads evenly usine "close" affinity. |
411 |
|
* Threads with the same color will be in the same subcomm. |
412 |
|
* |
413 |
|
* example: (n_splits=2) |
414 |
|
* |
415 |
|
* tid 0 1 2 3 4 5 6 7 |
416 |
|
* color 0 0 0 0 1 1 1 1 (gang id) |
417 |
|
* first 0 0 0 0 4 4 4 4 |
418 |
|
* last 4 4 4 4 8 8 8 8 |
419 |
|
* child_tid 0 1 2 3 0 1 2 3 |
420 |
|
* 4 4 4 4 4 4 4 4 (child_n_threads) |
421 |
|
*/ |
422 |
|
size_t color = ( n_splits * tid ) / n_threads; |
423 |
|
size_t first = ( ( color + 0 ) * n_threads ) / n_splits; |
424 |
|
size_t last = ( ( color + 1 ) * n_threads ) / n_splits; |
425 |
|
size_t child_tid = tid - first; |
426 |
|
size_t child_n_threads = last - first; |
427 |
|
|
428 |
|
//printf( "Split %s n_split %lu n_thread %lu color %lu\n", |
429 |
|
// comm->name.data(), n_splits, n_threads, color ); fflush( stdout ); |
430 |
|
|
431 |
|
gid = color; |
432 |
|
|
433 |
|
/** make sure all threads have the new communicator */ |
434 |
|
child.InitWithCommunicator( &(comm->kids[ gid ]), child_tid, 0 ); |
435 |
|
|
436 |
|
return child; |
437 |
|
|
438 |
|
}; |
439 |
|
|
440 |
|
|
441 |
|
size_t Worker::BalanceOver1DGangs( size_t n, size_t default_size, size_t nb ) |
442 |
|
{ |
443 |
|
size_t nparts = comm->GetNumGroups(); |
444 |
|
if ( nparts > 1 ) return ( ( n - 1 ) / ( nb * nparts ) + 1 ) * nb; |
445 |
|
return default_size; |
446 |
|
}; |
447 |
|
|
448 |
|
|
449 |
|
tuple<size_t, size_t, size_t> Worker::DistributeOver1DGangs( |
450 |
|
size_t beg, size_t end, size_t nb ) |
451 |
|
{ |
452 |
|
size_t nparts = comm->GetNumGroups(); |
453 |
|
|
454 |
|
SchedulePolicy strategy = HMLP_SCHEDULE_DEFAULT; |
455 |
|
|
456 |
|
switch ( strategy ) |
457 |
|
{ |
458 |
|
case HMLP_SCHEDULE_DEFAULT: |
459 |
|
{ |
460 |
|
size_t gid_beg = beg + gid * nb; |
461 |
|
size_t gid_inc = nparts * nb; |
462 |
|
return make_tuple( gid_beg, end, gid_inc ); |
463 |
|
} |
464 |
|
case HMLP_SCHEDULE_ROUND_ROBIN: |
465 |
|
{ |
466 |
|
auto gid_beg = beg + gid * nb; |
467 |
|
auto gid_inc = nparts * nb; |
468 |
|
return make_tuple( gid_beg, end, gid_inc ); |
469 |
|
} |
470 |
|
case HMLP_SCHEDULE_UNIFORM: |
471 |
|
{ |
472 |
|
printf( "GetRange(): HMLP_SCHEDULE_UNIFORM not yet implemented yet.\n" ); |
473 |
|
exit( 1 ); |
474 |
|
} |
475 |
|
case HMLP_SCHEDULE_HEFT: |
476 |
|
{ |
477 |
|
printf( "GetRange(): HMLP_SCHEDULE_HEFT not yet implemented yet.\n" ); |
478 |
|
exit( 1 ); |
479 |
|
} |
480 |
|
default: |
481 |
|
{ |
482 |
|
exit( 1 ); |
483 |
|
} |
484 |
|
} |
485 |
|
}; |
486 |
|
|
487 |
|
|
488 |
|
|
489 |
|
|
490 |
|
tuple<size_t, size_t, size_t> Worker::DistributeOver1DThreads( |
491 |
|
size_t beg, size_t end, size_t nb ) |
492 |
|
{ |
493 |
|
size_t nparts = comm->GetNumThreads(); |
494 |
|
|
495 |
|
SchedulePolicy strategy = HMLP_SCHEDULE_DEFAULT; |
496 |
|
|
497 |
|
switch ( strategy ) |
498 |
|
{ |
499 |
|
case HMLP_SCHEDULE_DEFAULT: |
500 |
|
{ |
501 |
|
size_t tid_beg = beg + tid * nb; |
502 |
|
size_t tid_inc = nparts * nb; |
503 |
|
return make_tuple( tid_beg, end, tid_inc ); |
504 |
|
} |
505 |
|
case HMLP_SCHEDULE_ROUND_ROBIN: |
506 |
|
{ |
507 |
|
auto tid_beg = beg + tid * nb; |
508 |
|
auto tid_inc = nparts * nb; |
509 |
|
return make_tuple( tid_beg, end, tid_inc ); |
510 |
|
} |
511 |
|
case HMLP_SCHEDULE_UNIFORM: |
512 |
|
{ |
513 |
|
printf( "GetRange(): HMLP_SCHEDULE_UNIFORM not yet implemented yet.\n" ); |
514 |
|
exit( 1 ); |
515 |
|
} |
516 |
|
case HMLP_SCHEDULE_HEFT: |
517 |
|
{ |
518 |
|
printf( "GetRange(): HMLP_SCHEDULE_HEFT not yet implemented yet.\n" ); |
519 |
|
exit( 1 ); |
520 |
|
} |
521 |
|
default: |
522 |
|
{ |
523 |
|
exit( 1 ); |
524 |
|
} |
525 |
|
} |
526 |
|
}; |
527 |
|
|
528 |
|
|
529 |
|
|
530 |
|
|
531 |
|
|
532 |
|
|
533 |
|
|
534 |
|
|
535 |
|
|
536 |
|
|
537 |
|
|
538 |
|
|
539 |
|
|
540 |
|
|
541 |
|
|
542 |
|
|
543 |
|
|
544 |
|
|
545 |
|
|
546 |
|
|
547 |
|
|
548 |
|
|
549 |
|
/** Assign a device to this worker. */ |
550 |
|
void Worker::SetDevice( class Device *device ) { this->device = device; }; |
551 |
|
|
552 |
|
|
553 |
|
/** Return the device pointer attached to the worker. */ |
554 |
|
class Device* Worker::GetDevice() { return device; }; |
555 |
|
|
556 |
|
|
557 |
|
//bool Worker::Execute( vector<Task*> & batch ) |
558 |
|
//{ |
559 |
|
// /** Loop over each task in the batch. */ |
560 |
|
// for ( auto task : batch ) |
561 |
|
// { |
562 |
|
// assert( task->GetStatus() == RUNNING ); |
563 |
|
// task->worker = this; |
564 |
|
// task->event.Begin( this->tid ); |
565 |
|
// /** Notice that this may be an asynchronous execution. */ |
566 |
|
// task->Execute( this ); |
567 |
|
// } |
568 |
|
// |
569 |
|
// /** Wait for all tasks in the batch to terminate. */ |
570 |
|
// WaitExecute(); |
571 |
|
// |
572 |
|
// /** Loop over each task in the batch. */ |
573 |
|
// for ( auto task : batch ) |
574 |
|
// { |
575 |
|
// task->event.Terminate(); |
576 |
|
// task->GetEventRecord(); |
577 |
|
// } |
578 |
|
// return true; |
579 |
|
//}; /** end Worker::Execute() */ |
580 |
|
|
581 |
|
|
582 |
|
|
583 |
|
/** |
584 |
|
* @brief The work executes the task in the runtime system. I left some |
585 |
|
* code commented out because there is no GPU support now. |
586 |
|
* With GPUs (or some distributed devices), we need to first |
587 |
|
* gather data before the execution. |
588 |
|
* |
589 |
|
* @param *task The current task pointer. |
590 |
|
* |
591 |
|
*/ |
592 |
|
bool Worker::Execute( Task *batch ) |
593 |
|
{ |
594 |
|
current_task = batch; |
595 |
|
Task *task = batch; |
596 |
|
|
597 |
|
while ( task ) |
598 |
|
{ |
599 |
|
/** Some tasks may be in "EXECUTED" status. */ |
600 |
|
if ( task->GetStatus() == RUNNING ) |
601 |
|
{ |
602 |
|
task->worker = this; |
603 |
|
task->event.Begin( this->tid ); |
604 |
|
task->Execute( this ); |
605 |
|
} |
606 |
|
/** Move to the next task in the batch */ |
607 |
|
task = task->next; |
608 |
|
} |
609 |
|
|
610 |
|
/** Wait for all tasks in the batch to terminate. */ |
611 |
|
WaitExecute(); |
612 |
|
|
613 |
|
task = batch; |
614 |
|
while ( task ) |
615 |
|
{ |
616 |
|
task->event.Terminate(); |
617 |
|
task->GetEventRecord(); |
618 |
|
/** Move to the next task in the batch */ |
619 |
|
task = task->next; |
620 |
|
} |
621 |
|
|
622 |
|
/** Set my current executing task to NULL. */ |
623 |
|
current_task = NULL; |
624 |
|
|
625 |
|
return true; |
626 |
|
}; /** end Worker::Execute() */ |
627 |
|
|
628 |
|
|
629 |
|
|
630 |
|
/** |
631 |
|
* @brief Pose a barrier if the device owned by this worker |
632 |
|
* is performing asybchronous execution. |
633 |
|
*/ |
634 |
|
void Worker::WaitExecute() { if ( device ) device->waitexecute(); }; |
635 |
|
|
636 |
|
float Worker::EstimateCost( class Task *task ) { return task->cost; }; |
637 |
|
|
638 |
|
|
639 |
|
|
640 |
|
|
641 |
|
|
642 |
2 |
}; /** end namespace hmlp */ |