root/ompi/patterns/comm/allreduce.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_comm_allreduce_pml

   1 /*
   2  * Copyright (c) 2009-2012 Mellanox Technologies.  All rights reserved.
   3  * Copyright (c) 2009-2012 Oak Ridge National Laboratory.  All rights reserved.
   4  * Copyright (c) 2012      Los Alamos National Security, LLC.
   5  *                         All rights reserved.
   6  * Copyright (c) 2014-2017 Research Organization for Information Science
   7  *                         and Technology (RIST). All rights reserved.
   8  * Copyright (c) 2017      IBM Corporation. All rights reserved.
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 /** @file */
  16 
  17 #include "ompi_config.h"
  18 
  19 #include "ompi/constants.h"
  20 #include "ompi/op/op.h"
  21 #include "ompi/datatype/ompi_datatype.h"
  22 #include "ompi/communicator/communicator.h"
  23 #include "opal/include/opal/sys/atomic.h"
  24 #include "ompi/mca/pml/pml.h"
  25 #include "ompi/patterns/net/netpatterns.h"
  26 #include "ompi/mca/coll/base/coll_base_util.h"
  27 #include "coll_ops.h"
  28 #include "commpatterns.h"
  29 
  30 /**
  31  * All-reduce for contigous primitive types
  32  */
  33 OMPI_DECLSPEC int ompi_comm_allreduce_pml(void *sbuf, void *rbuf, int count,
  34         ompi_datatype_t *dtype, int my_rank_in_group,
  35         struct ompi_op_t *op, int n_peers,int *ranks_in_comm,
  36         ompi_communicator_t *comm)
  37 {
  38     /* local variables */
  39     int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
  40     int pair_rank,exchange,extra_rank;
  41     netpatterns_pair_exchange_node_t my_exchange_node;
  42     int count_processed,count_this_stripe;
  43     size_t dt_size,dt_extent;
  44     char scratch_bufers[2][MAX_TMP_BUFFER];
  45     int send_buffer=0,recv_buffer=1;
  46     char *sbuf_current, *rbuf_current;
  47 
  48     /* get size of data needed - same layout as user data, so that
  49      *   we can apply the reudction routines directly on these buffers
  50      */
  51     rc = opal_datatype_type_size((opal_datatype_t *)dtype, &dt_size);
  52     if( OMPI_SUCCESS != rc ) {
  53         goto Error;
  54     }
  55     rc = ompi_datatype_type_extent(dtype, (ptrdiff_t *)&dt_extent);
  56     if( OMPI_SUCCESS != rc ) {
  57         goto Error;
  58     }
  59 
  60     /* 1 process special case */
  61     if(1 == n_peers) {
  62         /* place my data in the correct destination buffer */
  63         rc=ompi_datatype_copy_content_same_ddt(dtype,count,
  64                 (char *)rbuf, (char *)sbuf);
  65         if( OMPI_SUCCESS != rc ) {
  66             goto Error;
  67         }
  68         return OMPI_SUCCESS;
  69     }
  70 
  71     /* number of data types copies that the scratch buffer can hold */
  72     n_dts_per_buffer=((int) MAX_TMP_BUFFER)/dt_extent;
  73     if ( 0 == n_dts_per_buffer ) {
  74         rc=OMPI_ERROR;
  75         goto Error;
  76     }
  77 
  78     /* compute number of stripes needed to process this collective */
  79     n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
  80 
  81     /* get my reduction communication pattern */
  82     memset(&my_exchange_node, 0, sizeof(netpatterns_pair_exchange_node_t));
  83     rc = ompi_netpatterns_setup_recursive_doubling_tree_node(n_peers,
  84             my_rank_in_group, &my_exchange_node);
  85     if(OMPI_SUCCESS != rc){
  86         return rc;
  87     }
  88 
  89     count_processed=0;
  90 
  91     /* get a pointer to the shared-memory working buffer */
  92     /* NOTE: starting with a rather synchronous approach */
  93     for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
  94 
  95         /* get number of elements to process in this stripe */
  96         count_this_stripe=n_dts_per_buffer;
  97         if( count_processed + count_this_stripe > count )
  98             count_this_stripe=count-count_processed;
  99 
 100         /* copy data from the input buffer into the temp buffer */
 101         sbuf_current=(char *)sbuf+count_processed*dt_extent;
 102         rc=ompi_datatype_copy_content_same_ddt(dtype,count_this_stripe,
 103                 scratch_bufers[send_buffer], sbuf_current);
 104         if( OMPI_SUCCESS != rc ) {
 105             goto Error;
 106         }
 107 
 108         /* copy data in from the "extra" source, if need be */
 109         if(0 < my_exchange_node.n_extra_sources)  {
 110 
 111             if ( EXCHANGE_NODE == my_exchange_node.node_type ) {
 112 
 113                 /*
 114                 ** Receive data from extra node
 115                 */
 116                 extra_rank=my_exchange_node.rank_extra_source;
 117                 rc=MCA_PML_CALL(recv(scratch_bufers[recv_buffer],
 118                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 119                             -OMPI_COMMON_TAG_ALLREDUCE, comm,
 120                             MPI_STATUSES_IGNORE));
 121                 if( 0 > rc ) {
 122                     fprintf(stderr,"  first recv failed in ompi_comm_allreduce_pml \n");
 123                     fflush(stderr);
 124                     goto  Error;
 125                 }
 126 
 127 
 128                 /* apply collective operation to first half of the data */
 129                 if( 0 < count_this_stripe ) {
 130                     ompi_op_reduce(op,
 131                             (void *)scratch_bufers[send_buffer],
 132                             (void *)scratch_bufers[recv_buffer],
 133                             count_this_stripe,dtype);
 134                 }
 135 
 136 
 137             } else {
 138 
 139                 /*
 140                 ** Send data to "partner" node
 141                 */
 142                 extra_rank=my_exchange_node.rank_extra_source;
 143                 rc=MCA_PML_CALL(send(scratch_bufers[send_buffer],
 144                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 145                             -OMPI_COMMON_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD,
 146                             comm));
 147                 if( 0 > rc ) {
 148                     fprintf(stderr,"  first send failed in ompi_comm_allreduce_pml \n");
 149                     fflush(stderr);
 150                     goto  Error;
 151                 }
 152             }
 153 
 154             /* change pointer to scratch buffer - this was we can send data
 155             ** that we have summed w/o a memory copy, and receive data into the
 156             ** other buffer, w/o fear of over writting data that has not yet
 157             ** completed being send
 158             */
 159             recv_buffer^=1;
 160             send_buffer^=1;
 161         }
 162 
 163         /* loop over data exchanges */
 164         for(exchange=0 ; exchange < my_exchange_node.n_exchanges ; exchange++) {
 165 
 166             /* is the remote data read */
 167             pair_rank=my_exchange_node.rank_exchanges[exchange];
 168 
 169             rc=ompi_coll_base_sendrecv_actual(scratch_bufers[send_buffer],
 170                                               count_this_stripe,dtype, ranks_in_comm[pair_rank],
 171                                               -OMPI_COMMON_TAG_ALLREDUCE,
 172                                               scratch_bufers[recv_buffer],
 173                                               count_this_stripe,dtype,ranks_in_comm[pair_rank],
 174                                               -OMPI_COMMON_TAG_ALLREDUCE,
 175                                               comm, MPI_STATUS_IGNORE);
 176             if( 0 > rc ) {
 177                 fprintf(stderr,"  irecv failed in  ompi_comm_allreduce_pml at iterations %d \n",
 178                         exchange);
 179                 fflush(stderr);
 180                 goto Error;
 181             }
 182 
 183             /* reduce the data */
 184             if( 0 < count_this_stripe ) {
 185                 ompi_op_reduce(op,
 186                         (void *)scratch_bufers[send_buffer],
 187                         (void *)scratch_bufers[recv_buffer],
 188                         count_this_stripe,dtype);
 189             }
 190             /* get ready for next step */
 191             recv_buffer^=1;
 192             send_buffer^=1;
 193 
 194         }
 195 
 196         /* copy data in from the "extra" source, if need be */
 197         if(0 < my_exchange_node.n_extra_sources)  {
 198 
 199             if ( EXTRA_NODE == my_exchange_node.node_type ) {
 200                 /*
 201                 ** receive the data
 202                 ** */
 203                 extra_rank=my_exchange_node.rank_extra_source;
 204                 rc=MCA_PML_CALL(recv(scratch_bufers[recv_buffer],
 205                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 206                             -OMPI_COMMON_TAG_ALLREDUCE, comm,
 207                             MPI_STATUSES_IGNORE));
 208                 if( 0 > rc ) {
 209                     fprintf(stderr,"  last recv failed in ompi_comm_allreduce_pml \n");
 210                     fflush(stderr);
 211                     goto  Error;
 212                 }
 213 
 214                 recv_buffer^=1;
 215                 send_buffer^=1;
 216             } else {
 217                 /* send the data to the pair-rank outside of the power of 2 set
 218                 ** of ranks
 219                 */
 220 
 221                 extra_rank=my_exchange_node.rank_extra_source;
 222                 rc=MCA_PML_CALL(send((char *)scratch_bufers[send_buffer],
 223                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 224                             -OMPI_COMMON_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD,
 225                             comm));
 226                 if( 0 > rc ) {
 227                     fprintf(stderr,"  last send failed in ompi_comm_allreduce_pml \n");
 228                     fflush(stderr);
 229                     goto  Error;
 230                 }
 231             }
 232         }
 233 
 234         /* copy data from the temp buffer into the output buffer */
 235         rbuf_current = (char *) rbuf + count_processed * dt_size;
 236         memcpy(rbuf_current,scratch_bufers[send_buffer], count_this_stripe*dt_size);
 237 
 238         /* update the count of elements processed */
 239         count_processed += count_this_stripe;
 240     }
 241 
 242     ompi_netpatterns_cleanup_recursive_doubling_tree_node(&my_exchange_node);
 243 
 244     /* return */
 245     return OMPI_SUCCESS;
 246 
 247 Error:
 248     return rc;
 249 }

/* [<][>][^][v][top][bottom][index][help] */