root/ompi/mca/coll/portals4/coll_portals4_reduce.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. reduce_kary_tree_top
  2. reduce_kary_tree_bottom
  3. ompi_coll_portals4_reduce_intra
  4. ompi_coll_portals4_ireduce_intra
  5. ompi_coll_portals4_ireduce_intra_fini

   1 /*
   2  * Copyright (c) 2015      Sandia National Laboratories. All rights reserved.
   3  * Copyright (c) 2015      Bull SAS.  All rights reserved.
   4  * Copyright (c) 2015      Research Organization for Information Science
   5  *                         and Technology (RIST). All rights reserved.
   6  * Copyright (c) 2017      IBM Corporation.  All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #include "ompi_config.h"
  15 #include "coll_portals4.h"
  16 #include "coll_portals4_request.h"
  17 
  18 #include <stdio.h>
  19 
  20 #include "mpi.h"
  21 #include "ompi/constants.h"
  22 #include "ompi/datatype/ompi_datatype.h"
  23 #include "ompi/datatype/ompi_datatype_internal.h"
  24 #include "ompi/op/op.h"
  25 #include "opal/util/bit_ops.h"
  26 #include "ompi/mca/pml/pml.h"
  27 #include "ompi/mca/coll/coll.h"
  28 #include "ompi/mca/coll/base/base.h"
  29 
  30 #define COLL_PORTALS4_REDUCE_MAX_CHILDREN       2
  31 
  32 
  33 static int
  34 reduce_kary_tree_top(const void *sendbuf, void *recvbuf, int count,
  35         MPI_Datatype dtype, MPI_Op op,
  36         int root,
  37         struct ompi_communicator_t *comm,
  38         ompi_coll_portals4_request_t *request,
  39         mca_coll_portals4_module_t *module)
  40 {
  41     bool is_sync = request->is_sync;
  42     int ret;
  43     unsigned int i;
  44     int size = ompi_comm_size(comm);
  45     int rank = ompi_comm_rank(comm);
  46     ptl_rank_t parent, child[COLL_PORTALS4_REDUCE_MAX_CHILDREN];
  47     size_t internal_count, length;
  48     ptl_handle_md_t zero_md_h, data_md_h;
  49     ptl_handle_me_t me_h;
  50     ptl_me_t me;
  51     ptl_match_bits_t match_bits_ack, match_bits_rtr, match_bits;
  52     ptl_ct_event_t ct;
  53     ptl_op_t ptl_op;
  54     ptl_datatype_t ptl_dtype;
  55 
  56 
  57     request->type = OMPI_COLL_PORTALS4_TYPE_REDUCE;
  58 
  59     /*
  60      ** Initialization
  61      */
  62 
  63     for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) {
  64         child[i] = PTL_INVALID_RANK;
  65     }
  66 
  67     parent = PTL_INVALID_RANK;
  68 
  69     zero_md_h = mca_coll_portals4_component.zero_md_h;
  70     data_md_h = mca_coll_portals4_component.data_md_h;
  71 
  72     internal_count = opal_atomic_add_fetch_size_t(&module->coll_count, 1);
  73 
  74     /*
  75      ** DATATYPE and SIZES
  76      */
  77     ret = ompi_datatype_type_size(dtype, &length);
  78     length *= count;
  79 
  80     request->u.reduce.is_optim = is_reduce_optimizable(dtype, length, op, &ptl_dtype, &ptl_op);
  81 
  82     if (request->u.reduce.is_optim) {
  83 
  84         /*
  85          * TOPOLOGY
  86          */
  87 
  88         /* this function is dependent on the number of segments,
  89          * if we use segmentation pipe-line is preferred, and
  90          * binary tree otherwise */
  91 
  92         get_k_ary_tree(COLL_PORTALS4_REDUCE_MAX_CHILDREN,
  93                 rank, size, root, &parent, child, &request->u.reduce.child_nb);
  94 
  95         /*
  96          * PORTALS4 RESOURCE ALLOCATION
  97          */
  98 
  99         /* Compute match bits */
 100         COLL_PORTALS4_SET_BITS(match_bits_ack, ompi_comm_get_cid(comm), 1, 0,
 101                 COLL_PORTALS4_REDUCE, 0, internal_count);
 102 
 103         COLL_PORTALS4_SET_BITS(match_bits_rtr, ompi_comm_get_cid(comm), 0, 1,
 104                 COLL_PORTALS4_REDUCE, 0, internal_count);
 105 
 106         COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_cid(comm), 0, 0,
 107                 COLL_PORTALS4_REDUCE, 0, internal_count);
 108 
 109         if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.trig_ct_h)) != 0) {
 110             return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
 111         }
 112 
 113         /* warning : all the operations will be executed on the recvbuf */
 114         if (rank != root) {
 115             request->u.reduce.free_buffer = malloc(length);
 116             if (NULL == request->u.reduce.free_buffer) {
 117                 return OMPI_ERR_OUT_OF_RESOURCE;
 118             }
 119             recvbuf = (void*)request->u.reduce.free_buffer;
 120 
 121             memcpy(recvbuf, sendbuf, length);
 122         }
 123         else {
 124             request->u.reduce.free_buffer = NULL;
 125             if (sendbuf != MPI_IN_PLACE) {
 126                 memcpy(recvbuf, sendbuf, length);
 127             }
 128         }
 129 
 130         if (request->u.reduce.child_nb) {
 131 
 132             /*
 133              ** Prepare Data ME
 134              */
 135             memset(&me, 0, sizeof(ptl_me_t));
 136             me.start = recvbuf;
 137             me.length = length;
 138             me.ct_handle = request->u.reduce.trig_ct_h;
 139             me.uid = mca_coll_portals4_component.uid;
 140             me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 141                     PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 142                     PTL_ME_EVENT_CT_COMM;
 143             me.match_id.phys.nid = PTL_NID_ANY;
 144             me.match_id.phys.pid = PTL_PID_ANY;
 145             me.match_bits = match_bits;
 146             me.ignore_bits = 0;
 147 
 148             if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 149                     mca_coll_portals4_component.pt_idx,
 150                     &me, PTL_PRIORITY_LIST, NULL,
 151                     &request->u.reduce.data_me_h)) != 0) {
 152                 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
 153             }
 154         }
 155 
 156         if (rank != root) {
 157             request->u.reduce.use_ack_ct_h = true;
 158 
 159             if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.ack_ct_h)) != 0) {
 160                 return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
 161             }
 162 
 163             /*
 164              ** Prepare ME for data ACK Put
 165              ** Priority List
 166              */
 167 
 168             memset(&me, 0, sizeof(ptl_me_t));
 169             me.start = NULL;
 170             me.length = 0;
 171             me.min_free = 0;
 172             me.uid = mca_coll_portals4_component.uid;
 173             me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 174                     PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 175                     PTL_ME_USE_ONCE |
 176                     PTL_ME_EVENT_CT_COMM;
 177             me.match_id.phys.nid = PTL_NID_ANY;
 178             me.match_id.phys.pid = PTL_PID_ANY;
 179             me.match_bits = match_bits_ack;
 180             me.ignore_bits = 0;
 181             me.ct_handle = request->u.reduce.ack_ct_h;
 182 
 183             if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 184                     mca_coll_portals4_component.pt_idx,
 185                     &me, PTL_PRIORITY_LIST,
 186                     NULL,
 187                     &me_h)) != 0) {
 188                 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
 189             }
 190 
 191             /*
 192              ** Prepare ME for sending RTR Put
 193              ** Priority List, match also with "Overflow list Me" in coll_portals4_component
 194              */
 195 
 196             memset(&me, 0, sizeof(ptl_me_t));
 197             me.start = NULL;
 198             me.length = 0;
 199             me.min_free = 0;
 200             me.uid = mca_coll_portals4_component.uid;
 201             me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 202                     PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 203                     PTL_ME_USE_ONCE |
 204                     PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW;
 205             me.match_id.phys.nid = PTL_NID_ANY;
 206             me.match_id.phys.pid = PTL_PID_ANY;
 207             me.match_bits = match_bits_rtr;
 208             me.ignore_bits = 0;
 209             me.ct_handle = request->u.reduce.trig_ct_h;
 210 
 211             if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 212                     mca_coll_portals4_component.pt_idx,
 213                     &me, PTL_PRIORITY_LIST,
 214                     NULL,
 215                     &me_h)) != 0) {
 216                 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
 217             }
 218 
 219             /* Send Atomic operation to the parent */
 220             if ((ret = PtlTriggeredAtomic(data_md_h,
 221                     (uint64_t)recvbuf,
 222                     length, PTL_NO_ACK_REQ,
 223                     ompi_coll_portals4_get_peer(comm, parent),
 224                     mca_coll_portals4_component.pt_idx,
 225                     match_bits, 0, NULL, 0,
 226                     ptl_op, ptl_dtype, request->u.reduce.trig_ct_h,
 227                     request->u.reduce.child_nb + 1)) != 0) {
 228                 return opal_stderr("PtlTriggeredAtomic failed", __FILE__, __LINE__, ret);
 229             }
 230         }
 231         else {
 232             request->u.reduce.use_ack_ct_h = false;
 233         }
 234 
 235         if (request->u.reduce.child_nb) {
 236             for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) {
 237                 if (child[i] != PTL_INVALID_RANK) {
 238                     /*
 239                      * Prepare Triggered Put to ACK Data to children
 240                      *
 241                      */
 242 
 243                     if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 244                             ompi_coll_portals4_get_peer(comm, child[i]),
 245                             mca_coll_portals4_component.pt_idx,
 246                             match_bits_ack, 0, NULL, 0,
 247                             request->u.reduce.trig_ct_h,
 248                             (rank != root) ?
 249                                     request->u.reduce.child_nb + 1 :
 250                                     request->u.reduce.child_nb)) != 0) {
 251                         return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
 252                     }
 253 
 254                     /*
 255                      * Send RTR to children
 256                      *
 257                      */
 258 
 259                     /* and there, we only send the RTR when all the MEs are ready */
 260                     if ((ret = PtlPut(zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 261                             ompi_coll_portals4_get_peer(comm, child[i]),
 262                             mca_coll_portals4_component.pt_idx,
 263                             match_bits_rtr, 0, NULL, 0)) != PTL_OK) {
 264                         return opal_stderr("Put RTR failed %d", __FILE__, __LINE__, ret);
 265                     }
 266                 }
 267             }
 268         }
 269 
 270         if (rank != root) {
 271             if (is_sync) {
 272                 if ((ret = PtlCTWait(request->u.reduce.ack_ct_h, 1, &ct)) != 0) {
 273                     opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
 274                 }
 275             }
 276             else {
 277                 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 278                         ompi_coll_portals4_get_peer(comm, rank),
 279                         mca_coll_portals4_component.finish_pt_idx,
 280                         0, 0, NULL, (uintptr_t) request,
 281                         request->u.reduce.ack_ct_h,
 282                         1)) != 0) {
 283                     return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
 284                 }
 285             }
 286         }
 287         else {
 288             if (is_sync) {
 289                 if ((ret = PtlCTWait(request->u.reduce.trig_ct_h,
 290                         request->u.reduce.child_nb, &ct)) != 0) {
 291                     opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
 292                 }
 293             }
 294             else {
 295                 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
 296                         ompi_coll_portals4_get_peer(comm, rank),
 297                         mca_coll_portals4_component.finish_pt_idx,
 298                         0, 0, NULL, (uintptr_t) request,
 299                         request->u.reduce.trig_ct_h,
 300                         request->u.reduce.child_nb)) != 0) {
 301                     return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
 302                 }
 303             }
 304         }
 305     }
 306     else {
 307         opal_output_verbose(100, ompi_coll_base_framework.framework_output,
 308                 "rank %d - optimization not supported, falling back to previous handler\n", rank);
 309 
 310         if (request->is_sync) {
 311             if ((module->previous_reduce) && (module->previous_reduce_module)) {
 312                 ret = module->previous_reduce(sendbuf, recvbuf, count, dtype, op, root,
 313                         comm, module->previous_reduce_module);
 314             }
 315             else {
 316                 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 317                         "rank %d - no previous reduce handler is available, aborting\n", rank);
 318                 return (OMPI_ERROR);
 319             }
 320         }
 321         else {
 322             if ((module->previous_ireduce) && (module->previous_ireduce_module)) {
 323                 ret =  module->previous_ireduce(sendbuf, recvbuf, count, dtype, op, root,
 324                         comm, request->fallback_request, module->previous_ireduce_module);
 325             }
 326             else {
 327                 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 328                         "rank %d - no previous ireduce handler is available, aborting\n", rank);
 329                 return (OMPI_ERROR);
 330             }
 331         }
 332         return ret;
 333     }
 334     return (OMPI_SUCCESS);
 335 }
 336 
 337 
 338 
 339 
 340 static int
 341 reduce_kary_tree_bottom(ompi_coll_portals4_request_t *request)
 342 {
 343     int ret, line;
 344 
 345     if (request->u.reduce.is_optim) {
 346         PtlAtomicSync();
 347 
 348         if (request->u.reduce.use_ack_ct_h) {
 349             ret = PtlCTFree(request->u.reduce.ack_ct_h);
 350             if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 351         }
 352 
 353         if (request->u.reduce.child_nb) {
 354             do {
 355                 ret = PtlMEUnlink(request->u.reduce.data_me_h);
 356             } while (PTL_IN_USE == ret);
 357             if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 358         }
 359 
 360         ret = PtlCTFree(request->u.reduce.trig_ct_h);
 361         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 362 
 363         if (request->u.reduce.free_buffer) {
 364             free(request->u.reduce.free_buffer);
 365         }
 366     }
 367     return (OMPI_SUCCESS);
 368 
 369 err_hdlr:
 370     opal_output(ompi_coll_base_framework.framework_output,
 371                 "%s:%4d:%4d\tError occurred ret=%d",
 372                 __FILE__, __LINE__, line, ret);
 373 
 374     return ret;
 375 }
 376 
 377 
 378 int
 379 ompi_coll_portals4_reduce_intra(const void *sendbuf, void *recvbuf, int count,
 380         MPI_Datatype dtype, MPI_Op op,
 381         int root,
 382         struct ompi_communicator_t *comm,
 383         mca_coll_base_module_t *module)
 384 {
 385     int ret;
 386     mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
 387     ompi_coll_portals4_request_t *request;
 388 
 389     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
 390     if (NULL == request) {
 391         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 392                 "%s:%d: request alloc failed\n",
 393                 __FILE__, __LINE__);
 394         return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
 395     }
 396 
 397     request->is_sync = true;
 398     request->fallback_request = NULL;
 399 
 400     ret = reduce_kary_tree_top(sendbuf, recvbuf, count,
 401             dtype, op, root, comm,  request,  portals4_module);
 402     if (OMPI_SUCCESS != ret)
 403         return ret;
 404     ret = reduce_kary_tree_bottom(request);
 405     if (OMPI_SUCCESS != ret)
 406         return ret;
 407 
 408     OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
 409     return (OMPI_SUCCESS);
 410 }
 411 
 412 
 413 int
 414 ompi_coll_portals4_ireduce_intra(const void* sendbuf, void* recvbuf, int count,
 415         MPI_Datatype dtype, MPI_Op op,
 416         int root,
 417         struct ompi_communicator_t *comm,
 418         ompi_request_t ** ompi_request,
 419         struct mca_coll_base_module_2_3_0_t *module)
 420 {
 421     int ret;
 422     mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
 423     ompi_coll_portals4_request_t *request;
 424 
 425     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
 426     if (NULL == request) {
 427         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 428                 "%s:%d: request alloc failed\n",
 429                 __FILE__, __LINE__);
 430         return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
 431     }
 432 
 433     *ompi_request = &request->super;
 434     request->fallback_request = ompi_request;
 435     request->is_sync = false;
 436 
 437     ret = reduce_kary_tree_top(sendbuf, recvbuf, count,
 438             dtype, op, root, comm,  request,  portals4_module);
 439     if (OMPI_SUCCESS != ret)
 440         return ret;
 441 
 442     if (!request->u.reduce.is_optim) {
 443         OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
 444     }
 445 
 446     opal_output_verbose(10, ompi_coll_base_framework.framework_output, "ireduce");
 447     return (OMPI_SUCCESS);
 448 }
 449 
 450 int
 451 ompi_coll_portals4_ireduce_intra_fini(ompi_coll_portals4_request_t *request)
 452 {
 453     int ret;
 454 
 455     ret = reduce_kary_tree_bottom(request);
 456     if (OMPI_SUCCESS != ret)
 457         return ret;
 458 
 459     ompi_request_complete(&request->super, true);
 460 
 461     return (OMPI_SUCCESS);
 462 }

/* [<][>][^][v][top][bottom][index][help] */