root/ompi/mca/coll/sm/coll_sm_reduce.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. min
  2. mca_coll_sm_reduce_intra
  3. reduce_inorder
  4. reduce_no_order

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2015 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2009-2013 Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2015      Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 #include "ompi_config.h"
  23 
  24 #include <string.h>
  25 
  26 #include "opal/datatype/opal_convertor.h"
  27 #include "opal/sys/atomic.h"
  28 #include "ompi/constants.h"
  29 #include "ompi/communicator/communicator.h"
  30 #include "ompi/mca/coll/coll.h"
  31 #include "ompi/op/op.h"
  32 #include "coll_sm.h"
  33 
  34 
  35 /*
  36  * Local functions
  37  */
  38 static int reduce_inorder(const void *sbuf, void* rbuf, int count,
  39                           struct ompi_datatype_t *dtype,
  40                           struct ompi_op_t *op,
  41                           int root, struct ompi_communicator_t *comm,
  42                           mca_coll_base_module_t *module);
  43 #define WANT_REDUCE_NO_ORDER 0
  44 #if WANT_REDUCE_NO_ORDER
  45 static int reduce_no_order(const void *sbuf, void* rbuf, int count,
  46                            struct ompi_datatype_t *dtype,
  47                            struct ompi_op_t *op,
  48                            int root, struct ompi_communicator_t *comm,
  49                            mca_coll_base_module_t *module);
  50 #endif
  51 
  52 /*
  53  * Useful utility routine
  54  */
  55 #if !defined(min)
  56 static inline int min(int a, int b)
  57 {
  58     return (a < b) ? a : b;
  59 }
  60 #endif
  61 
  62 /**
  63  * Shared memory reduction.
  64  *
  65  * Simply farms out to the associative or non-associative functions.
  66  */
  67 int mca_coll_sm_reduce_intra(const void *sbuf, void* rbuf, int count,
  68                              struct ompi_datatype_t *dtype,
  69                              struct ompi_op_t *op,
  70                              int root, struct ompi_communicator_t *comm,
  71                              mca_coll_base_module_t *module)
  72 {
  73     size_t size;
  74     mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
  75 
  76     /* There are several possibilities:
  77      *
  78      * 0. If the datatype is larger than a segment, fall back to
  79      *    underlying module
  80      * 1. If the op is user-defined, use the strict order
  81      * 2. If the op is intrinsic:
  82      *    a. If the op is float-associative, use the unordered
  83      *    b. If the op is not float-associative:
  84      *       i. if the data is floating point, use the strict order
  85      *       ii. if the data is not floating point, use the unordered
  86      */
  87 
  88     ompi_datatype_type_size(dtype, &size);
  89     if ((int)size > mca_coll_sm_component.sm_control_size) {
  90         return sm_module->previous_reduce(sbuf, rbuf, count,
  91                                           dtype, op, root, comm,
  92                                           sm_module->previous_reduce_module);
  93     }
  94 #if WANT_REDUCE_NO_ORDER
  95     else {
  96         /* Lazily enable the module the first time we invoke a
  97            collective on it */
  98         if (!sm_module->enabled) {
  99             if (OMPI_SUCCESS !=
 100                 (ret = ompi_coll_sm_lazy_enable(module, comm))) {
 101                 return ret;
 102             }
 103         }
 104 
 105         if (!ompi_op_is_intrinsic(op) ||
 106             (ompi_op_is_intrinsic(op) && !ompi_op_is_float_assoc(op) &&
 107              0 != (dtype->flags & OMPI_DATATYPE_FLAG_DATA_FLOAT))) {
 108             return reduce_inorder(sbuf, rbuf, count, dtype, op,
 109                                   root, comm, module);
 110         } else {
 111             return reduce_no_order(sbuf, rbuf, count, dtype, op,
 112                                    root, comm, module);
 113         }
 114     }
 115 #else
 116     else {
 117         /* Lazily enable the module the first time we invoke a
 118            collective on it */
 119         if (!sm_module->enabled) {
 120             int ret;
 121 
 122             if (OMPI_SUCCESS !=
 123                 (ret = ompi_coll_sm_lazy_enable(module, comm))) {
 124                 return ret;
 125             }
 126         }
 127 
 128         return reduce_inorder(sbuf, rbuf, count, dtype, op, root, comm, module);
 129     }
 130 #endif
 131 }
 132 
 133 
 134 /**
 135  * In-order shared memory reduction.
 136  *
 137  * This function performs the reduction in order -- combining elements
 138  * starting with (0 operation 1), then (result operation 2), then
 139  * (result operation 3), etc.
 140  *
 141  * Root's algorithm:
 142  *
 143  * If our datatype is "friendly" (i.e., the representation of the
 144  * buffer is the same packed as it is unpacked), then the root doesn't
 145  * need a temporary buffer -- we can combine the operands directly
 146  * from the shared memory segments to the root's rbuf.  Otherwise, we
 147  * need a receive convertor and receive each fragment into a temporary
 148  * buffer where we can combine that operan with the root's rbuf.
 149  *
 150  * In general, there are two loops:
 151  *
 152  * 1. loop over all fragments (which must be done in units of an
 153  * integer number of datatypes -- remember that if this function is
 154  * called, we know that the datattype is smaller than the max size of
 155  * a fragment, so this is definitely possible)
 156  *
 157  * 2. loop over all the processes -- 0 to (comm_size-1).
 158  * For process 0:
 159  * - if the root==0, copy the *entire* buffer (i.e., don't copy
 160  *   fragment by fragment -- might as well copy the entire thing) the
 161  *   first time through the algorithm, and no-op every other time
 162  * - else, copy from the shmem fragment to the out buffer
 163  * For all other proceses:
 164  * - if root==i, combine the relevant fragment from the sbuf to the
 165  *   relevant fragment on the rbuf
 166  * - else, if the datatype is friendly, combine relevant fragment from
 167  *   the shmem segment to the relevant fragment in the rbuf.  Otherwise,
 168  *   use the convertor to copy the fragment out of shmem into a temp
 169  *   buffer and do the combination from there to the rbuf.
 170  *
 171  * If we don't have a friendly datatype, then free the temporary
 172  * buffer at the end.
 173  */
 174 
 175 
 176 static int reduce_inorder(const void *sbuf, void* rbuf, int count,
 177                           struct ompi_datatype_t *dtype,
 178                           struct ompi_op_t *op,
 179                           int root, struct ompi_communicator_t *comm,
 180                           mca_coll_base_module_t *module)
 181 {
 182     struct iovec iov;
 183     mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
 184     mca_coll_sm_comm_t *data = sm_module->sm_comm_data;
 185     int ret, rank, size;
 186     int flag_num, segment_num, max_segment_num;
 187     size_t total_size, max_data, bytes;
 188     mca_coll_sm_in_use_flag_t *flag;
 189     mca_coll_sm_data_index_t *index;
 190     size_t ddt_size, segsize;
 191     size_t segment_ddt_count, segment_ddt_bytes, zero = 0;
 192     ptrdiff_t extent, gap;
 193 
 194     /* Setup some identities */
 195 
 196     rank = ompi_comm_rank(comm);
 197     size = ompi_comm_size(comm);
 198 
 199     /* Figure out how much we should have the convertor copy.  We need
 200        to have it be in units of a datatype -- i.e., we only want to
 201        copy a whole datatype worth of data or none at all (we've
 202        already guaranteed above that the datatype is not larger than a
 203        segment, so we'll at least get 1). */
 204 
 205     /* ddt_size is the packed size (e.g., MPI_SHORT_INT is 6) */
 206     ompi_datatype_type_size(dtype, &ddt_size);
 207     /* extent is from lb to ub (e.g., MPI_SHORT_INT is 8) */
 208     ompi_datatype_type_extent(dtype, &extent);
 209     segment_ddt_count = mca_coll_sm_component.sm_fragment_size / ddt_size;
 210     iov.iov_len = segment_ddt_bytes = segment_ddt_count * ddt_size;
 211     total_size = ddt_size * count;
 212 
 213     bytes = 0;
 214 
 215     /* Only have one top-level decision as to whether I'm the root or
 216        not.  Do this at the slight expense of repeating a little logic
 217        -- but it's better than a conditional branch in every loop
 218        iteration. */
 219 
 220     /*********************************************************************
 221      * Root
 222      *********************************************************************/
 223 
 224     if (root == rank) {
 225         opal_convertor_t rtb_convertor, rbuf_convertor;
 226         char *reduce_temp_buffer, *free_buffer, *reduce_target;
 227         char *inplace_temp;
 228         int peer;
 229         size_t count_left = (size_t)count;
 230         int frag_num = 0;
 231         bool first_operation = true;
 232 
 233         /* If the datatype is the same packed as it is unpacked, we
 234            can save a memory copy and just do the reduction operation
 235            directly from the shared memory segment.  However, if the
 236            representation is not the same, then we need to get a
 237            receive convertor and a temporary buffer to receive
 238            into. */
 239 
 240         if (ompi_datatype_is_contiguous_memory_layout(dtype, count)) {
 241             reduce_temp_buffer = free_buffer = NULL;
 242         } else {
 243             /* When we have a non-contiguous datatype, we need one or
 244              * two convertors:
 245              *
 246              * rtb_convertor: unpacking from the shmem to the
 247              * reduce_temp_buffer (where we can then apply the
 248              * reduction).
 249              *
 250              * rbuf_convertor: unpacking from the shmem directly to the
 251              * rbuf (no need to go to the reduce_temp_buffer first and
 252              * then apply the reduction -- just copy straight to the
 253              * target buffer).
 254              */
 255             OBJ_CONSTRUCT(&rtb_convertor, opal_convertor_t);
 256             OBJ_CONSTRUCT(&rbuf_convertor, opal_convertor_t);
 257 
 258             /* See lengthy comment in coll basic reduce about
 259                explanation for how to malloc the extra buffer.  Note
 260                that we do not need a buffer big enough to hold "count"
 261                instances of the datatype (i.e., big enough to hold the
 262                entire user buffer) -- we only need to be able to hold
 263                "segment_ddt_count" instances (i.e., the number of
 264                instances that can be held in a single fragment) */
 265 
 266             segsize = opal_datatype_span(&dtype->super, segment_ddt_count, &gap);
 267 
 268             free_buffer = (char*)malloc(segsize);
 269             if (NULL == free_buffer) {
 270                 return OMPI_ERR_OUT_OF_RESOURCE;
 271             }
 272             reduce_temp_buffer = free_buffer - gap;
 273 
 274             /* Trickery here: we use a potentially smaller count than
 275                the user count -- use the largest count that is <=
 276                user's count that will fit within a single segment. */
 277 
 278             if (OMPI_SUCCESS !=
 279                 (ret = opal_convertor_copy_and_prepare_for_recv(
 280                                        ompi_mpi_local_convertor,
 281                                        &(dtype->super),
 282                                        segment_ddt_count,
 283                                        reduce_temp_buffer,
 284                                        0,
 285                                        &rtb_convertor))) {
 286                 free(free_buffer);
 287                 return ret;
 288             }
 289 
 290             /* See if we need the rbuf_convertor */
 291             if (size - 1 != rank) {
 292                 if (OMPI_SUCCESS !=
 293                     (ret = opal_convertor_copy_and_prepare_for_recv(
 294                                        ompi_mpi_local_convertor,
 295                                        &(dtype->super),
 296                                        count,
 297                                        rbuf,
 298                                        0,
 299                                        &rbuf_convertor))) {
 300                     free(free_buffer);
 301                     return ret;
 302                 }
 303             }
 304         }
 305 
 306         /* If we're a) doing MPI_IN_PLACE (which means we're the root
 307            -- wouldn't have gotten down here with MPI_IN_PLACE if we
 308            weren't the root), and b) we're not rank (size-1), then we
 309            need to copy the rbuf into a temporary buffer and use that
 310            as the sbuf */
 311 
 312         if (MPI_IN_PLACE == sbuf && (size - 1) != rank) {
 313             segsize = opal_datatype_span(&dtype->super, count, &gap);
 314             inplace_temp = (char*)malloc(segsize);
 315             if (NULL == inplace_temp) {
 316                 if (NULL != free_buffer) {
 317                     free(free_buffer);
 318                 }
 319                 return OMPI_ERR_OUT_OF_RESOURCE;
 320             }
 321             sbuf = inplace_temp - gap;
 322             ompi_datatype_copy_content_same_ddt(dtype, count, (char *)sbuf, (char *)rbuf);
 323         } else {
 324             inplace_temp = NULL;
 325         }
 326 
 327         /* Main loop over receiving / reducing fragments */
 328 
 329         do {
 330             flag_num = (data->mcb_operation_count %
 331                         mca_coll_sm_component.sm_comm_num_in_use_flags);
 332             FLAG_SETUP(flag_num, flag, data);
 333             FLAG_WAIT_FOR_IDLE(flag, reduce_root_flag_label);
 334             FLAG_RETAIN(flag, size, data->mcb_operation_count);
 335             ++data->mcb_operation_count;
 336 
 337             /* Loop over all the segments in this set */
 338 
 339             segment_num =
 340                 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
 341             max_segment_num =
 342                 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
 343             reduce_target = (((char*) rbuf) + (frag_num * extent * segment_ddt_count));
 344             do {
 345 
 346                 /* Note that all the other coll modules reduce from
 347                    process (size-1) to 0, so that's the order we'll do
 348                    it here. */
 349                 /* Process (size-1) is the root (special case) */
 350                 if (size - 1 == rank) {
 351                     /* If we're the root *and* the first process to be
 352                        combined *and* this is the first segment in the
 353                        entire algorithm, then just copy the whole sbuf
 354                        to rbuf.  That way, we never need to copy from
 355                        my sbuf again (i.e., do the copy all at once
 356                        since all the data is local, and then don't
 357                        worry about it for the rest of the
 358                        algorithm) */
 359                     if (first_operation) {
 360                         first_operation = false;
 361                         if (MPI_IN_PLACE != sbuf) {
 362                             ompi_datatype_copy_content_same_ddt(dtype, count,
 363                                                reduce_target, (char*)sbuf);
 364                         }
 365                     }
 366                 }
 367 
 368                 /* Process (size-1) is not the root */
 369                 else {
 370                     /* Wait for the data to be copied into shmem, just
 371                        like any other non-root process */
 372                     index = &(data->mcb_data_index[segment_num]);
 373                     PARENT_WAIT_FOR_NOTIFY_SPECIFIC(size - 1, rank, index, max_data, reduce_root_parent_label1);
 374 
 375                     /* If the datatype is contiguous, just copy it
 376                        straight to the reduce_target */
 377                     if (NULL == free_buffer) {
 378                         memcpy(reduce_target, ((char*)index->mcbmi_data) +
 379                                (size - 1) * mca_coll_sm_component.sm_fragment_size, max_data);
 380                     }
 381                     /* If the datatype is noncontiguous, use the
 382                        rbuf_convertor to unpack it straight to the
 383                        rbuf */
 384                     else {
 385                         max_data = segment_ddt_bytes;
 386                         COPY_FRAGMENT_OUT(rbuf_convertor, size - 1, index,
 387                                           iov, max_data);
 388                     }
 389                 }
 390 
 391                 /* Loop over all the remaining processes, receiving
 392                    and reducing them in order */
 393 
 394                 for (peer = size - 2; peer >= 0; --peer) {
 395 
 396                     /* Handle the case where the source is this
 397                        process (which, by definition, excludes the
 398                        sbuf_copied_to_rbuf case because that can
 399                        *only* happen when root==0).  In this case, we
 400                        don't need to wait for the peer (i.e., me) to
 401                        copy into shmem -- just reduce directly from my
 402                        sbuf. */
 403                     if (rank == peer) {
 404                         ompi_op_reduce(op,
 405                                        ((char *) sbuf) +
 406                                        frag_num * extent * segment_ddt_count,
 407                                        reduce_target,
 408                                        min(count_left, segment_ddt_count),
 409                                        dtype);
 410                     }
 411 
 412                     /* Now handle the case where the source is not
 413                        this process.  Wait for the process to copy to
 414                        the segment into shmem. */
 415                     else {
 416                         index = &(data->mcb_data_index[segment_num]);
 417                         PARENT_WAIT_FOR_NOTIFY_SPECIFIC(peer, rank,
 418                                                         index, max_data, reduce_root_parent_label2);
 419 
 420                         /* If we don't need an extra buffer, then do the
 421                            reduction operation on the fragment straight
 422                            from the shmem. */
 423 
 424                         if (NULL == free_buffer) {
 425                             ompi_op_reduce(op,
 426                                            (index->mcbmi_data +
 427                                             (peer * mca_coll_sm_component.sm_fragment_size)),
 428                                            reduce_target,
 429                                            min(count_left, segment_ddt_count),
 430                                            dtype);
 431                         }
 432 
 433                         /* Otherwise, unpack the fragment to the temporary
 434                            buffer and then do the reduction from there */
 435 
 436                         else {
 437                             /* Unpack the fragment into my temporary
 438                                buffer */
 439                             max_data = segment_ddt_bytes;
 440                             COPY_FRAGMENT_OUT(rtb_convertor, peer, index,
 441                                               iov, max_data);
 442                             opal_convertor_set_position(&rtb_convertor, &zero);
 443 
 444                             /* Do the reduction on this fragment */
 445                             ompi_op_reduce(op, reduce_temp_buffer,
 446                                            reduce_target,
 447                                            min(count_left, segment_ddt_count),
 448                                            dtype);
 449                         }
 450                     } /* whether this process was me or not */
 451                 } /* loop over all proceses */
 452 
 453                 /* We've iterated through all the processes -- now we
 454                    move on to the next segment */
 455 
 456                 count_left -= segment_ddt_count;
 457                 bytes += segment_ddt_bytes;
 458                 ++segment_num;
 459                 ++frag_num;
 460                 reduce_target += extent * segment_ddt_count;
 461             } while (bytes < total_size && segment_num < max_segment_num);
 462 
 463             /* Root is now done with this set of segments */
 464             FLAG_RELEASE(flag);
 465         } while (bytes < total_size);
 466 
 467         /* Kill the convertor, if we had one */
 468 
 469         if (NULL != free_buffer) {
 470             OBJ_DESTRUCT(&rtb_convertor);
 471             OBJ_DESTRUCT(&rbuf_convertor);
 472             free(free_buffer);
 473         }
 474         if (NULL != inplace_temp) {
 475             free(inplace_temp);
 476         }
 477     }
 478 
 479     /*********************************************************************
 480      * Non-root
 481      *********************************************************************/
 482 
 483     else {
 484         /* Here we get a convertor for the full count that the user
 485            provided (as opposed to the convertor that the root got) */
 486 
 487         opal_convertor_t sbuf_convertor;
 488         OBJ_CONSTRUCT(&sbuf_convertor, opal_convertor_t);
 489         if (OMPI_SUCCESS !=
 490             (ret =
 491              opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor,
 492                                                       &(dtype->super),
 493                                                       count,
 494                                                       sbuf,
 495                                                       0,
 496                                                       &sbuf_convertor))) {
 497             return ret;
 498         }
 499 
 500         /* Loop over sending fragments to the root */
 501 
 502         do {
 503             flag_num = (data->mcb_operation_count %
 504                         mca_coll_sm_component.sm_comm_num_in_use_flags);
 505 
 506             /* Wait for the root to mark this set of segments as
 507                ours */
 508             FLAG_SETUP(flag_num, flag, data);
 509             FLAG_WAIT_FOR_OP(flag, data->mcb_operation_count, reduce_nonroot_flag_label);
 510             ++data->mcb_operation_count;
 511 
 512             /* Loop over all the segments in this set */
 513 
 514             segment_num =
 515                 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
 516             max_segment_num =
 517                 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
 518             do {
 519                 index = &(data->mcb_data_index[segment_num]);
 520 
 521                 /* Copy from the user's buffer to my shared mem
 522                    segment */
 523                 max_data = segment_ddt_bytes;
 524                 COPY_FRAGMENT_IN(sbuf_convertor, index, rank, iov, max_data);
 525                 bytes += max_data;
 526 
 527                 /* Wait for the write to absolutely complete */
 528                 opal_atomic_wmb();
 529 
 530                 /* Tell my parent (always the reduction root -- we're
 531                    ignoring the mcb_tree parent/child relationships
 532                    here) that this fragment is ready */
 533                 CHILD_NOTIFY_PARENT(rank, root, index, max_data);
 534 
 535                 ++segment_num;
 536             } while (bytes < total_size && segment_num < max_segment_num);
 537 
 538             /* We're finished with this set of segments */
 539             FLAG_RELEASE(flag);
 540         } while (bytes < total_size);
 541 
 542         /* Kill the convertor */
 543 
 544         OBJ_DESTRUCT(&sbuf_convertor);
 545     }
 546 
 547     /* All done */
 548 
 549     return OMPI_SUCCESS;
 550 }
 551 
 552 
 553 #if WANT_REDUCE_NO_ORDER
 554 /**
 555  * Unordered shared memory reduction.
 556  *
 557  * This function performs the reduction in whatever order the operands
 558  * arrive.
 559  */
 560 static int reduce_no_order(const void *sbuf, void* rbuf, int count,
 561                            struct ompi_datatype_t *dtype,
 562                            struct ompi_op_t *op,
 563                            int root, struct ompi_communicator_t *comm,
 564                            mca_coll_base_module_t *module)
 565 {
 566     return OMPI_ERR_NOT_IMPLEMENTED;
 567 }
 568 #endif

/* [<][>][^][v][top][bottom][index][help] */