This source file includes following definitions.
- send_completion
- recv_completion
- op_reduce
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 #include "ompi_config.h"
  16 
  17 #include "ompi/constants.h"
  18 #include "coll_sm2.h"
  19 #include "ompi/op/op.h"
  20 #include "ompi/datatype/ompi_datatype.h"
  21 #include "ompi/communicator/communicator.h"
  22 #include "ompi/mca/rte/rte.h"
  23 
  24 void send_completion(nt status, struct ompi_process_name_t* peer, struct iovec* msg,
  25                      int count, ompi_rml_tag_t tag, void* cbdata)
  26 {
  27     
  28     *(int *)cbdata=1;
  29 }
  30 
  31 
  32 void recv_completion(nt status, struct ompi_process_name_t* peer, struct iovec* msg,
  33                      int count, ompi_rml_tag_t tag, void* cbdata)
  34 {
  35     
  36     MB();
  37     *(int *)cbdata=1;
  38 }
  39 
  40 
  41 static void op_reduce(int op_type,(void *)src_dest_buf,(void *) src_buf, int count,
  42         int data_type)
  43 {
  44     
  45     int ret;
  46 
  47     
  48     switch (op_type) {
  49 
  50         case OP_SUM:
  51 
  52 
  53             switch (data_type) {
  54                 case TYPE_INT4:
  55                     int *int_src_ptr=(int *)src_ptr;
  56                     int *int_src_dst_ptr=(int *)src_dst_ptr;
  57                     int cnt;
  58                     for(cnt=0 ; cnt < count ; ) {
  59                         (*(int_src_dst_ptr))+=(*(int_src_ptr));
  60                     break;
  61                 default:
  62                     ret=OMPI_ERROR;
  63                     goto Error;
  64             }
  65 
  66             break;
  67 
  68         default:
  69         ret=OMPI_ERROR;
  70         goto Error;
  71     }
  72 Error:
  73     return ret;
  74 }
  75 
  76 
  77 
  78 
  79 static
  80 comm_allreduce(void *sbuf, void *rbuf, int count, opal_datatype_t *dtype,
  81         int op_type, opal_list_t *peers)
  82 {
  83     
  84     int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
  85     int pair_rank,exchange,extra_rank;
  86     int index_read,index_write;
  87     netpatterns_pair_exchange_node_t my_exchange_node;
  88     int my_rank,count_processed,count_this_stripe;
  89     size_t n_peers,message_extent,len_data_buffer;
  90     size_t dt_size;
  91     long long tag, base_tag;
  92     sm_work_buffer_t *sm_buffer_desc;
  93     opal_list_item_t *item;
  94     char scratch_bufers[2][MAX_TMP_BUFFER];
  95     int send_buffer=0;recv_buffer=1;
  96     char *sbuf_current,*rbuf_current;
  97     ompi_proc_t **proc_array;
  98     struct iovec send_iov, recv_iov;
  99     volatile int *recv_done, *send_done;
 100     int recv_completion_flag, send_completion_flag;
 101     int data_type;
 102 
 103     
 104 
 105 
 106     rc=opal_datatype_type_size(dtype, &dt_size);
 107     if( OMPI_SUCCESS != rc ) {
 108         goto Error;
 109     }
 110     message_extent=dt_extent*count;
 111 
 112     
 113     len_data_buffer=sm_module->data_memory_per_proc_per_segment;
 114 
 115     
 116     n_dts_per_buffer=((int) MAX_TMP_BUFFER)/dt_size;
 117     if ( 0 == n_dts_per_buffer ) {
 118         rc=OMPI_ERROR;
 119         goto Error;
 120     }
 121 
 122     
 123     n_dts_per_buffer/=2;
 124     len_data_buffer=n_dts_per_buffer*dt_size;
 125 
 126     
 127     n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
 128 
 129     
 130     n_peers=opal_list_get_size(peers);
 131 
 132     
 133     my_rank=0;
 134     for (item = opal_list_get_first(peers) ;
 135             item != opal_list_get_end(peers) ;
 136             item = opal_list_get_next(peers)) {
 137         if(ompi_proc_local()==(ompi_proc_t *)item){
 138             
 139             break;
 140         }
 141         my_rank++;
 142     }
 143     proc_array=(ompi_proc_t **)malloc(sizeof(ompi_proc_t *)*n_peers);
 144     if( NULL == proc_array) {
 145         goto Error;
 146     }
 147     cnt=0;
 148     for (item = opal_list_get_first(peers) ;
 149             item != opal_list_get_end(peers) ;
 150             item = opal_list_get_next(peers)) {
 151         proc_array[cnt]=(ompi_proc_t *)item;
 152         cnt++;
 153     }
 154 
 155     
 156     ret=ompi_netpatterns_setup_recursive_doubling_tree_node(n_peers,my_rank,&my_exchange_node);
 157     if(OMPI_SUCCESS != ret){
 158         return ret;
 159     }
 160 
 161     
 162     recv_done=&recv_completion_flag;
 163     send_done=&send_completion_flag;
 164 
 165     
 166     if(&opal_datatype_int4==dtype) {
 167         data_type=TYPE_INT4;
 168     }
 169 
 170     count_processed=0;
 171 
 172     
 173     
 174     for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
 175 
 176         
 177         count_this_stripe=n_dts_per_buffer;
 178         if( count_processed + count_this_stripe > count )
 179             count_this_stripe=count-count_processed;
 180 
 181         
 182         sbuf_current=(char *)sbuf+count_processed*dt_size;
 183         memcopy(scratch_bufers[send_buffer],sbuf_current,count_this_stripe*dt_size);
 184 
 185         
 186         if(0 < my_exchange_node->n_extra_sources)  {
 187 
 188             if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
 189 
 190                 
 191 
 192 
 193 
 194                 extra_rank=my_exchange_node.rank_extra_source;
 195                 recv_iov.iov_base=scratch_bufers[recv_buffer];
 196                 recv_iov.iov_len=count_this_stripe*dt_size;
 197                 rc = ompi_rte_recv(&(proc_array[extra_rank]->proc_name), &recv_iov, 1,
 198                         OMPI_RML_TAG_ALLREDUCE , 0);
 199                 if(OMPI_SUCCESS != rc ) {
 200                     goto  Error;
 201                 }
 202 
 203                 
 204                 if( 0 < count_this_stripe ) {
 205                     op_reduce(op_type,(void *)scratch_bufers[recv_buffer],
 206                             (void *)scratch_bufers[send_buffer], n_my_count,TYPE_INT4);
 207                 }
 208 
 209 
 210             } else {
 211 
 212                 
 213 
 214 
 215                 extra_rank=my_exchange_node.rank_extra_source;
 216                 send_iov.iov_base=scratch_bufers[send_buffer];
 217                 send_iov.iov_len=count_this_stripe*dt_size;
 218                 rc = ompi_rte_send(&(proc_array[extra_rank]->proc_name), &send_iov, 1,
 219                         OMPI_RML_TAG_ALLREDUCE , 0);
 220                 if(OMPI_SUCCESS != rc ) {
 221                     goto  Error;
 222                 }
 223             }
 224 
 225             
 226 
 227 
 228 
 229 
 230             recv_buffer^=1;
 231             send_buffer^=1;
 232         }
 233 
 234         MB();
 235         
 236 
 237 
 238         tag=base_tag+1;
 239         my_ctl_pointer->flag=tag;
 240 
 241         
 242         for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
 243 
 244             
 245 
 246 
 247 
 248 
 249             my_write_pointer=my_tmp_data_buffer[index_write];
 250             my_read_pointer=my_tmp_data_buffer[index_read];
 251 
 252             
 253             pair_rank=my_exchange_node->rank_exchanges[exchange];
 254 
 255             *recv_done=0;
 256             *send_done=0;
 257             MB();
 258 
 259             
 260             recv_iov.iov_base=scratch_bufers[send_buffer];
 261             recv_iov.iov_len=count_this_stripe*dt_size;
 262             rc = ompi_rte_recv_nb(&(proc_array[extra_rank]->proc_name), recv_iov, 1,
 263                         OMPI_RML_TAG_ALLREDUCE , 0, recv_completion, recv_done);
 264 
 265             
 266             send_iov.iov_base=scratch_bufers[send_buffer];
 267             send_iov.iov_len=count_this_stripe*dt_size;
 268             rc = ompi_rte_send_nb(&(proc_array[extra_rank]->proc_name), send_iov, 1,
 269                         OMPI_RML_TAG_ALLREDUCE , 0, send_completion, send_done);
 270 
 271             
 272             while(!(*recv_done) ) {
 273                 opal_progress();
 274             }
 275 
 276             
 277             if( 0 < count_this_stripe ) {
 278                 op_reduce(op_type,(void *)scratch_bufers[recv_buffer],
 279                         (void *)scratch_bufers[send_buffer], n_my_count,TYPE_INT4);
 280             }
 281 
 282 
 283             
 284             index_read=(exchange&1);
 285             index_write=((exchange+1)&1);
 286 
 287             
 288             while(!(*send_done) ) {
 289                 opal_progress();
 290             }
 291 
 292         }
 293 
 294         
 295         if(0 < my_exchange_node->n_extra_sources)  {
 296 
 297             if ( EXTRA_NODE == my_exchange_node->node_type ) {
 298                 
 299 
 300 
 301                 extra_rank=my_exchange_node->rank_extra_source;
 302 
 303                 recv_iov.iov_base=scratch_bufers[recv_buffer];
 304                 recv_iov.iov_len=count_this_stripe*dt_size;
 305                 rc = ompi_rte_recv(&(proc_array[extra_rank]->proc_name), &recv_iov, 1,
 306                         OMPI_RML_TAG_ALLREDUCE , 0);
 307                 if(OMPI_SUCCESS != rc ) {
 308                     goto  Error;
 309                 }
 310 
 311             } else {
 312                 
 313 
 314 
 315 
 316                 extra_rank=my_exchange_node->rank_extra_source;
 317                 send_iov.iov_base=scratch_bufers[recv_buffer];
 318                 send_iov.iov_len=count_this_stripe*dt_size;
 319                 rc = ompi_rte_recv(&(proc_array[extra_rank]->proc_name), &send_iov, 1,
 320                         OMPI_RML_TAG_ALLREDUCE , 0);
 321                 if(OMPI_SUCCESS != rc ) {
 322                     goto  Error;
 323                 }
 324             }
 325         }
 326 
 327         
 328         rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
 329                 (char *)((char *)rbuf+dt_extent*count_processed),
 330                 (char *)my_write_pointer);
 331         if( 0 != rc ) {
 332             return OMPI_ERROR;
 333         }
 334 
 335         
 336         rbuf_current=(char *)rbuf+count_processed*dt_size;
 337         memcopy(scratch_bufers[recv_buffer],rbuf_current,count_this_stripe*dt_size);
 338 
 339         
 340         count_processed+=count_this_stripe;
 341     }
 342 
 343     
 344     return rc;
 345 
 346 Error:
 347     return rc;
 348 }