This source file includes following definitions.
- NBC_Reset_times
- NBC_Print_times
- nbc_schedule_constructor
- nbc_schedule_destructor
- nbc_schedule_grow
- nbc_schedule_round_append
- NBC_Sched_send_internal
- NBC_Sched_send
- NBC_Sched_local_send
- NBC_Sched_recv_internal
- NBC_Sched_recv
- NBC_Sched_local_recv
- NBC_Sched_op
- NBC_Sched_copy
- NBC_Sched_unpack
- NBC_Sched_barrier
- NBC_Sched_commit
- NBC_Free
- NBC_Progress
- NBC_Start_round
- NBC_Return_handle
- NBC_Init_comm
- NBC_Start
- NBC_Schedule_request
- NBC_SchedCache_args_delete_key_dummy
- NBC_SchedCache_args_delete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "nbc_internal.h"
28 #include "ompi/mca/coll/base/coll_tags.h"
29 #include "ompi/op/op.h"
30 #include "ompi/mca/pml/pml.h"
31
32
33 static inline int NBC_Start_round(NBC_Handle *handle);
34
35
36
37 #ifdef NBC_TIMING
38 static double Isend_time=0, Irecv_time=0, Wait_time=0, Test_time=0;
39 void NBC_Reset_times() {
40 Isend_time=Irecv_time=Wait_time=Test_time=0;
41 }
42 void NBC_Print_times(double div) {
43 printf("*** NBC_TIMES: Isend: %lf, Irecv: %lf, Wait: %lf, Test: %lf\n", Isend_time*1e6/div, Irecv_time*1e6/div, Wait_time*1e6/div, Test_time*1e6/div);
44 }
45 #endif
46
47 static void nbc_schedule_constructor (NBC_Schedule *schedule) {
48
49 schedule->size = sizeof (int);
50 schedule->current_round_offset = 0;
51 schedule->data = calloc (1, schedule->size);
52 }
53
54 static void nbc_schedule_destructor (NBC_Schedule *schedule) {
55 free (schedule->data);
56 schedule->data = NULL;
57 }
58
59 OBJ_CLASS_INSTANCE(NBC_Schedule, opal_object_t, nbc_schedule_constructor,
60 nbc_schedule_destructor);
61
62 static int nbc_schedule_grow (NBC_Schedule *schedule, int additional) {
63 void *tmp;
64 int size;
65
66
67 size = nbc_schedule_get_size (schedule);
68
69 tmp = realloc (schedule->data, size + additional);
70 if (NULL == tmp) {
71 NBC_Error ("Could not increase the size of NBC schedule");
72 return OMPI_ERR_OUT_OF_RESOURCE;
73 }
74
75 schedule->data = tmp;
76 return OMPI_SUCCESS;
77 }
78
79 static int nbc_schedule_round_append (NBC_Schedule *schedule, void *data, int data_size, bool barrier) {
80 int ret, size = nbc_schedule_get_size (schedule);
81
82 if (barrier) {
83 ret = nbc_schedule_grow (schedule, data_size + 1 + sizeof (int));
84 } else {
85 ret = nbc_schedule_grow (schedule, data_size);
86 }
87 if (OMPI_SUCCESS != ret) {
88 return ret;
89 }
90
91
92 if (data_size) {
93 memcpy (schedule->data + size, data, data_size);
94
95
96 nbc_schedule_inc_round (schedule);
97
98
99 nbc_schedule_inc_size (schedule, data_size);
100 }
101
102 if (barrier) {
103
104 schedule->data[size + data_size] = 1;
105
106 memset (schedule->data + size + data_size + 1, 0, sizeof (int));
107
108 NBC_DEBUG(10, "ended round at byte %i\n", size + data_size + 1);
109
110 schedule->current_round_offset = size + data_size + 1;
111
112
113 nbc_schedule_inc_size (schedule, sizeof (int) + 1);
114 }
115
116 return OMPI_SUCCESS;
117 }
118
119
120 static int NBC_Sched_send_internal (const void* buf, char tmpbuf, int count, MPI_Datatype datatype, int dest, bool local, NBC_Schedule *schedule, bool barrier) {
121 NBC_Args_send send_args;
122 int ret;
123
124
125 send_args.type = SEND;
126 send_args.buf = buf;
127 send_args.tmpbuf = tmpbuf;
128 send_args.count = count;
129 send_args.datatype = datatype;
130 send_args.dest = dest;
131 send_args.local = local;
132
133
134 ret = nbc_schedule_round_append (schedule, &send_args, sizeof (send_args), barrier);
135 if (OMPI_SUCCESS != ret) {
136 return ret;
137 }
138
139 NBC_DEBUG(10, "added send - ends at byte %i\n", nbc_schedule_get_size (schedule));
140
141 return OMPI_SUCCESS;
142 }
143
144 int NBC_Sched_send (const void* buf, char tmpbuf, int count, MPI_Datatype datatype, int dest, NBC_Schedule *schedule, bool barrier) {
145 return NBC_Sched_send_internal (buf, tmpbuf, count, datatype, dest, false, schedule, barrier);
146 }
147
148 int NBC_Sched_local_send (const void* buf, char tmpbuf, int count, MPI_Datatype datatype, int dest, NBC_Schedule *schedule, bool barrier) {
149 return NBC_Sched_send_internal (buf, tmpbuf, count, datatype, dest, true, schedule, barrier);
150 }
151
152
153 static int NBC_Sched_recv_internal (void* buf, char tmpbuf, int count, MPI_Datatype datatype, int source, bool local, NBC_Schedule *schedule, bool barrier) {
154 NBC_Args_recv recv_args;
155 int ret;
156
157
158 recv_args.type = RECV;
159 recv_args.buf = buf;
160 recv_args.tmpbuf = tmpbuf;
161 recv_args.count = count;
162 recv_args.datatype = datatype;
163 recv_args.source = source;
164 recv_args.local = local;
165
166
167 ret = nbc_schedule_round_append (schedule, &recv_args, sizeof (recv_args), barrier);
168 if (OMPI_SUCCESS != ret) {
169 return ret;
170 }
171
172 NBC_DEBUG(10, "added receive - ends at byte %d\n", nbc_schedule_get_size (schedule));
173
174 return OMPI_SUCCESS;
175 }
176
177 int NBC_Sched_recv (void* buf, char tmpbuf, int count, MPI_Datatype datatype, int source, NBC_Schedule *schedule, bool barrier) {
178 return NBC_Sched_recv_internal(buf, tmpbuf, count, datatype, source, false, schedule, barrier);
179 }
180
181 int NBC_Sched_local_recv (void* buf, char tmpbuf, int count, MPI_Datatype datatype, int source, NBC_Schedule *schedule, bool barrier) {
182 return NBC_Sched_recv_internal(buf, tmpbuf, count, datatype, source, true, schedule, barrier);
183 }
184
185
186 int NBC_Sched_op (const void* buf1, char tmpbuf1, void* buf2, char tmpbuf2, int count, MPI_Datatype datatype,
187 MPI_Op op, NBC_Schedule *schedule, bool barrier) {
188 NBC_Args_op op_args;
189 int ret;
190
191
192 op_args.type = OP;
193 op_args.buf1 = buf1;
194 op_args.buf2 = buf2;
195 op_args.tmpbuf1 = tmpbuf1;
196 op_args.tmpbuf2 = tmpbuf2;
197 op_args.count = count;
198 op_args.op = op;
199 op_args.datatype = datatype;
200
201
202 ret = nbc_schedule_round_append (schedule, &op_args, sizeof (op_args), barrier);
203 if (OMPI_SUCCESS != ret) {
204 return ret;
205 }
206
207 NBC_DEBUG(10, "added op2 - ends at byte %i\n", nbc_schedule_get_size (schedule));
208
209 return OMPI_SUCCESS;
210 }
211
212
213 int NBC_Sched_copy (void *src, char tmpsrc, int srccount, MPI_Datatype srctype, void *tgt, char tmptgt, int tgtcount,
214 MPI_Datatype tgttype, NBC_Schedule *schedule, bool barrier) {
215 NBC_Args_copy copy_args;
216 int ret;
217
218
219 copy_args.type = COPY;
220 copy_args.src = src;
221 copy_args.tmpsrc = tmpsrc;
222 copy_args.srccount = srccount;
223 copy_args.srctype = srctype;
224 copy_args.tgt = tgt;
225 copy_args.tmptgt = tmptgt;
226 copy_args.tgtcount = tgtcount;
227 copy_args.tgttype = tgttype;
228
229
230 ret = nbc_schedule_round_append (schedule, ©_args, sizeof (copy_args), barrier);
231 if (OMPI_SUCCESS != ret) {
232 return ret;
233 }
234
235 NBC_DEBUG(10, "added copy - ends at byte %i\n", nbc_schedule_get_size (schedule));
236
237 return OMPI_SUCCESS;
238 }
239
240
241 int NBC_Sched_unpack (void *inbuf, char tmpinbuf, int count, MPI_Datatype datatype, void *outbuf, char tmpoutbuf,
242 NBC_Schedule *schedule, bool barrier) {
243 NBC_Args_unpack unpack_args;
244 int ret;
245
246
247 unpack_args.type = UNPACK;
248 unpack_args.inbuf = inbuf;
249 unpack_args.tmpinbuf = tmpinbuf;
250 unpack_args.count = count;
251 unpack_args.datatype = datatype;
252 unpack_args.outbuf = outbuf;
253 unpack_args.tmpoutbuf = tmpoutbuf;
254
255
256 ret = nbc_schedule_round_append (schedule, &unpack_args, sizeof (unpack_args), barrier);
257 if (OMPI_SUCCESS != ret) {
258 return ret;
259 }
260
261 NBC_DEBUG(10, "added unpack - ends at byte %i\n", nbc_schedule_get_size (schedule));
262
263 return OMPI_SUCCESS;
264 }
265
266
267 int NBC_Sched_barrier (NBC_Schedule *schedule) {
268 return nbc_schedule_round_append (schedule, NULL, 0, true);
269 }
270
271
272 int NBC_Sched_commit(NBC_Schedule *schedule) {
273 int size = nbc_schedule_get_size (schedule);
274 char *ptr;
275 int ret;
276
277 ret = nbc_schedule_grow (schedule, 1);
278 if (OMPI_SUCCESS != ret) {
279 return ret;
280 }
281
282
283 ptr = schedule->data + size;
284 *((char *) ptr) = 0;
285
286
287 nbc_schedule_inc_size (schedule, 1);
288
289 NBC_DEBUG(10, "closed schedule %p at byte %i\n", schedule, (int)(size + 1));
290
291 return OMPI_SUCCESS;
292 }
293
294
295
296
297 static inline void NBC_Free (NBC_Handle* handle) {
298
299 if (NULL != handle->schedule) {
300
301 OBJ_RELEASE (handle->schedule);
302 handle->schedule = NULL;
303 }
304
305
306
307
308 if (NULL != handle->tmpbuf) {
309 free((void*)handle->tmpbuf);
310 handle->tmpbuf = NULL;
311 }
312 }
313
314
315
316
317 int NBC_Progress(NBC_Handle *handle) {
318 int res, ret=NBC_CONTINUE;
319 bool flag;
320 unsigned long size = 0;
321 char *delim;
322
323 if (handle->nbc_complete) {
324 return NBC_OK;
325 }
326
327 flag = true;
328
329 if ((handle->req_count > 0) && (handle->req_array != NULL)) {
330 NBC_DEBUG(50, "NBC_Progress: testing for %i requests\n", handle->req_count);
331 #ifdef NBC_TIMING
332 Test_time -= MPI_Wtime();
333 #endif
334
335 while (handle->req_count) {
336 ompi_request_t *subreq = handle->req_array[handle->req_count - 1];
337 if (REQUEST_COMPLETE(subreq)) {
338 if(OPAL_UNLIKELY( OMPI_SUCCESS != subreq->req_status.MPI_ERROR )) {
339 NBC_Error ("MPI Error in NBC subrequest %p : %d", subreq, subreq->req_status.MPI_ERROR);
340
341
342 handle->super.req_status.MPI_ERROR = subreq->req_status.MPI_ERROR;
343 }
344 handle->req_count--;
345 ompi_request_free(&subreq);
346 } else {
347 flag = false;
348 break;
349 }
350 }
351 #ifdef NBC_TIMING
352 Test_time += MPI_Wtime();
353 #endif
354 }
355
356
357 if (flag) {
358
359 if (NULL != handle->req_array) {
360
361 free (handle->req_array);
362 handle->req_array = NULL;
363 }
364
365 handle->req_count = 0;
366
367
368 if (OPAL_UNLIKELY(OMPI_SUCCESS != handle->super.req_status.MPI_ERROR)) {
369 res = handle->super.req_status.MPI_ERROR;
370 NBC_Error("NBC_Progress: an error %d was found during schedule %p at row-offset %li - aborting the schedule\n", res, handle->schedule, handle->row_offset);
371 handle->nbc_complete = true;
372 if (!handle->super.req_persistent) {
373 NBC_Free(handle);
374 }
375 return res;
376 }
377
378
379 NBC_DEBUG(5, "NBC_Progress: going in schedule %p to row-offset: %li\n", handle->schedule, handle->row_offset);
380 delim = handle->schedule->data + handle->row_offset;
381 NBC_DEBUG(10, "delim: %p\n", delim);
382 nbc_get_round_size(delim, &size);
383 NBC_DEBUG(10, "size: %li\n", size);
384
385 delim = delim + size;
386
387 if (*delim == 0) {
388
389 NBC_DEBUG(5, "NBC_Progress last round finished - we're done\n");
390
391 handle->nbc_complete = true;
392 if (!handle->super.req_persistent) {
393 NBC_Free(handle);
394 }
395
396 return NBC_OK;
397 }
398
399 NBC_DEBUG(5, "NBC_Progress round finished - goto next round\n");
400
401
402 handle->row_offset = (intptr_t) (delim + 1) - (intptr_t) handle->schedule->data;
403
404 res = NBC_Start_round(handle);
405 if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
406 NBC_Error ("Error in NBC_Start_round() (%i)", res);
407 return res;
408 }
409 }
410
411 return ret;
412 }
413
414 static inline int NBC_Start_round(NBC_Handle *handle) {
415 int num;
416 int res;
417 char* ptr;
418 MPI_Request *tmp;
419 NBC_Fn_type type;
420 NBC_Args_send sendargs;
421 NBC_Args_recv recvargs;
422 NBC_Args_op opargs;
423 NBC_Args_copy copyargs;
424 NBC_Args_unpack unpackargs;
425 void *buf1, *buf2;
426
427
428 ptr = handle->schedule->data + handle->row_offset;
429
430 NBC_GET_BYTES(ptr,num);
431 NBC_DEBUG(10, "start_round round at offset %d : posting %i operations\n", handle->row_offset, num);
432
433 for (int i = 0 ; i < num ; ++i) {
434 int offset = (intptr_t)(ptr - handle->schedule->data);
435
436 memcpy (&type, ptr, sizeof (type));
437 switch(type) {
438 case SEND:
439 NBC_DEBUG(5," SEND (offset %li) ", offset);
440 NBC_GET_BYTES(ptr,sendargs);
441 NBC_DEBUG(5,"*buf: %p, count: %i, type: %p, dest: %i, tag: %i)\n", sendargs.buf,
442 sendargs.count, sendargs.datatype, sendargs.dest, handle->tag);
443
444 handle->req_count++;
445
446 if(sendargs.tmpbuf) {
447 buf1=(char*)handle->tmpbuf+(long)sendargs.buf;
448 } else {
449 buf1=(void *)sendargs.buf;
450 }
451 #ifdef NBC_TIMING
452 Isend_time -= MPI_Wtime();
453 #endif
454 tmp = (MPI_Request *) realloc ((void *) handle->req_array, handle->req_count * sizeof (MPI_Request));
455 if (NULL == tmp) {
456 return OMPI_ERR_OUT_OF_RESOURCE;
457 }
458
459 handle->req_array = tmp;
460
461 res = MCA_PML_CALL(isend(buf1, sendargs.count, sendargs.datatype, sendargs.dest, handle->tag,
462 MCA_PML_BASE_SEND_STANDARD, sendargs.local?handle->comm->c_local_comm:handle->comm,
463 handle->req_array+handle->req_count - 1));
464 if (OMPI_SUCCESS != res) {
465 NBC_Error ("Error in MPI_Isend(%lu, %i, %p, %i, %i, %lu) (%i)", (unsigned long)buf1, sendargs.count,
466 sendargs.datatype, sendargs.dest, handle->tag, (unsigned long)handle->comm, res);
467 return res;
468 }
469 #ifdef NBC_TIMING
470 Isend_time += MPI_Wtime();
471 #endif
472 break;
473 case RECV:
474 NBC_DEBUG(5, " RECV (offset %li) ", offset);
475 NBC_GET_BYTES(ptr,recvargs);
476 NBC_DEBUG(5, "*buf: %p, count: %i, type: %p, source: %i, tag: %i)\n", recvargs.buf, recvargs.count,
477 recvargs.datatype, recvargs.source, handle->tag);
478
479 handle->req_count++;
480
481 if(recvargs.tmpbuf) {
482 buf1=(char*)handle->tmpbuf+(long)recvargs.buf;
483 } else {
484 buf1=recvargs.buf;
485 }
486 #ifdef NBC_TIMING
487 Irecv_time -= MPI_Wtime();
488 #endif
489 tmp = (MPI_Request *) realloc ((void *) handle->req_array, handle->req_count * sizeof (MPI_Request));
490 if (NULL == tmp) {
491 return OMPI_ERR_OUT_OF_RESOURCE;
492 }
493
494 handle->req_array = tmp;
495
496 res = MCA_PML_CALL(irecv(buf1, recvargs.count, recvargs.datatype, recvargs.source, handle->tag, recvargs.local?handle->comm->c_local_comm:handle->comm,
497 handle->req_array+handle->req_count-1));
498 if (OMPI_SUCCESS != res) {
499 NBC_Error("Error in MPI_Irecv(%lu, %i, %p, %i, %i, %lu) (%i)", (unsigned long)buf1, recvargs.count,
500 recvargs.datatype, recvargs.source, handle->tag, (unsigned long)handle->comm, res);
501 return res;
502 }
503 #ifdef NBC_TIMING
504 Irecv_time += MPI_Wtime();
505 #endif
506 break;
507 case OP:
508 NBC_DEBUG(5, " OP2 (offset %li) ", offset);
509 NBC_GET_BYTES(ptr,opargs);
510 NBC_DEBUG(5, "*buf1: %p, buf2: %p, count: %i, type: %p)\n", opargs.buf1, opargs.buf2,
511 opargs.count, opargs.datatype);
512
513 if(opargs.tmpbuf1) {
514 buf1=(char*)handle->tmpbuf+(long)opargs.buf1;
515 } else {
516 buf1=(void *)opargs.buf1;
517 }
518 if(opargs.tmpbuf2) {
519 buf2=(char*)handle->tmpbuf+(long)opargs.buf2;
520 } else {
521 buf2=opargs.buf2;
522 }
523 ompi_op_reduce(opargs.op, buf1, buf2, opargs.count, opargs.datatype);
524 break;
525 case COPY:
526 NBC_DEBUG(5, " COPY (offset %li) ", offset);
527 NBC_GET_BYTES(ptr,copyargs);
528 NBC_DEBUG(5, "*src: %lu, srccount: %i, srctype: %p, *tgt: %lu, tgtcount: %i, tgttype: %p)\n",
529 (unsigned long) copyargs.src, copyargs.srccount, copyargs.srctype,
530 (unsigned long) copyargs.tgt, copyargs.tgtcount, copyargs.tgttype);
531
532 if(copyargs.tmpsrc) {
533 buf1=(char*)handle->tmpbuf+(long)copyargs.src;
534 } else {
535 buf1=copyargs.src;
536 }
537 if(copyargs.tmptgt) {
538 buf2=(char*)handle->tmpbuf+(long)copyargs.tgt;
539 } else {
540 buf2=copyargs.tgt;
541 }
542 res = NBC_Copy (buf1, copyargs.srccount, copyargs.srctype, buf2, copyargs.tgtcount, copyargs.tgttype,
543 handle->comm);
544 if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
545 return res;
546 }
547 break;
548 case UNPACK:
549 NBC_DEBUG(5, " UNPACK (offset %li) ", offset);
550 NBC_GET_BYTES(ptr,unpackargs);
551 NBC_DEBUG(5, "*src: %lu, srccount: %i, srctype: %p, *tgt: %lu\n", (unsigned long) unpackargs.inbuf,
552 unpackargs.count, unpackargs.datatype, (unsigned long) unpackargs.outbuf);
553
554 if(unpackargs.tmpinbuf) {
555 buf1=(char*)handle->tmpbuf+(long)unpackargs.inbuf;
556 } else {
557 buf1=unpackargs.inbuf;
558 }
559 if(unpackargs.tmpoutbuf) {
560 buf2=(char*)handle->tmpbuf+(long)unpackargs.outbuf;
561 } else {
562 buf2=unpackargs.outbuf;
563 }
564 res = NBC_Unpack (buf1, unpackargs.count, unpackargs.datatype, buf2, handle->comm);
565 if (OMPI_SUCCESS != res) {
566 NBC_Error ("NBC_Unpack() failed (code: %i)", res);
567 return res;
568 }
569
570 break;
571 default:
572 NBC_Error ("NBC_Start_round: bad type %li at offset %li", (long)type, offset);
573 return OMPI_ERROR;
574 }
575 }
576
577
578
579
580
581
582 if (handle->row_offset) {
583 res = NBC_Progress(handle);
584 if ((NBC_OK != res) && (NBC_CONTINUE != res)) {
585 return OMPI_ERROR;
586 }
587 }
588
589 return OMPI_SUCCESS;
590 }
591
592 void NBC_Return_handle(ompi_coll_libnbc_request_t *request) {
593 NBC_Free (request);
594 OMPI_COLL_LIBNBC_REQUEST_RETURN(request);
595 }
596
597 int NBC_Init_comm(MPI_Comm comm, NBC_Comminfo *comminfo) {
598 comminfo->tag= MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
599
600 #ifdef NBC_CACHE_SCHEDULE
601
602 comminfo->NBC_Dict[NBC_ALLTOALL] = hb_tree_new((dict_cmp_func)NBC_Alltoall_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
603 if(comminfo->NBC_Dict[NBC_ALLTOALL] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
604 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_ALLTOALL]);
605 comminfo->NBC_Dict_size[NBC_ALLTOALL] = 0;
606
607 comminfo->NBC_Dict[NBC_ALLGATHER] = hb_tree_new((dict_cmp_func)NBC_Allgather_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
608 if(comminfo->NBC_Dict[NBC_ALLGATHER] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
609 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_ALLGATHER]);
610 comminfo->NBC_Dict_size[NBC_ALLGATHER] = 0;
611
612 comminfo->NBC_Dict[NBC_ALLREDUCE] = hb_tree_new((dict_cmp_func)NBC_Allreduce_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
613 if(comminfo->NBC_Dict[NBC_ALLREDUCE] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
614 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_ALLREDUCE]);
615 comminfo->NBC_Dict_size[NBC_ALLREDUCE] = 0;
616
617
618 comminfo->NBC_Dict_size[NBC_BARRIER] = 0;
619
620 comminfo->NBC_Dict[NBC_BCAST] = hb_tree_new((dict_cmp_func)NBC_Bcast_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
621 if(comminfo->NBC_Dict[NBC_BCAST] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
622 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_BCAST]);
623 comminfo->NBC_Dict_size[NBC_BCAST] = 0;
624
625 comminfo->NBC_Dict[NBC_GATHER] = hb_tree_new((dict_cmp_func)NBC_Gather_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
626 if(comminfo->NBC_Dict[NBC_GATHER] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
627 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_GATHER]);
628 comminfo->NBC_Dict_size[NBC_GATHER] = 0;
629
630 comminfo->NBC_Dict[NBC_REDUCE] = hb_tree_new((dict_cmp_func)NBC_Reduce_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
631 if(comminfo->NBC_Dict[NBC_REDUCE] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
632 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_REDUCE]);
633 comminfo->NBC_Dict_size[NBC_REDUCE] = 0;
634
635 comminfo->NBC_Dict[NBC_SCAN] = hb_tree_new((dict_cmp_func)NBC_Scan_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
636 if(comminfo->NBC_Dict[NBC_SCAN] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
637 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_SCAN]);
638 comminfo->NBC_Dict_size[NBC_SCAN] = 0;
639
640 comminfo->NBC_Dict[NBC_SCATTER] = hb_tree_new((dict_cmp_func)NBC_Scatter_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
641 if(comminfo->NBC_Dict[NBC_SCATTER] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
642 NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_SCATTER]);
643 comminfo->NBC_Dict_size[NBC_SCATTER] = 0;
644 #endif
645
646 return OMPI_SUCCESS;
647 }
648
649 int NBC_Start(NBC_Handle *handle) {
650 int res;
651
652
653 if ((ompi_request_t *)handle == &ompi_request_empty) {
654 return OMPI_SUCCESS;
655 }
656
657
658 handle->super.req_state = OMPI_REQUEST_ACTIVE;
659 handle->super.req_status.MPI_ERROR = OMPI_SUCCESS;
660 res = NBC_Start_round(handle);
661 if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
662 return res;
663 }
664
665 OPAL_THREAD_LOCK(&mca_coll_libnbc_component.lock);
666 opal_list_append(&mca_coll_libnbc_component.active_requests, &(handle->super.super.super));
667 OPAL_THREAD_UNLOCK(&mca_coll_libnbc_component.lock);
668
669 return OMPI_SUCCESS;
670 }
671
672 int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
673 ompi_coll_libnbc_module_t *module, bool persistent,
674 ompi_request_t **request, void *tmpbuf) {
675 int ret, tmp_tag;
676 bool need_register = false;
677 ompi_coll_libnbc_request_t *handle;
678
679
680 if (((int *)schedule->data)[0] == 0 && schedule->data[sizeof(int)] == 0) {
681 ret = nbc_get_noop_request(persistent, request);
682 if (OMPI_SUCCESS != ret) {
683 return OMPI_ERR_OUT_OF_RESOURCE;
684 }
685
686
687
688 OPAL_THREAD_LOCK(&module->mutex);
689 tmp_tag = module->tag--;
690 if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
691 tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
692 NBC_DEBUG(2,"resetting tags ...\n");
693 }
694 OPAL_THREAD_UNLOCK(&module->mutex);
695
696 OBJ_RELEASE(schedule);
697 free(tmpbuf);
698
699 return OMPI_SUCCESS;
700 }
701
702 OMPI_COLL_LIBNBC_REQUEST_ALLOC(comm, persistent, handle);
703 if (NULL == handle) return OMPI_ERR_OUT_OF_RESOURCE;
704
705 handle->tmpbuf = NULL;
706 handle->req_count = 0;
707 handle->req_array = NULL;
708 handle->comm = comm;
709 handle->schedule = NULL;
710 handle->row_offset = 0;
711 handle->nbc_complete = persistent ? true : false;
712
713
714
715 OPAL_THREAD_LOCK(&module->mutex);
716 tmp_tag = module->tag--;
717 if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
718 tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
719 NBC_DEBUG(2,"resetting tags ...\n");
720 }
721
722 if (true != module->comm_registered) {
723 module->comm_registered = true;
724 need_register = true;
725 }
726 OPAL_THREAD_UNLOCK(&module->mutex);
727
728 handle->tag = tmp_tag;
729
730
731 if (need_register) {
732 int32_t tmp =
733 OPAL_THREAD_ADD_FETCH32(&mca_coll_libnbc_component.active_comms, 1);
734 if (tmp == 1) {
735 opal_progress_register(ompi_coll_libnbc_progress);
736 }
737 }
738
739 handle->comm=comm;
740
741
742
743 handle->comminfo = module;
744
745 NBC_DEBUG(3, "got tag %i\n", handle->tag);
746
747 handle->tmpbuf = tmpbuf;
748 handle->schedule = schedule;
749 *request = (ompi_request_t *) handle;
750
751 return OMPI_SUCCESS;
752 }
753
754 #ifdef NBC_CACHE_SCHEDULE
755 void NBC_SchedCache_args_delete_key_dummy(void *k) {
756
757
758 }
759
760 void NBC_SchedCache_args_delete(void *entry) {
761 struct NBC_dummyarg *tmp = (struct NBC_dummyarg*)entry;
762 OBJ_RELEASE(tmp->schedule);
763 free(entry);
764 }
765 #endif