root/ompi/proc/proc.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_proc_construct
  2. ompi_proc_destruct
  3. ompi_proc_allocate
  4. ompi_proc_complete_init_single
  5. ompi_proc_lookup
  6. ompi_proc_for_name_nolock
  7. ompi_proc_for_name
  8. ompi_proc_init
  9. ompi_proc_compare_vid
  10. ompi_proc_complete_init
  11. ompi_proc_finalize
  12. ompi_proc_world_size
  13. ompi_proc_get_allocated
  14. ompi_proc_world
  15. ompi_proc_all
  16. ompi_proc_self
  17. ompi_proc_find
  18. ompi_proc_refresh
  19. ompi_proc_pack
  20. ompi_proc_find_and_add
  21. ompi_proc_unpack

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2011 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2006 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2006 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2006-2015 Cisco Systems, Inc.  All rights reserved.
  14  * Copyright (c) 2012-2015 Los Alamos National Security, LLC.  All rights
  15  *                         reserved.
  16  * Copyright (c) 2013-2015 Intel, Inc. All rights reserved
  17  * Copyright (c) 2014-2017 Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * Copyright (c) 2015-2017 Mellanox Technologies. All rights reserved.
  20  *
  21  * $COPYRIGHT$
  22  *
  23  * Additional copyrights may follow
  24  *
  25  * $HEADER$
  26  */
  27 
  28 #include "ompi_config.h"
  29 
  30 #include <string.h>
  31 #include <strings.h>
  32 
  33 #include "ompi/constants.h"
  34 #include "opal/datatype/opal_convertor.h"
  35 #include "opal/threads/mutex.h"
  36 #include "opal/dss/dss.h"
  37 #include "opal/util/arch.h"
  38 #include "opal/util/show_help.h"
  39 #include "opal/mca/hwloc/base/base.h"
  40 #include "opal/mca/pmix/pmix.h"
  41 #include "opal/util/argv.h"
  42 
  43 #include "ompi/proc/proc.h"
  44 #include "ompi/datatype/ompi_datatype.h"
  45 #include "ompi/runtime/mpiruntime.h"
  46 #include "ompi/runtime/params.h"
  47 #include "ompi/mca/pml/pml.h"
  48 
  49 opal_list_t  ompi_proc_list = {{0}};
  50 static opal_mutex_t ompi_proc_lock;
  51 static opal_hash_table_t ompi_proc_hash;
  52 
  53 ompi_proc_t* ompi_proc_local_proc = NULL;
  54 
  55 static void ompi_proc_construct(ompi_proc_t* proc);
  56 static void ompi_proc_destruct(ompi_proc_t* proc);
  57 static ompi_proc_t *ompi_proc_for_name_nolock (const opal_process_name_t proc_name);
  58 
  59 OBJ_CLASS_INSTANCE(
  60     ompi_proc_t,
  61     opal_proc_t,
  62     ompi_proc_construct,
  63     ompi_proc_destruct
  64 );
  65 
  66 
  67 void ompi_proc_construct(ompi_proc_t* proc)
  68 {
  69     bzero(proc->proc_endpoints, sizeof(proc->proc_endpoints));
  70 
  71     /* By default all processors are supposedly having the same architecture as me. Thus,
  72      * by default we run in a homogeneous environment. Later, when the RTE can tell us
  73      * the arch of the remote nodes, we will have to set the convertors to the correct
  74      * architecture.
  75      */
  76     OBJ_RETAIN( ompi_mpi_local_convertor );
  77     proc->super.proc_convertor = ompi_mpi_local_convertor;
  78 }
  79 
  80 
  81 void ompi_proc_destruct(ompi_proc_t* proc)
  82 {
  83     /* As all the convertors are created with OBJ_NEW we can just call OBJ_RELEASE. All, except
  84      * the local convertor, will get destroyed at some point here. If the reference count is correct
  85      * the local convertor (who has the reference count increased in the datatype) will not get
  86      * destroyed here. It will be destroyed later when the ompi_datatype_finalize is called.
  87      */
  88     OBJ_RELEASE( proc->super.proc_convertor );
  89     if (NULL != proc->super.proc_hostname) {
  90         free(proc->super.proc_hostname);
  91     }
  92     opal_mutex_lock (&ompi_proc_lock);
  93     opal_list_remove_item(&ompi_proc_list, (opal_list_item_t*)proc);
  94     opal_hash_table_remove_value_ptr (&ompi_proc_hash, &proc->super.proc_name, sizeof (proc->super.proc_name));
  95     opal_mutex_unlock (&ompi_proc_lock);
  96 }
  97 
  98 /**
  99  * Allocate a new ompi_proc_T for the given jobid/vpid
 100  *
 101  * @param[in]  jobid Job identifier
 102  * @param[in]  vpid  Process identifier
 103  * @param[out] procp New ompi_proc_t structure
 104  *
 105  * This function allocates a new ompi_proc_t and inserts it into
 106  * the process list and hash table.
 107  */
 108 static int ompi_proc_allocate (ompi_jobid_t jobid, ompi_vpid_t vpid, ompi_proc_t **procp) {
 109     ompi_proc_t *proc = OBJ_NEW(ompi_proc_t);
 110 
 111     opal_list_append(&ompi_proc_list, (opal_list_item_t*)proc);
 112 
 113     OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = jobid;
 114     OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid = vpid;
 115 
 116     opal_hash_table_set_value_ptr (&ompi_proc_hash, &proc->super.proc_name, sizeof (proc->super.proc_name),
 117                                    proc);
 118 
 119     /* by default we consider process to be remote */
 120     proc->super.proc_flags = OPAL_PROC_NON_LOCAL;
 121     *procp = proc;
 122 
 123     return OMPI_SUCCESS;
 124 }
 125 
 126 /**
 127  * Finish setting up an ompi_proc_t
 128  *
 129  * @param[in] proc ompi process structure
 130  *
 131  * This function contains the core code of ompi_proc_complete_init() and
 132  * ompi_proc_refresh(). The tasks performed by this function include
 133  * retrieving the hostname (if below the modex cutoff), determining the
 134  * remote architecture, and calculating the locality of the process.
 135  */
 136 int ompi_proc_complete_init_single (ompi_proc_t *proc)
 137 {
 138     int ret;
 139 
 140     if ((OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid == OMPI_PROC_MY_NAME->jobid) &&
 141         (OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid  == OMPI_PROC_MY_NAME->vpid)) {
 142         /* nothing else to do */
 143         return OMPI_SUCCESS;
 144     }
 145 
 146     /* we can retrieve the hostname at no cost because it
 147      * was provided at startup - but make it optional so
 148      * we don't chase after it if some system doesn't
 149      * provide it */
 150     proc->super.proc_hostname = NULL;
 151     OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_HOSTNAME, &proc->super.proc_name,
 152                                    (char**)&(proc->super.proc_hostname), OPAL_STRING);
 153 
 154 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 155     /* get the remote architecture - this might force a modex except
 156      * for those environments where the RM provides it */
 157     {
 158         uint32_t *ui32ptr;
 159         ui32ptr = &(proc->super.proc_arch);
 160         OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_ARCH, &proc->super.proc_name,
 161                               (void**)&ui32ptr, OPAL_UINT32);
 162         if (OPAL_SUCCESS == ret) {
 163             /* if arch is different than mine, create a new convertor for this proc */
 164             if (proc->super.proc_arch != opal_local_arch) {
 165                 OBJ_RELEASE(proc->super.proc_convertor);
 166                 proc->super.proc_convertor = opal_convertor_create(proc->super.proc_arch, 0);
 167             }
 168         } else if (OMPI_ERR_NOT_IMPLEMENTED == ret) {
 169             proc->super.proc_arch = opal_local_arch;
 170         } else {
 171             return ret;
 172         }
 173     }
 174 #else
 175     /* must be same arch as my own */
 176     proc->super.proc_arch = opal_local_arch;
 177 #endif
 178 
 179     return OMPI_SUCCESS;
 180 }
 181 
 182 opal_proc_t *ompi_proc_lookup (const opal_process_name_t proc_name)
 183 {
 184     ompi_proc_t *proc = NULL;
 185     int ret;
 186 
 187     /* try to lookup the value in the hash table */
 188     ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
 189 
 190     if (OPAL_SUCCESS == ret) {
 191         return &proc->super;
 192     }
 193 
 194     return NULL;
 195 }
 196 
 197 static ompi_proc_t *ompi_proc_for_name_nolock (const opal_process_name_t proc_name)
 198 {
 199     ompi_proc_t *proc = NULL;
 200     int ret;
 201 
 202     /* double-check that another competing thread has not added this proc */
 203     ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
 204     if (OPAL_SUCCESS == ret) {
 205         goto exit;
 206     }
 207 
 208     /* allocate a new ompi_proc_t object for the process and insert it into the process table */
 209     ret = ompi_proc_allocate (proc_name.jobid, proc_name.vpid, &proc);
 210     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 211         /* allocation fail */
 212         goto exit;
 213     }
 214 
 215     /* finish filling in the important proc data fields */
 216     ret = ompi_proc_complete_init_single (proc);
 217     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 218         goto exit;
 219     }
 220 exit:
 221     return proc;
 222 }
 223 
 224 opal_proc_t *ompi_proc_for_name (const opal_process_name_t proc_name)
 225 {
 226     ompi_proc_t *proc = NULL;
 227     int ret;
 228 
 229     /* try to lookup the value in the hash table */
 230     ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
 231     if (OPAL_SUCCESS == ret) {
 232         return &proc->super;
 233     }
 234 
 235     opal_mutex_lock (&ompi_proc_lock);
 236     proc = ompi_proc_for_name_nolock (proc_name);
 237     opal_mutex_unlock (&ompi_proc_lock);
 238 
 239     return (opal_proc_t *) proc;
 240 }
 241 
 242 int ompi_proc_init(void)
 243 {
 244     int opal_proc_hash_init_size = (ompi_process_info.num_procs < ompi_add_procs_cutoff) ? ompi_process_info.num_procs :
 245         1024;
 246     ompi_proc_t *proc;
 247     int ret;
 248 
 249     OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t);
 250     OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t);
 251     OBJ_CONSTRUCT(&ompi_proc_hash, opal_hash_table_t);
 252 
 253     ret = opal_hash_table_init (&ompi_proc_hash, opal_proc_hash_init_size);
 254     if (OPAL_SUCCESS != ret) {
 255         return ret;
 256     }
 257 
 258     /* create a proc for the local process */
 259     ret = ompi_proc_allocate (OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, &proc);
 260     if (OMPI_SUCCESS != ret) {
 261         return OMPI_ERR_OUT_OF_RESOURCE;
 262     }
 263 
 264     /* set local process data */
 265     ompi_proc_local_proc = proc;
 266     proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
 267     proc->super.proc_hostname = strdup(ompi_process_info.nodename);
 268     proc->super.proc_arch = opal_local_arch;
 269     /* Register the local proc with OPAL */
 270     opal_proc_local_set(&proc->super);
 271 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 272     /* add our arch to the modex */
 273     OPAL_MODEX_SEND_VALUE(ret, OPAL_PMIX_GLOBAL,
 274                           OPAL_PMIX_ARCH, &opal_local_arch, OPAL_UINT32);
 275     if (OPAL_SUCCESS != ret) {
 276         return ret;
 277     }
 278 #endif
 279 
 280     return OMPI_SUCCESS;
 281 }
 282 
 283 static int ompi_proc_compare_vid (opal_list_item_t **a, opal_list_item_t **b)
 284 {
 285     ompi_proc_t *proca = (ompi_proc_t *) *a;
 286     ompi_proc_t *procb = (ompi_proc_t *) *b;
 287 
 288     if (proca->super.proc_name.vpid > procb->super.proc_name.vpid) {
 289         return 1;
 290     } else {
 291         return -1;
 292     }
 293 
 294     /* they should never be equal */
 295 }
 296 
 297 /**
 298  * The process creation is split into two steps. The second step
 299  * is the important one, it sets the properties of the remote
 300  * process, such as architecture, node name and locality flags.
 301  *
 302  * This function is to be called __only__ after the modex exchange
 303  * has been performed, in order to allow the modex to carry the data
 304  * instead of requiring the runtime to provide it.
 305  */
 306 int ompi_proc_complete_init(void)
 307 {
 308     opal_process_name_t wildcard_rank;
 309     ompi_proc_t *proc;
 310     int ret, errcode = OMPI_SUCCESS;
 311     char *val;
 312 
 313     opal_mutex_lock (&ompi_proc_lock);
 314 
 315     /* Add all local peers first */
 316     wildcard_rank.jobid = OMPI_PROC_MY_NAME->jobid;
 317     wildcard_rank.vpid = OMPI_NAME_WILDCARD->vpid;
 318     /* retrieve the local peers */
 319     OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCAL_PEERS,
 320                           &wildcard_rank, &val, OPAL_STRING);
 321     if (OPAL_SUCCESS == ret && NULL != val) {
 322         char **peers = opal_argv_split(val, ',');
 323         int i;
 324         free(val);
 325         for (i=0; NULL != peers[i]; i++) {
 326             ompi_vpid_t local_rank = strtoul(peers[i], NULL, 10);
 327             uint16_t u16, *u16ptr = &u16;
 328             if (OMPI_PROC_MY_NAME->vpid == local_rank) {
 329                 continue;
 330             }
 331             ret = ompi_proc_allocate (OMPI_PROC_MY_NAME->jobid, local_rank, &proc);
 332             if (OMPI_SUCCESS != ret) {
 333                 return ret;
 334             }
 335             /* get the locality information - all RTEs are required
 336              * to provide this information at startup */
 337             OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_LOCALITY, &proc->super.proc_name, &u16ptr, OPAL_UINT16);
 338             if (OPAL_SUCCESS == ret) {
 339                 proc->super.proc_flags = u16;
 340             }
 341         }
 342         opal_argv_free(peers);
 343     }
 344 
 345     /* Complete initialization of node-local procs */
 346     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
 347         ret = ompi_proc_complete_init_single (proc);
 348         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 349             errcode = ret;
 350             break;
 351         }
 352     }
 353 
 354     /* if cutoff is larger than # of procs - add all processes
 355      * NOTE that local procs will be automatically skipped as they
 356      * are already in the hash table
 357      */
 358     if (ompi_process_info.num_procs < ompi_add_procs_cutoff) {
 359         /* sinse ompi_proc_for_name is locking internally -
 360          * we need to release lock here
 361          */
 362         opal_mutex_unlock (&ompi_proc_lock);
 363 
 364         for (ompi_vpid_t i = 0 ; i < ompi_process_info.num_procs ; ++i ) {
 365             opal_process_name_t proc_name;
 366             proc_name.jobid = OMPI_PROC_MY_NAME->jobid;
 367             proc_name.vpid = i;
 368             (void) ompi_proc_for_name (proc_name);
 369         }
 370 
 371         /* acquire lock back for the next step - sort */
 372         opal_mutex_lock (&ompi_proc_lock);
 373     }
 374 
 375     opal_list_sort (&ompi_proc_list, ompi_proc_compare_vid);
 376 
 377     opal_mutex_unlock (&ompi_proc_lock);
 378 
 379     return errcode;
 380 }
 381 
 382 int ompi_proc_finalize (void)
 383 {
 384     ompi_proc_t *proc;
 385 
 386     /* Unregister the local proc from OPAL */
 387     opal_proc_local_set(NULL);
 388 
 389     /* remove all items from list and destroy them. Since we cannot know
 390      * the reference count of the procs for certain, it is possible that
 391      * a single OBJ_RELEASE won't drive the count to zero, and hence will
 392      * not release the memory. Accordingly, we cycle through the list here,
 393      * calling release on each item.
 394      *
 395      * This will cycle until it forces the reference count of each item
 396      * to zero, thus causing the destructor to run - which will remove
 397      * the item from the list!
 398      *
 399      * We cannot do this under the thread lock as the destructor will
 400      * call it when removing the item from the list. However, this function
 401      * is ONLY called from MPI_Finalize, and all threads are prohibited from
 402      * calling an MPI function once ANY thread has called MPI_Finalize. Of
 403      * course, multiple threads are allowed to call MPI_Finalize, so this
 404      * function may get called multiple times by various threads. We believe
 405      * it is thread safe to do so...though it may not -appear- to be so
 406      * without walking through the entire list/destructor sequence.
 407      */
 408     while ((ompi_proc_t *)opal_list_get_end(&ompi_proc_list) != (proc = (ompi_proc_t *)opal_list_get_first(&ompi_proc_list))) {
 409         OBJ_RELEASE(proc);
 410     }
 411     /* now destruct the list and thread lock */
 412     OBJ_DESTRUCT(&ompi_proc_list);
 413     OBJ_DESTRUCT(&ompi_proc_lock);
 414     OBJ_DESTRUCT(&ompi_proc_hash);
 415 
 416     return OMPI_SUCCESS;
 417 }
 418 
 419 int ompi_proc_world_size (void)
 420 {
 421     return ompi_process_info.num_procs;
 422 }
 423 
 424 ompi_proc_t **ompi_proc_get_allocated (size_t *size)
 425 {
 426     ompi_proc_t **procs;
 427     ompi_proc_t *proc;
 428     size_t count = 0;
 429     ompi_rte_cmp_bitmask_t mask;
 430     ompi_process_name_t my_name;
 431 
 432     /* check bozo case */
 433     if (NULL == ompi_proc_local_proc) {
 434         return NULL;
 435     }
 436     mask = OMPI_RTE_CMP_JOBID;
 437     my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);
 438 
 439     /* First count how many match this jobid */
 440     opal_mutex_lock (&ompi_proc_lock);
 441     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
 442         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), &my_name)) {
 443             ++count;
 444         }
 445     }
 446 
 447     /* allocate an array */
 448     procs = (ompi_proc_t**) malloc(count * sizeof(ompi_proc_t*));
 449     if (NULL == procs) {
 450         opal_mutex_unlock (&ompi_proc_lock);
 451         return NULL;
 452     }
 453 
 454     /* now save only the procs that match this jobid */
 455     count = 0;
 456     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
 457         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, &my_name)) {
 458             /* DO NOT RETAIN THIS OBJECT - the reference count on this
 459              * object will be adjusted by external callers. The intent
 460              * here is to allow the reference count to drop to zero if
 461              * the app no longer desires to communicate with this proc.
 462              * For example, the proc may call comm_disconnect on all
 463              * communicators involving this proc. In such cases, we want
 464              * the proc object to be removed from the list. By not incrementing
 465              * the reference count here, we allow this to occur.
 466              *
 467              * We don't implement that yet, but we are still safe for now as
 468              * the OBJ_NEW in ompi_proc_init owns the initial reference
 469              * count which cannot be released until ompi_proc_finalize is
 470              * called.
 471              */
 472             procs[count++] = proc;
 473         }
 474     }
 475     opal_mutex_unlock (&ompi_proc_lock);
 476 
 477     *size = count;
 478     return procs;
 479 }
 480 
 481 ompi_proc_t **ompi_proc_world (size_t *size)
 482 {
 483     ompi_proc_t **procs;
 484     size_t count = 0;
 485 
 486     /* check bozo case */
 487     if (NULL == ompi_proc_local_proc) {
 488         return NULL;
 489     }
 490 
 491     /* First count how many match this jobid (we already know this from our process info) */
 492     count = ompi_process_info.num_procs;
 493 
 494     /* allocate an array */
 495     procs = (ompi_proc_t **) malloc (count * sizeof(ompi_proc_t*));
 496     if (NULL == procs) {
 497         return NULL;
 498     }
 499 
 500     /* now get/allocate all the procs in this jobid */
 501     for (size_t i = 0 ; i < count ; ++i) {
 502         opal_process_name_t name = {.jobid = OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name)->jobid,
 503                                     .vpid = i};
 504 
 505         /* DO NOT RETAIN THIS OBJECT - the reference count on this
 506          * object will be adjusted by external callers. The intent
 507          * here is to allow the reference count to drop to zero if
 508          * the app no longer desires to communicate with this proc.
 509          * For example, the proc may call comm_disconnect on all
 510          * communicators involving this proc. In such cases, we want
 511          * the proc object to be removed from the list. By not incrementing
 512          * the reference count here, we allow this to occur.
 513          *
 514          * We don't implement that yet, but we are still safe for now as
 515          * the OBJ_NEW in ompi_proc_init owns the initial reference
 516          * count which cannot be released until ompi_proc_finalize is
 517          * called.
 518          */
 519         procs[i] = (ompi_proc_t*)ompi_proc_for_name (name);
 520     }
 521 
 522     *size = count;
 523 
 524     return procs;
 525 }
 526 
 527 
 528 ompi_proc_t** ompi_proc_all(size_t* size)
 529 {
 530     ompi_proc_t **procs =
 531         (ompi_proc_t**) malloc(opal_list_get_size(&ompi_proc_list) * sizeof(ompi_proc_t*));
 532     ompi_proc_t *proc;
 533     size_t count = 0;
 534 
 535     if (NULL == procs) {
 536         return NULL;
 537     }
 538 
 539     opal_mutex_lock (&ompi_proc_lock);
 540     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
 541         /* We know this isn't consistent with the behavior in ompi_proc_world,
 542          * but we are leaving the RETAIN for now because the code using this function
 543          * assumes that the results need to be released when done. It will
 544          * be cleaned up later as the "fix" will impact other places in
 545          * the code
 546          */
 547         OBJ_RETAIN(proc);
 548         procs[count++] = proc;
 549     }
 550     opal_mutex_unlock (&ompi_proc_lock);
 551     *size = count;
 552     return procs;
 553 }
 554 
 555 
 556 ompi_proc_t** ompi_proc_self(size_t* size)
 557 {
 558     ompi_proc_t **procs = (ompi_proc_t**) malloc(sizeof(ompi_proc_t*));
 559     if (NULL == procs) {
 560         return NULL;
 561     }
 562     /* We know this isn't consistent with the behavior in ompi_proc_world,
 563      * but we are leaving the RETAIN for now because the code using this function
 564      * assumes that the results need to be released when done. It will
 565      * be cleaned up later as the "fix" will impact other places in
 566      * the code
 567      */
 568     OBJ_RETAIN(ompi_proc_local_proc);
 569     *procs = ompi_proc_local_proc;
 570     *size = 1;
 571     return procs;
 572 }
 573 
 574 ompi_proc_t * ompi_proc_find ( const ompi_process_name_t * name )
 575 {
 576     ompi_proc_t *proc, *rproc=NULL;
 577     ompi_rte_cmp_bitmask_t mask;
 578 
 579     /* return the proc-struct which matches this jobid+process id */
 580     mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
 581     opal_mutex_lock (&ompi_proc_lock);
 582     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
 583         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
 584             rproc = proc;
 585             break;
 586         }
 587     }
 588     opal_mutex_unlock (&ompi_proc_lock);
 589 
 590     return rproc;
 591 }
 592 
 593 
 594 int ompi_proc_refresh(void)
 595 {
 596     ompi_proc_t *proc = NULL;
 597     ompi_vpid_t i = 0;
 598     int ret=OMPI_SUCCESS;
 599 
 600     opal_mutex_lock (&ompi_proc_lock);
 601 
 602     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
 603         /* Does not change: proc->super.proc_name.vpid */
 604         OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = OMPI_PROC_MY_NAME->jobid;
 605 
 606         /* Make sure to clear the local flag before we set it below */
 607         proc->super.proc_flags = 0;
 608 
 609         if (i == OMPI_PROC_MY_NAME->vpid) {
 610             ompi_proc_local_proc = proc;
 611             proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
 612             proc->super.proc_hostname = ompi_process_info.nodename;
 613             proc->super.proc_arch = opal_local_arch;
 614             opal_proc_local_set(&proc->super);
 615         } else {
 616             ret = ompi_proc_complete_init_single (proc);
 617             if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 618                 break;
 619             }
 620         }
 621     }
 622 
 623     opal_mutex_unlock (&ompi_proc_lock);
 624 
 625     return ret;
 626 }
 627 
 628 int
 629 ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
 630                opal_buffer_t* buf)
 631 {
 632     int rc;
 633     char *nspace;
 634 
 635     opal_mutex_lock (&ompi_proc_lock);
 636 
 637     /* cycle through the provided array, packing the OMPI level
 638      * data for each proc. This data may or may not be included
 639      * in any subsequent modex operation, so we include it here
 640      * to ensure completion of a connect/accept handshake. See
 641      * the ompi/mca/dpm framework for an example of where and how
 642      * this info is used.
 643      *
 644      * Eventually, we will review the procedures that call this
 645      * function to see if duplication of communication can be
 646      * reduced. For now, just go ahead and pack the info so it
 647      * can be sent.
 648      */
 649     for (int i = 0 ; i < proclistsize ; ++i) {
 650         ompi_proc_t *proc = proclist[i];
 651 
 652         if (ompi_proc_is_sentinel (proc)) {
 653             proc = ompi_proc_for_name_nolock (ompi_proc_sentinel_to_name ((uintptr_t) proc));
 654         }
 655 
 656         /* send proc name */
 657         rc = opal_dss.pack(buf, &(proc->super.proc_name), 1, OMPI_NAME);
 658         if(rc != OPAL_SUCCESS) {
 659             OMPI_ERROR_LOG(rc);
 660             opal_mutex_unlock (&ompi_proc_lock);
 661             return rc;
 662         }
 663         /* retrieve and send the corresponding nspace for this job
 664          * as the remote side may not know the translation */
 665         nspace = (char*)opal_pmix.get_nspace(proc->super.proc_name.jobid);
 666         rc = opal_dss.pack(buf, &nspace, 1, OPAL_STRING);
 667         if(rc != OPAL_SUCCESS) {
 668             OMPI_ERROR_LOG(rc);
 669             opal_mutex_unlock (&ompi_proc_lock);
 670             return rc;
 671         }
 672         /* pack architecture flag */
 673         rc = opal_dss.pack(buf, &(proc->super.proc_arch), 1, OPAL_UINT32);
 674         if(rc != OPAL_SUCCESS) {
 675             OMPI_ERROR_LOG(rc);
 676             opal_mutex_unlock (&ompi_proc_lock);
 677             return rc;
 678         }
 679         /* pass the name of the host this proc is on */
 680         rc = opal_dss.pack(buf, &(proc->super.proc_hostname), 1, OPAL_STRING);
 681         if(rc != OPAL_SUCCESS) {
 682             OMPI_ERROR_LOG(rc);
 683             opal_mutex_unlock (&ompi_proc_lock);
 684             return rc;
 685         }
 686     }
 687     opal_mutex_unlock (&ompi_proc_lock);
 688     return OMPI_SUCCESS;
 689 }
 690 
 691 ompi_proc_t *
 692 ompi_proc_find_and_add(const ompi_process_name_t * name, bool* isnew)
 693 {
 694     ompi_proc_t *proc, *rproc = NULL;
 695     ompi_rte_cmp_bitmask_t mask;
 696 
 697     /* return the proc-struct which matches this jobid+process id */
 698     mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
 699     opal_mutex_lock (&ompi_proc_lock);
 700     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
 701         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
 702             rproc = proc;
 703             *isnew = false;
 704             break;
 705         }
 706     }
 707 
 708     /* if we didn't find this proc in the list, create a new
 709      * proc_t and append it to the list
 710      */
 711     if (NULL == rproc) {
 712         *isnew = true;
 713         ompi_proc_allocate (name->jobid, name->vpid, &rproc);
 714     }
 715 
 716     opal_mutex_unlock (&ompi_proc_lock);
 717 
 718     return rproc;
 719 }
 720 
 721 
 722 int
 723 ompi_proc_unpack(opal_buffer_t* buf,
 724                  int proclistsize, ompi_proc_t ***proclist,
 725                  int *newproclistsize, ompi_proc_t ***newproclist)
 726 {
 727     size_t newprocs_len = 0;
 728     ompi_proc_t **plist=NULL, **newprocs = NULL;
 729 
 730     /* do not free plist *ever*, since it is used in the remote group
 731        structure of a communicator */
 732     plist = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
 733     if ( NULL == plist ) {
 734         return OMPI_ERR_OUT_OF_RESOURCE;
 735     }
 736     /* free this on the way out */
 737     newprocs = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
 738     if (NULL == newprocs) {
 739         free(plist);
 740         return OMPI_ERR_OUT_OF_RESOURCE;
 741     }
 742 
 743     /* cycle through the array of provided procs and unpack
 744      * their info - as packed by ompi_proc_pack
 745      */
 746     for (int i = 0; i < proclistsize ; ++i){
 747         int32_t count=1;
 748         ompi_process_name_t new_name;
 749         uint32_t new_arch;
 750         char *new_hostname;
 751         bool isnew = false;
 752         int rc;
 753         char *nspace;
 754 
 755         rc = opal_dss.unpack(buf, &new_name, &count, OMPI_NAME);
 756         if (rc != OPAL_SUCCESS) {
 757             OMPI_ERROR_LOG(rc);
 758             free(plist);
 759             free(newprocs);
 760             return rc;
 761         }
 762         rc = opal_dss.unpack(buf, &nspace, &count, OPAL_STRING);
 763         if (rc != OPAL_SUCCESS) {
 764             OMPI_ERROR_LOG(rc);
 765             free(plist);
 766             free(newprocs);
 767             return rc;
 768         }
 769         opal_pmix.register_jobid(new_name.jobid, nspace);
 770         free(nspace);
 771         rc = opal_dss.unpack(buf, &new_arch, &count, OPAL_UINT32);
 772         if (rc != OPAL_SUCCESS) {
 773             OMPI_ERROR_LOG(rc);
 774             free(plist);
 775             free(newprocs);
 776             return rc;
 777         }
 778         rc = opal_dss.unpack(buf, &new_hostname, &count, OPAL_STRING);
 779         if (rc != OPAL_SUCCESS) {
 780             OMPI_ERROR_LOG(rc);
 781             free(plist);
 782             free(newprocs);
 783             return rc;
 784         }
 785         /* see if this proc is already on our ompi_proc_list */
 786         plist[i] = ompi_proc_find_and_add(&new_name, &isnew);
 787         if (isnew) {
 788             /* if not, then it was added, so update the values
 789              * in the proc_t struct with the info that was passed
 790              * to us
 791              */
 792             newprocs[newprocs_len++] = plist[i];
 793 
 794             /* update all the values */
 795             plist[i]->super.proc_arch = new_arch;
 796             /* if arch is different than mine, create a new convertor for this proc */
 797             if (plist[i]->super.proc_arch != opal_local_arch) {
 798 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 799                 OBJ_RELEASE(plist[i]->super.proc_convertor);
 800                 plist[i]->super.proc_convertor = opal_convertor_create(plist[i]->super.proc_arch, 0);
 801 #else
 802                 opal_show_help("help-mpi-runtime.txt",
 803                                "heterogeneous-support-unavailable",
 804                                true, ompi_process_info.nodename,
 805                                new_hostname == NULL ? "<hostname unavailable>" :
 806                                new_hostname);
 807                 free(plist);
 808                 free(newprocs);
 809                 return OMPI_ERR_NOT_SUPPORTED;
 810 #endif
 811             }
 812 
 813             if (NULL != new_hostname) {
 814                 if (0 == strcmp(ompi_proc_local_proc->super.proc_hostname, new_hostname)) {
 815                     plist[i]->super.proc_flags |= (OPAL_PROC_ON_NODE | OPAL_PROC_ON_CU | OPAL_PROC_ON_CLUSTER);
 816                 }
 817 
 818                 /* Save the hostname */
 819                 plist[i]->super.proc_hostname = new_hostname;
 820             }
 821         } else if (NULL != new_hostname) {
 822             free(new_hostname);
 823         }
 824     }
 825 
 826     if (NULL != newproclistsize) *newproclistsize = newprocs_len;
 827     if (NULL != newproclist) {
 828         *newproclist = newprocs;
 829     } else if (newprocs != NULL) {
 830         free(newprocs);
 831     }
 832 
 833     *proclist = plist;
 834     return OMPI_SUCCESS;
 835 }

/* [<][>][^][v][top][bottom][index][help] */