root/ompi/dpm/dpm.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_dpm_init
  2. ompi_dpm_connect_accept
  3. construct_peers
  4. ompi_dpm_disconnect
  5. ompi_dpm_spawn
  6. ompi_dpm_open_port
  7. ompi_dpm_close_port
  8. ompi_dpm_dyn_init
  9. ompi_dpm_finalize
  10. ompi_dpm_dyn_finalize
  11. disconnect_init
  12. disconnect_waitall
  13. ompi_dpm_group_is_dyn
  14. ompi_dpm_mark_dyncomm

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2007-2018 Cisco Systems, Inc.  All rights reserved
  14  * Copyright (c) 2006-2009 University of Houston.  All rights reserved.
  15  * Copyright (c) 2009      Sun Microsystems, Inc.  All rights reserved.
  16  * Copyright (c) 2011-2015 Los Alamos National Security, LLC.  All rights
  17  *                         reserved.
  18  * Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
  19  * Copyright (c) 2014-2017 Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * Copyright (c) 2018      Amazon.com, Inc. or its affiliates.  All Rights reserved.
  22  * $COPYRIGHT$
  23  *
  24  * Additional copyrights may follow
  25  *
  26  * $HEADER$
  27  */
  28 
  29 #include "ompi_config.h"
  30 #include "ompi/constants.h"
  31 
  32 #include <string.h>
  33 #include <stdio.h>
  34 #include <ctype.h>
  35 #include <time.h>
  36 #if HAVE_SYS_TIME_H
  37 #include <sys/time.h>
  38 #endif
  39 
  40 #include "opal/util/alfg.h"
  41 #include "opal/util/argv.h"
  42 #include "opal/util/opal_getcwd.h"
  43 #include "opal/util/proc.h"
  44 #include "opal/util/show_help.h"
  45 #include "opal/util/printf.h"
  46 #include "opal/dss/dss.h"
  47 #include "opal/mca/hwloc/base/base.h"
  48 #include "opal/mca/pmix/pmix.h"
  49 
  50 #include "ompi/communicator/communicator.h"
  51 #include "ompi/group/group.h"
  52 #include "ompi/proc/proc.h"
  53 #include "ompi/mca/pml/pml.h"
  54 #include "ompi/mca/rte/rte.h"
  55 #include "ompi/info/info.h"
  56 
  57 #include "ompi/dpm/dpm.h"
  58 
  59 static opal_rng_buff_t rnd;
  60 
  61 typedef struct {
  62     ompi_communicator_t       *comm;
  63     int                       size;
  64     struct ompi_request_t     **reqs;
  65     int                       buf;
  66 } ompi_dpm_disconnect_obj;
  67 static int disconnect_waitall (int count, ompi_dpm_disconnect_obj **objs);
  68 static ompi_dpm_disconnect_obj *disconnect_init(ompi_communicator_t *comm);
  69 
  70 typedef struct {
  71     opal_list_item_t super;
  72     ompi_proc_t *p;
  73 } ompi_dpm_proct_caddy_t;
  74 static OBJ_CLASS_INSTANCE(ompi_dpm_proct_caddy_t,
  75                           opal_list_item_t,
  76                           NULL, NULL);
  77 
  78 /*
  79  * Init the module
  80  */
  81 int ompi_dpm_init(void)
  82 {
  83     time_t now;
  84 
  85     /* seed our random number generator */
  86     now = time(NULL);
  87     if (!opal_srand(&rnd, now)) {
  88         return OMPI_ERROR;
  89     }
  90     return OMPI_SUCCESS;
  91 }
  92 
  93 int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
  94                             const char *port_string, bool send_first,
  95                             ompi_communicator_t **newcomm)
  96 {
  97     int k, size, rsize, rank, rc, rportlen=0;
  98     char **members = NULL, *nstring, *rport=NULL;
  99     bool dense, isnew;
 100     opal_process_name_t pname;
 101     opal_list_t ilist, mlist, rlist;
 102     opal_value_t info;
 103     opal_pmix_pdata_t pdat;
 104     opal_namelist_t *nm;
 105     opal_jobid_t jobid;
 106 
 107     ompi_communicator_t *newcomp=MPI_COMM_NULL;
 108     ompi_proc_t *proc;
 109     ompi_group_t *group=comm->c_local_group;
 110     ompi_proc_t **proc_list=NULL, **new_proc_list = NULL;
 111     int32_t i;
 112     ompi_group_t *new_group_pointer;
 113     ompi_dpm_proct_caddy_t *cd;
 114 
 115     if (NULL == opal_pmix.publish || NULL == opal_pmix.connect ||
 116         NULL == opal_pmix.unpublish ||
 117        (NULL == opal_pmix.lookup && NULL == opal_pmix.lookup_nb)) {
 118         /* print a nice message explaining we don't have support */
 119         opal_show_help("help-mpi-runtime.txt", "noconxcpt", true);
 120         return OMPI_ERR_NOT_SUPPORTED;
 121     }
 122     if (!ompi_rte_connect_accept_support(port_string)) {
 123         /* they will have printed the help message */
 124         return OMPI_ERR_NOT_SUPPORTED;
 125     }
 126 
 127     /* set default error return */
 128     *newcomm = MPI_COMM_NULL;
 129 
 130     size = ompi_comm_size ( comm );
 131     rank = ompi_comm_rank ( comm );
 132 
 133     /* the "send_first" end will append ":connect" to the port name and publish
 134      * the list of its participating procs on that key. The receiving root proc
 135      * will append ":accept" to the port name and publish the list of its
 136      * participants on that key. Each proc will then block waiting for lookup
 137      * to complete on the other's key. Once that completes, the list of remote
 138      * procs is used to complete construction of the intercommunicator. */
 139 
 140     /* everyone constructs the list of members from their communicator */
 141     if (MPI_COMM_WORLD == comm) {
 142         pname.jobid = OMPI_PROC_MY_NAME->jobid;
 143         pname.vpid = OPAL_VPID_WILDCARD;
 144         rc = opal_convert_process_name_to_string(&nstring, &pname);
 145         if (OPAL_SUCCESS != rc) {
 146             return OMPI_ERROR;
 147         }
 148         opal_argv_append_nosize(&members, nstring);
 149         free(nstring);
 150         /* have to add the number of procs in the job so the remote side
 151          * can correctly add the procs by computing their names, and our nspace
 152          * so they can update their records */
 153         if (NULL == (nstring = (char*)opal_pmix.get_nspace(OMPI_PROC_MY_NAME->jobid))) {
 154             opal_argv_free(members);
 155             return OMPI_ERR_NOT_SUPPORTED;
 156         }
 157         opal_argv_append_nosize(&members, nstring);
 158         (void)opal_asprintf(&nstring, "%d", size);
 159         opal_argv_append_nosize(&members, nstring);
 160         free(nstring);
 161     } else {
 162         if (OMPI_GROUP_IS_DENSE(group)) {
 163             proc_list = group->grp_proc_pointers;
 164             dense = true;
 165         } else {
 166             proc_list = (ompi_proc_t**)calloc(group->grp_proc_count,
 167                                               sizeof(ompi_proc_t *));
 168             for (i=0 ; i<group->grp_proc_count ; i++) {
 169                 if (NULL == (proc_list[i] = ompi_group_peer_lookup(group,i))) {
 170                     OMPI_ERROR_LOG(OMPI_ERR_NOT_FOUND);
 171                     rc = OMPI_ERR_NOT_FOUND;
 172                     free(proc_list);
 173                     goto exit;
 174                 }
 175             }
 176             dense = false;
 177         }
 178         for (i=0; i < size; i++) {
 179             opal_process_name_t proc_name;
 180             if (ompi_proc_is_sentinel (proc_list[i])) {
 181                 proc_name = ompi_proc_sentinel_to_name ((uintptr_t) proc_list[i]);
 182             } else {
 183                 proc_name = proc_list[i]->super.proc_name;
 184             }
 185             rc = opal_convert_process_name_to_string(&nstring, &proc_name);
 186             if (OPAL_SUCCESS != rc) {
 187                 if (!dense) {
 188                     free(proc_list);
 189                     proc_list = NULL;
 190                 }
 191                 return OMPI_ERROR;
 192             }
 193             opal_argv_append_nosize(&members, nstring);
 194             free(nstring);
 195             if (NULL == (nstring = (char*)opal_pmix.get_nspace(proc_name.jobid))) {
 196                 opal_argv_free(members);
 197                 free (proc_list);
 198                 return OMPI_ERR_NOT_SUPPORTED;
 199             }
 200             opal_argv_append_nosize(&members, nstring);
 201         }
 202         if (!dense) {
 203             free(proc_list);
 204             proc_list = NULL;
 205         }
 206     }
 207 
 208     if (rank == root) {
 209         /* the roots for each side exchange their list of participants */
 210         OBJ_CONSTRUCT(&info, opal_value_t);
 211         OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t);
 212         if (send_first) {
 213             (void)opal_asprintf(&info.key, "%s:connect", port_string);
 214             (void)opal_asprintf(&pdat.value.key, "%s:accept", port_string);
 215         } else {
 216             (void)opal_asprintf(&info.key, "%s:accept", port_string);
 217             (void)opal_asprintf(&pdat.value.key, "%s:connect", port_string);
 218         }
 219         info.type = OPAL_STRING;
 220         info.data.string = opal_argv_join(members, ':');
 221         pdat.value.type = OPAL_STRING;
 222 
 223         OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600);  // give them 10 minutes
 224         OBJ_DESTRUCT(&info);
 225         if (OPAL_SUCCESS != rc) {
 226             OBJ_DESTRUCT(&pdat);
 227             return rc;
 228         }
 229 
 230         /* save the result */
 231         rport = strdup(pdat.value.data.string);  // need this later
 232         rportlen = strlen(rport) + 1;  // retain the NULL terminator
 233         OBJ_DESTRUCT(&pdat);
 234     }
 235 
 236     /* if we aren't in a comm_spawn, the non-root members won't have
 237      * the port_string - so let's make sure everyone knows the other
 238      * side's participants */
 239 
 240     /* bcast the list-length to all processes in the local comm */
 241     rc = comm->c_coll->coll_bcast(&rportlen, 1, MPI_INT, root, comm,
 242                                  comm->c_coll->coll_bcast_module);
 243     if (OMPI_SUCCESS != rc) {
 244         free(rport);
 245         goto exit;
 246     }
 247 
 248     if (rank != root) {
 249         /* non root processes need to allocate the buffer manually */
 250         rport = (char*)malloc(rportlen);
 251         if (NULL == rport) {
 252             rc = OMPI_ERR_OUT_OF_RESOURCE;
 253             goto exit;
 254         }
 255     }
 256     /* now share the list of remote participants */
 257     rc = comm->c_coll->coll_bcast(rport, rportlen, MPI_BYTE, root, comm,
 258                                  comm->c_coll->coll_bcast_module);
 259     if (OMPI_SUCCESS != rc) {
 260         free(rport);
 261         goto exit;
 262     }
 263 
 264     /* initiate a list of participants for the connect,
 265      * starting with our own members */
 266     OBJ_CONSTRUCT(&mlist, opal_list_t);
 267     for (i=0; NULL != members[i]; i++) {
 268         nm = OBJ_NEW(opal_namelist_t);
 269         if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
 270             OMPI_ERROR_LOG(rc);
 271             opal_argv_free(members);
 272             free(rport);
 273             OPAL_LIST_DESTRUCT(&mlist);
 274             goto exit;
 275         }
 276         /* step over the nspace */
 277         ++i;
 278         if (NULL == members[i]) {
 279             /* this shouldn't happen and is an error */
 280             OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
 281             OPAL_LIST_DESTRUCT(&mlist);
 282             opal_argv_free(members);
 283             free(rport);
 284             rc = OMPI_ERR_BAD_PARAM;
 285             goto exit;
 286         }
 287         /* if the rank is wildcard, then we need to add all procs
 288          * in that job to the list */
 289         if (OPAL_VPID_WILDCARD == nm->name.vpid) {
 290             jobid = nm->name.jobid;
 291             OBJ_RELEASE(nm);
 292             for (k=0; k < size; k++) {
 293                 nm = OBJ_NEW(opal_namelist_t);
 294                 nm->name.jobid = jobid;
 295                 nm->name.vpid = k;
 296                 opal_list_append(&mlist, &nm->super);
 297             }
 298             /* now step over the size */
 299             if (NULL == members[i+1]) {
 300                 /* this shouldn't happen and is an error */
 301                 OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
 302                 OPAL_LIST_DESTRUCT(&mlist);
 303                 opal_argv_free(members);
 304                 free(rport);
 305                 rc = OMPI_ERR_BAD_PARAM;
 306                 goto exit;
 307             }
 308             ++i;
 309         } else {
 310             opal_list_append(&mlist, &nm->super);
 311         }
 312     }
 313     opal_argv_free(members);
 314     members = NULL;
 315 
 316     /* rport contains a colon-delimited list
 317      * of process names for the remote procs - convert it
 318      * into an argv array */
 319     members = opal_argv_split(rport, ':');
 320     free(rport);
 321 
 322     /* add the list of remote procs to our list, and
 323      * keep a list of them for later */
 324     OBJ_CONSTRUCT(&ilist, opal_list_t);
 325     OBJ_CONSTRUCT(&rlist, opal_list_t);
 326 
 327     for (i=0; NULL != members[i]; i++) {
 328         nm = OBJ_NEW(opal_namelist_t);
 329         if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
 330             OMPI_ERROR_LOG(rc);
 331             opal_argv_free(members);
 332             OPAL_LIST_DESTRUCT(&ilist);
 333             OPAL_LIST_DESTRUCT(&rlist);
 334             goto exit;
 335         }
 336         /* next entry is the nspace - register it */
 337         ++i;
 338         if (NULL == members[i]) {
 339             OMPI_ERROR_LOG(OMPI_ERR_NOT_SUPPORTED);
 340             opal_argv_free(members);
 341             OPAL_LIST_DESTRUCT(&ilist);
 342             OPAL_LIST_DESTRUCT(&rlist);
 343             goto exit;
 344         }
 345         opal_pmix.register_jobid(nm->name.jobid, members[i]);
 346         if (OPAL_VPID_WILDCARD == nm->name.vpid) {
 347             jobid = nm->name.jobid;
 348             OBJ_RELEASE(nm);
 349             /* if the vpid is wildcard, then we are including all ranks
 350              * of that job, and the next entry in members should be the
 351              * number of procs in the job */
 352             if (NULL == members[i+1]) {
 353                 /* just protect against the error */
 354                 OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
 355                 opal_argv_free(members);
 356                 OPAL_LIST_DESTRUCT(&ilist);
 357                 OPAL_LIST_DESTRUCT(&rlist);
 358                 rc = OMPI_ERR_BAD_PARAM;
 359                 goto exit;
 360             }
 361             rsize = strtoul(members[i+1], NULL, 10);
 362             ++i;
 363             for (k=0; k < rsize; k++) {
 364                 nm = OBJ_NEW(opal_namelist_t);
 365                 nm->name.jobid = jobid;
 366                 nm->name.vpid = k;
 367                 opal_list_append(&mlist, &nm->super);
 368                 /* see if this needs to be added to our ompi_proc_t array */
 369                 proc = ompi_proc_find_and_add(&nm->name, &isnew);
 370                 if (isnew) {
 371                     cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
 372                     cd->p = proc;
 373                     opal_list_append(&ilist, &cd->super);
 374                 }
 375                 /* either way, add to the remote list */
 376                 cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
 377                 cd->p = proc;
 378                 opal_list_append(&rlist, &cd->super);
 379             }
 380         } else {
 381             opal_list_append(&mlist, &nm->super);
 382             /* see if this needs to be added to our ompi_proc_t array */
 383             proc = ompi_proc_find_and_add(&nm->name, &isnew);
 384             if (isnew) {
 385                 cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
 386                 cd->p = proc;
 387                 opal_list_append(&ilist, &cd->super);
 388             }
 389             /* either way, add to the remote list */
 390             cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
 391             cd->p = proc;
 392             opal_list_append(&rlist, &cd->super);
 393         }
 394     }
 395     opal_argv_free(members);
 396 
 397     /* tell the host RTE to connect us - this will download
 398      * all known data for the nspace's of participating procs
 399      * so that add_procs will not result in a slew of lookups */
 400     rc = opal_pmix.connect(&mlist);
 401     OPAL_LIST_DESTRUCT(&mlist);
 402     if (OPAL_SUCCESS != rc) {
 403         OMPI_ERROR_LOG(rc);
 404         OPAL_LIST_DESTRUCT(&ilist);
 405         OPAL_LIST_DESTRUCT(&rlist);
 406         goto exit;
 407     }
 408     if (0 < opal_list_get_size(&ilist)) {
 409         /* convert the list of new procs to a proc_t array */
 410         new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist),
 411                                               sizeof(ompi_proc_t *));
 412         i = 0;
 413         OPAL_LIST_FOREACH(cd, &ilist, ompi_dpm_proct_caddy_t) {
 414             opal_value_t *kv;
 415             proc = cd->p;
 416             new_proc_list[i] = proc ;
 417             /* ompi_proc_complete_init_single() initializes and optionally retrieves
 418              * OPAL_PMIX_LOCALITY and OPAL_PMIX_HOSTNAME. since we can live without
 419              * them, we are just fine */
 420             ompi_proc_complete_init_single(proc);
 421             /* save the locality for later */
 422             kv = OBJ_NEW(opal_value_t);
 423             kv->key = strdup(OPAL_PMIX_LOCALITY);
 424             kv->type = OPAL_UINT16;
 425             kv->data.uint16 = proc->super.proc_flags;
 426             opal_pmix.store_local(&proc->super.proc_name, kv);
 427             OBJ_RELEASE(kv); // maintain accounting
 428             ++i;
 429         }
 430         /* call add_procs on the new ones */
 431         rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
 432         free(new_proc_list);
 433         new_proc_list = NULL;
 434         if (OMPI_SUCCESS != rc) {
 435             OMPI_ERROR_LOG(rc);
 436             OPAL_LIST_DESTRUCT(&ilist);
 437             goto exit;
 438         }
 439     }
 440     OPAL_LIST_DESTRUCT(&ilist);
 441 
 442     /* now deal with the remote group */
 443     rsize = opal_list_get_size(&rlist);
 444     new_group_pointer=ompi_group_allocate(rsize);
 445     if (NULL == new_group_pointer) {
 446         rc = OMPI_ERR_OUT_OF_RESOURCE;
 447         OPAL_LIST_DESTRUCT(&rlist);
 448         goto exit;
 449     }
 450     /* assign group elements */
 451     i=0;
 452     OPAL_LIST_FOREACH(cd, &rlist, ompi_dpm_proct_caddy_t) {
 453         new_group_pointer->grp_proc_pointers[i++] = cd->p;
 454         /* retain the proc */
 455         OBJ_RETAIN(cd->p);
 456     }
 457     OPAL_LIST_DESTRUCT(&rlist);
 458 
 459     /* set up communicator structure */
 460     rc = ompi_comm_set ( &newcomp,                 /* new comm */
 461                          comm,                     /* old comm */
 462                          group->grp_proc_count,    /* local_size */
 463                          NULL,                     /* local_procs */
 464                          rsize,                    /* remote_size */
 465                          NULL  ,                   /* remote_procs */
 466                          NULL,                     /* attrs */
 467                          comm->error_handler,      /* error handler */
 468                          NULL,                     /* topo component */
 469                          group,                    /* local group */
 470                          new_group_pointer         /* remote group */
 471                          );
 472     if (OMPI_SUCCESS != rc) {
 473         goto exit;
 474     }
 475 
 476     OBJ_RELEASE(new_group_pointer);
 477     new_group_pointer = MPI_GROUP_NULL;
 478 
 479     /* allocate comm_cid */
 480     rc = ompi_comm_nextcid ( newcomp,                   /* new communicator */
 481                              comm,                      /* old communicator */
 482                              NULL,                      /* bridge comm */
 483                              &root,                     /* local leader */
 484                              (void*)port_string,        /* rendezvous point */
 485                              send_first,                /* send or recv first */
 486                              OMPI_COMM_CID_INTRA_PMIX); /* mode */
 487     if (OMPI_SUCCESS != rc) {
 488         goto exit;
 489     }
 490 
 491     /* activate comm and init coll-component */
 492     rc = ompi_comm_activate ( &newcomp,                  /* new communicator */
 493                               comm,                      /* old communicator */
 494                               NULL,                      /* bridge comm */
 495                               &root,                     /* local leader */
 496                               (void*)port_string,        /* rendezvous point */
 497                               send_first,                /* send or recv first */
 498                               OMPI_COMM_CID_INTRA_PMIX); /* mode */
 499     if (OMPI_SUCCESS != rc) {
 500         goto exit;
 501     }
 502 
 503     /* Question: do we have to re-start some low level stuff
 504        to enable the usage of fast communication devices
 505        between the two worlds ?
 506     */
 507 
 508  exit:
 509     if (OMPI_SUCCESS != rc) {
 510         if (MPI_COMM_NULL != newcomp && NULL != newcomp) {
 511             OBJ_RELEASE(newcomp);
 512             newcomp = MPI_COMM_NULL;
 513         }
 514     }
 515 
 516     *newcomm = newcomp;
 517     return rc;
 518 }
 519 
 520 static int construct_peers(ompi_group_t *group, opal_list_t *peers)
 521 {
 522     int i;
 523     opal_namelist_t *nm, *n2;
 524     ompi_proc_t *proct;
 525     opal_process_name_t proc_name;
 526 
 527     for (i=0; i < group->grp_proc_count; i++) {
 528         if (OMPI_GROUP_IS_DENSE(group)) {
 529             proct = group->grp_proc_pointers[i];
 530         } else {
 531             proct = ompi_group_peer_lookup(group, i);
 532         }
 533         if (NULL == proct) {
 534             OMPI_ERROR_LOG(OMPI_ERR_NOT_FOUND);
 535             return OMPI_ERR_NOT_FOUND;
 536         }
 537         if (ompi_proc_is_sentinel (proct)) {
 538             proc_name = ompi_proc_sentinel_to_name ((uintptr_t)proct);
 539         } else {
 540             proc_name = proct->super.proc_name;
 541         }
 542 
 543         /* add to the list of peers */
 544         nm = OBJ_NEW(opal_namelist_t);
 545         nm->name = proc_name;
 546         /* need to maintain an ordered list to ensure the tracker signatures
 547          * match across all procs */
 548         OPAL_LIST_FOREACH(n2, peers, opal_namelist_t) {
 549             if (opal_compare_proc(nm->name, n2->name) < 0) {
 550                 opal_list_insert_pos(peers, &n2->super, &nm->super);
 551                 nm = NULL;
 552                 break;
 553             }
 554         }
 555         if (NULL != nm) {
 556             /* append to the end */
 557             opal_list_append(peers, &nm->super);
 558         }
 559     }
 560     return OMPI_SUCCESS;
 561 }
 562 
 563 int ompi_dpm_disconnect(ompi_communicator_t *comm)
 564 {
 565     int ret;
 566     ompi_group_t *group;
 567     opal_list_t coll;
 568 
 569     /* Note that we explicitly use an RTE-based barrier (vs. an MPI
 570        barrier).  See a lengthy comment in
 571        ompi/runtime/ompi_mpi_finalize.c for a much more detailed
 572        rationale. */
 573 
 574     /* setup the collective */
 575     OBJ_CONSTRUCT(&coll, opal_list_t);
 576     /* RHC: assuming for now that this must flow across all
 577      * local and remote group members */
 578     group = comm->c_local_group;
 579     if (OMPI_SUCCESS != (ret = construct_peers(group, &coll))) {
 580         OMPI_ERROR_LOG(ret);
 581         OPAL_LIST_DESTRUCT(&coll);
 582         return ret;
 583     }
 584     /* do the same for the remote group */
 585     group = comm->c_remote_group;
 586     if (OMPI_SUCCESS != (ret = construct_peers(group, &coll))) {
 587         OMPI_ERROR_LOG(ret);
 588         OPAL_LIST_DESTRUCT(&coll);
 589         return ret;
 590     }
 591 
 592     /* ensure we tell the host RM to disconnect us - this
 593      * is a blocking operation so just use a fence */
 594     if (OMPI_SUCCESS != (ret = opal_pmix.fence(&coll, false))) {
 595         OMPI_ERROR_LOG(ret);
 596         OPAL_LIST_DESTRUCT(&coll);
 597         return ret;
 598     }
 599     OPAL_LIST_DESTRUCT(&coll);
 600 
 601     return ret;
 602 }
 603 
 604 int ompi_dpm_spawn(int count, const char *array_of_commands[],
 605                    char **array_of_argv[],
 606                    const int array_of_maxprocs[],
 607                    const MPI_Info array_of_info[],
 608                    const char *port_name)
 609 {
 610     int rc, i, j;
 611     int have_wdir=0;
 612     int flag=0;
 613     char cwd[OPAL_PATH_MAX];
 614     char host[OPAL_MAX_INFO_VAL];  /*** should define OMPI_HOST_MAX ***/
 615     char prefix[OPAL_MAX_INFO_VAL];
 616     char stdin_target[OPAL_MAX_INFO_VAL];
 617     char params[OPAL_MAX_INFO_VAL];
 618     char mapper[OPAL_MAX_INFO_VAL];
 619     char slot_list[OPAL_MAX_INFO_VAL];
 620     uint32_t ui32;
 621     bool personality = false;
 622     opal_jobid_t jobid;
 623 
 624     opal_list_t apps;
 625     opal_list_t job_info;
 626     opal_pmix_app_t *app;
 627     opal_value_t *info;
 628     bool local_spawn, non_mpi;
 629     char **envars;
 630 
 631     /* parse the info object */
 632     /* check potentially for:
 633        - "host": desired host where to spawn the processes
 634        - "hostfile": hostfile containing hosts where procs are
 635        to be spawned
 636        - "add-host": add the specified hosts to the known list
 637        of available resources and spawn these
 638        procs on them
 639        - "add-hostfile": add the hosts in the hostfile to the
 640        known list of available resources and spawn
 641        these procs on them
 642        - "env": a newline-delimited list of envar values to be
 643        placed into the app's environment (of form "foo=bar")
 644        - "ompi_prefix": the path to the root of the directory tree where ompi
 645        executables and libraries can be found on all nodes
 646        used to spawn these procs
 647        - "arch": desired architecture
 648        - "wdir": directory, where executable can be found
 649        - "path": list of directories where to look for the executable
 650        - "file": filename, where additional information is provided.
 651        - "soft": see page 92 of MPI-2.
 652        - "mapper": indicate the mapper to be used for the job
 653        - "display_map": display the map of the spawned job
 654        - "npernode": number of procs/node to spawn
 655        - "pernode": spawn one proc/node
 656        - "ppr": spawn specified number of procs per specified object
 657        - "map_by": specify object by which the procs should be mapped
 658        - "rank_by": specify object by which the procs should be ranked
 659        - "bind_to": specify object to which the procs should be bound
 660        - "ompi_preload_binary": move binaries to nodes prior to execution
 661        - "ompi_preload_files": move specified files to nodes prior to execution
 662        - "ompi_non_mpi": spawned job will not call MPI_Init
 663        - "ompi_param": list of MCA params to be in the spawned job's environment
 664        - "env": newline (\n) delimited list of envar values to be passed to spawned procs
 665     */
 666 
 667     /* setup the job object */
 668     OBJ_CONSTRUCT(&job_info, opal_list_t);
 669     OBJ_CONSTRUCT(&apps, opal_list_t);
 670 
 671     /* Convert the list of commands to list of opal_pmix_app_t */
 672     for (i = 0; i < count; ++i) {
 673         app = OBJ_NEW(opal_pmix_app_t);
 674         if (NULL == app) {
 675             OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
 676             OPAL_LIST_DESTRUCT(&apps);
 677             opal_progress_event_users_decrement();
 678             return OMPI_ERR_OUT_OF_RESOURCE;
 679         }
 680         /* add the app to the job data */
 681         opal_list_append(&apps, &app->super);
 682 
 683         /* copy over the name of the executable */
 684         app->cmd = strdup(array_of_commands[i]);
 685         opal_argv_append_nosize(&app->argv, app->cmd);
 686 
 687         /* record the number of procs to be generated */
 688         app->maxprocs = array_of_maxprocs[i];
 689 
 690         /* copy over the argv array */
 691         if (MPI_ARGVS_NULL != array_of_argv &&
 692             MPI_ARGV_NULL != array_of_argv[i]) {
 693             for (j=0; NULL != array_of_argv[i][j]; j++) {
 694                 opal_argv_append_nosize(&app->argv, array_of_argv[i][j]);
 695             }
 696         }
 697 
 698         /* Add environment variable with the contact information for the
 699            child processes.
 700         */
 701         opal_setenv("OMPI_PARENT_PORT", port_name, true, &app->env);
 702         for (j = 0; NULL != environ[j]; ++j) {
 703             if (0 == strncmp(OPAL_MCA_PREFIX, environ[j], strlen(OPAL_MCA_PREFIX))) {
 704                 opal_argv_append_nosize(&app->env, environ[j]);
 705             }
 706         }
 707 
 708         /* Check for well-known info keys */
 709         have_wdir = 0;
 710         if ( array_of_info != NULL && array_of_info[i] != MPI_INFO_NULL ) {
 711 
 712             /* check for personality - this is a job-level key */
 713             ompi_info_get (array_of_info[i], "personality", sizeof(host) - 1, host, &flag);
 714             if ( flag ) {
 715                 personality = true;
 716                 info = OBJ_NEW(opal_value_t);
 717                 info->key = strdup(OPAL_PMIX_PERSONALITY);
 718                 opal_value_load(info, host, OPAL_STRING);
 719                 opal_list_append(&job_info, &info->super);
 720             }
 721 
 722             /* check for 'host' */
 723             ompi_info_get (array_of_info[i], "host", sizeof(host) - 1, host, &flag);
 724             if ( flag ) {
 725                 info = OBJ_NEW(opal_value_t);
 726                 info->key = strdup(OPAL_PMIX_HOST);
 727                 opal_value_load(info, host, OPAL_STRING);
 728                 opal_list_append(&app->info, &info->super);
 729             }
 730 
 731             /* check for 'hostfile' */
 732             ompi_info_get (array_of_info[i], "hostfile", sizeof(host) - 1, host, &flag);
 733             if ( flag ) {
 734                 info = OBJ_NEW(opal_value_t);
 735                 info->key = strdup(OPAL_PMIX_HOSTFILE);
 736                 opal_value_load(info, host, OPAL_STRING);
 737                 opal_list_append(&app->info, &info->super);
 738             }
 739 
 740             /* check for 'add-hostfile' */
 741             ompi_info_get (array_of_info[i], "add-hostfile", sizeof(host) - 1, host, &flag);
 742             if ( flag ) {
 743                 info = OBJ_NEW(opal_value_t);
 744                 info->key = strdup(OPAL_PMIX_ADD_HOSTFILE);
 745                 opal_value_load(info, host, OPAL_STRING);
 746                 opal_list_append(&app->info, &info->super);
 747             }
 748 
 749             /* check for 'add-host' */
 750             ompi_info_get (array_of_info[i], "add-host", sizeof(host) - 1, host, &flag);
 751             if ( flag ) {
 752                 info = OBJ_NEW(opal_value_t);
 753                 info->key = strdup(OPAL_PMIX_ADD_HOST);
 754                 opal_value_load(info, host, OPAL_STRING);
 755                 opal_list_append(&app->info, &info->super);
 756             }
 757 
 758             /* check for env */
 759             ompi_info_get (array_of_info[i], "env", sizeof(host)-1, host, &flag);
 760             if ( flag ) {
 761                 envars = opal_argv_split(host, '\n');
 762                 for (j=0; NULL != envars[j]; j++) {
 763                     opal_argv_append_nosize(&app->env, envars[j]);
 764                 }
 765                 opal_argv_free(envars);
 766             }
 767 
 768             /* 'path', 'arch', 'file', 'soft'  -- to be implemented */
 769 
 770             /* check for 'ompi_prefix' (OMPI-specific -- to effect the same
 771              * behavior as --prefix option to orterun)
 772              *
 773              * This is a job-level key
 774              */
 775             ompi_info_get (array_of_info[i], "ompi_prefix", sizeof(prefix) - 1, prefix, &flag);
 776             if ( flag ) {
 777                 info = OBJ_NEW(opal_value_t);
 778                 info->key = strdup(OPAL_PMIX_PREFIX);
 779                 opal_value_load(info, prefix, OPAL_STRING);
 780                 opal_list_append(&job_info, &info->super);
 781             }
 782 
 783             /* check for 'wdir' */
 784             ompi_info_get (array_of_info[i], "wdir", sizeof(cwd) - 1, cwd, &flag);
 785             if ( flag ) {
 786                 info = OBJ_NEW(opal_value_t);
 787                 info->key = strdup(OPAL_PMIX_WDIR);
 788                 opal_value_load(info, cwd, OPAL_STRING);
 789                 opal_list_append(&app->info, &info->super);
 790                 have_wdir = 1;
 791             }
 792 
 793             /* check for 'mapper' - a job-level key */
 794             ompi_info_get(array_of_info[i], "mapper", sizeof(mapper) - 1, mapper, &flag);
 795             if ( flag ) {
 796                 info = OBJ_NEW(opal_value_t);
 797                 info->key = strdup(OPAL_PMIX_MAPPER);
 798                 opal_value_load(info, mapper, OPAL_STRING);
 799                 opal_list_append(&job_info, &info->super);
 800             }
 801 
 802             /* check for 'display_map' - a job-level key */
 803             ompi_info_get_bool(array_of_info[i], "display_map", &local_spawn, &flag);
 804             if ( flag ) {
 805                 info = OBJ_NEW(opal_value_t);
 806                 info->key = strdup(OPAL_PMIX_DISPLAY_MAP);
 807                 opal_value_load(info, &local_spawn, OPAL_BOOL);
 808                 opal_list_append(&job_info, &info->super);
 809             }
 810 
 811             /* check for 'npernode' and 'ppr' - job-level key */
 812             ompi_info_get (array_of_info[i], "npernode", sizeof(slot_list) - 1, slot_list, &flag);
 813             if ( flag ) {
 814                 info = OBJ_NEW(opal_value_t);
 815                 info->key = strdup(OPAL_PMIX_PPR);
 816                 info->type = OPAL_STRING;
 817                 (void)opal_asprintf(&(info->data.string), "%s:n", slot_list);
 818                 opal_list_append(&job_info, &info->super);
 819             }
 820             ompi_info_get (array_of_info[i], "pernode", sizeof(slot_list) - 1, slot_list, &flag);
 821             if ( flag ) {
 822                 info = OBJ_NEW(opal_value_t);
 823                 info->key = strdup(OPAL_PMIX_PPR);
 824                 opal_value_load(info, "1:n", OPAL_STRING);
 825                 opal_list_append(&job_info, &info->super);
 826             }
 827             ompi_info_get (array_of_info[i], "ppr", sizeof(slot_list) - 1, slot_list, &flag);
 828             if ( flag ) {
 829                 info = OBJ_NEW(opal_value_t);
 830                 info->key = strdup(OPAL_PMIX_PPR);
 831                 opal_value_load(info, slot_list, OPAL_STRING);
 832                 opal_list_append(&job_info, &info->super);
 833             }
 834 
 835             /* check for 'map_by' - job-level key */
 836             ompi_info_get(array_of_info[i], "map_by", sizeof(slot_list) - 1, slot_list, &flag);
 837             if ( flag ) {
 838                 info = OBJ_NEW(opal_value_t);
 839                 info->key = strdup(OPAL_PMIX_MAPBY);
 840                 opal_value_load(info, slot_list, OPAL_STRING);
 841                 opal_list_append(&job_info, &info->super);
 842             }
 843 
 844             /* check for 'rank_by' - job-level key */
 845             ompi_info_get(array_of_info[i], "rank_by", sizeof(slot_list) - 1, slot_list, &flag);
 846             if ( flag ) {
 847                 info = OBJ_NEW(opal_value_t);
 848                 info->key = strdup(OPAL_PMIX_RANKBY);
 849                 opal_value_load(info, slot_list, OPAL_STRING);
 850                 opal_list_append(&job_info, &info->super);
 851             }
 852 
 853             /* check for 'bind_to' - job-level key */
 854             ompi_info_get(array_of_info[i], "bind_to", sizeof(slot_list) - 1, slot_list, &flag);
 855             if ( flag ) {
 856                 info = OBJ_NEW(opal_value_t);
 857                 info->key = strdup(OPAL_PMIX_BINDTO);
 858                 opal_value_load(info, slot_list, OPAL_STRING);
 859                 opal_list_append(&job_info, &info->super);
 860             }
 861 
 862             /* check for 'preload_binary' - job-level key */
 863             ompi_info_get_bool(array_of_info[i], "ompi_preload_binary", &local_spawn, &flag);
 864             if ( flag ) {
 865                 info = OBJ_NEW(opal_value_t);
 866                 info->key = strdup(OPAL_PMIX_PRELOAD_BIN);
 867                 opal_value_load(info, &local_spawn, OPAL_BOOL);
 868                 opal_list_append(&job_info, &info->super);
 869             }
 870 
 871             /* check for 'preload_files' - job-level key */
 872             ompi_info_get (array_of_info[i], "ompi_preload_files", sizeof(cwd) - 1, cwd, &flag);
 873             if ( flag ) {
 874                 info = OBJ_NEW(opal_value_t);
 875                 info->key = strdup(OPAL_PMIX_PRELOAD_FILES);
 876                 opal_value_load(info, cwd, OPAL_STRING);
 877                 opal_list_append(&job_info, &info->super);
 878             }
 879 
 880             /* see if this is a non-mpi job - if so, then set the flag so ORTE
 881              * knows what to do - job-level key
 882              */
 883             ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi, &flag);
 884             if (flag && non_mpi) {
 885                 info = OBJ_NEW(opal_value_t);
 886                 info->key = strdup(OPAL_PMIX_NON_PMI);
 887                 opal_value_load(info, &non_mpi, OPAL_BOOL);
 888                 opal_list_append(&job_info, &info->super);
 889             }
 890 
 891             /* see if this is an MCA param that the user wants applied to the child job */
 892             ompi_info_get (array_of_info[i], "ompi_param", sizeof(params) - 1, params, &flag);
 893             if ( flag ) {
 894                 opal_argv_append_unique_nosize(&app->env, params, true);
 895             }
 896 
 897             /* see if user specified what to do with stdin - defaults to
 898              * not forwarding stdin to child processes - job-level key
 899              */
 900             ompi_info_get (array_of_info[i], "ompi_stdin_target", sizeof(stdin_target) - 1, stdin_target, &flag);
 901             if ( flag ) {
 902                 if (0 == strcmp(stdin_target, "all")) {
 903                     ui32 = OPAL_VPID_WILDCARD;
 904                 } else if (0 == strcmp(stdin_target, "none")) {
 905                     ui32 = OPAL_VPID_INVALID;
 906                 } else {
 907                     ui32 = strtoul(stdin_target, NULL, 10);
 908                 }
 909                 info = OBJ_NEW(opal_value_t);
 910                 info->key = strdup(OPAL_PMIX_STDIN_TGT);
 911                 opal_value_load(info, &ui32, OPAL_UINT32);
 912                 opal_list_append(&job_info, &info->super);
 913             }
 914         }
 915 
 916         /* default value: If the user did not tell us where to look for the
 917          * executable, we assume the current working directory
 918          */
 919         if ( !have_wdir ) {
 920             if (OMPI_SUCCESS != (rc = opal_getcwd(cwd, OPAL_PATH_MAX))) {
 921                 OMPI_ERROR_LOG(rc);
 922                 OPAL_LIST_DESTRUCT(&apps);
 923                 opal_progress_event_users_decrement();
 924                 return rc;
 925             }
 926             info = OBJ_NEW(opal_value_t);
 927             info->key = strdup(OPAL_PMIX_WDIR);
 928             opal_value_load(info, cwd, OPAL_STRING);
 929             opal_list_append(&app->info, &info->super);
 930         }
 931 
 932         /* leave the map info alone - the launcher will
 933          * decide where to put things
 934          */
 935     } /* for (i = 0 ; i < count ; ++i) */
 936 
 937     /* default the personality - job-level key */
 938     if (!personality) {
 939         info = OBJ_NEW(opal_value_t);
 940         info->key = strdup(OPAL_PMIX_PERSONALITY);
 941         opal_value_load(info, "ompi", OPAL_STRING);
 942         opal_list_append(&job_info, &info->super);
 943     }
 944 
 945     /* spawn procs */
 946     rc = opal_pmix.spawn(&job_info, &apps, &jobid);
 947     OPAL_LIST_DESTRUCT(&job_info);
 948     OPAL_LIST_DESTRUCT(&apps);
 949 
 950     if (OPAL_SUCCESS != rc) {
 951         opal_progress_event_users_decrement();
 952         return MPI_ERR_SPAWN;
 953     }
 954 
 955     return OMPI_SUCCESS;
 956 }
 957 
 958 /* Create a rendezvous tag consisting of our name + a random number */
 959 int ompi_dpm_open_port(char *port_name)
 960 {
 961     uint32_t r;
 962     char *tmp;
 963 
 964     r = opal_rand(&rnd);
 965     opal_convert_process_name_to_string(&tmp, OMPI_PROC_MY_NAME);
 966     snprintf(port_name, MPI_MAX_PORT_NAME-1, "%s:%u", tmp, r);
 967     port_name[MPI_MAX_PORT_NAME - 1] = '\0';
 968     free(tmp);
 969     return OMPI_SUCCESS;
 970 }
 971 
 972 int ompi_dpm_close_port(const char *port_name)
 973 {
 974     /* nothing to do here - user is responsible for the memory */
 975     return OMPI_SUCCESS;
 976 }
 977 
 978 int ompi_dpm_dyn_init(void)
 979 {
 980     int root=0, rc;
 981     bool send_first = true;
 982     ompi_communicator_t *newcomm=NULL;
 983     char *port_name=NULL, *tmp, *ptr;
 984 
 985     /* check for appropriate env variable */
 986     tmp = getenv("OMPI_PARENT_PORT");
 987     if (NULL == tmp) {
 988         /* nothing to do */
 989         return OMPI_SUCCESS;
 990     }
 991 
 992     /* the value passed to us may have quote marks around it to protect
 993      * the value if passed on the command line. We must remove those
 994     * to have a correct string
 995      */
 996      if ('"' == tmp[0]) {
 997         /* if the first char is a quote, then so will the last one be */
 998         tmp[strlen(tmp)-1] = '\0';
 999         ptr = &tmp[1];
1000     } else {
1001         ptr = &tmp[0];
1002     }
1003     port_name = strdup(ptr);
1004 
1005     rc = ompi_dpm_connect_accept(MPI_COMM_WORLD, root, port_name, send_first, &newcomm);
1006     free(port_name);
1007     if (OMPI_SUCCESS != rc) {
1008         return rc;
1009     }
1010 
1011     /* originally, we set comm_parent to comm_null (in comm_init),
1012      * now we have to decrease the reference counters to the according
1013      * objects
1014      */
1015     OBJ_RELEASE(ompi_mpi_comm_parent->c_local_group);
1016     OBJ_RELEASE(ompi_mpi_comm_parent->error_handler);
1017     OBJ_RELEASE(ompi_mpi_comm_parent);
1018 
1019     /* Set the parent communicator */
1020     ompi_mpi_comm_parent = newcomm;
1021 
1022     /* Set name for debugging purposes */
1023     snprintf(newcomm->c_name, MPI_MAX_OBJECT_NAME, "MPI_COMM_PARENT");
1024     newcomm->c_flags |= OMPI_COMM_NAMEISSET;
1025 
1026     return OMPI_SUCCESS;
1027 }
1028 
1029 
1030 /*
1031  * finalize the module
1032  */
1033 int ompi_dpm_finalize(void)
1034 {
1035     return OMPI_SUCCESS;
1036 }
1037 
1038 
1039 /**********************************************************************/
1040 /**********************************************************************/
1041 /**********************************************************************/
1042 /* this routine runs through the list of communicators
1043    and does the disconnect for all dynamic communicators */
1044 int ompi_dpm_dyn_finalize(void)
1045 {
1046     int i,j=0, max=0;
1047     ompi_dpm_disconnect_obj **objs=NULL;
1048     ompi_communicator_t *comm=NULL;
1049 
1050     if (1 <ompi_comm_num_dyncomm) {
1051         objs = (ompi_dpm_disconnect_obj**)malloc(ompi_comm_num_dyncomm*
1052                                sizeof(ompi_dpm_disconnect_obj*));
1053         if (NULL == objs) {
1054             return OMPI_ERR_OUT_OF_RESOURCE;
1055         }
1056 
1057         max = opal_pointer_array_get_size(&ompi_mpi_communicators);
1058         for (i=3; i<max; i++) {
1059             comm = (ompi_communicator_t*)opal_pointer_array_get_item(&ompi_mpi_communicators,i);
1060             if (NULL != comm &&  OMPI_COMM_IS_DYNAMIC(comm)) {
1061                 objs[j++] = disconnect_init(comm);
1062             }
1063         }
1064 
1065         if (j != ompi_comm_num_dyncomm+1) {
1066             free(objs);
1067             return OMPI_ERROR;
1068         }
1069 
1070         disconnect_waitall(ompi_comm_num_dyncomm, objs);
1071         free(objs);
1072     }
1073 
1074     return OMPI_SUCCESS;
1075 }
1076 
1077 /* the next two routines implement a kind of non-blocking barrier.
1078 the only difference is, that you can wait for the completion
1079 of more than one initiated ibarrier. This is required for waiting
1080 for all still connected processes in MPI_Finalize.
1081 
1082 disconnect_init returns a handle, which has to be passed in
1083 to disconnect_waitall. The second routine blocks, until
1084 all non-blocking barriers described by the handles are finished.
1085 The communicators can than be released.
1086 */
1087 /**********************************************************************/
1088 /**********************************************************************/
1089 /**********************************************************************/
1090 
1091 static ompi_dpm_disconnect_obj *disconnect_init(ompi_communicator_t *comm)
1092 {
1093     ompi_dpm_disconnect_obj *obj=NULL;
1094     int ret;
1095     int i;
1096 
1097     obj = (ompi_dpm_disconnect_obj*)calloc(1,sizeof(ompi_dpm_disconnect_obj));
1098     if (NULL == obj) {
1099         opal_output(0, "Could not allocate disconnect object");
1100         return NULL;
1101     }
1102 
1103     if (OMPI_COMM_IS_INTER(comm)) {
1104         obj->size = ompi_comm_remote_size(comm);
1105     } else {
1106         obj->size = ompi_comm_size(comm);
1107     }
1108 
1109     obj->comm = comm;
1110     obj->reqs = (ompi_request_t**)malloc(2*obj->size*sizeof(ompi_request_t *));
1111     if (NULL == obj->reqs) {
1112         opal_output(0, "Could not allocate request array for disconnect object");
1113         free(obj);
1114         return NULL;
1115     }
1116 
1117     /* initiate all isend_irecvs. We use a dummy buffer stored on
1118        the object, since we are sending zero size messages anyway. */
1119     for (i=0; i < obj->size; i++) {
1120         ret = MCA_PML_CALL(irecv(&(obj->buf), 0, MPI_INT, i,
1121                                  OMPI_COMM_BARRIER_TAG, comm,
1122                                  &(obj->reqs[2*i])));
1123 
1124         if (OMPI_SUCCESS != ret) {
1125             opal_output(0, "dpm_disconnect_init: error %d in irecv to process %d", ret, i);
1126             free(obj->reqs);
1127             free(obj);
1128             return NULL;
1129         }
1130         ret = MCA_PML_CALL(isend(&(obj->buf), 0, MPI_INT, i,
1131                                  OMPI_COMM_BARRIER_TAG,
1132                                  MCA_PML_BASE_SEND_SYNCHRONOUS,
1133                                  comm, &(obj->reqs[2*i+1])));
1134 
1135         if (OMPI_SUCCESS != ret) {
1136             opal_output(0, "dpm_disconnect_init: error %d in isend to process %d", ret, i);
1137             free(obj->reqs);
1138             free(obj);
1139             return NULL;
1140         }
1141     }
1142 
1143     /* return handle */
1144     return obj;
1145 }
1146 /**********************************************************************/
1147 /**********************************************************************/
1148 /**********************************************************************/
1149 /* - count how many requests are active
1150  * - generate a request array large enough to hold
1151      all active requests
1152  * - call waitall on the overall request array
1153  * - free the objects
1154  */
1155 static int disconnect_waitall (int count, ompi_dpm_disconnect_obj **objs)
1156 {
1157 
1158     ompi_request_t **reqs=NULL;
1159     char *treq=NULL;
1160     int totalcount = 0;
1161     int i;
1162     int ret;
1163 
1164     for (i=0; i<count; i++) {
1165         if (NULL == objs[i]) {
1166             opal_output(0, "Error in comm_disconnect_waitall");
1167             return OMPI_ERROR;
1168         }
1169 
1170         totalcount += objs[i]->size;
1171     }
1172 
1173     reqs = (ompi_request_t**)malloc(2*totalcount*sizeof(ompi_request_t *));
1174     if (NULL == reqs) {
1175         opal_output(0, "ompi_comm_disconnect_waitall: error allocating memory");
1176         return OMPI_ERROR;
1177     }
1178 
1179     /* generate a single, large array of pending requests */
1180     treq = (char *)reqs;
1181     for (i=0; i<count; i++) {
1182         memcpy(treq, objs[i]->reqs, 2*objs[i]->size * sizeof(ompi_request_t *));
1183         treq += 2*objs[i]->size * sizeof(ompi_request_t *);
1184     }
1185 
1186     /* force all non-blocking all-to-alls to finish */
1187     ret = ompi_request_wait_all(2*totalcount, reqs, MPI_STATUSES_IGNORE);
1188 
1189     /* Finally, free everything */
1190     for (i=0; i< count; i++ ) {
1191         if (NULL != objs[i]->reqs ) {
1192             free(objs[i]->reqs );
1193         }
1194         free(objs[i]);
1195     }
1196 
1197     free(reqs);
1198 
1199     return ret;
1200 }
1201 
1202 /**********************************************************************/
1203 /**********************************************************************/
1204 /**********************************************************************/
1205 static bool ompi_dpm_group_is_dyn (ompi_group_t *group, ompi_jobid_t thisjobid)
1206 {
1207     int size = group ? ompi_group_size (group) : 0;
1208 
1209     for (int i = 0 ; i < size ; ++i) {
1210         opal_process_name_t name = ompi_group_get_proc_name (group, i);
1211 
1212         if (thisjobid != ((ompi_process_name_t *) &name)->jobid) {
1213             /* at least one is different */
1214             return true;
1215         }
1216     }
1217 
1218     return false;
1219 }
1220 
1221 /* All we want to do in this function is determine if the number of
1222  * jobids in the local and/or remote group is > 1. This tells us to
1223  * set the disconnect flag. We don't actually care what the true
1224  * number -is-, only that it is > 1
1225  */
1226 void ompi_dpm_mark_dyncomm(ompi_communicator_t *comm)
1227 {
1228     bool found;
1229     ompi_jobid_t thisjobid;
1230 
1231     /* special case for MPI_COMM_NULL */
1232     if (comm == MPI_COMM_NULL) {
1233         return;
1234     }
1235 
1236     thisjobid = ompi_group_get_proc_name (comm->c_local_group, 0).jobid;
1237 
1238     /* loop over all processes in local group and check for
1239      * a different jobid
1240      */
1241     found = ompi_dpm_group_is_dyn (comm->c_local_group, thisjobid);
1242     if (!found) {
1243         /* if inter-comm, loop over all processes in remote_group
1244          * and see if any are different from thisjobid
1245          */
1246         found = ompi_dpm_group_is_dyn (comm->c_remote_group, thisjobid);
1247     }
1248 
1249     /* if a different jobid was found, set the disconnect flag*/
1250     if (found) {
1251         ompi_comm_num_dyncomm++;
1252         OMPI_COMM_SET_DYNAMIC(comm);
1253     }
1254 }

/* [<][>][^][v][top][bottom][index][help] */