This source file includes following definitions.
- mca_coll_sm_module_construct
- mca_coll_sm_module_destruct
- mca_coll_sm_module_disable
- mca_coll_sm_init_query
- mca_coll_sm_comm_query
- sm_module_enable
- ompi_coll_sm_lazy_enable
- bootstrap_comm
- mca_coll_sm_ft_event
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 
  32 
  33 
  34 
  35 
  36 #include "ompi_config.h"
  37 
  38 #include <stdio.h>
  39 #include <string.h>
  40 #ifdef HAVE_SCHED_H
  41 #include <sched.h>
  42 #endif
  43 #include <sys/types.h>
  44 #ifdef HAVE_SYS_MMAN_H
  45 #include <sys/mman.h>
  46 #endif  
  47 #ifdef HAVE_UNISTD_H
  48 #include <unistd.h>
  49 #endif  
  50 
  51 #include "mpi.h"
  52 #include "opal_stdint.h"
  53 #include "opal/mca/hwloc/base/base.h"
  54 #include "opal/util/os_path.h"
  55 #include "opal/util/printf.h"
  56 
  57 #include "ompi/communicator/communicator.h"
  58 #include "ompi/group/group.h"
  59 #include "ompi/mca/coll/coll.h"
  60 #include "ompi/mca/coll/base/base.h"
  61 #include "ompi/mca/rte/rte.h"
  62 #include "ompi/proc/proc.h"
  63 #include "coll_sm.h"
  64 
  65 #include "ompi/mca/coll/base/coll_tags.h"
  66 #include "ompi/mca/pml/pml.h"
  67 
  68 
  69 
  70 
  71 uint32_t mca_coll_sm_one = 1;
  72 
  73 
  74 
  75 
  76 
  77 static int sm_module_enable(mca_coll_base_module_t *module,
  78                           struct ompi_communicator_t *comm);
  79 static int bootstrap_comm(ompi_communicator_t *comm,
  80                           mca_coll_sm_module_t *module);
  81 static int mca_coll_sm_module_disable(mca_coll_base_module_t *module,
  82                           struct ompi_communicator_t *comm);
  83 
  84 
  85 
  86 
  87 static void mca_coll_sm_module_construct(mca_coll_sm_module_t *module)
  88 {
  89     module->enabled = false;
  90     module->sm_comm_data = NULL;
  91     module->previous_reduce = NULL;
  92     module->previous_reduce_module = NULL;
  93     module->super.coll_module_disable = mca_coll_sm_module_disable;
  94 }
  95 
  96 
  97 
  98 
  99 static void mca_coll_sm_module_destruct(mca_coll_sm_module_t *module)
 100 {
 101     mca_coll_sm_comm_t *c = module->sm_comm_data;
 102 
 103     if (NULL != c) {
 104         
 105         if (NULL != c->sm_bootstrap_meta) {
 106             
 107 
 108             mca_common_sm_fini(c->sm_bootstrap_meta);
 109             OBJ_RELEASE(c->sm_bootstrap_meta);
 110         }
 111         free(c);
 112     }
 113 
 114     
 115     if (NULL != module->previous_reduce_module) {
 116         OBJ_RELEASE(module->previous_reduce_module);
 117     }
 118 
 119     module->enabled = false;
 120 }
 121 
 122 
 123 
 124 
 125 static int mca_coll_sm_module_disable(mca_coll_base_module_t *module, struct ompi_communicator_t *comm)
 126 {
 127     mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
 128     if (NULL != sm_module->previous_reduce_module) {
 129         sm_module->previous_reduce = NULL;
 130         OBJ_RELEASE(sm_module->previous_reduce_module);
 131         sm_module->previous_reduce_module = NULL;
 132     }
 133     return OMPI_SUCCESS;
 134 }
 135 
 136 
 137 OBJ_CLASS_INSTANCE(mca_coll_sm_module_t,
 138                    mca_coll_base_module_t,
 139                    mca_coll_sm_module_construct,
 140                    mca_coll_sm_module_destruct);
 141 
 142 
 143 
 144 
 145 
 146 
 147 
 148 int mca_coll_sm_init_query(bool enable_progress_threads,
 149                            bool enable_mpi_threads)
 150 {
 151     
 152     if (NULL == ompi_process_info.job_session_dir) {
 153         return OMPI_ERR_OUT_OF_RESOURCE;
 154     }
 155     
 156 
 157     opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 158                         "coll:sm:init_query: pick me! pick me!");
 159     return OMPI_SUCCESS;
 160 }
 161 
 162 
 163 
 164 
 165 
 166 
 167 
 168 mca_coll_base_module_t *
 169 mca_coll_sm_comm_query(struct ompi_communicator_t *comm, int *priority)
 170 {
 171     mca_coll_sm_module_t *sm_module;
 172 
 173     
 174 
 175 
 176     if (OMPI_COMM_IS_INTER(comm) || 1 == ompi_comm_size(comm) || ompi_group_have_remote_peers (comm->c_local_group)) {
 177         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 178                             "coll:sm:comm_query (%d/%s): intercomm, comm is too small, or not all peers local; disqualifying myself", comm->c_contextid, comm->c_name);
 179         return NULL;
 180     }
 181 
 182     
 183 
 184     *priority = mca_coll_sm_component.sm_priority;
 185     if (mca_coll_sm_component.sm_priority <= 0) {
 186         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 187                             "coll:sm:comm_query (%d/%s): priority too low; disqualifying myself", comm->c_contextid, comm->c_name);
 188         return NULL;
 189     }
 190 
 191     sm_module = OBJ_NEW(mca_coll_sm_module_t);
 192     if (NULL == sm_module) {
 193         return NULL;
 194     }
 195 
 196     
 197     sm_module->super.coll_module_enable = sm_module_enable;
 198     sm_module->super.ft_event        = mca_coll_sm_ft_event;
 199     sm_module->super.coll_allgather  = NULL;
 200     sm_module->super.coll_allgatherv = NULL;
 201     sm_module->super.coll_allreduce  = mca_coll_sm_allreduce_intra;
 202     sm_module->super.coll_alltoall   = NULL;
 203     sm_module->super.coll_alltoallv  = NULL;
 204     sm_module->super.coll_alltoallw  = NULL;
 205     sm_module->super.coll_barrier    = mca_coll_sm_barrier_intra;
 206     sm_module->super.coll_bcast      = mca_coll_sm_bcast_intra;
 207     sm_module->super.coll_exscan     = NULL;
 208     sm_module->super.coll_gather     = NULL;
 209     sm_module->super.coll_gatherv    = NULL;
 210     sm_module->super.coll_reduce     = mca_coll_sm_reduce_intra;
 211     sm_module->super.coll_reduce_scatter = NULL;
 212     sm_module->super.coll_scan       = NULL;
 213     sm_module->super.coll_scatter    = NULL;
 214     sm_module->super.coll_scatterv   = NULL;
 215 
 216     opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 217                         "coll:sm:comm_query (%d/%s): pick me! pick me!",
 218                         comm->c_contextid, comm->c_name);
 219     return &(sm_module->super);
 220 }
 221 
 222 
 223 
 224 
 225 
 226 static int sm_module_enable(mca_coll_base_module_t *module,
 227                             struct ompi_communicator_t *comm)
 228 {
 229     if (NULL == comm->c_coll->coll_reduce ||
 230         NULL == comm->c_coll->coll_reduce_module) {
 231         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 232                             "coll:sm:enable (%d/%s): no underlying reduce; disqualifying myself",
 233                             comm->c_contextid, comm->c_name);
 234         return OMPI_ERROR;
 235     }
 236 
 237     
 238     return OMPI_SUCCESS;
 239 }
 240 
 241 int ompi_coll_sm_lazy_enable(mca_coll_base_module_t *module,
 242                              struct ompi_communicator_t *comm)
 243 {
 244     int i, j, root, ret;
 245     int rank = ompi_comm_rank(comm);
 246     int size = ompi_comm_size(comm);
 247     mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
 248     mca_coll_sm_comm_t *data = NULL;
 249     size_t control_size, frag_size;
 250     mca_coll_sm_component_t *c = &mca_coll_sm_component;
 251     opal_hwloc_base_memory_segment_t *maffinity;
 252     int parent, min_child, num_children;
 253     unsigned char *base = NULL;
 254     const int num_barrier_buffers = 2;
 255 
 256     
 257     if (sm_module->enabled) {
 258         return OMPI_SUCCESS;
 259     }
 260     sm_module->enabled = true;
 261 
 262     
 263 
 264     maffinity = (opal_hwloc_base_memory_segment_t*)
 265         malloc(sizeof(opal_hwloc_base_memory_segment_t) *
 266                c->sm_comm_num_segments * 3);
 267     if (NULL == maffinity) {
 268         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 269                             "coll:sm:enable (%d/%s): malloc failed (1)",
 270                             comm->c_contextid, comm->c_name);
 271         return OMPI_ERR_OUT_OF_RESOURCE;
 272     }
 273 
 274     
 275 
 276 
 277 
 278 
 279 
 280 
 281 
 282 
 283 
 284 
 285 
 286     sm_module->sm_comm_data = data = (mca_coll_sm_comm_t*)
 287         malloc(sizeof(mca_coll_sm_comm_t) +
 288                (c->sm_comm_num_segments *
 289                 sizeof(mca_coll_sm_data_index_t)) +
 290                (size *
 291                 (sizeof(mca_coll_sm_tree_node_t) +
 292                  (sizeof(mca_coll_sm_tree_node_t*) * c->sm_tree_degree))));
 293     if (NULL == data) {
 294         free(maffinity);
 295         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 296                             "coll:sm:enable (%d/%s): malloc failed (2)",
 297                             comm->c_contextid, comm->c_name);
 298         return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
 299     }
 300     data->mcb_operation_count = 0;
 301 
 302     
 303 
 304     data->mcb_data_index = (mca_coll_sm_data_index_t*) (data + 1);
 305     
 306     data->mcb_tree = (mca_coll_sm_tree_node_t*)
 307         (data->mcb_data_index + c->sm_comm_num_segments);
 308     
 309 
 310     data->mcb_tree[0].mcstn_children = (mca_coll_sm_tree_node_t**)
 311         (data->mcb_tree + size);
 312     for (i = 1; i < size; ++i) {
 313         data->mcb_tree[i].mcstn_children =
 314             data->mcb_tree[i - 1].mcstn_children + c->sm_tree_degree;
 315     }
 316 
 317     
 318 
 319 
 320     for (root = 0; root < size; ++root) {
 321         parent = (root - 1) / mca_coll_sm_component.sm_tree_degree;
 322         num_children = mca_coll_sm_component.sm_tree_degree;
 323 
 324         
 325 
 326         if ((root * num_children) + 1 >= size) {
 327             
 328             min_child = -1;
 329             num_children = 0;
 330         } else {
 331             
 332             int max_child;
 333             min_child = root * num_children + 1;
 334             max_child = root * num_children + num_children;
 335             if (max_child >= size) {
 336                 max_child = size - 1;
 337             }
 338             num_children = max_child - min_child + 1;
 339         }
 340 
 341         
 342         data->mcb_tree[root].mcstn_id = root;
 343         if (root == 0 && parent == 0) {
 344             data->mcb_tree[root].mcstn_parent = NULL;
 345         } else {
 346             data->mcb_tree[root].mcstn_parent = &data->mcb_tree[parent];
 347         }
 348         data->mcb_tree[root].mcstn_num_children = num_children;
 349         for (i = 0; i < c->sm_tree_degree; ++i) {
 350             data->mcb_tree[root].mcstn_children[i] =
 351                 (i < num_children) ?
 352                 &data->mcb_tree[min_child + i] : NULL;
 353         }
 354     }
 355 
 356     
 357     if (OMPI_SUCCESS != (ret = bootstrap_comm(comm, sm_module))) {
 358         free(data);
 359         free(maffinity);
 360         sm_module->sm_comm_data = NULL;
 361         return ret;
 362     }
 363 
 364     
 365 
 366 
 367 
 368 
 369 
 370 
 371 
 372     control_size = c->sm_control_size;
 373     base = data->sm_bootstrap_meta->module_data_addr;
 374     data->mcb_barrier_control_me = (uint32_t*)
 375         (base + (rank * control_size * num_barrier_buffers * 2));
 376     if (data->mcb_tree[rank].mcstn_parent) {
 377         data->mcb_barrier_control_parent = (opal_atomic_uint32_t*)
 378             (base +
 379              (data->mcb_tree[rank].mcstn_parent->mcstn_id * control_size *
 380               num_barrier_buffers * 2));
 381     } else {
 382         data->mcb_barrier_control_parent = NULL;
 383     }
 384     if (data->mcb_tree[rank].mcstn_num_children > 0) {
 385         data->mcb_barrier_control_children = (uint32_t*)
 386             (base +
 387              (data->mcb_tree[rank].mcstn_children[0]->mcstn_id * control_size *
 388               num_barrier_buffers * 2));
 389     } else {
 390         data->mcb_barrier_control_children = NULL;
 391     }
 392     data->mcb_barrier_count = 0;
 393 
 394     
 395 
 396 
 397     base += (c->sm_control_size * size * num_barrier_buffers * 2);
 398     data->mcb_in_use_flags = (mca_coll_sm_in_use_flag_t*) base;
 399 
 400     
 401 
 402 
 403     j = 0;
 404     if (0 == rank) {
 405         maffinity[j].mbs_start_addr = base;
 406         maffinity[j].mbs_len = c->sm_control_size *
 407             c->sm_comm_num_in_use_flags;
 408         
 409 
 410 
 411 
 412 
 413         for (i = 0; i < mca_coll_sm_component.sm_comm_num_in_use_flags; ++i) {
 414             ((mca_coll_sm_in_use_flag_t *)base)[i].mcsiuf_operation_count = 1;
 415             ((mca_coll_sm_in_use_flag_t *)base)[i].mcsiuf_num_procs_using = 0;
 416         }
 417         ++j;
 418     }
 419 
 420     
 421 
 422     base += (c->sm_comm_num_in_use_flags * c->sm_control_size);
 423     control_size = size * c->sm_control_size;
 424     frag_size = size * c->sm_fragment_size;
 425     for (i = 0; i < c->sm_comm_num_segments; ++i) {
 426         data->mcb_data_index[i].mcbmi_control = (uint32_t*)
 427             (base + (i * (control_size + frag_size)));
 428         data->mcb_data_index[i].mcbmi_data =
 429             (((char*) data->mcb_data_index[i].mcbmi_control) +
 430              control_size);
 431 
 432         
 433 
 434         maffinity[j].mbs_len = c->sm_control_size;
 435         maffinity[j].mbs_start_addr = (void *)
 436             (((char*) data->mcb_data_index[i].mcbmi_control) +
 437              (rank * c->sm_control_size));
 438         ++j;
 439 
 440         
 441 
 442         maffinity[j].mbs_len = c->sm_fragment_size;
 443         maffinity[j].mbs_start_addr =
 444             ((char*) data->mcb_data_index[i].mcbmi_data) +
 445             (rank * c->sm_control_size);
 446         ++j;
 447     }
 448 
 449     
 450 
 451     opal_hwloc_base_memory_set(maffinity, j);
 452     free(maffinity);
 453 
 454     
 455     memset(data->mcb_barrier_control_me, 0,
 456            num_barrier_buffers * 2 * c->sm_control_size);
 457     for (i = 0; i < c->sm_comm_num_segments; ++i) {
 458         memset((void *) data->mcb_data_index[i].mcbmi_control, 0,
 459                c->sm_control_size);
 460     }
 461 
 462     
 463     sm_module->previous_reduce = comm->c_coll->coll_reduce;
 464     sm_module->previous_reduce_module = comm->c_coll->coll_reduce_module;
 465     OBJ_RETAIN(sm_module->previous_reduce_module);
 466 
 467     
 468     opal_atomic_add (&(data->sm_bootstrap_meta->module_seg->seg_inited), 1);
 469 
 470     
 471     opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 472                         "coll:sm:enable (%d/%s): waiting for peers to attach",
 473                         comm->c_contextid, comm->c_name);
 474     SPIN_CONDITION(size == data->sm_bootstrap_meta->module_seg->seg_inited, seg_init_exit);
 475 
 476     
 477     if (0 == rank) {
 478         unlink(data->sm_bootstrap_meta->shmem_ds.seg_name);
 479         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 480                             "coll:sm:enable (%d/%s): removed mmap file %s",
 481                             comm->c_contextid, comm->c_name,
 482                             data->sm_bootstrap_meta->shmem_ds.seg_name);
 483     }
 484 
 485     
 486 
 487     opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 488                         "coll:sm:enable (%d/%s): success!",
 489                         comm->c_contextid, comm->c_name);
 490     return OMPI_SUCCESS;
 491 }
 492 
 493 static int bootstrap_comm(ompi_communicator_t *comm,
 494                           mca_coll_sm_module_t *module)
 495 {
 496     int i;
 497     char *shortpath, *fullpath;
 498     mca_coll_sm_component_t *c = &mca_coll_sm_component;
 499     mca_coll_sm_comm_t *data = module->sm_comm_data;
 500     int comm_size = ompi_comm_size(comm);
 501     int num_segments = c->sm_comm_num_segments;
 502     int num_in_use = c->sm_comm_num_in_use_flags;
 503     int frag_size = c->sm_fragment_size;
 504     int control_size = c->sm_control_size;
 505     ompi_process_name_t *lowest_name = NULL;
 506     size_t size;
 507     ompi_proc_t *proc;
 508 
 509     
 510 
 511 
 512 
 513     proc = ompi_group_peer_lookup(comm->c_local_group, 0);
 514     lowest_name = OMPI_CAST_RTE_NAME(&proc->super.proc_name);
 515     for (i = 1; i < comm_size; ++i) {
 516         proc = ompi_group_peer_lookup(comm->c_local_group, i);
 517         if (ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
 518                                           OMPI_CAST_RTE_NAME(&proc->super.proc_name),
 519                                           lowest_name) < 0) {
 520             lowest_name = OMPI_CAST_RTE_NAME(&proc->super.proc_name);
 521         }
 522     }
 523     opal_asprintf(&shortpath, "coll-sm-cid-%d-name-%s.mmap", comm->c_contextid,
 524              OMPI_NAME_PRINT(lowest_name));
 525     if (NULL == shortpath) {
 526         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 527                             "coll:sm:enable:bootstrap comm (%d/%s): asprintf failed",
 528                             comm->c_contextid, comm->c_name);
 529         return OMPI_ERR_OUT_OF_RESOURCE;
 530     }
 531     fullpath = opal_os_path(false, ompi_process_info.job_session_dir,
 532                             shortpath, NULL);
 533     free(shortpath);
 534     if (NULL == fullpath) {
 535         opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 536                             "coll:sm:enable:bootstrap comm (%d/%s): opal_os_path failed",
 537                             comm->c_contextid, comm->c_name);
 538         return OMPI_ERR_OUT_OF_RESOURCE;
 539     }
 540 
 541     
 542 
 543 
 544 
 545 
 546 
 547 
 548 
 549 
 550 
 551 
 552 
 553 
 554 
 555 
 556 
 557 
 558 
 559 
 560 
 561 
 562     size = 4 * control_size +
 563         (num_in_use * control_size) +
 564         (num_segments * (comm_size * control_size * 2)) +
 565         (num_segments * (comm_size * frag_size));
 566     opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 567                         "coll:sm:enable:bootstrap comm (%d/%s): attaching to %" PRIsize_t " byte mmap: %s",
 568                         comm->c_contextid, comm->c_name, size, fullpath);
 569     if (0 == ompi_comm_rank (comm)) {
 570         data->sm_bootstrap_meta = mca_common_sm_module_create_and_attach (size, fullpath, sizeof(mca_common_sm_seg_header_t), 8);
 571         if (NULL == data->sm_bootstrap_meta) {
 572             opal_output_verbose(10, ompi_coll_base_framework.framework_output,
 573                 "coll:sm:enable:bootstrap comm (%d/%s): mca_common_sm_init_group failed",
 574                 comm->c_contextid, comm->c_name);
 575             free(fullpath);
 576             return OMPI_ERR_OUT_OF_RESOURCE;
 577         }
 578 
 579         for (int i = 1 ; i < ompi_comm_size (comm) ; ++i) {
 580             MCA_PML_CALL(send(&data->sm_bootstrap_meta->shmem_ds, sizeof (data->sm_bootstrap_meta->shmem_ds), MPI_BYTE,
 581                          i, MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm));
 582         }
 583     } else {
 584         opal_shmem_ds_t shmem_ds;
 585         MCA_PML_CALL(recv(&shmem_ds, sizeof (shmem_ds), MPI_BYTE, 0, MCA_COLL_BASE_TAG_BCAST, comm, MPI_STATUS_IGNORE));
 586         data->sm_bootstrap_meta = mca_common_sm_module_attach (&shmem_ds, sizeof(mca_common_sm_seg_header_t), 8);
 587     }
 588 
 589     
 590     free(fullpath);
 591     return OMPI_SUCCESS;
 592 }
 593 
 594 
 595 int mca_coll_sm_ft_event(int state) {
 596     if(OPAL_CRS_CHECKPOINT == state) {
 597         ;
 598     }
 599     else if(OPAL_CRS_CONTINUE == state) {
 600         ;
 601     }
 602     else if(OPAL_CRS_RESTART == state) {
 603         ;
 604     }
 605     else if(OPAL_CRS_TERM == state ) {
 606         ;
 607     }
 608     else {
 609         ;
 610     }
 611 
 612     return OMPI_SUCCESS;
 613 }