root/orte/mca/routed/base/routed_base_fns.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_routed_base_xcast_routing
  2. orte_routed_base_process_callback

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2011-2012 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "orte_config.h"
  24 #include "orte/constants.h"
  25 #include "orte/types.h"
  26 
  27 #include "opal/dss/dss.h"
  28 #include "opal/util/argv.h"
  29 
  30 #include "orte/mca/errmgr/errmgr.h"
  31 #include "orte/mca/ess/ess.h"
  32 #include "orte/mca/odls/odls_types.h"
  33 #include "orte/mca/rml/rml.h"
  34 #include "orte/mca/state/state.h"
  35 #include "orte/runtime/orte_globals.h"
  36 #include "orte/runtime/orte_wait.h"
  37 
  38 #include "orte/mca/routed/base/base.h"
  39 
  40 void orte_routed_base_xcast_routing(opal_list_t *coll, opal_list_t *my_children)
  41 {
  42     orte_routed_tree_t *child;
  43     orte_namelist_t *nm;
  44     int i;
  45     orte_proc_t *proc;
  46     orte_job_t *daemons;
  47 
  48     /* if we are the HNP and an abnormal termination is underway,
  49      * then send it directly to everyone
  50      */
  51     if (ORTE_PROC_IS_HNP) {
  52         if (orte_abnormal_term_ordered || !orte_routing_is_enabled) {
  53             daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
  54             for (i=1; i < daemons->procs->size; i++) {
  55                 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, i))) {
  56                     continue;
  57                 }
  58                 /* exclude anyone known not alive */
  59                 if (ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_ALIVE)) {
  60                     nm = OBJ_NEW(orte_namelist_t);
  61                     nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
  62                     nm->name.vpid = proc->name.vpid;
  63                     opal_list_append(coll, &nm->super);
  64                 }
  65             }
  66             /* if nobody is known alive, then we need to die */
  67             if (0 == opal_list_get_size(coll)) {
  68                 ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
  69             }
  70         } else {
  71             /* the xcast always goes to our children */
  72             OPAL_LIST_FOREACH(child, my_children, orte_routed_tree_t) {
  73                 nm = OBJ_NEW(orte_namelist_t);
  74                 nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
  75                 nm->name.vpid = child->vpid;
  76                 opal_list_append(coll, &nm->super);
  77             }
  78         }
  79     } else {
  80         /* I am a daemon - route to my children */
  81         OPAL_LIST_FOREACH(child, my_children, orte_routed_tree_t) {
  82             nm = OBJ_NEW(orte_namelist_t);
  83             nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
  84             nm->name.vpid = child->vpid;
  85             opal_list_append(coll, &nm->super);
  86         }
  87     }
  88 }
  89 
  90 int orte_routed_base_process_callback(orte_jobid_t job, opal_buffer_t *buffer)
  91 {
  92     orte_proc_t *proc;
  93     orte_job_t *jdata;
  94     orte_std_cntr_t cnt;
  95     char *rml_uri;
  96     orte_vpid_t vpid;
  97     int rc;
  98 
  99     /* lookup the job object for this process */
 100     if (NULL == (jdata = orte_get_job_data_object(job))) {
 101         /* came from a different job family - this is an error */
 102         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 103         return ORTE_ERR_NOT_FOUND;
 104     }
 105 
 106     /* unpack the data for each entry */
 107     cnt = 1;
 108     while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &cnt, ORTE_VPID))) {
 109 
 110         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
 111             ORTE_ERROR_LOG(rc);
 112             continue;
 113         }
 114 
 115         OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
 116                              "%s routed_base:callback got uri %s for job %s rank %s",
 117                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 118                              (NULL == rml_uri) ? "NULL" : rml_uri,
 119                              ORTE_JOBID_PRINT(job), ORTE_VPID_PRINT(vpid)));
 120 
 121         if (NULL == rml_uri) {
 122             /* should not happen */
 123             ORTE_ERROR_LOG(ORTE_ERR_FATAL);
 124             return ORTE_ERR_FATAL;
 125         }
 126 
 127         if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) {
 128             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 129             continue;
 130         }
 131 
 132         /* update the record */
 133         proc->rml_uri = strdup(rml_uri);
 134         free(rml_uri);
 135 
 136         cnt = 1;
 137     }
 138     if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 139         ORTE_ERROR_LOG(rc);
 140         return rc;
 141     }
 142 
 143     return ORTE_SUCCESS;
 144 }

/* [<][>][^][v][top][bottom][index][help] */