root/ompi/mca/coll/libnbc/nbc.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. NBC_Reset_times
  2. NBC_Print_times
  3. nbc_schedule_constructor
  4. nbc_schedule_destructor
  5. nbc_schedule_grow
  6. nbc_schedule_round_append
  7. NBC_Sched_send_internal
  8. NBC_Sched_send
  9. NBC_Sched_local_send
  10. NBC_Sched_recv_internal
  11. NBC_Sched_recv
  12. NBC_Sched_local_recv
  13. NBC_Sched_op
  14. NBC_Sched_copy
  15. NBC_Sched_unpack
  16. NBC_Sched_barrier
  17. NBC_Sched_commit
  18. NBC_Free
  19. NBC_Progress
  20. NBC_Start_round
  21. NBC_Return_handle
  22. NBC_Init_comm
  23. NBC_Start
  24. NBC_Schedule_request
  25. NBC_SchedCache_args_delete_key_dummy
  26. NBC_SchedCache_args_delete

   1 /* -*- Mode: C; c-basic-offset:2 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2006      The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2013-2018 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2006      The Technical University of Chemnitz. All
  10  *                         rights reserved.
  11  * Copyright (c) 2015      Los Alamos National Security, LLC.  All rights
  12  *                         reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  *
  16  * Author(s): Torsten Hoefler <htor@cs.indiana.edu>
  17  *
  18  * Copyright (c) 2012      Oracle and/or its affiliates.  All rights reserved.
  19  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  20  * Copyright (c) 2017      Ian Bradley Morgan and Anthony Skjellum. All
  21  *                         rights reserved.
  22  * Copyright (c) 2018      FUJITSU LIMITED.  All rights reserved.
  23  * $COPYRIGHT$
  24  *
  25  * Additional copyrights may follow
  26  */
  27 #include "nbc_internal.h"
  28 #include "ompi/mca/coll/base/coll_tags.h"
  29 #include "ompi/op/op.h"
  30 #include "ompi/mca/pml/pml.h"
  31 
  32 /* only used in this file */
  33 static inline int NBC_Start_round(NBC_Handle *handle);
  34 
  35 /* #define NBC_TIMING */
  36 
  37 #ifdef NBC_TIMING
  38 static double Isend_time=0, Irecv_time=0, Wait_time=0, Test_time=0;
  39 void NBC_Reset_times() {
  40   Isend_time=Irecv_time=Wait_time=Test_time=0;
  41 }
  42 void NBC_Print_times(double div) {
  43   printf("*** NBC_TIMES: Isend: %lf, Irecv: %lf, Wait: %lf, Test: %lf\n", Isend_time*1e6/div, Irecv_time*1e6/div, Wait_time*1e6/div, Test_time*1e6/div);
  44 }
  45 #endif
  46 
  47 static void nbc_schedule_constructor (NBC_Schedule *schedule) {
  48   /* initial total size of the schedule */
  49   schedule->size = sizeof (int);
  50   schedule->current_round_offset = 0;
  51   schedule->data = calloc (1, schedule->size);
  52 }
  53 
  54 static void nbc_schedule_destructor (NBC_Schedule *schedule) {
  55   free (schedule->data);
  56   schedule->data = NULL;
  57 }
  58 
  59 OBJ_CLASS_INSTANCE(NBC_Schedule, opal_object_t, nbc_schedule_constructor,
  60                    nbc_schedule_destructor);
  61 
  62 static int nbc_schedule_grow (NBC_Schedule *schedule, int additional) {
  63   void *tmp;
  64   int size;
  65 
  66   /* get current size of schedule */
  67   size = nbc_schedule_get_size (schedule);
  68 
  69   tmp = realloc (schedule->data, size + additional);
  70   if (NULL == tmp) {
  71     NBC_Error ("Could not increase the size of NBC schedule");
  72     return OMPI_ERR_OUT_OF_RESOURCE;
  73   }
  74 
  75   schedule->data = tmp;
  76   return OMPI_SUCCESS;
  77 }
  78 
  79 static int nbc_schedule_round_append (NBC_Schedule *schedule, void *data, int data_size, bool barrier) {
  80   int ret, size = nbc_schedule_get_size (schedule);
  81 
  82   if (barrier) {
  83     ret = nbc_schedule_grow (schedule, data_size + 1 + sizeof (int));
  84   } else {
  85     ret = nbc_schedule_grow (schedule, data_size);
  86   }
  87   if (OMPI_SUCCESS != ret) {
  88     return ret;
  89   }
  90 
  91   /* append to the round-schedule */
  92   if (data_size) {
  93     memcpy (schedule->data + size, data, data_size);
  94 
  95     /* increase number of elements in round-schedule */
  96     nbc_schedule_inc_round (schedule);
  97 
  98     /* increase size of schedule */
  99     nbc_schedule_inc_size (schedule, data_size);
 100   }
 101 
 102   if (barrier) {
 103     /* add the barrier */
 104     schedule->data[size + data_size] = 1;
 105     /* set next round counter to 0 */
 106     memset (schedule->data + size + data_size + 1, 0, sizeof (int));
 107 
 108     NBC_DEBUG(10, "ended round at byte %i\n", size + data_size + 1);
 109 
 110     schedule->current_round_offset = size + data_size + 1;
 111 
 112     /* increase size of schedule */
 113     nbc_schedule_inc_size (schedule, sizeof (int) + 1);
 114   }
 115 
 116   return OMPI_SUCCESS;
 117 }
 118 
 119 /* this function puts a send into the schedule */
 120 static int NBC_Sched_send_internal (const void* buf, char tmpbuf, int count, MPI_Datatype datatype, int dest, bool local, NBC_Schedule *schedule, bool barrier) {
 121   NBC_Args_send send_args;
 122   int ret;
 123 
 124   /* store the passed arguments */
 125   send_args.type = SEND;
 126   send_args.buf = buf;
 127   send_args.tmpbuf = tmpbuf;
 128   send_args.count = count;
 129   send_args.datatype = datatype;
 130   send_args.dest = dest;
 131   send_args.local = local;
 132 
 133   /* append to the round-schedule */
 134   ret = nbc_schedule_round_append (schedule, &send_args, sizeof (send_args), barrier);
 135   if (OMPI_SUCCESS != ret) {
 136     return ret;
 137   }
 138 
 139   NBC_DEBUG(10, "added send - ends at byte %i\n", nbc_schedule_get_size (schedule));
 140 
 141   return OMPI_SUCCESS;
 142 }
 143 
 144 int NBC_Sched_send (const void* buf, char tmpbuf, int count, MPI_Datatype datatype, int dest, NBC_Schedule *schedule, bool barrier) {
 145   return NBC_Sched_send_internal (buf, tmpbuf, count, datatype, dest, false, schedule, barrier);
 146 }
 147 
 148 int NBC_Sched_local_send (const void* buf, char tmpbuf, int count, MPI_Datatype datatype, int dest, NBC_Schedule *schedule, bool barrier) {
 149   return NBC_Sched_send_internal (buf, tmpbuf, count, datatype, dest, true, schedule, barrier);
 150 }
 151 
 152 /* this function puts a receive into the schedule */
 153 static int NBC_Sched_recv_internal (void* buf, char tmpbuf, int count, MPI_Datatype datatype, int source, bool local, NBC_Schedule *schedule, bool barrier) {
 154   NBC_Args_recv recv_args;
 155   int ret;
 156 
 157   /* store the passed arguments */
 158   recv_args.type = RECV;
 159   recv_args.buf = buf;
 160   recv_args.tmpbuf = tmpbuf;
 161   recv_args.count = count;
 162   recv_args.datatype = datatype;
 163   recv_args.source = source;
 164   recv_args.local = local;
 165 
 166   /* append to the round-schedule */
 167   ret = nbc_schedule_round_append (schedule, &recv_args, sizeof (recv_args), barrier);
 168   if (OMPI_SUCCESS != ret) {
 169     return ret;
 170   }
 171 
 172   NBC_DEBUG(10, "added receive - ends at byte %d\n", nbc_schedule_get_size (schedule));
 173 
 174   return OMPI_SUCCESS;
 175 }
 176 
 177 int NBC_Sched_recv (void* buf, char tmpbuf, int count, MPI_Datatype datatype, int source, NBC_Schedule *schedule, bool barrier) {
 178   return NBC_Sched_recv_internal(buf, tmpbuf, count, datatype, source, false, schedule, barrier);
 179 }
 180 
 181 int NBC_Sched_local_recv (void* buf, char tmpbuf, int count, MPI_Datatype datatype, int source, NBC_Schedule *schedule, bool barrier) {
 182   return NBC_Sched_recv_internal(buf, tmpbuf, count, datatype, source, true, schedule, barrier);
 183 }
 184 
 185 /* this function puts an operation into the schedule */
 186 int NBC_Sched_op (const void* buf1, char tmpbuf1, void* buf2, char tmpbuf2, int count, MPI_Datatype datatype,
 187                   MPI_Op op, NBC_Schedule *schedule, bool barrier) {
 188   NBC_Args_op op_args;
 189   int ret;
 190 
 191   /* store the passed arguments */
 192   op_args.type = OP;
 193   op_args.buf1 = buf1;
 194   op_args.buf2 = buf2;
 195   op_args.tmpbuf1 = tmpbuf1;
 196   op_args.tmpbuf2 = tmpbuf2;
 197   op_args.count = count;
 198   op_args.op = op;
 199   op_args.datatype = datatype;
 200 
 201   /* append to the round-schedule */
 202   ret = nbc_schedule_round_append (schedule, &op_args, sizeof (op_args), barrier);
 203   if (OMPI_SUCCESS != ret) {
 204     return ret;
 205   }
 206 
 207   NBC_DEBUG(10, "added op2 - ends at byte %i\n", nbc_schedule_get_size (schedule));
 208 
 209   return OMPI_SUCCESS;
 210 }
 211 
 212 /* this function puts a copy into the schedule */
 213 int NBC_Sched_copy (void *src, char tmpsrc, int srccount, MPI_Datatype srctype, void *tgt, char tmptgt, int tgtcount,
 214                     MPI_Datatype tgttype, NBC_Schedule *schedule, bool barrier) {
 215   NBC_Args_copy copy_args;
 216   int ret;
 217 
 218   /* store the passed arguments */
 219   copy_args.type = COPY;
 220   copy_args.src = src;
 221   copy_args.tmpsrc = tmpsrc;
 222   copy_args.srccount = srccount;
 223   copy_args.srctype = srctype;
 224   copy_args.tgt = tgt;
 225   copy_args.tmptgt = tmptgt;
 226   copy_args.tgtcount = tgtcount;
 227   copy_args.tgttype = tgttype;
 228 
 229   /* append to the round-schedule */
 230   ret = nbc_schedule_round_append (schedule, &copy_args, sizeof (copy_args), barrier);
 231   if (OMPI_SUCCESS != ret) {
 232     return ret;
 233   }
 234 
 235   NBC_DEBUG(10, "added copy - ends at byte %i\n", nbc_schedule_get_size (schedule));
 236 
 237   return OMPI_SUCCESS;
 238 }
 239 
 240 /* this function puts a unpack into the schedule */
 241 int NBC_Sched_unpack (void *inbuf, char tmpinbuf, int count, MPI_Datatype datatype, void *outbuf, char tmpoutbuf,
 242                       NBC_Schedule *schedule, bool barrier) {
 243   NBC_Args_unpack unpack_args;
 244   int ret;
 245 
 246   /* store the passed arguments */
 247   unpack_args.type = UNPACK;
 248   unpack_args.inbuf = inbuf;
 249   unpack_args.tmpinbuf = tmpinbuf;
 250   unpack_args.count = count;
 251   unpack_args.datatype = datatype;
 252   unpack_args.outbuf = outbuf;
 253   unpack_args.tmpoutbuf = tmpoutbuf;
 254 
 255   /* append to the round-schedule */
 256   ret = nbc_schedule_round_append (schedule, &unpack_args, sizeof (unpack_args), barrier);
 257   if (OMPI_SUCCESS != ret) {
 258     return ret;
 259   }
 260 
 261   NBC_DEBUG(10, "added unpack - ends at byte %i\n", nbc_schedule_get_size (schedule));
 262 
 263   return OMPI_SUCCESS;
 264 }
 265 
 266 /* this function ends a round of a schedule */
 267 int NBC_Sched_barrier (NBC_Schedule *schedule) {
 268   return nbc_schedule_round_append (schedule, NULL, 0, true);
 269 }
 270 
 271 /* this function ends a schedule */
 272 int NBC_Sched_commit(NBC_Schedule *schedule) {
 273   int size = nbc_schedule_get_size (schedule);
 274   char *ptr;
 275   int ret;
 276 
 277   ret = nbc_schedule_grow (schedule, 1);
 278   if (OMPI_SUCCESS != ret) {
 279     return ret;
 280   }
 281 
 282   /* add the barrier char (0) because this is the last round */
 283   ptr = schedule->data + size;
 284   *((char *) ptr) = 0;
 285 
 286   /* increase size of schedule */
 287   nbc_schedule_inc_size (schedule, 1);
 288 
 289   NBC_DEBUG(10, "closed schedule %p at byte %i\n", schedule, (int)(size + 1));
 290 
 291   return OMPI_SUCCESS;
 292 }
 293 
 294 /* finishes a request
 295  *
 296  * to be called *only* from the progress thread !!! */
 297 static inline void NBC_Free (NBC_Handle* handle) {
 298 
 299   if (NULL != handle->schedule) {
 300     /* release schedule */
 301     OBJ_RELEASE (handle->schedule);
 302     handle->schedule = NULL;
 303   }
 304 
 305   /* if the nbc_I<collective> attached some data */
 306   /* problems with schedule cache here, see comment (TODO) in
 307    * nbc_internal.h */
 308   if (NULL != handle->tmpbuf) {
 309     free((void*)handle->tmpbuf);
 310     handle->tmpbuf = NULL;
 311   }
 312 }
 313 
 314 /* progresses a request
 315  *
 316  * to be called *only* from the progress thread !!! */
 317 int NBC_Progress(NBC_Handle *handle) {
 318   int res, ret=NBC_CONTINUE;
 319   bool flag;
 320   unsigned long size = 0;
 321   char *delim;
 322 
 323   if (handle->nbc_complete) {
 324     return NBC_OK;
 325   }
 326 
 327   flag = true;
 328 
 329   if ((handle->req_count > 0) && (handle->req_array != NULL)) {
 330     NBC_DEBUG(50, "NBC_Progress: testing for %i requests\n", handle->req_count);
 331 #ifdef NBC_TIMING
 332     Test_time -= MPI_Wtime();
 333 #endif
 334     /* don't call ompi_request_test_all as it causes a recursive call into opal_progress */
 335     while (handle->req_count) {
 336         ompi_request_t *subreq = handle->req_array[handle->req_count - 1];
 337         if (REQUEST_COMPLETE(subreq)) {
 338             if(OPAL_UNLIKELY( OMPI_SUCCESS != subreq->req_status.MPI_ERROR )) {
 339                 NBC_Error ("MPI Error in NBC subrequest %p : %d", subreq, subreq->req_status.MPI_ERROR);
 340                 /* copy the error code from the underlying request and let the
 341                  * round finish */
 342                 handle->super.req_status.MPI_ERROR = subreq->req_status.MPI_ERROR;
 343             }
 344             handle->req_count--;
 345             ompi_request_free(&subreq);
 346         } else {
 347             flag = false;
 348             break;
 349         }
 350     }
 351 #ifdef NBC_TIMING
 352     Test_time += MPI_Wtime();
 353 #endif
 354   }
 355 
 356   /* a round is finished */
 357   if (flag) {
 358     /* reset handle for next round */
 359     if (NULL != handle->req_array) {
 360       /* free request array */
 361       free (handle->req_array);
 362       handle->req_array = NULL;
 363     }
 364 
 365     handle->req_count = 0;
 366 
 367     /* previous round had an error */
 368     if (OPAL_UNLIKELY(OMPI_SUCCESS != handle->super.req_status.MPI_ERROR)) {
 369       res = handle->super.req_status.MPI_ERROR;
 370       NBC_Error("NBC_Progress: an error %d was found during schedule %p at row-offset %li - aborting the schedule\n", res, handle->schedule, handle->row_offset);
 371       handle->nbc_complete = true;
 372       if (!handle->super.req_persistent) {
 373         NBC_Free(handle);
 374       }
 375       return res;
 376     }
 377 
 378     /* adjust delim to start of current round */
 379     NBC_DEBUG(5, "NBC_Progress: going in schedule %p to row-offset: %li\n", handle->schedule, handle->row_offset);
 380     delim = handle->schedule->data + handle->row_offset;
 381     NBC_DEBUG(10, "delim: %p\n", delim);
 382     nbc_get_round_size(delim, &size);
 383     NBC_DEBUG(10, "size: %li\n", size);
 384     /* adjust delim to end of current round -> delimiter */
 385     delim = delim + size;
 386 
 387     if (*delim == 0) {
 388       /* this was the last round - we're done */
 389       NBC_DEBUG(5, "NBC_Progress last round finished - we're done\n");
 390 
 391       handle->nbc_complete = true;
 392       if (!handle->super.req_persistent) {
 393         NBC_Free(handle);
 394       }
 395 
 396       return NBC_OK;
 397     }
 398 
 399     NBC_DEBUG(5, "NBC_Progress round finished - goto next round\n");
 400     /* move delim to start of next round */
 401     /* initializing handle for new virgin round */
 402     handle->row_offset = (intptr_t) (delim + 1) - (intptr_t) handle->schedule->data;
 403     /* kick it off */
 404     res = NBC_Start_round(handle);
 405     if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
 406       NBC_Error ("Error in NBC_Start_round() (%i)", res);
 407       return res;
 408     }
 409   }
 410 
 411   return ret;
 412 }
 413 
 414 static inline int NBC_Start_round(NBC_Handle *handle) {
 415   int num; /* number of operations */
 416   int res;
 417   char* ptr;
 418   MPI_Request *tmp;
 419   NBC_Fn_type type;
 420   NBC_Args_send     sendargs;
 421   NBC_Args_recv     recvargs;
 422   NBC_Args_op         opargs;
 423   NBC_Args_copy     copyargs;
 424   NBC_Args_unpack unpackargs;
 425   void *buf1,  *buf2;
 426 
 427   /* get round-schedule address */
 428   ptr = handle->schedule->data + handle->row_offset;
 429 
 430   NBC_GET_BYTES(ptr,num);
 431   NBC_DEBUG(10, "start_round round at offset %d : posting %i operations\n", handle->row_offset, num);
 432 
 433   for (int i = 0 ; i < num ; ++i) {
 434     int offset = (intptr_t)(ptr - handle->schedule->data);
 435 
 436     memcpy (&type, ptr, sizeof (type));
 437     switch(type) {
 438       case SEND:
 439         NBC_DEBUG(5,"  SEND (offset %li) ", offset);
 440         NBC_GET_BYTES(ptr,sendargs);
 441         NBC_DEBUG(5,"*buf: %p, count: %i, type: %p, dest: %i, tag: %i)\n", sendargs.buf,
 442                   sendargs.count, sendargs.datatype, sendargs.dest, handle->tag);
 443         /* get an additional request */
 444         handle->req_count++;
 445         /* get buffer */
 446         if(sendargs.tmpbuf) {
 447           buf1=(char*)handle->tmpbuf+(long)sendargs.buf;
 448         } else {
 449           buf1=(void *)sendargs.buf;
 450         }
 451 #ifdef NBC_TIMING
 452         Isend_time -= MPI_Wtime();
 453 #endif
 454         tmp = (MPI_Request *) realloc ((void *) handle->req_array, handle->req_count * sizeof (MPI_Request));
 455         if (NULL == tmp) {
 456           return OMPI_ERR_OUT_OF_RESOURCE;
 457         }
 458 
 459         handle->req_array = tmp;
 460 
 461         res = MCA_PML_CALL(isend(buf1, sendargs.count, sendargs.datatype, sendargs.dest, handle->tag,
 462                                  MCA_PML_BASE_SEND_STANDARD, sendargs.local?handle->comm->c_local_comm:handle->comm,
 463                                  handle->req_array+handle->req_count - 1));
 464         if (OMPI_SUCCESS != res) {
 465           NBC_Error ("Error in MPI_Isend(%lu, %i, %p, %i, %i, %lu) (%i)", (unsigned long)buf1, sendargs.count,
 466                      sendargs.datatype, sendargs.dest, handle->tag, (unsigned long)handle->comm, res);
 467           return res;
 468         }
 469 #ifdef NBC_TIMING
 470         Isend_time += MPI_Wtime();
 471 #endif
 472         break;
 473       case RECV:
 474         NBC_DEBUG(5, "  RECV (offset %li) ", offset);
 475         NBC_GET_BYTES(ptr,recvargs);
 476         NBC_DEBUG(5, "*buf: %p, count: %i, type: %p, source: %i, tag: %i)\n", recvargs.buf, recvargs.count,
 477                   recvargs.datatype, recvargs.source, handle->tag);
 478         /* get an additional request - TODO: req_count NOT thread safe */
 479         handle->req_count++;
 480         /* get buffer */
 481         if(recvargs.tmpbuf) {
 482           buf1=(char*)handle->tmpbuf+(long)recvargs.buf;
 483         } else {
 484           buf1=recvargs.buf;
 485         }
 486 #ifdef NBC_TIMING
 487         Irecv_time -= MPI_Wtime();
 488 #endif
 489         tmp = (MPI_Request *) realloc ((void *) handle->req_array, handle->req_count * sizeof (MPI_Request));
 490         if (NULL == tmp) {
 491           return OMPI_ERR_OUT_OF_RESOURCE;
 492         }
 493 
 494         handle->req_array = tmp;
 495 
 496         res = MCA_PML_CALL(irecv(buf1, recvargs.count, recvargs.datatype, recvargs.source, handle->tag, recvargs.local?handle->comm->c_local_comm:handle->comm,
 497                                  handle->req_array+handle->req_count-1));
 498         if (OMPI_SUCCESS != res) {
 499           NBC_Error("Error in MPI_Irecv(%lu, %i, %p, %i, %i, %lu) (%i)", (unsigned long)buf1, recvargs.count,
 500                     recvargs.datatype, recvargs.source, handle->tag, (unsigned long)handle->comm, res);
 501           return res;
 502         }
 503 #ifdef NBC_TIMING
 504         Irecv_time += MPI_Wtime();
 505 #endif
 506         break;
 507       case OP:
 508         NBC_DEBUG(5, "  OP2  (offset %li) ", offset);
 509         NBC_GET_BYTES(ptr,opargs);
 510         NBC_DEBUG(5, "*buf1: %p, buf2: %p, count: %i, type: %p)\n", opargs.buf1, opargs.buf2,
 511                   opargs.count, opargs.datatype);
 512         /* get buffers */
 513         if(opargs.tmpbuf1) {
 514           buf1=(char*)handle->tmpbuf+(long)opargs.buf1;
 515         } else {
 516           buf1=(void *)opargs.buf1;
 517         }
 518         if(opargs.tmpbuf2) {
 519           buf2=(char*)handle->tmpbuf+(long)opargs.buf2;
 520         } else {
 521           buf2=opargs.buf2;
 522         }
 523         ompi_op_reduce(opargs.op, buf1, buf2, opargs.count, opargs.datatype);
 524         break;
 525       case COPY:
 526         NBC_DEBUG(5, "  COPY   (offset %li) ", offset);
 527         NBC_GET_BYTES(ptr,copyargs);
 528         NBC_DEBUG(5, "*src: %lu, srccount: %i, srctype: %p, *tgt: %lu, tgtcount: %i, tgttype: %p)\n",
 529                   (unsigned long) copyargs.src, copyargs.srccount, copyargs.srctype,
 530                   (unsigned long) copyargs.tgt, copyargs.tgtcount, copyargs.tgttype);
 531         /* get buffers */
 532         if(copyargs.tmpsrc) {
 533           buf1=(char*)handle->tmpbuf+(long)copyargs.src;
 534         } else {
 535           buf1=copyargs.src;
 536         }
 537         if(copyargs.tmptgt) {
 538           buf2=(char*)handle->tmpbuf+(long)copyargs.tgt;
 539         } else {
 540           buf2=copyargs.tgt;
 541         }
 542         res = NBC_Copy (buf1, copyargs.srccount, copyargs.srctype, buf2, copyargs.tgtcount, copyargs.tgttype,
 543                         handle->comm);
 544         if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
 545           return res;
 546         }
 547         break;
 548       case UNPACK:
 549         NBC_DEBUG(5, "  UNPACK   (offset %li) ", offset);
 550         NBC_GET_BYTES(ptr,unpackargs);
 551         NBC_DEBUG(5, "*src: %lu, srccount: %i, srctype: %p, *tgt: %lu\n", (unsigned long) unpackargs.inbuf,
 552                   unpackargs.count, unpackargs.datatype, (unsigned long) unpackargs.outbuf);
 553         /* get buffers */
 554         if(unpackargs.tmpinbuf) {
 555           buf1=(char*)handle->tmpbuf+(long)unpackargs.inbuf;
 556         } else {
 557           buf1=unpackargs.inbuf;
 558         }
 559         if(unpackargs.tmpoutbuf) {
 560           buf2=(char*)handle->tmpbuf+(long)unpackargs.outbuf;
 561         } else {
 562           buf2=unpackargs.outbuf;
 563         }
 564         res = NBC_Unpack (buf1, unpackargs.count, unpackargs.datatype, buf2, handle->comm);
 565         if (OMPI_SUCCESS != res) {
 566           NBC_Error ("NBC_Unpack() failed (code: %i)", res);
 567           return res;
 568         }
 569 
 570         break;
 571       default:
 572         NBC_Error ("NBC_Start_round: bad type %li at offset %li", (long)type, offset);
 573         return OMPI_ERROR;
 574     }
 575   }
 576 
 577   /* check if we can make progress - not in the first round, this allows us to leave the
 578    * initialization faster and to reach more overlap
 579    *
 580    * threaded case: calling progress in the first round can lead to a
 581    * deadlock if NBC_Free is called in this round :-( */
 582   if (handle->row_offset) {
 583     res = NBC_Progress(handle);
 584     if ((NBC_OK != res) && (NBC_CONTINUE != res)) {
 585       return OMPI_ERROR;
 586     }
 587   }
 588 
 589   return OMPI_SUCCESS;
 590 }
 591 
 592 void NBC_Return_handle(ompi_coll_libnbc_request_t *request) {
 593   NBC_Free (request);
 594   OMPI_COLL_LIBNBC_REQUEST_RETURN(request);
 595 }
 596 
 597 int  NBC_Init_comm(MPI_Comm comm, NBC_Comminfo *comminfo) {
 598   comminfo->tag= MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
 599 
 600 #ifdef NBC_CACHE_SCHEDULE
 601   /* initialize the NBC_ALLTOALL SchedCache tree */
 602   comminfo->NBC_Dict[NBC_ALLTOALL] = hb_tree_new((dict_cmp_func)NBC_Alltoall_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 603   if(comminfo->NBC_Dict[NBC_ALLTOALL] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 604   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_ALLTOALL]);
 605   comminfo->NBC_Dict_size[NBC_ALLTOALL] = 0;
 606   /* initialize the NBC_ALLGATHER SchedCache tree */
 607   comminfo->NBC_Dict[NBC_ALLGATHER] = hb_tree_new((dict_cmp_func)NBC_Allgather_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 608   if(comminfo->NBC_Dict[NBC_ALLGATHER] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 609   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_ALLGATHER]);
 610   comminfo->NBC_Dict_size[NBC_ALLGATHER] = 0;
 611   /* initialize the NBC_ALLREDUCE SchedCache tree */
 612   comminfo->NBC_Dict[NBC_ALLREDUCE] = hb_tree_new((dict_cmp_func)NBC_Allreduce_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 613   if(comminfo->NBC_Dict[NBC_ALLREDUCE] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 614   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_ALLREDUCE]);
 615   comminfo->NBC_Dict_size[NBC_ALLREDUCE] = 0;
 616   /* initialize the NBC_BARRIER SchedCache tree - is not needed -
 617    * schedule is hung off directly */
 618   comminfo->NBC_Dict_size[NBC_BARRIER] = 0;
 619   /* initialize the NBC_BCAST SchedCache tree */
 620   comminfo->NBC_Dict[NBC_BCAST] = hb_tree_new((dict_cmp_func)NBC_Bcast_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 621   if(comminfo->NBC_Dict[NBC_BCAST] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 622   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_BCAST]);
 623   comminfo->NBC_Dict_size[NBC_BCAST] = 0;
 624   /* initialize the NBC_GATHER SchedCache tree */
 625   comminfo->NBC_Dict[NBC_GATHER] = hb_tree_new((dict_cmp_func)NBC_Gather_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 626   if(comminfo->NBC_Dict[NBC_GATHER] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 627   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_GATHER]);
 628   comminfo->NBC_Dict_size[NBC_GATHER] = 0;
 629   /* initialize the NBC_REDUCE SchedCache tree */
 630   comminfo->NBC_Dict[NBC_REDUCE] = hb_tree_new((dict_cmp_func)NBC_Reduce_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 631   if(comminfo->NBC_Dict[NBC_REDUCE] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 632   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_REDUCE]);
 633   comminfo->NBC_Dict_size[NBC_REDUCE] = 0;
 634   /* initialize the NBC_SCAN SchedCache tree */
 635   comminfo->NBC_Dict[NBC_SCAN] = hb_tree_new((dict_cmp_func)NBC_Scan_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 636   if(comminfo->NBC_Dict[NBC_SCAN] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 637   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_SCAN]);
 638   comminfo->NBC_Dict_size[NBC_SCAN] = 0;
 639   /* initialize the NBC_SCATTER SchedCache tree */
 640   comminfo->NBC_Dict[NBC_SCATTER] = hb_tree_new((dict_cmp_func)NBC_Scatter_args_compare, NBC_SchedCache_args_delete_key_dummy, NBC_SchedCache_args_delete);
 641   if(comminfo->NBC_Dict[NBC_SCATTER] == NULL) { printf("Error in hb_tree_new()\n"); return OMPI_ERROR;; }
 642   NBC_DEBUG(1, "added tree at address %lu\n", (unsigned long)comminfo->NBC_Dict[NBC_SCATTER]);
 643   comminfo->NBC_Dict_size[NBC_SCATTER] = 0;
 644 #endif
 645 
 646   return OMPI_SUCCESS;
 647 }
 648 
 649 int NBC_Start(NBC_Handle *handle) {
 650   int res;
 651 
 652   /* bozo case */
 653   if ((ompi_request_t *)handle == &ompi_request_empty) {
 654     return OMPI_SUCCESS;
 655   }
 656 
 657   /* kick off first round */
 658   handle->super.req_state = OMPI_REQUEST_ACTIVE;
 659   handle->super.req_status.MPI_ERROR = OMPI_SUCCESS;
 660   res = NBC_Start_round(handle);
 661   if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
 662     return res;
 663   }
 664 
 665   OPAL_THREAD_LOCK(&mca_coll_libnbc_component.lock);
 666   opal_list_append(&mca_coll_libnbc_component.active_requests, &(handle->super.super.super));
 667   OPAL_THREAD_UNLOCK(&mca_coll_libnbc_component.lock);
 668 
 669   return OMPI_SUCCESS;
 670 }
 671 
 672 int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
 673                          ompi_coll_libnbc_module_t *module, bool persistent,
 674                          ompi_request_t **request, void *tmpbuf) {
 675   int ret, tmp_tag;
 676   bool need_register = false;
 677   ompi_coll_libnbc_request_t *handle;
 678 
 679   /* no operation (e.g. one process barrier)? */
 680   if (((int *)schedule->data)[0] == 0 && schedule->data[sizeof(int)] == 0) {
 681     ret = nbc_get_noop_request(persistent, request);
 682     if (OMPI_SUCCESS != ret) {
 683       return OMPI_ERR_OUT_OF_RESOURCE;
 684     }
 685 
 686     /* update the module->tag here because other processes may have operations
 687      * and they may update the module->tag */
 688     OPAL_THREAD_LOCK(&module->mutex);
 689     tmp_tag = module->tag--;
 690     if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
 691       tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
 692       NBC_DEBUG(2,"resetting tags ...\n");
 693     }
 694     OPAL_THREAD_UNLOCK(&module->mutex);
 695 
 696     OBJ_RELEASE(schedule);
 697     free(tmpbuf);
 698 
 699     return OMPI_SUCCESS;
 700   }
 701 
 702   OMPI_COLL_LIBNBC_REQUEST_ALLOC(comm, persistent, handle);
 703   if (NULL == handle) return OMPI_ERR_OUT_OF_RESOURCE;
 704 
 705   handle->tmpbuf = NULL;
 706   handle->req_count = 0;
 707   handle->req_array = NULL;
 708   handle->comm = comm;
 709   handle->schedule = NULL;
 710   handle->row_offset = 0;
 711   handle->nbc_complete = persistent ? true : false;
 712 
 713   /******************** Do the tag and shadow comm administration ...  ***************/
 714 
 715   OPAL_THREAD_LOCK(&module->mutex);
 716   tmp_tag = module->tag--;
 717   if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
 718       tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
 719       NBC_DEBUG(2,"resetting tags ...\n");
 720   }
 721 
 722   if (true != module->comm_registered) {
 723       module->comm_registered = true;
 724       need_register = true;
 725   }
 726   OPAL_THREAD_UNLOCK(&module->mutex);
 727 
 728   handle->tag = tmp_tag;
 729 
 730   /* register progress */
 731   if (need_register) {
 732       int32_t tmp =
 733           OPAL_THREAD_ADD_FETCH32(&mca_coll_libnbc_component.active_comms, 1);
 734       if (tmp == 1) {
 735           opal_progress_register(ompi_coll_libnbc_progress);
 736       }
 737   }
 738 
 739   handle->comm=comm;
 740   /*printf("got module: %lu tag: %i\n", module, module->tag);*/
 741 
 742   /******************** end of tag and shadow comm administration ...  ***************/
 743   handle->comminfo = module;
 744 
 745   NBC_DEBUG(3, "got tag %i\n", handle->tag);
 746 
 747   handle->tmpbuf = tmpbuf;
 748   handle->schedule = schedule;
 749   *request = (ompi_request_t *) handle;
 750 
 751   return OMPI_SUCCESS;
 752 }
 753 
 754 #ifdef NBC_CACHE_SCHEDULE
 755 void NBC_SchedCache_args_delete_key_dummy(void *k) {
 756     /* do nothing because the key and the data element are identical :-)
 757      * both (the single one :) is freed in NBC_<COLLOP>_args_delete() */
 758 }
 759 
 760 void NBC_SchedCache_args_delete(void *entry) {
 761   struct NBC_dummyarg *tmp = (struct NBC_dummyarg*)entry;
 762   OBJ_RELEASE(tmp->schedule);
 763   free(entry);
 764 }
 765 #endif

/* [<][>][^][v][top][bottom][index][help] */