root/orte/mca/plm/base/plm_base_receive.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_plm_base_comm_start
  2. orte_plm_base_comm_stop
  3. orte_plm_base_recv
  4. orte_plm_base_receive_process_msg

   1 /* -*- C -*-
   2  *
   3  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2011 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2011      Los Alamos National Security, LLC.
  14  *                         All rights reserved.
  15  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
  16  * Copyright (c) 2017      Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 /** @file:
  25  *
  26  */
  27 
  28 /*
  29  * includes
  30  */
  31 #include "orte_config.h"
  32 
  33 #include <string.h>
  34 #ifdef HAVE_SYS_TIME_H
  35 #include <sys/time.h>
  36 #endif
  37 
  38 #include "orte/mca/mca.h"
  39 #include "opal/dss/dss.h"
  40 #include "opal/threads/threads.h"
  41 #include "opal/util/argv.h"
  42 #include "opal/util/opal_environ.h"
  43 
  44 #include "orte/constants.h"
  45 #include "orte/types.h"
  46 #include "orte/util/proc_info.h"
  47 #include "orte/util/error_strings.h"
  48 #include "orte/mca/errmgr/errmgr.h"
  49 #include "orte/mca/ess/ess.h"
  50 #include "orte/mca/rml/rml.h"
  51 #include "orte/mca/rml/rml_types.h"
  52 #include "orte/mca/routed/routed.h"
  53 #include "orte/mca/ras/base/base.h"
  54 #include "orte/util/name_fns.h"
  55 #include "orte/mca/state/state.h"
  56 #include "orte/runtime/orte_globals.h"
  57 #include "orte/runtime/orte_quit.h"
  58 
  59 #include "orte/mca/plm/plm_types.h"
  60 #include "orte/mca/plm/plm.h"
  61 #include "orte/mca/plm/base/plm_private.h"
  62 #include "orte/mca/plm/base/base.h"
  63 
  64 static bool recv_issued=false;
  65 
  66 int orte_plm_base_comm_start(void)
  67 {
  68     if (recv_issued) {
  69         return ORTE_SUCCESS;
  70     }
  71 
  72     OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
  73                          "%s plm:base:receive start comm",
  74                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
  75 
  76     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  77                             ORTE_RML_TAG_PLM,
  78                             ORTE_RML_PERSISTENT,
  79                             orte_plm_base_recv,
  80                             NULL);
  81     if (ORTE_PROC_IS_HNP) {
  82         orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  83                                 ORTE_RML_TAG_ORTED_CALLBACK,
  84                                 ORTE_RML_PERSISTENT,
  85                                 orte_plm_base_daemon_callback, NULL);
  86         orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  87                                 ORTE_RML_TAG_REPORT_REMOTE_LAUNCH,
  88                                 ORTE_RML_PERSISTENT,
  89                                 orte_plm_base_daemon_failed, NULL);
  90         orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  91                                 ORTE_RML_TAG_TOPOLOGY_REPORT,
  92                                 ORTE_RML_PERSISTENT,
  93                                 orte_plm_base_daemon_topology, NULL);
  94     }
  95     recv_issued = true;
  96 
  97     return ORTE_SUCCESS;
  98 }
  99 
 100 
 101 int orte_plm_base_comm_stop(void)
 102 {
 103     if (!recv_issued) {
 104         return ORTE_SUCCESS;
 105     }
 106 
 107     OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 108                          "%s plm:base:receive stop comm",
 109                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 110 
 111     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLM);
 112     if (ORTE_PROC_IS_HNP) {
 113         orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_CALLBACK);
 114         orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_REPORT_REMOTE_LAUNCH);
 115         orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_TOPOLOGY_REPORT);
 116     }
 117     recv_issued = false;
 118 
 119     return ORTE_SUCCESS;
 120 }
 121 
 122 
 123 /* process incoming messages in order of receipt */
 124 void orte_plm_base_recv(int status, orte_process_name_t* sender,
 125                         opal_buffer_t* buffer, orte_rml_tag_t tag,
 126                         void* cbdata)
 127 {
 128     orte_plm_cmd_flag_t command;
 129     orte_std_cntr_t count;
 130     orte_jobid_t job;
 131     orte_job_t *jdata, *parent;
 132     opal_buffer_t *answer;
 133     orte_vpid_t vpid;
 134     orte_proc_t *proc;
 135     orte_proc_state_t state;
 136     orte_exit_code_t exit_code;
 137     int32_t rc=ORTE_SUCCESS, ret;
 138     orte_app_context_t *app, *child_app;
 139     orte_process_name_t name, *nptr;
 140     pid_t pid;
 141     bool running;
 142     int i, room;
 143     char **env;
 144     char *prefix_dir;
 145 
 146     OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 147                          "%s plm:base:receive processing msg",
 148                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 149 
 150     count = 1;
 151     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_PLM_CMD))) {
 152         ORTE_ERROR_LOG(rc);
 153         goto CLEANUP;
 154     }
 155 
 156     switch (command) {
 157     case ORTE_PLM_LAUNCH_JOB_CMD:
 158         OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 159                              "%s plm:base:receive job launch command from %s",
 160                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 161                              ORTE_NAME_PRINT(sender)));
 162 
 163         /* unpack the job object */
 164         count = 1;
 165         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jdata, &count, ORTE_JOB))) {
 166             ORTE_ERROR_LOG(rc);
 167             goto ANSWER_LAUNCH;
 168         }
 169 
 170         /* record the sender so we know who to respond to */
 171         jdata->originator.jobid = sender->jobid;
 172         jdata->originator.vpid = sender->vpid;
 173 
 174         /* get the name of the actual spawn parent - i.e., the proc that actually
 175          * requested the spawn */
 176         nptr = &name;
 177         if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&nptr, OPAL_NAME)) {
 178             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 179             rc = ORTE_ERR_NOT_FOUND;
 180             goto ANSWER_LAUNCH;
 181         }
 182 
 183         /* get the parent's job object */
 184         if (NULL != (parent = orte_get_job_data_object(name.jobid))) {
 185             /* if the prefix was set in the parent's job, we need to transfer
 186              * that prefix to the child's app_context so any further launch of
 187              * orteds can find the correct binary. There always has to be at
 188              * least one app_context in both parent and child, so we don't
 189              * need to check that here. However, be sure not to overwrite
 190              * the prefix if the user already provided it!
 191              */
 192             app = (orte_app_context_t*)opal_pointer_array_get_item(parent->apps, 0);
 193             child_app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, 0);
 194             prefix_dir = NULL;
 195             if (orte_get_attribute(&app->attributes, ORTE_APP_PREFIX_DIR, (void**)&prefix_dir, OPAL_STRING) &&
 196                 !orte_get_attribute(&child_app->attributes, ORTE_APP_PREFIX_DIR, NULL, OPAL_STRING)) {
 197                 orte_set_attribute(&child_app->attributes, ORTE_APP_PREFIX_DIR, ORTE_ATTR_GLOBAL, prefix_dir, OPAL_STRING);
 198             }
 199             if (NULL != prefix_dir) {
 200                 free(prefix_dir);
 201             }
 202         }
 203 
 204         /* if the user asked to forward any envars, cycle through the app contexts
 205          * in the comm_spawn request and add them
 206          */
 207         if (NULL != orte_forwarded_envars) {
 208             for (i=0; i < jdata->apps->size; i++) {
 209                 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
 210                     continue;
 211                 }
 212                 env = opal_environ_merge(orte_forwarded_envars, app->env);
 213                 opal_argv_free(app->env);
 214                 app->env = env;
 215             }
 216         }
 217 
 218         OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 219                              "%s plm:base:receive adding hosts",
 220                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 221 
 222         /* process any add-hostfile and add-host options that were provided */
 223         if (ORTE_SUCCESS != (rc = orte_ras_base_add_hosts(jdata))) {
 224             ORTE_ERROR_LOG(rc);
 225             goto ANSWER_LAUNCH;
 226         }
 227 
 228         if (NULL != parent) {
 229             if (NULL == parent->bookmark) {
 230                 /* find the sender's node in the job map */
 231                 if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(parent->procs, sender->vpid))) {
 232                     /* set the bookmark so the child starts from that place - this means
 233                      * that the first child process could be co-located with the proc
 234                      * that called comm_spawn, assuming slots remain on that node. Otherwise,
 235                      * the procs will start on the next available node
 236                      */
 237                     jdata->bookmark = proc->node;
 238                 }
 239             } else {
 240                 jdata->bookmark = parent->bookmark;
 241             }
 242             /* provide the parent's last object */
 243             jdata->bkmark_obj = parent->bkmark_obj;
 244         }
 245 
 246         /* launch it */
 247         OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 248                              "%s plm:base:receive calling spawn",
 249                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 250         if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) {
 251             ORTE_ERROR_LOG(rc);
 252             goto ANSWER_LAUNCH;
 253         }
 254         break;
 255     ANSWER_LAUNCH:
 256         OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 257                              "%s plm:base:receive - error on launch: %d",
 258                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rc));
 259 
 260         /* setup the response */
 261         answer = OBJ_NEW(opal_buffer_t);
 262 
 263         /* pack the error code to be returned */
 264         if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT32))) {
 265             ORTE_ERROR_LOG(ret);
 266         }
 267 
 268         /* pack an invalid jobid */
 269         job = ORTE_JOBID_INVALID;
 270         if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &job, 1, ORTE_JOBID))) {
 271             ORTE_ERROR_LOG(ret);
 272         }
 273         /* pack the room number of the request */
 274         if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ROOM_NUM, (void**)&room, OPAL_INT)) {
 275             if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &room, 1, OPAL_INT))) {
 276                 ORTE_ERROR_LOG(ret);
 277             }
 278         }
 279 
 280         /* send the response back to the sender */
 281         if (0 > (ret = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_LAUNCH_RESP,
 282                                                orte_rml_send_callback, NULL))) {
 283             ORTE_ERROR_LOG(ret);
 284             OBJ_RELEASE(answer);
 285         }
 286         break;
 287 
 288     case ORTE_PLM_UPDATE_PROC_STATE:
 289         opal_output_verbose(5, orte_plm_base_framework.framework_output,
 290                             "%s plm:base:receive update proc state command from %s",
 291                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 292                             ORTE_NAME_PRINT(sender));
 293         count = 1;
 294         while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) {
 295 
 296             opal_output_verbose(5, orte_plm_base_framework.framework_output,
 297                                 "%s plm:base:receive got update_proc_state for job %s",
 298                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 299                                 ORTE_JOBID_PRINT(job));
 300 
 301             name.jobid = job;
 302             running = false;
 303             /* get the job object */
 304             jdata = orte_get_job_data_object(job);
 305             count = 1;
 306             while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID))) {
 307                 if (ORTE_VPID_INVALID == vpid) {
 308                     /* flag indicates that this job is complete - move on */
 309                     break;
 310                 }
 311                 name.vpid = vpid;
 312                 /* unpack the pid */
 313                 count = 1;
 314                 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &pid, &count, OPAL_PID))) {
 315                     ORTE_ERROR_LOG(rc);
 316                     goto CLEANUP;
 317                 }
 318                 /* unpack the state */
 319                 count = 1;
 320                 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &state, &count, ORTE_PROC_STATE))) {
 321                     ORTE_ERROR_LOG(rc);
 322                     goto CLEANUP;
 323                 }
 324                 if (ORTE_PROC_STATE_RUNNING == state) {
 325                     running = true;
 326                 }
 327                 /* unpack the exit code */
 328                 count = 1;
 329                 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &exit_code, &count, ORTE_EXIT_CODE))) {
 330                     ORTE_ERROR_LOG(rc);
 331                     goto CLEANUP;
 332                 }
 333 
 334                 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 335                                      "%s plm:base:receive got update_proc_state for vpid %lu state %s exit_code %d",
 336                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 337                                      (unsigned long)vpid, orte_proc_state_to_str(state), (int)exit_code));
 338 
 339                 if (NULL != jdata) {
 340                     /* get the proc data object */
 341                     if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) {
 342                         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 343                         ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
 344                         goto CLEANUP;
 345                     }
 346                     /* NEVER update the proc state before activating the state machine - let
 347                      * the state cbfunc update it as it may need to compare this
 348                      * state against the prior proc state */
 349                     proc->pid = pid;
 350                     proc->exit_code = exit_code;
 351                     ORTE_ACTIVATE_PROC_STATE(&name, state);
 352                 }
 353             }
 354             /* record that we heard back from a daemon during app launch */
 355             if (running && NULL != jdata) {
 356                 jdata->num_daemons_reported++;
 357                 if (orte_report_launch_progress) {
 358                     if (0 == jdata->num_daemons_reported % 100 ||
 359                         jdata->num_daemons_reported == orte_process_info.num_procs) {
 360                         ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS);
 361                     }
 362                 }
 363             }
 364             /* prepare for next job */
 365             count = 1;
 366         }
 367         if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 368             ORTE_ERROR_LOG(rc);
 369         } else {
 370             rc = ORTE_SUCCESS;
 371         }
 372         break;
 373 
 374     case ORTE_PLM_REGISTERED_CMD:
 375         count=1;
 376         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) {
 377             ORTE_ERROR_LOG(rc);
 378             goto CLEANUP;
 379         }
 380         name.jobid = job;
 381         /* get the job object */
 382         if (NULL == (jdata = orte_get_job_data_object(job))) {
 383             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 384             rc = ORTE_ERR_NOT_FOUND;
 385             goto CLEANUP;
 386         }
 387         count=1;
 388         while (ORTE_SUCCESS == opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID)) {
 389             name.vpid = vpid;
 390             ORTE_ACTIVATE_PROC_STATE(&name, ORTE_PROC_STATE_REGISTERED);
 391             count=1;
 392         }
 393         break;
 394 
 395     default:
 396         ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
 397         rc = ORTE_ERR_VALUE_OUT_OF_BOUNDS;
 398         break;
 399     }
 400 
 401   CLEANUP:
 402     /* see if an error occurred - if so, wakeup the HNP so we can exit */
 403     if (ORTE_PROC_IS_HNP && ORTE_SUCCESS != rc) {
 404         jdata = NULL;
 405         ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
 406     }
 407 
 408     OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
 409                          "%s plm:base:receive done processing commands",
 410                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 411 }
 412 
 413 /* where HNP messages come */
 414 void orte_plm_base_receive_process_msg(int fd, short event, void *data)
 415 {
 416     assert(0);
 417 }

/* [<][>][^][v][top][bottom][index][help] */