root/ompi/patterns/comm/allgather.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_comm_allgather_pml

   1 /*
   2  * Copyright (c) 2009-2012 Mellanox Technologies.  All rights reserved.
   3  * Copyright (c) 2009-2012 Oak Ridge National Laboratory.  All rights reserved.
   4  * Copyright (c) 2012      Los Alamos National Security, LLC.
   5  *                         All rights reserved.
   6  * Copyright (c) 2014-2017 Research Organization for Information Science
   7  *                         and Technology (RIST). All rights reserved.
   8  * Copyright (c) 2017      IBM Corporation. All rights reserved.
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 /** @file */
  16 
  17 #include "ompi_config.h"
  18 
  19 #include "ompi/constants.h"
  20 #include "ompi/op/op.h"
  21 #include "ompi/datatype/ompi_datatype.h"
  22 #include "ompi/communicator/communicator.h"
  23 #include "opal/include/opal/sys/atomic.h"
  24 #include "ompi/mca/pml/pml.h"
  25 #include "ompi/patterns/net/netpatterns.h"
  26 #include "coll_ops.h"
  27 
  28 /**
  29  * All-reduce - subgroup in communicator
  30  */
  31 OMPI_DECLSPEC int ompi_comm_allgather_pml(void *src_buf, void *dest_buf, int count,
  32         ompi_datatype_t *dtype, int my_rank_in_group,
  33         int n_peers, int *ranks_in_comm,ompi_communicator_t *comm)
  34 {
  35     /* local variables */
  36     int rc=OMPI_SUCCESS,msg_cnt;
  37     int pair_rank,exchange,extra_rank, n_extra_nodes,n_extra;
  38     int proc_block,extra_start,extra_end,iovec_len;
  39     int remote_data_start_rank,remote_data_end_rank;
  40     int local_data_start_rank;
  41     netpatterns_pair_exchange_node_t my_exchange_node;
  42     size_t message_extent,current_data_extent,current_data_count;
  43     size_t dt_size;
  44     ptrdiff_t dt_extent;
  45     char *src_buf_current;
  46     char *dest_buf_current;
  47     struct iovec send_iov[2] = {{0,0},{0,0}},
  48                  recv_iov[2] = {{0,0},{0,0}};
  49     ompi_request_t *requests[4];
  50 
  51     /* get size of data needed - same layout as user data, so that
  52      *   we can apply the reudction routines directly on these buffers
  53      */
  54     rc = ompi_datatype_type_size(dtype, &dt_size);
  55     if( OMPI_SUCCESS != rc ) {
  56         goto Error;
  57     }
  58 
  59     rc = ompi_datatype_type_extent(dtype, &dt_extent);
  60     if( OMPI_SUCCESS != rc ) {
  61         goto Error;
  62     }
  63     message_extent = dt_extent*count;
  64 
  65     /* place my data in the correct destination buffer */
  66     rc=ompi_datatype_copy_content_same_ddt(dtype,count,
  67             (char *)dest_buf+my_rank_in_group*message_extent,
  68             (char *)src_buf);
  69     if( OMPI_SUCCESS != rc ) {
  70         goto Error;
  71     }
  72 
  73     /* 1 process special case */
  74     if(1 == n_peers) {
  75         return OMPI_SUCCESS;
  76     }
  77 
  78     /* get my reduction communication pattern */
  79     memset(&my_exchange_node, 0, sizeof(netpatterns_pair_exchange_node_t));
  80     rc = ompi_netpatterns_setup_recursive_doubling_tree_node(n_peers,
  81             my_rank_in_group, &my_exchange_node);
  82     if(OMPI_SUCCESS != rc){
  83         return rc;
  84     }
  85 
  86     n_extra_nodes=n_peers-my_exchange_node.n_largest_pow_2;
  87 
  88     /* get the data from the extra sources */
  89     if(0 < my_exchange_node.n_extra_sources)  {
  90 
  91         if ( EXCHANGE_NODE == my_exchange_node.node_type ) {
  92 
  93             /*
  94              ** Receive data from extra node
  95              */
  96 
  97             extra_rank=my_exchange_node.rank_extra_source;
  98             /* receive the data into the correct location - will use 2
  99              * messages in the recursive doubling phase */
 100             dest_buf_current=(char *)dest_buf+message_extent*extra_rank;
 101             rc=MCA_PML_CALL(recv(dest_buf_current,
 102                     count,dtype,ranks_in_comm[extra_rank],
 103                     -OMPI_COMMON_TAG_ALLREDUCE,
 104                     comm, MPI_STATUSES_IGNORE));
 105             if( 0 > rc ) {
 106                 goto  Error;
 107             }
 108 
 109         } else {
 110 
 111             /*
 112              ** Send data to "partner" node
 113              */
 114             extra_rank=my_exchange_node.rank_extra_source;
 115             src_buf_current=(char *)src_buf;
 116             rc=MCA_PML_CALL(send(src_buf_current,
 117                     count,dtype,ranks_in_comm[extra_rank],
 118                     -OMPI_COMMON_TAG_ALLREDUCE,
 119                     MCA_PML_BASE_SEND_STANDARD,
 120                     comm));
 121             if( 0 > rc ) {
 122                 goto  Error;
 123             }
 124         }
 125     }
 126 
 127     current_data_extent=message_extent;
 128     current_data_count=count;
 129     src_buf_current=(char *)dest_buf+my_rank_in_group*message_extent;
 130     proc_block=1;
 131     local_data_start_rank=my_rank_in_group;
 132     /* loop over data exchanges */
 133     for(exchange=0 ; exchange < my_exchange_node.n_exchanges ; exchange++) {
 134 
 135         /* is the remote data read */
 136         pair_rank=my_exchange_node.rank_exchanges[exchange];
 137         msg_cnt=0;
 138 
 139         /*
 140          * Power of 2 data segment
 141          */
 142         /* post non-blocking receive */
 143         if(pair_rank > my_rank_in_group ){
 144             recv_iov[0].iov_base=src_buf_current+current_data_extent;
 145             recv_iov[0].iov_len=current_data_extent;
 146             iovec_len=1;
 147             remote_data_start_rank=local_data_start_rank+proc_block;
 148             remote_data_end_rank=remote_data_start_rank+proc_block-1;
 149         } else {
 150             recv_iov[0].iov_base=src_buf_current-current_data_extent;
 151             recv_iov[0].iov_len=current_data_extent;
 152             iovec_len=1;
 153             remote_data_start_rank=local_data_start_rank-proc_block;
 154             remote_data_end_rank=remote_data_start_rank+proc_block-1;
 155         }
 156         /* the data from the non power of 2 ranks */
 157         if(remote_data_start_rank<n_extra_nodes) {
 158             /* figure out how much data is at the remote rank */
 159             /* last rank with data */
 160             extra_start=remote_data_start_rank;
 161             extra_end=remote_data_end_rank;
 162             if(extra_end >= n_extra_nodes ) {
 163                 /* if last rank exceeds the ranks with extra data,
 164                  * adjust this.
 165                  */
 166                 extra_end=n_extra_nodes-1;
 167             }
 168             /* get the number of ranks whos data is to be grabbed */
 169             n_extra=extra_end-extra_start+1;
 170 
 171             recv_iov[1].iov_base=(char *)dest_buf+
 172                 (extra_start+my_exchange_node.n_largest_pow_2)*message_extent;
 173             recv_iov[1].iov_len=n_extra*count;
 174             iovec_len=2;
 175         }
 176 
 177         rc=MCA_PML_CALL(irecv(recv_iov[0].iov_base,
 178                     current_data_count,dtype,ranks_in_comm[pair_rank],
 179                     -OMPI_COMMON_TAG_ALLREDUCE,
 180                     comm,&(requests[msg_cnt])));
 181         if( 0 > rc ) {
 182             goto Error;
 183         }
 184         msg_cnt++;
 185 
 186         if(iovec_len > 1 ) {
 187             rc=MCA_PML_CALL(irecv(recv_iov[1].iov_base,
 188                         recv_iov[1].iov_len,dtype,ranks_in_comm[pair_rank],
 189                         -OMPI_COMMON_TAG_ALLREDUCE,
 190                         comm,&(requests[msg_cnt])));
 191             if( 0 > rc ) {
 192                 goto Error;
 193             }
 194             msg_cnt++;
 195         }
 196 
 197         /* post non-blocking send */
 198         send_iov[0].iov_base=src_buf_current;
 199         send_iov[0].iov_len=current_data_extent;
 200         iovec_len=1;
 201         /* the data from the non power of 2 ranks */
 202         if(local_data_start_rank<n_extra_nodes) {
 203             /* figure out how much data is at the remote rank */
 204             /* last rank with data */
 205             extra_start=local_data_start_rank;
 206             extra_end=extra_start+proc_block-1;
 207             if(extra_end >= n_extra_nodes ) {
 208                 /* if last rank exceeds the ranks with extra data,
 209                  * adjust this.
 210                  */
 211                 extra_end=n_extra_nodes-1;
 212             }
 213             /* get the number of ranks whos data is to be grabbed */
 214             n_extra=extra_end-extra_start+1;
 215 
 216             send_iov[1].iov_base=(char *)dest_buf+
 217                 (extra_start+my_exchange_node.n_largest_pow_2)*message_extent;
 218             send_iov[1].iov_len=n_extra*count;
 219             iovec_len=2;
 220         }
 221 
 222         rc=MCA_PML_CALL(isend(send_iov[0].iov_base,
 223                     current_data_count,dtype,ranks_in_comm[pair_rank],
 224                     -OMPI_COMMON_TAG_ALLREDUCE,MCA_PML_BASE_SEND_STANDARD,
 225                     comm,&(requests[msg_cnt])));
 226         if( 0 > rc ) {
 227             goto Error;
 228         }
 229         msg_cnt++;
 230         if( iovec_len > 1 ) {
 231             rc=MCA_PML_CALL(isend(send_iov[1].iov_base,
 232                         send_iov[1].iov_len,dtype,ranks_in_comm[pair_rank],
 233                         -OMPI_COMMON_TAG_ALLREDUCE,MCA_PML_BASE_SEND_STANDARD,
 234                         comm,&(requests[msg_cnt])));
 235             if( 0 > rc ) {
 236                 goto Error;
 237             }
 238             msg_cnt++;
 239         }
 240 
 241         /* prepare the source buffer for the next iteration */
 242         if(pair_rank < my_rank_in_group ){
 243             src_buf_current-=current_data_extent;
 244             local_data_start_rank-=proc_block;
 245         }
 246         proc_block*=2;
 247         current_data_extent*=2;
 248         current_data_count*=2;
 249 
 250         /* wait on send and receive completion */
 251         ompi_request_wait_all(msg_cnt,requests,MPI_STATUSES_IGNORE);
 252     }
 253 
 254     /* copy data in from the "extra" source, if need be */
 255     if(0 < my_exchange_node.n_extra_sources)  {
 256 
 257         if ( EXTRA_NODE == my_exchange_node.node_type ) {
 258             /*
 259              ** receive the data
 260              ** */
 261             extra_rank=my_exchange_node.rank_extra_source;
 262 
 263             rc=MCA_PML_CALL(recv(dest_buf,
 264                     count*n_peers,dtype,ranks_in_comm[extra_rank],
 265                     -OMPI_COMMON_TAG_ALLREDUCE,
 266                     comm,MPI_STATUSES_IGNORE));
 267             if(0 > rc ) {
 268                 goto  Error;
 269             }
 270         } else {
 271             /* send the data to the pair-rank outside of the power of 2 set
 272              ** of ranks
 273              */
 274 
 275             extra_rank=my_exchange_node.rank_extra_source;
 276             rc=MCA_PML_CALL(send(dest_buf,
 277                     count*n_peers,dtype,ranks_in_comm[extra_rank],
 278                     -OMPI_COMMON_TAG_ALLREDUCE,
 279                     MCA_PML_BASE_SEND_STANDARD,
 280                     comm));
 281             if( 0 > rc ) {
 282                 goto  Error;
 283             }
 284         }
 285     }
 286 
 287     ompi_netpatterns_cleanup_recursive_doubling_tree_node(&my_exchange_node);
 288 
 289     /* return */
 290     return OMPI_SUCCESS;
 291 
 292 Error:
 293     return rc;
 294 }

/* [<][>][^][v][top][bottom][index][help] */