This source file includes following definitions.
- ompi_comm_allreduce_pml
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 #include "ompi_config.h"
  18 
  19 #include "ompi/constants.h"
  20 #include "ompi/op/op.h"
  21 #include "ompi/datatype/ompi_datatype.h"
  22 #include "ompi/communicator/communicator.h"
  23 #include "opal/include/opal/sys/atomic.h"
  24 #include "ompi/mca/pml/pml.h"
  25 #include "ompi/patterns/net/netpatterns.h"
  26 #include "ompi/mca/coll/base/coll_base_util.h"
  27 #include "coll_ops.h"
  28 #include "commpatterns.h"
  29 
  30 
  31 
  32 
  33 OMPI_DECLSPEC int ompi_comm_allreduce_pml(void *sbuf, void *rbuf, int count,
  34         ompi_datatype_t *dtype, int my_rank_in_group,
  35         struct ompi_op_t *op, int n_peers,int *ranks_in_comm,
  36         ompi_communicator_t *comm)
  37 {
  38     
  39     int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
  40     int pair_rank,exchange,extra_rank;
  41     netpatterns_pair_exchange_node_t my_exchange_node;
  42     int count_processed,count_this_stripe;
  43     size_t dt_size,dt_extent;
  44     char scratch_bufers[2][MAX_TMP_BUFFER];
  45     int send_buffer=0,recv_buffer=1;
  46     char *sbuf_current, *rbuf_current;
  47 
  48     
  49 
  50 
  51     rc = opal_datatype_type_size((opal_datatype_t *)dtype, &dt_size);
  52     if( OMPI_SUCCESS != rc ) {
  53         goto Error;
  54     }
  55     rc = ompi_datatype_type_extent(dtype, (ptrdiff_t *)&dt_extent);
  56     if( OMPI_SUCCESS != rc ) {
  57         goto Error;
  58     }
  59 
  60     
  61     if(1 == n_peers) {
  62         
  63         rc=ompi_datatype_copy_content_same_ddt(dtype,count,
  64                 (char *)rbuf, (char *)sbuf);
  65         if( OMPI_SUCCESS != rc ) {
  66             goto Error;
  67         }
  68         return OMPI_SUCCESS;
  69     }
  70 
  71     
  72     n_dts_per_buffer=((int) MAX_TMP_BUFFER)/dt_extent;
  73     if ( 0 == n_dts_per_buffer ) {
  74         rc=OMPI_ERROR;
  75         goto Error;
  76     }
  77 
  78     
  79     n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
  80 
  81     
  82     memset(&my_exchange_node, 0, sizeof(netpatterns_pair_exchange_node_t));
  83     rc = ompi_netpatterns_setup_recursive_doubling_tree_node(n_peers,
  84             my_rank_in_group, &my_exchange_node);
  85     if(OMPI_SUCCESS != rc){
  86         return rc;
  87     }
  88 
  89     count_processed=0;
  90 
  91     
  92     
  93     for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
  94 
  95         
  96         count_this_stripe=n_dts_per_buffer;
  97         if( count_processed + count_this_stripe > count )
  98             count_this_stripe=count-count_processed;
  99 
 100         
 101         sbuf_current=(char *)sbuf+count_processed*dt_extent;
 102         rc=ompi_datatype_copy_content_same_ddt(dtype,count_this_stripe,
 103                 scratch_bufers[send_buffer], sbuf_current);
 104         if( OMPI_SUCCESS != rc ) {
 105             goto Error;
 106         }
 107 
 108         
 109         if(0 < my_exchange_node.n_extra_sources)  {
 110 
 111             if ( EXCHANGE_NODE == my_exchange_node.node_type ) {
 112 
 113                 
 114 
 115 
 116                 extra_rank=my_exchange_node.rank_extra_source;
 117                 rc=MCA_PML_CALL(recv(scratch_bufers[recv_buffer],
 118                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 119                             -OMPI_COMMON_TAG_ALLREDUCE, comm,
 120                             MPI_STATUSES_IGNORE));
 121                 if( 0 > rc ) {
 122                     fprintf(stderr,"  first recv failed in ompi_comm_allreduce_pml \n");
 123                     fflush(stderr);
 124                     goto  Error;
 125                 }
 126 
 127 
 128                 
 129                 if( 0 < count_this_stripe ) {
 130                     ompi_op_reduce(op,
 131                             (void *)scratch_bufers[send_buffer],
 132                             (void *)scratch_bufers[recv_buffer],
 133                             count_this_stripe,dtype);
 134                 }
 135 
 136 
 137             } else {
 138 
 139                 
 140 
 141 
 142                 extra_rank=my_exchange_node.rank_extra_source;
 143                 rc=MCA_PML_CALL(send(scratch_bufers[send_buffer],
 144                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 145                             -OMPI_COMMON_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD,
 146                             comm));
 147                 if( 0 > rc ) {
 148                     fprintf(stderr,"  first send failed in ompi_comm_allreduce_pml \n");
 149                     fflush(stderr);
 150                     goto  Error;
 151                 }
 152             }
 153 
 154             
 155 
 156 
 157 
 158 
 159             recv_buffer^=1;
 160             send_buffer^=1;
 161         }
 162 
 163         
 164         for(exchange=0 ; exchange < my_exchange_node.n_exchanges ; exchange++) {
 165 
 166             
 167             pair_rank=my_exchange_node.rank_exchanges[exchange];
 168 
 169             rc=ompi_coll_base_sendrecv_actual(scratch_bufers[send_buffer],
 170                                               count_this_stripe,dtype, ranks_in_comm[pair_rank],
 171                                               -OMPI_COMMON_TAG_ALLREDUCE,
 172                                               scratch_bufers[recv_buffer],
 173                                               count_this_stripe,dtype,ranks_in_comm[pair_rank],
 174                                               -OMPI_COMMON_TAG_ALLREDUCE,
 175                                               comm, MPI_STATUS_IGNORE);
 176             if( 0 > rc ) {
 177                 fprintf(stderr,"  irecv failed in  ompi_comm_allreduce_pml at iterations %d \n",
 178                         exchange);
 179                 fflush(stderr);
 180                 goto Error;
 181             }
 182 
 183             
 184             if( 0 < count_this_stripe ) {
 185                 ompi_op_reduce(op,
 186                         (void *)scratch_bufers[send_buffer],
 187                         (void *)scratch_bufers[recv_buffer],
 188                         count_this_stripe,dtype);
 189             }
 190             
 191             recv_buffer^=1;
 192             send_buffer^=1;
 193 
 194         }
 195 
 196         
 197         if(0 < my_exchange_node.n_extra_sources)  {
 198 
 199             if ( EXTRA_NODE == my_exchange_node.node_type ) {
 200                 
 201 
 202 
 203                 extra_rank=my_exchange_node.rank_extra_source;
 204                 rc=MCA_PML_CALL(recv(scratch_bufers[recv_buffer],
 205                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 206                             -OMPI_COMMON_TAG_ALLREDUCE, comm,
 207                             MPI_STATUSES_IGNORE));
 208                 if( 0 > rc ) {
 209                     fprintf(stderr,"  last recv failed in ompi_comm_allreduce_pml \n");
 210                     fflush(stderr);
 211                     goto  Error;
 212                 }
 213 
 214                 recv_buffer^=1;
 215                 send_buffer^=1;
 216             } else {
 217                 
 218 
 219 
 220 
 221                 extra_rank=my_exchange_node.rank_extra_source;
 222                 rc=MCA_PML_CALL(send((char *)scratch_bufers[send_buffer],
 223                             count_this_stripe,dtype,ranks_in_comm[extra_rank],
 224                             -OMPI_COMMON_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD,
 225                             comm));
 226                 if( 0 > rc ) {
 227                     fprintf(stderr,"  last send failed in ompi_comm_allreduce_pml \n");
 228                     fflush(stderr);
 229                     goto  Error;
 230                 }
 231             }
 232         }
 233 
 234         
 235         rbuf_current = (char *) rbuf + count_processed * dt_size;
 236         memcpy(rbuf_current,scratch_bufers[send_buffer], count_this_stripe*dt_size);
 237 
 238         
 239         count_processed += count_this_stripe;
 240     }
 241 
 242     ompi_netpatterns_cleanup_recursive_doubling_tree_node(&my_exchange_node);
 243 
 244     
 245     return OMPI_SUCCESS;
 246 
 247 Error:
 248     return rc;
 249 }