root/ompi/mca/coll/portals4/coll_portals4_scatter.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. setup_scatter_buffers_linear
  2. setup_scatter_handles
  3. setup_sync_handles
  4. cleanup_scatter_handles
  5. cleanup_sync_handles
  6. ompi_coll_portals4_scatter_intra_linear_top
  7. ompi_coll_portals4_scatter_intra_linear_bottom
  8. ompi_coll_portals4_scatter_intra
  9. ompi_coll_portals4_iscatter_intra
  10. ompi_coll_portals4_iscatter_intra_fini

   1 /*
   2  * Copyright (c) 2015      Sandia National Laboratories. All rights reserved.
   3  * $COPYRIGHT$
   4  * 
   5  * Additional copyrights may follow
   6  * 
   7  * $HEADER$
   8  */
   9 
  10 
  11 #include "ompi_config.h"
  12 
  13 #include "mpi.h"
  14 #include "ompi/constants.h"
  15 #include "ompi/datatype/ompi_datatype.h"
  16 #include "opal/util/bit_ops.h"
  17 #include "ompi/mca/pml/pml.h"
  18 #include "ompi/mca/coll/coll.h"
  19 #include "ompi/mca/coll/base/base.h"
  20 
  21 #include "coll_portals4.h"
  22 #include "coll_portals4_request.h"
  23 
  24 
  25 #undef RTR_USES_TRIGGERED_PUT
  26 
  27 
  28 #define VRANK(ra, ro, si) ((ra - ro + si) % si)
  29 
  30 
  31 static int
  32 setup_scatter_buffers_linear(struct ompi_communicator_t   *comm,
  33                              ompi_coll_portals4_request_t *request,
  34                              mca_coll_portals4_module_t   *portals4_module)
  35 {
  36     int ret, line;
  37 
  38     int8_t i_am_root = (request->u.scatter.my_rank == request->u.scatter.root_rank);
  39 
  40     ompi_coll_portals4_create_send_converter (&request->u.scatter.send_converter,
  41                                               request->u.scatter.pack_src_buf,
  42                                               ompi_comm_peer_lookup(comm, request->u.scatter.my_rank),
  43                                               request->u.scatter.pack_src_count,
  44                                               request->u.scatter.pack_src_dtype);
  45     opal_convertor_get_packed_size(&request->u.scatter.send_converter, &request->u.scatter.packed_size);
  46     OBJ_DESTRUCT(&request->u.scatter.send_converter);
  47 
  48     /**********************************/
  49     /* Setup Scatter Buffers           */
  50     /**********************************/
  51     if (i_am_root) {
  52 
  53         /*
  54          * calculate the total size of the packed data
  55          */
  56         request->u.scatter.scatter_bytes=request->u.scatter.packed_size * (ptrdiff_t)request->u.scatter.size;
  57 
  58         /* all transfers done using request->u.scatter.sdtype.
  59          * allocate temp buffer for recv, copy and/or rotate data at the end */
  60         request->u.scatter.scatter_buf = (char *) malloc(request->u.scatter.scatter_bytes);
  61         if (NULL == request->u.scatter.scatter_buf) {
  62             ret = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
  63         }
  64         request->u.scatter.free_after = 1;
  65 
  66         for (int32_t i=0;i<request->u.scatter.size;i++) {
  67             uint32_t iov_count = 1;
  68             struct iovec iov;
  69             size_t max_data;
  70 
  71             uint64_t offset = request->u.scatter.pack_src_extent * request->u.scatter.pack_src_count * i;
  72 
  73             opal_output_verbose(30, ompi_coll_base_framework.framework_output,
  74                                 "%s:%d:rank(%d): offset(%lu)",
  75                                 __FILE__, __LINE__, request->u.scatter.my_rank,
  76                                 offset);
  77 
  78             ompi_coll_portals4_create_send_converter (&request->u.scatter.send_converter,
  79                                                       request->u.scatter.pack_src_buf + offset,
  80                                                       ompi_comm_peer_lookup(comm, request->u.scatter.my_rank),
  81                                                       request->u.scatter.pack_src_count,
  82                                                       request->u.scatter.pack_src_dtype);
  83 
  84             iov.iov_len = request->u.scatter.packed_size;
  85             iov.iov_base = (IOVBASE_TYPE *) ((char *)request->u.scatter.scatter_buf + (request->u.scatter.packed_size*i));
  86             opal_convertor_pack(&request->u.scatter.send_converter, &iov, &iov_count, &max_data);
  87 
  88             OBJ_DESTRUCT(&request->u.scatter.send_converter);
  89         }
  90 
  91         opal_output_verbose(30, ompi_coll_base_framework.framework_output,
  92                             "%s:%d:rank(%d): root - scatter_buf(%p) - scatter_bytes(%lu)=packed_size(%ld) * size(%d)",
  93                             __FILE__, __LINE__, request->u.scatter.my_rank,
  94                             request->u.scatter.scatter_buf, request->u.scatter.scatter_bytes,
  95                             request->u.scatter.packed_size, request->u.scatter.size);
  96     } else {
  97         request->u.scatter.scatter_bytes=request->u.scatter.packed_size;
  98         request->u.scatter.scatter_buf = (char *) malloc(request->u.scatter.scatter_bytes);
  99         if (NULL == request->u.scatter.scatter_buf) {
 100             ret = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
 101         }
 102         request->u.scatter.free_after = 1;
 103 
 104         opal_output_verbose(30, ompi_coll_base_framework.framework_output,
 105                             "%s:%d:rank(%d): leaf - scatter_buf(%p) - scatter_bytes(%lu)=packed_size(%ld)",
 106                             __FILE__, __LINE__, request->u.scatter.my_rank,
 107                             request->u.scatter.scatter_buf, request->u.scatter.scatter_bytes,
 108                             request->u.scatter.packed_size);
 109     }
 110 
 111     return OMPI_SUCCESS;
 112 
 113 err_hdlr:
 114     opal_output(ompi_coll_base_framework.framework_output,
 115                 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 116                 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 117 
 118     return ret;
 119 }
 120 
 121 static int
 122 setup_scatter_handles(struct ompi_communicator_t   *comm,
 123                       ompi_coll_portals4_request_t *request,
 124                       mca_coll_portals4_module_t   *portals4_module)
 125 {
 126     int ret, line;
 127 
 128     ptl_me_t  me;
 129 
 130     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 131                  "coll:portals4:setup_scatter_handles enter rank %d", request->u.scatter.my_rank));
 132 
 133     /**********************************/
 134     /* Setup Scatter Handles           */
 135     /**********************************/
 136     COLL_PORTALS4_SET_BITS(request->u.scatter.scatter_match_bits, ompi_comm_get_cid(comm),
 137             0, 0, COLL_PORTALS4_SCATTER, 0, request->u.scatter.coll_count);
 138 
 139     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 140                  "coll:portals4:setup_scatter_handles rank(%d) scatter_match_bits(0x%016lX)",
 141                  request->u.scatter.my_rank, request->u.scatter.scatter_match_bits));
 142 
 143     ret = PtlCTAlloc(mca_coll_portals4_component.ni_h,
 144                      &request->u.scatter.scatter_cth);
 145     if (PTL_OK != ret) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr; }
 146 
 147     request->u.scatter.scatter_mdh = mca_coll_portals4_component.data_md_h;
 148 
 149     me.start = request->u.scatter.scatter_buf;
 150     me.length = request->u.scatter.scatter_bytes;
 151     me.ct_handle = request->u.scatter.scatter_cth;
 152     me.min_free = 0;
 153     me.uid = mca_coll_portals4_component.uid;
 154     me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 155         PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 156         PTL_ME_EVENT_CT_COMM;
 157     me.match_id.phys.nid = PTL_NID_ANY;
 158     me.match_id.phys.pid = PTL_PID_ANY;
 159     me.match_bits = request->u.scatter.scatter_match_bits;
 160     me.ignore_bits = 0;
 161     ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 162                       mca_coll_portals4_component.pt_idx,
 163                       &me,
 164                       PTL_PRIORITY_LIST,
 165                       NULL,
 166                       &request->u.scatter.scatter_meh);
 167     if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 168 
 169     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 170                  "coll:portals4:setup_scatter_handles exit rank %d", request->u.scatter.my_rank));
 171 
 172     return OMPI_SUCCESS;
 173 
 174 err_hdlr:
 175     opal_output(ompi_coll_base_framework.framework_output,
 176                 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 177                 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 178 
 179     return ret;
 180 }
 181 
 182 static int
 183 setup_sync_handles(struct ompi_communicator_t   *comm,
 184                    ompi_coll_portals4_request_t *request,
 185                    mca_coll_portals4_module_t   *portals4_module)
 186 {
 187     int ret, line;
 188 
 189     ptl_me_t  me;
 190 
 191     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 192                  "coll:portals4:setup_sync_handles enter rank %d", request->u.scatter.my_rank));
 193 
 194     /**********************************/
 195     /* Setup Sync Handles             */
 196     /**********************************/
 197     COLL_PORTALS4_SET_BITS(request->u.scatter.sync_match_bits, ompi_comm_get_cid(comm),
 198             0, 1, COLL_PORTALS4_SCATTER, 0, request->u.scatter.coll_count);
 199 
 200     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 201                  "coll:portals4:setup_sync_handles rank(%d) sync_match_bits(0x%016lX)",
 202                  request->u.scatter.my_rank, request->u.scatter.sync_match_bits));
 203 
 204     ret = PtlCTAlloc(mca_coll_portals4_component.ni_h,
 205                      &request->u.scatter.sync_cth);
 206     if (PTL_OK != ret) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr; }
 207 
 208     request->u.scatter.sync_mdh = mca_coll_portals4_component.zero_md_h;
 209 
 210     me.start = NULL;
 211     me.length = 0;
 212     me.ct_handle = request->u.scatter.sync_cth;
 213     me.min_free = 0;
 214     me.uid = mca_coll_portals4_component.uid;
 215     me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
 216         PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
 217         PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW;
 218     me.match_id.phys.nid = PTL_NID_ANY;
 219     me.match_id.phys.pid = PTL_PID_ANY;
 220     me.match_bits = request->u.scatter.sync_match_bits;
 221     me.ignore_bits = 0;
 222     ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
 223                       mca_coll_portals4_component.pt_idx,
 224                       &me,
 225                       PTL_PRIORITY_LIST,
 226                       NULL,
 227                       &request->u.scatter.sync_meh);
 228     if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 229 
 230     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 231                  "coll:portals4:setup_sync_handles exit rank %d", request->u.scatter.my_rank));
 232 
 233     return OMPI_SUCCESS;
 234 
 235 err_hdlr:
 236     opal_output(ompi_coll_base_framework.framework_output,
 237                 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 238                 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 239 
 240     return ret;
 241 }
 242 
 243 static int
 244 cleanup_scatter_handles(ompi_coll_portals4_request_t *request)
 245 {
 246     int ret, line;
 247 
 248     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 249                  "coll:portals4:cleanup_scatter_handles enter rank %d", request->u.scatter.my_rank));
 250 
 251     /**********************************/
 252     /* Cleanup Scatter Handles             */
 253     /**********************************/
 254     do {
 255         ret = PtlMEUnlink(request->u.scatter.scatter_meh);
 256     } while (PTL_IN_USE == ret);
 257     if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 258 
 259     ret = PtlCTFree(request->u.scatter.scatter_cth);
 260     if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 261 
 262     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 263                  "coll:portals4:cleanup_scatter_handles exit rank %d", request->u.scatter.my_rank));
 264 
 265     return OMPI_SUCCESS;
 266 
 267 err_hdlr:
 268     opal_output(ompi_coll_base_framework.framework_output,
 269                 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 270                 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 271 
 272     return ret;
 273 }
 274 
 275 static int
 276 cleanup_sync_handles(ompi_coll_portals4_request_t *request)
 277 {
 278     int ret, line;
 279     int ptl_ret;
 280 
 281     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 282                  "coll:portals4:cleanup_sync_handles enter rank %d", request->u.scatter.my_rank));
 283 
 284     /**********************************/
 285     /* Cleanup Sync Handles             */
 286     /**********************************/
 287     do {
 288         ret = PtlMEUnlink(request->u.scatter.sync_meh);
 289     } while (PTL_IN_USE == ret);
 290     if (PTL_OK != ret) { ptl_ret = ret; ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 291 
 292     ret = PtlCTFree(request->u.scatter.sync_cth);
 293     if (PTL_OK != ret) { ptl_ret = ret; ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 294 
 295     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 296                  "coll:portals4:cleanup_sync_handles exit rank %d", request->u.scatter.my_rank));
 297 
 298     return OMPI_SUCCESS;
 299 
 300 err_hdlr:
 301     opal_output(ompi_coll_base_framework.framework_output,
 302                 "%s:%4d:%4d\tError occurred (ptl_ret=%d) ret=%d, rank %2d",
 303                 __FILE__, __LINE__, line, ptl_ret, ret, request->u.scatter.my_rank);
 304 
 305     return ret;
 306 }
 307 
 308 static int
 309 ompi_coll_portals4_scatter_intra_linear_top(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
 310                                             void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
 311                                             int root,
 312                                             struct ompi_communicator_t *comm,
 313                                             ompi_coll_portals4_request_t *request,
 314                                             mca_coll_base_module_t *module)
 315 {
 316     mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
 317     int ret, line;
 318     ptl_ct_event_t ct;
 319 
 320     ptl_ct_event_t sync_incr_event;
 321 
 322     int8_t i_am_root;
 323 
 324     int32_t expected_rtrs = 0;
 325     int32_t expected_puts = 0;
 326     int32_t expected_acks = 0;
 327     int32_t expected_ops  = 0;
 328 
 329     int32_t expected_chained_rtrs = 0;
 330     int32_t expected_chained_acks = 0;
 331 
 332     ptl_size_t number_of_fragment = 1;
 333 
 334     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 335                  "coll:portals4:scatter_intra_linear_top enter rank %d", request->u.scatter.my_rank));
 336 
 337     request->type                   = OMPI_COLL_PORTALS4_TYPE_SCATTER;
 338     request->u.scatter.scatter_buf  = NULL;
 339     request->u.scatter.scatter_mdh  = PTL_INVALID_HANDLE;
 340     request->u.scatter.scatter_cth  = PTL_INVALID_HANDLE;
 341     request->u.scatter.scatter_meh  = PTL_INVALID_HANDLE;
 342     request->u.scatter.sync_mdh     = PTL_INVALID_HANDLE;
 343     request->u.scatter.sync_cth     = PTL_INVALID_HANDLE;
 344     request->u.scatter.sync_meh     = PTL_INVALID_HANDLE;
 345 
 346     request->u.scatter.my_rank   = ompi_comm_rank(comm);
 347     request->u.scatter.size      = ompi_comm_size(comm);
 348     request->u.scatter.root_rank = root;
 349     request->u.scatter.sbuf      = sbuf;
 350     request->u.scatter.rbuf      = rbuf;
 351 
 352     request->u.scatter.pack_src_buf    = sbuf;
 353     request->u.scatter.pack_src_count  = scount;
 354     request->u.scatter.pack_src_dtype  = sdtype;
 355     ompi_datatype_get_extent(request->u.scatter.pack_src_dtype,
 356                              &request->u.scatter.pack_src_lb,
 357                              &request->u.scatter.pack_src_extent);
 358     ompi_datatype_get_true_extent(request->u.scatter.pack_src_dtype,
 359                                   &request->u.scatter.pack_src_true_lb,
 360                                   &request->u.scatter.pack_src_true_extent);
 361 
 362     if ((root == request->u.scatter.my_rank) && (rbuf == MPI_IN_PLACE)) {
 363         request->u.scatter.unpack_dst_buf   = NULL;
 364         request->u.scatter.unpack_dst_count = 0;
 365         request->u.scatter.unpack_dst_dtype = MPI_DATATYPE_NULL;
 366     } else {
 367         request->u.scatter.unpack_dst_buf   = rbuf;
 368         request->u.scatter.unpack_dst_count = rcount;
 369         request->u.scatter.unpack_dst_dtype = rdtype;
 370         request->u.scatter.unpack_dst_offset = 0;
 371         ompi_datatype_get_extent(request->u.scatter.unpack_dst_dtype,
 372                                  &request->u.scatter.unpack_dst_lb,
 373                                  &request->u.scatter.unpack_dst_extent);
 374         ompi_datatype_get_true_extent(request->u.scatter.unpack_dst_dtype,
 375                                       &request->u.scatter.unpack_dst_true_lb,
 376                                       &request->u.scatter.unpack_dst_true_extent);
 377     }
 378 
 379     opal_output_verbose(30, ompi_coll_base_framework.framework_output,
 380                         "%s:%d:rank(%d): request->u.scatter.unpack_dst_offset(%lu)",
 381                         __FILE__, __LINE__, request->u.scatter.my_rank,
 382                         request->u.scatter.unpack_dst_offset);
 383 
 384     /**********************************/
 385     /* Setup Common Parameters        */
 386     /**********************************/
 387 
 388     i_am_root = (request->u.scatter.my_rank == request->u.scatter.root_rank);
 389 
 390     request->u.scatter.coll_count = opal_atomic_add_fetch_size_t(&portals4_module->coll_count, 1);
 391 
 392     ret = setup_scatter_buffers_linear(comm, request, portals4_module);
 393     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 394 
 395     ret = setup_scatter_handles(comm, request, portals4_module);
 396     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 397 
 398     ret = setup_sync_handles(comm, request, portals4_module);
 399     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 400 
 401     number_of_fragment = (request->u.scatter.packed_size > mca_coll_portals4_component.ni_limits.max_msg_size) ?
 402         (request->u.scatter.packed_size + mca_coll_portals4_component.ni_limits.max_msg_size - 1) / mca_coll_portals4_component.ni_limits.max_msg_size :
 403         1;
 404     opal_output_verbose(90, ompi_coll_base_framework.framework_output,
 405         "%s:%d:rank %d:number_of_fragment = %lu",
 406         __FILE__, __LINE__, request->u.scatter.my_rank, number_of_fragment);
 407 
 408     /**********************************/
 409     /* do the scatter                 */
 410     /**********************************/
 411     if (i_am_root) {
 412         /* operations on the sync counter */
 413         expected_rtrs = request->u.scatter.size - 1; /* expect RTRs from non-root ranks */
 414         expected_acks = request->u.scatter.size - 1; /* expect Recv-ACKs from non-root ranks */
 415 
 416         /* operations on the scatter counter */
 417         expected_puts         = 0;
 418         expected_chained_rtrs = 1;
 419         expected_chained_acks = 1;
 420 
 421         /* Chain the RTR and Recv-ACK to the Scatter CT */
 422         sync_incr_event.success=1;
 423         sync_incr_event.failure=0;
 424         ret = PtlTriggeredCTInc(request->u.scatter.scatter_cth,
 425                                 sync_incr_event,
 426                                 request->u.scatter.sync_cth,
 427                                 expected_rtrs);
 428         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 429 
 430         ret = PtlTriggeredCTInc(request->u.scatter.scatter_cth,
 431                                 sync_incr_event,
 432                                 request->u.scatter.sync_cth,
 433                                 expected_rtrs + expected_acks);
 434         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 435 
 436         /* root, so put packed bytes to other ranks */
 437         for (int32_t i=0;i<request->u.scatter.size;i++) {
 438             /* do not put to my scatter_buf.  my data gets unpacked into my out buffer in linear_bottom(). */
 439             if (i == request->u.scatter.my_rank) {
 440                 continue;
 441             }
 442 
 443             ptl_size_t offset = request->u.scatter.packed_size * i;
 444             ptl_size_t size_sent = 0;
 445             ptl_size_t size_left = request->u.scatter.packed_size;
 446 
 447             opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 448                                 "%s:%d:rank(%d): offset(%lu)=rank(%d) * packed_size(%ld)",
 449                                 __FILE__, __LINE__, request->u.scatter.my_rank,
 450                                 offset, i, request->u.scatter.packed_size);
 451 
 452             for (ptl_size_t j=0; j<number_of_fragment; j++) {
 453 
 454                 ptl_size_t frag_size = (size_left >  mca_coll_portals4_component.ni_limits.max_msg_size) ?
 455                     mca_coll_portals4_component.ni_limits.max_msg_size :
 456                     size_left;
 457 
 458                 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 459                                      "%s:%d:rank(%d): frag(%lu),offset_frag (%lu) frag_size(%lu)",
 460                                      __FILE__, __LINE__, request->u.scatter.my_rank,
 461                                      j, size_sent, frag_size));
 462 
 463                 ret = PtlTriggeredPut(request->u.scatter.scatter_mdh,
 464                                       (ptl_size_t)request->u.scatter.scatter_buf + offset + size_sent,
 465                                       frag_size,
 466                                       PTL_NO_ACK_REQ,
 467                                       ompi_coll_portals4_get_peer(comm, i),
 468                                       mca_coll_portals4_component.pt_idx,
 469                                       request->u.scatter.scatter_match_bits,
 470                                       size_sent,
 471                                       NULL,
 472                                       0,
 473                                       request->u.scatter.scatter_cth,
 474                                       expected_chained_rtrs);
 475                 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 476 
 477                 size_left -= frag_size;
 478                 size_sent += frag_size;
 479             }
 480         }
 481     } else {
 482         /* non-root, so do nothing */
 483 
 484         /* operations on the sync counter */
 485         expected_rtrs = 0;
 486         expected_acks = 0;
 487 
 488         /* operations on the scatter counter */
 489         expected_puts         = number_of_fragment;  /* scatter put from root */
 490         expected_chained_rtrs = 0;
 491         expected_chained_acks = 0;
 492     }
 493 
 494     expected_ops = expected_chained_rtrs + expected_puts;
 495 
 496     /**********************************************/
 497     /* only non-root ranks are PUT to, so only    */
 498     /* non-root ranks must PUT a Recv-ACK to root */
 499     /**********************************************/
 500     if (!i_am_root) {
 501         ret = PtlTriggeredPut(request->u.scatter.sync_mdh,
 502                               0,
 503                               0,
 504                               PTL_NO_ACK_REQ,
 505                               ompi_coll_portals4_get_peer(comm, request->u.scatter.root_rank),
 506                               mca_coll_portals4_component.pt_idx,
 507                               request->u.scatter.sync_match_bits,
 508                               0,
 509                               NULL,
 510                               0,
 511                               request->u.scatter.scatter_cth,
 512                               expected_ops);
 513         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 514     }
 515 
 516     expected_ops += expected_chained_acks;
 517 
 518     if (!request->u.scatter.is_sync) {
 519         /******************************************/
 520         /* put to finish pt when all ops complete */
 521         /******************************************/
 522         ret = PtlTriggeredPut(mca_coll_portals4_component.zero_md_h,
 523                 0,
 524                 0,
 525                 PTL_NO_ACK_REQ,
 526                 ompi_coll_portals4_get_peer(comm, request->u.scatter.my_rank),
 527                 mca_coll_portals4_component.finish_pt_idx,
 528                 0,
 529                 0,
 530                 NULL,
 531                 (uintptr_t) request,
 532                 request->u.scatter.scatter_cth,
 533                 expected_ops);
 534         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 535     }
 536 
 537     /**************************************/
 538     /* all non-root ranks put RTR to root */
 539     /**************************************/
 540     if (!i_am_root) {
 541         ret = PtlPut(request->u.scatter.sync_mdh,
 542                      0,
 543                      0,
 544                      PTL_NO_ACK_REQ,
 545                      ompi_coll_portals4_get_peer(comm, request->u.scatter.root_rank),
 546                      mca_coll_portals4_component.pt_idx,
 547                      request->u.scatter.sync_match_bits,
 548                      0,
 549                      NULL,
 550                      0);
 551         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 552     }
 553 
 554     if (request->u.scatter.is_sync) {
 555         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 556                 "calling CTWait(expected_ops=%d)\n", expected_ops);
 557 
 558         /********************************/
 559         /* Wait for all ops to complete */
 560         /********************************/
 561         ret = PtlCTWait(request->u.scatter.scatter_cth, expected_ops, &ct);
 562         if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
 563 
 564         opal_output_verbose(1, ompi_coll_base_framework.framework_output,
 565                 "completed CTWait(expected_ops=%d)\n", expected_ops);
 566     }
 567 
 568     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 569                  "coll:portals4:scatter_intra_linear_top exit rank %d", request->u.scatter.my_rank));
 570 
 571     return OMPI_SUCCESS;
 572 
 573 err_hdlr:
 574     if (NULL != request->u.scatter.scatter_buf)
 575         free(request->u.scatter.scatter_buf);
 576 
 577     opal_output(ompi_coll_base_framework.framework_output,
 578                 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 579                 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 580 
 581     return ret;
 582 }
 583 
 584 static int
 585 ompi_coll_portals4_scatter_intra_linear_bottom(struct ompi_communicator_t *comm,
 586                                                ompi_coll_portals4_request_t *request)
 587 {
 588     int ret, line;
 589 
 590     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 591                  "coll:portals4:scatter_intra_linear_bottom enter rank %d", request->u.scatter.my_rank));
 592 
 593     ret = cleanup_scatter_handles(request);
 594     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 595 
 596     ret = cleanup_sync_handles(request);
 597     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 598 
 599     if (NULL != request->u.scatter.unpack_dst_buf) {
 600         uint32_t iov_count = 1;
 601         struct iovec iov;
 602         size_t max_data;
 603 
 604         ompi_coll_portals4_create_recv_converter (&request->u.scatter.recv_converter,
 605                                                   request->u.scatter.unpack_dst_buf,
 606                                                   ompi_comm_peer_lookup(comm, request->u.scatter.my_rank),
 607                                                   request->u.scatter.unpack_dst_count,
 608                                                   request->u.scatter.unpack_dst_dtype);
 609 
 610         iov.iov_len = request->u.scatter.packed_size;
 611         if (request->u.scatter.my_rank == request->u.scatter.root_rank) {
 612             /* unpack my data from the location in scatter_buf where is was packed */
 613             uint64_t offset = request->u.scatter.pack_src_extent * request->u.scatter.pack_src_count * request->u.scatter.my_rank;
 614             iov.iov_base = (IOVBASE_TYPE *)((char *)request->u.scatter.scatter_buf + offset);
 615         } else {
 616             iov.iov_base = (IOVBASE_TYPE *)request->u.scatter.scatter_buf;
 617         }
 618         opal_convertor_unpack(&request->u.scatter.recv_converter, &iov, &iov_count, &max_data);
 619 
 620         OBJ_DESTRUCT(&request->u.scatter.recv_converter);
 621     }
 622 
 623     if (request->u.scatter.free_after)
 624         free(request->u.scatter.scatter_buf);
 625 
 626     request->super.req_status.MPI_ERROR = OMPI_SUCCESS;
 627 
 628     ompi_request_complete(&request->super, true);
 629 
 630     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 631                  "coll:portals4:scatter_intra_linear_bottom exit rank %d", request->u.scatter.my_rank));
 632 
 633     return OMPI_SUCCESS;
 634 
 635 err_hdlr:
 636     request->super.req_status.MPI_ERROR = ret;
 637 
 638     if (request->u.scatter.free_after)
 639         free(request->u.scatter.scatter_buf);
 640 
 641     opal_output(ompi_coll_base_framework.framework_output,
 642             "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 643             __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 644 
 645     return ret;
 646 }
 647 
 648 int
 649 ompi_coll_portals4_scatter_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
 650                                  void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
 651                                  int root,
 652                                  struct ompi_communicator_t *comm,
 653                                  mca_coll_base_module_t *module)
 654 {
 655     int ret, line;
 656 
 657     ompi_coll_portals4_request_t *request;
 658 
 659     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 660                  "coll:portals4:scatter_intra enter rank %d", ompi_comm_rank(comm)));
 661 
 662     /*
 663      *  allocate a portals4 request
 664      */
 665     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
 666     if (NULL == request) {
 667         ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
 668     }
 669     request->u.scatter.is_sync = 1;
 670 
 671     /*
 672      *  initiate the scatter
 673      *
 674      *  this request is marked synchronous (is_sync==1), so PtlCTWait()
 675      *  will be called to wait for completion.
 676      */
 677     ret = ompi_coll_portals4_scatter_intra_linear_top(sbuf, scount, sdtype,
 678                                                      rbuf, rcount, rdtype,
 679                                                      root,
 680                                                      comm,
 681                                                      request,
 682                                                      module);
 683     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 684 
 685     ret = ompi_coll_portals4_scatter_intra_linear_bottom(comm, request);
 686     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 687 
 688     /*
 689      *  return the portals4 request
 690      */
 691     OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
 692 
 693     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 694                  "coll:portals4:scatter_intra exit rank %d", request->u.scatter.my_rank));
 695 
 696     return OMPI_SUCCESS;
 697 
 698 err_hdlr:
 699     opal_output(ompi_coll_base_framework.framework_output,
 700             "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 701             __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 702 
 703     return ret;
 704 }
 705 
 706 
 707 int
 708 ompi_coll_portals4_iscatter_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
 709                                  void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
 710                                  int root,
 711                                  struct ompi_communicator_t *comm,
 712                                  ompi_request_t **ompi_request,
 713                                  mca_coll_base_module_t *module)
 714 {
 715     int ret, line;
 716 
 717     ompi_coll_portals4_request_t *request;
 718 
 719     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 720                  "coll:portals4:iscatter_intra enter rank %d", ompi_comm_rank(comm)));
 721 
 722     /*
 723      *  allocate a portals4 request
 724      */
 725     OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
 726     if (NULL == request) {
 727         ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
 728     }
 729     *ompi_request = &request->super;
 730     request->u.scatter.is_sync = 0;
 731 
 732     /*
 733      *  initiate the scatter
 734      *
 735      *  this request is marked asynchronous (is_sync==0), so
 736      *  portals4_progress() will handle completion.
 737      */
 738     ret = ompi_coll_portals4_scatter_intra_linear_top(sbuf, scount, sdtype,
 739                                                       rbuf, rcount, rdtype,
 740                                                       root,
 741                                                       comm,
 742                                                       request,
 743                                                       module);
 744     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 745 
 746     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 747                  "coll:portals4:iscatter_intra exit rank %d", request->u.scatter.my_rank));
 748 
 749     return OMPI_SUCCESS;
 750 
 751 err_hdlr:
 752     opal_output(ompi_coll_base_framework.framework_output,
 753             "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 754             __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 755 
 756     return ret;
 757 }
 758 
 759 
 760 int
 761 ompi_coll_portals4_iscatter_intra_fini(ompi_coll_portals4_request_t *request)
 762 {
 763     int ret, line;
 764 
 765     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 766                  "coll:portals4:iscatter_intra_fini enter rank %d", request->u.scatter.my_rank));
 767 
 768     /*
 769      *  cleanup the scatter
 770      */
 771     ret = ompi_coll_portals4_scatter_intra_linear_bottom(request->super.req_mpi_object.comm, request);
 772     if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
 773 
 774     OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
 775                  "coll:portals4:iscatter_intra_fini exit rank %d", request->u.scatter.my_rank));
 776 
 777     return OMPI_SUCCESS;
 778 
 779 err_hdlr:
 780     opal_output(ompi_coll_base_framework.framework_output,
 781             "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
 782             __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
 783 
 784     return ret;
 785 }

/* [<][>][^][v][top][bottom][index][help] */