root/ompi/mca/coll/portals4/coll_portals4_allreduce.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. allreduce_kary_tree_top
  2. allreduce_kary_tree_bottom
  3. ompi_coll_portals4_allreduce_intra
  4. ompi_coll_portals4_iallreduce_intra
  5. ompi_coll_portals4_iallreduce_intra_fini

   1 /*
   2  * Copyright (c) 2015      Sandia National Laboratories. All rights reserved.
   3  * Copyright (c) 2015      Bull SAS.  All rights reserved.
   4  * Copyright (c) 2015      Research Organization for Information Science
   5  *                         and Technology (RIST). All rights reserved.
   6  * Copyright (c) 2017      IBM Corporation.  All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #include "ompi_config.h"
  15 #include "coll_portals4.h"
  16 #include "coll_portals4_request.h"
  17 
  18 #include <stdio.h>
  19 
  20 #include "mpi.h"
  21 #include "ompi/constants.h"
  22 #include "ompi/datatype/ompi_datatype.h"
  23 #include "ompi/datatype/ompi_datatype_internal.h"
  24 #include "ompi/op/op.h"
  25 #include "opal/util/bit_ops.h"
  26 #include "ompi/mca/pml/pml.h"
  27 #include "ompi/mca/coll/coll.h"
  28 #include "ompi/mca/coll/base/base.h"
  29 
  30 
  31 #define COLL_PORTALS4_ALLREDUCE_MAX_SEGMENT     128
  32 #define COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN    2
  33 
  34 static int
  35 allreduce_kary_tree_top(const void *sendbuf, void *recvbuf, int count,
  36         MPI_Datatype dtype, MPI_Op op,
  37         struct ompi_communicator_t *comm,
  38         ompi_coll_portals4_request_t *request,
  39         mca_coll_portals4_module_t *module)
  40 {
  41     bool is_sync = request->is_sync;
  42     int ret, i;
  43     int size = ompi_comm_size(comm);
  44     int rank = ompi_comm_rank(comm);
  45     ptl_rank_t parent, child[COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN];
  46     unsigned int child_nb;
  47     size_t internal_count, length;
  48     ptl_handle_md_t zero_md_h, data_md_h;
  49     ptl_handle_me_t me_h;
  50     ptl_me_t me;
  51     ptl_match_bits_t match_bits_ack, match_bits_rtr, match_bits;
  52     ptl_ct_event_t ct;
  53     ptl_op_t ptl_op;
  54     ptl_datatype_t ptl_dtype;
  55 
  56     request->type = OMPI_COLL_PORTALS4_TYPE_ALLREDUCE;
  57 
  58     /*
  59      ** Initialization
  60      */
  61 
  62     for (i = 0 ; i < COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN ; i++) {
  63         child[i] = PTL_INVALID_RANK;
  64     }
  65 
  66     parent = PTL_INVALID_RANK;
  67 
  68     zero_md_h = mca_coll_portals4_component.zero_md_h;
  69     data_md_h = mca_coll_portals4_component.data_md_h;
  70 
  71     internal_count = opal_atomic_add_fetch_size_t(&module->coll_count, 1);
  72 
  73     /*
  74      ** DATATYPE and SIZES
  75      */
  76     ret = ompi_datatype_type_size(dtype, &length);
  77     length *= count;
  78 
  79     request->u.allreduce.is_optim = is_reduce_optimizable(dtype, length, op, &ptl_dtype, &ptl_op);
  80 
  81     if (request->u.allreduce.is_optim) {
  82         /*
  83          * TOPOLOGY
  84          */
  85 
  86         /* this function is dependent on the number of segments,
  87          * if we use segmentation pipe-line is preferred, and
  88          * binary tree otherwise */
  89 
  90         get_k_ary_tree(COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN,
  91                 rank, size, PTL_FIRST_RANK, &parent, child, &child_nb);
  92         request->u.allreduce.child_nb = child_nb;
  93 
  94         /*
  95          * PORTALS4 RESOURCE ALLOCATION
  96          */
  97 
  98         /* Compute match bits */
  99         COLL_PORTALS4_SET_BITS(match_bits_ack, ompi_comm_get_cid(comm), 1, 0,
 100                 COLL_PORTALS4_ALLREDUCE, 0, internal_count);
 101 
 102         COLL_PORTALS4_SET_BITS(match_bits_rtr, ompi_comm_get_cid(comm), 0, 1,
 103                 COLL_PORTALS4_ALLREDUCE, 0, internal_count);
 104 
 105         COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_cid(comm), 0, 0,
 106                 COLL_PORTALS4_ALLREDUCE, 0, internal_count);
 107 
 108         if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.allreduce.trig_ct_h)) != 0) {
 109             return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
 110         }
 111 
 112         if (sendbuf != MPI_IN_PLACE) {
 113             /*
 114              * copy the data from sendbuf to recvbuf
 115              */
 116             memcpy(recvbuf, sendbuf, length);
 117         }
 118 
 119         /*
 120          ** Prepare Data ME
 121          */
 122         memset(&me, 0, sizeof(ptl_me_t));
 123         me.start = recvbuf;
 124         me.length = length;
 125         me.ct_handle = request->u.allreduce.trig_ct_h;
 126         me.uid = mca_coll_portals4_component.uid;
 127         me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 128                 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 129                 PTL_ME_EVENT_CT_COMM;
 130         me.match_id.phys.nid = PTL_NID_ANY;
 131         me.match_id.phys.pid = PTL_PID_ANY;
 132         me.match_bits = match_bits;
 133         me.ignore_bits = 0;
 134 
 135         if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 136                 mca_coll_portals4_component.pt_idx,
 137                 &me, PTL_PRIORITY_LIST, NULL,
 138                 &request->u.allreduce.data_me_h)) != 0) {
 139             return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
 140         }
 141 
 142         if (child_nb) {
 143             if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.allreduce.ack_ct_h)) != 0) {
 144                 return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
 145             }
 146 
 147             for (i = 0 ; i < COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN ; i++) {
 148                 if (child[i] != PTL_INVALID_RANK) {
 149 
 150                     /*
 151                      ** Prepare ME for data ACK Put
 152                      ** Priority List
 153                      */
 154 
 155                     memset(&me, 0, sizeof(ptl_me_t));
 156                     me.start = NULL;
 157                     me.length = 0;
 158                     me.min_free = 0;
 159                     me.uid = mca_coll_portals4_component.uid;
 160                     me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 161                             PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 162                             PTL_ME_USE_ONCE |
 163                             PTL_ME_EVENT_CT_COMM;
 164                     me.match_id.phys.nid = PTL_NID_ANY;
 165                     me.match_id.phys.pid = PTL_PID_ANY;
 166                     me.match_bits = match_bits_ack;
 167                     me.ignore_bits = 0;
 168                     me.ct_handle = request->u.allreduce.ack_ct_h;
 169 
 170                     if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 171                             mca_coll_portals4_component.pt_idx,
 172                             &me, PTL_PRIORITY_LIST,
 173                             NULL,
 174                             &me_h)) != 0) {
 175                         return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
 176                     }
 177 
 178                     /*
 179                      * Prepare Triggered Put to send the result to children
 180                      *
 181                      */
 182 
 183                     if ((ret = PtlTriggeredPut (data_md_h,
 184                             (uint64_t)recvbuf,
 185                             length, PTL_NO_ACK_REQ,
 186                             ompi_coll_portals4_get_peer(comm, child[i]),
 187                             mca_coll_portals4_component.pt_idx,
 188                             match_bits, 0, NULL, 0,
 189                             request->u.allreduce.trig_ct_h,
 190                             (rank != PTL_FIRST_RANK) ? child_nb + 2 : child_nb)) != 0) {
 191                         return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
 192                     }
 193                 }
 194 
 195             }
 196         }
 197 
 198         if (rank != PTL_FIRST_RANK) {
 199 
 200             /* Send Atomic operation to the parent */
 201             if ((ret = PtlTriggeredAtomic(data_md_h,
 202                     (uint64_t)recvbuf,
 203                     length, PTL_NO_ACK_REQ,
 204                     ompi_coll_portals4_get_peer(comm, parent),
 205                     mca_coll_portals4_component.pt_idx,
 206                     match_bits, 0, NULL, 0,
 207                     ptl_op, ptl_dtype, request->u.allreduce.trig_ct_h,
 208                     child_nb + 1)) != 0) {
 209                 return opal_stderr("PtlTriggeredAtomic failed", __FILE__, __LINE__, ret);
 210             }
 211 
 212             if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 213                     ompi_coll_portals4_get_peer(comm, parent),
 214                     mca_coll_portals4_component.pt_idx,
 215                     match_bits_ack, 0, NULL, 0,
 216                     request->u.allreduce.trig_ct_h,
 217                     child_nb + 2)) != 0) {
 218                 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
 219             }
 220 
 221             /*
 222              ** Prepare ME for receving RTR
 223              ** Priority List, match also with "Overflow list Me" in coll_portals4_component
 224              */
 225 
 226             memset(&me, 0, sizeof(ptl_me_t));
 227             me.start = NULL;
 228             me.length = 0;
 229             me.min_free = 0;
 230             me.uid = mca_coll_portals4_component.uid;
 231             me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 232                     PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 233                     PTL_ME_USE_ONCE |
 234                     PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW;
 235             me.match_id.phys.nid = PTL_NID_ANY;
 236             me.match_id.phys.pid = PTL_PID_ANY;
 237             me.match_bits = match_bits_rtr;
 238             me.ignore_bits = 0;
 239             me.ct_handle = request->u.allreduce.trig_ct_h;
 240 
 241             if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 242                     mca_coll_portals4_component.pt_idx,
 243                     &me, PTL_PRIORITY_LIST,
 244                     NULL,
 245                     &me_h)) != 0) {
 246                 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
 247             }
 248         }
 249 
 250 
 251         /*
 252          * OK, everything is ready for data and acknowledgement reception
 253          *
 254          */
 255 
 256         if (child_nb) {
 257             for (i = 0 ; i < COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN ; i++) {
 258                 if (child[i] != PTL_INVALID_RANK) {
 259                     /*
 260                      * Send RTR to children
 261                      *
 262                      */
 263 
 264                     /* and there, we only send the RTR when all the MEs are ready */
 265                     if ((ret = PtlPut(zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 266                             ompi_coll_portals4_get_peer(comm, child[i]),
 267                             mca_coll_portals4_component.pt_idx,
 268                             match_bits_rtr, 0, NULL, 0)) != PTL_OK)
 269                         return opal_stderr("Put RTR failed %d", __FILE__, __LINE__, ret);
 270                 }
 271             }
 272         }
 273 
 274         if (child_nb) {
 275             if (is_sync) {
 276                 if ((ret = PtlCTWait(request->u.allreduce.ack_ct_h,
 277                         child_nb, &ct)) != 0) {
 278                     opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
 279                 }
 280             }
 281             else {
 282                 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 283                         ompi_coll_portals4_get_peer(comm, rank),
 284                         mca_coll_portals4_component.finish_pt_idx,
 285                         0, 0, NULL, (uintptr_t) request,
 286                         request->u.allreduce.ack_ct_h,
 287                         child_nb)) != 0) {
 288                     return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
 289                 }
 290             }
 291         }
 292         else {
 293             if (is_sync) {
 294                 if ((ret = PtlCTWait(request->u.allreduce.trig_ct_h,
 295                         (rank != PTL_FIRST_RANK) ? 2 : 0, &ct)) != 0) {
 296                     opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
 297                 }
 298             }
 299             else {
 300                 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 301                         ompi_coll_portals4_get_peer(comm, rank),
 302                         mca_coll_portals4_component.finish_pt_idx,
 303                         0, 0, NULL, (uintptr_t) request,
 304                         request->u.allreduce.trig_ct_h,
 305                         (rank != PTL_FIRST_RANK) ? 2 : 0)) != 0) {
 306                     return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
 307                 }
 308             }
 309         }
 310 
 311     }
 312     else {
 313         opal_output_verbose(100, ompi_coll_base_framework.framework_output,
 314                 "rank %d - optimization not supported, falling back to previous handler\n", rank);
 315 
 316         if (request->is_sync) {
 317             if ((module->previous_allreduce) && (module->previous_allreduce_module)) {
 318                 ret = module->previous_allreduce(sendbuf, recvbuf, count, dtype, op,
 319                         comm, module->previous_allreduce_module);
 320             }
 321             else {
 322                 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 323                         "rank %d - no previous allreduce handler is available, aborting\n", rank);
 324                 return (OMPI_ERROR);
 325             }
 326         }
 327         else {
 328             if ((module->previous_iallreduce) && (module->previous_iallreduce_module)) {
 329                 ret = module->previous_iallreduce(sendbuf, recvbuf, count, dtype, op,
 330                         comm, request->fallback_request, module->previous_iallreduce_module);
 331             }
 332             else {
 333                 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 334                         "rank %d - no previous iallreduce handler is available, aborting\n", rank);
 335                 return (OMPI_ERROR);
 336             }
 337         }
 338         return ret;
 339     }
 340     return (OMPI_SUCCESS);
 341 }
 342 
 343 static int
 344 allreduce_kary_tree_bottom(ompi_coll_portals4_request_t *request)
 345 {
 346     int ret;
 347 
 348     if (request->u.allreduce.is_optim) {
 349         PtlAtomicSync();
 350 
 351         if (request->u.allreduce.child_nb) {
 352             ret = PtlCTFree(request->u.allreduce.ack_ct_h);
 353             if (PTL_OK != ret) {
 354                 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 355                         "%s:%d: PtlCTFree failed: %d\n",
 356                         __FILE__, __LINE__, ret);
 357                 return OMPI_ERROR;
 358             }
 359         }
 360 
 361         do {
 362             ret = PtlMEUnlink(request->u.allreduce.data_me_h);
 363         } while (PTL_IN_USE == ret);
 364         if (PTL_OK != ret) {
 365             opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 366                     "%s:%d: PtlMEUnlink failed: %d\n",
 367                     __FILE__, __LINE__, ret);
 368             return OMPI_ERROR;
 369         }
 370 
 371         ret = PtlCTFree(request->u.allreduce.trig_ct_h);
 372         if (PTL_OK != ret) {
 373             opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 374                     "%s:%d: PtlCTFree failed: %d\n",
 375                     __FILE__, __LINE__, ret);
 376             return OMPI_ERROR;
 377         }
 378     }
 379 
 380     return (OMPI_SUCCESS);
 381 }
 382 
 383 int ompi_coll_portals4_allreduce_intra(const void* sendbuf, void* recvbuf, int count,
 384         MPI_Datatype dtype, MPI_Op op,
 385         struct ompi_communicator_t *comm,
 386         struct mca_coll_base_module_2_3_0_t *module)
 387 {
 388     mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
 389     ompi_coll_portals4_request_t *request;
 390 
 391     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
 392     if (NULL == request) {
 393         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 394                 "%s:%d: request alloc failed\n",
 395                 __FILE__, __LINE__);
 396         return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
 397     }
 398 
 399     request->is_sync = true;
 400     request->fallback_request = NULL;
 401 
 402     allreduce_kary_tree_top(sendbuf, recvbuf, count,
 403             dtype, op, comm, request, portals4_module);
 404 
 405     allreduce_kary_tree_bottom(request);
 406 
 407     OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
 408     return (OMPI_SUCCESS);
 409 }
 410 
 411 
 412 int ompi_coll_portals4_iallreduce_intra(const void* sendbuf, void* recvbuf, int count,
 413         MPI_Datatype dtype, MPI_Op op,
 414         struct ompi_communicator_t *comm,
 415         ompi_request_t ** ompi_request,
 416         struct mca_coll_base_module_2_3_0_t *module)
 417 {
 418     mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
 419     ompi_coll_portals4_request_t *request;
 420 
 421     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
 422     if (NULL == request) {
 423         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 424                 "%s:%d: request alloc failed\n",
 425                 __FILE__, __LINE__);
 426         return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
 427     }
 428     *ompi_request = &request->super;
 429     request->fallback_request = ompi_request;
 430     request->is_sync = false;
 431 
 432     allreduce_kary_tree_top(sendbuf, recvbuf, count,
 433             dtype, op, comm, request, portals4_module);
 434 
 435     opal_output_verbose(10, ompi_coll_base_framework.framework_output, "iallreduce");
 436     return (OMPI_SUCCESS);
 437 }
 438 
 439 
 440 int
 441 ompi_coll_portals4_iallreduce_intra_fini(struct ompi_coll_portals4_request_t *request)
 442 {
 443     allreduce_kary_tree_bottom(request);
 444     ompi_request_complete(&request->super, true);
 445 
 446     return (OMPI_SUCCESS);
 447 }

/* [<][>][^][v][top][bottom][index][help] */