root/orte/mca/state/hnp/state_hnp.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. force_quit
  2. init
  3. finalize
  4. _send_notification
  5. hnp_notify

   1 /*
   2  * Copyright (c) 2011-2012 Los Alamos National Security, LLC.
   3  *                         All rights reserved.
   4  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   5  * Copyright (c) 2017      Research Organization for Information Science
   6  *                         and Technology (RIST). All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #include "orte_config.h"
  15 
  16 #include <sys/types.h>
  17 #ifdef HAVE_UNISTD_H
  18 #include <unistd.h>
  19 #endif  /* HAVE_UNISTD_H */
  20 #include <string.h>
  21 
  22 #include "opal/util/output.h"
  23 #include "opal/mca/pmix/pmix.h"
  24 
  25 #include "orte/mca/errmgr/errmgr.h"
  26 #include "orte/mca/grpcomm/grpcomm.h"
  27 #include "orte/mca/rml/rml.h"
  28 #include "orte/mca/iof/iof.h"
  29 #include "orte/mca/plm/base/base.h"
  30 #include "orte/mca/ras/base/base.h"
  31 #include "orte/mca/rmaps/base/base.h"
  32 #include "orte/mca/routed/routed.h"
  33 #include "orte/util/session_dir.h"
  34 #include "orte/runtime/orte_quit.h"
  35 
  36 #include "orte/mca/state/state.h"
  37 #include "orte/mca/state/base/base.h"
  38 #include "orte/mca/state/base/state_private.h"
  39 #include "state_hnp.h"
  40 
  41 /*
  42  * Module functions: Global
  43  */
  44 static int init(void);
  45 static int finalize(void);
  46 
  47 /******************
  48  * HNP module - just uses base functions after
  49  * initializing the proc state machine. Job state
  50  * machine is unused by hnplication procs at this
  51  * time.
  52  ******************/
  53 orte_state_base_module_t orte_state_hnp_module = {
  54     init,
  55     finalize,
  56     orte_state_base_activate_job_state,
  57     orte_state_base_add_job_state,
  58     orte_state_base_set_job_state_callback,
  59     orte_state_base_set_job_state_priority,
  60     orte_state_base_remove_job_state,
  61     orte_state_base_activate_proc_state,
  62     orte_state_base_add_proc_state,
  63     orte_state_base_set_proc_state_callback,
  64     orte_state_base_set_proc_state_priority,
  65     orte_state_base_remove_proc_state
  66 };
  67 
  68 static void hnp_notify(int sd, short args, void *cbdata);
  69 
  70 /* defined default state machine sequence - individual
  71  * plm's must add a state for launching daemons
  72  */
  73 static orte_job_state_t launch_states[] = {
  74     ORTE_JOB_STATE_INIT,
  75     ORTE_JOB_STATE_INIT_COMPLETE,
  76     ORTE_JOB_STATE_ALLOCATE,
  77     ORTE_JOB_STATE_ALLOCATION_COMPLETE,
  78     ORTE_JOB_STATE_DAEMONS_LAUNCHED,
  79     ORTE_JOB_STATE_DAEMONS_REPORTED,
  80     ORTE_JOB_STATE_VM_READY,
  81     ORTE_JOB_STATE_MAP,
  82     ORTE_JOB_STATE_MAP_COMPLETE,
  83     ORTE_JOB_STATE_SYSTEM_PREP,
  84     ORTE_JOB_STATE_LAUNCH_APPS,
  85     ORTE_JOB_STATE_SEND_LAUNCH_MSG,
  86     ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE,
  87     ORTE_JOB_STATE_RUNNING,
  88     ORTE_JOB_STATE_REGISTERED,
  89     /* termination states */
  90     ORTE_JOB_STATE_TERMINATED,
  91     ORTE_JOB_STATE_NOTIFY_COMPLETED,
  92     ORTE_JOB_STATE_NOTIFIED,
  93     ORTE_JOB_STATE_ALL_JOBS_COMPLETE
  94 };
  95 static orte_state_cbfunc_t launch_callbacks[] = {
  96     orte_plm_base_setup_job,
  97     orte_plm_base_setup_job_complete,
  98     orte_ras_base_allocate,
  99     orte_plm_base_allocation_complete,
 100     orte_plm_base_daemons_launched,
 101     orte_plm_base_daemons_reported,
 102     orte_plm_base_vm_ready,
 103     orte_rmaps_base_map_job,
 104     orte_plm_base_mapping_complete,
 105     orte_plm_base_complete_setup,
 106     orte_plm_base_launch_apps,
 107     orte_plm_base_send_launch_msg,
 108     orte_state_base_local_launch_complete,
 109     orte_plm_base_post_launch,
 110     orte_plm_base_registered,
 111     orte_state_base_check_all_complete,
 112     hnp_notify,
 113     orte_state_base_cleanup_job,
 114     orte_quit
 115 };
 116 
 117 static orte_proc_state_t proc_states[] = {
 118     ORTE_PROC_STATE_RUNNING,
 119     ORTE_PROC_STATE_REGISTERED,
 120     ORTE_PROC_STATE_IOF_COMPLETE,
 121     ORTE_PROC_STATE_WAITPID_FIRED,
 122     ORTE_PROC_STATE_TERMINATED
 123 };
 124 static orte_state_cbfunc_t proc_callbacks[] = {
 125     orte_state_base_track_procs,
 126     orte_state_base_track_procs,
 127     orte_state_base_track_procs,
 128     orte_state_base_track_procs,
 129     orte_state_base_track_procs
 130 };
 131 
 132 static void force_quit(int fd, short args, void *cbdata)
 133 {
 134     orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
 135 
 136     /* give us a chance to stop the orteds */
 137     orte_plm.terminate_orteds();
 138     OBJ_RELEASE(caddy);
 139 }
 140 
 141 /************************
 142  * API Definitions
 143  ************************/
 144 static int init(void)
 145 {
 146     int i, rc;
 147     int num_states;
 148 
 149     /* setup the state machines */
 150     OBJ_CONSTRUCT(&orte_job_states, opal_list_t);
 151     OBJ_CONSTRUCT(&orte_proc_states, opal_list_t);
 152 
 153     /* setup the job state machine */
 154     num_states = sizeof(launch_states) / sizeof(orte_job_state_t);
 155     for (i=0; i < num_states; i++) {
 156         if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i],
 157                                                            launch_callbacks[i],
 158                                                            ORTE_SYS_PRI))) {
 159             ORTE_ERROR_LOG(rc);
 160         }
 161     }
 162     /* add the termination response */
 163     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_DAEMONS_TERMINATED,
 164                                                        orte_quit, ORTE_SYS_PRI))) {
 165         ORTE_ERROR_LOG(rc);
 166     }
 167     /* add a default error response */
 168     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT,
 169                                                        force_quit, ORTE_ERROR_PRI))) {
 170         ORTE_ERROR_LOG(rc);
 171     }
 172     /* add callback to report progress, if requested */
 173     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS,
 174                                                        orte_state_base_report_progress, ORTE_ERROR_PRI))) {
 175         ORTE_ERROR_LOG(rc);
 176     }
 177     if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
 178         orte_state_base_print_job_state_machine();
 179     }
 180 
 181     /* populate the proc state machine to allow us to
 182      * track proc lifecycle changes
 183      */
 184     num_states = sizeof(proc_states) / sizeof(orte_proc_state_t);
 185     for (i=0; i < num_states; i++) {
 186         if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i],
 187                                                             proc_callbacks[i],
 188                                                             ORTE_SYS_PRI))) {
 189             ORTE_ERROR_LOG(rc);
 190         }
 191     }
 192     if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
 193         orte_state_base_print_proc_state_machine();
 194     }
 195 
 196     return ORTE_SUCCESS;
 197 }
 198 
 199 static int finalize(void)
 200 {
 201     /* cleanup the proc state machine */
 202     OPAL_LIST_DESTRUCT(&orte_proc_states);
 203     /* cleanup the job state machine */
 204     OPAL_LIST_DESTRUCT(&orte_job_states);
 205 
 206     return ORTE_SUCCESS;
 207 }
 208 
 209 static void _send_notification(int status,
 210                                orte_proc_state_t state,
 211                                orte_process_name_t *proc,
 212                                orte_process_name_t *target)
 213 {
 214     opal_buffer_t *buf;
 215     orte_grpcomm_signature_t sig;
 216     int rc;
 217     opal_value_t kv, *kvptr;
 218     orte_process_name_t daemon;
 219 
 220     buf = OBJ_NEW(opal_buffer_t);
 221 
 222     opal_output_verbose(5, orte_state_base_framework.framework_output,
 223                         "%s state:hnp:sending notification %s proc %s target %s",
 224                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 225                         ORTE_ERROR_NAME(status),
 226                         ORTE_NAME_PRINT(proc),
 227                         ORTE_NAME_PRINT(target));
 228 
 229     /* pack the status */
 230     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
 231         ORTE_ERROR_LOG(rc);
 232         OBJ_RELEASE(buf);
 233         return;
 234     }
 235 
 236     /* the source is the proc */
 237     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, proc, 1, ORTE_NAME))) {
 238         ORTE_ERROR_LOG(rc);
 239         OBJ_RELEASE(buf);
 240         return;
 241     }
 242 
 243     if (OPAL_ERR_PROC_ABORTED == status) {
 244         /* we will pass three opal_value_t's */
 245         rc = 3;
 246         if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
 247             ORTE_ERROR_LOG(rc);
 248             OBJ_RELEASE(buf);
 249             return;
 250         }
 251         /* pass along the affected proc(s) */
 252         OBJ_CONSTRUCT(&kv, opal_value_t);
 253         kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
 254         kv.type = OPAL_NAME;
 255         kv.data.name.jobid = proc->jobid;
 256         kv.data.name.vpid = proc->vpid;
 257         kvptr = &kv;
 258         if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
 259             ORTE_ERROR_LOG(rc);
 260             OBJ_DESTRUCT(&kv);
 261             OBJ_RELEASE(buf);
 262             return;
 263         }
 264         OBJ_DESTRUCT(&kv);
 265     } else {
 266         /* we are going to pass two opal_value_t's */
 267         rc = 2;
 268         if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
 269             ORTE_ERROR_LOG(rc);
 270             OBJ_RELEASE(buf);
 271             return;
 272         }
 273     }
 274 
 275     /* pass along the affected proc(s) */
 276     OBJ_CONSTRUCT(&kv, opal_value_t);
 277     kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
 278     kv.type = OPAL_NAME;
 279     kv.data.name.jobid = proc->jobid;
 280     kv.data.name.vpid = proc->vpid;
 281     kvptr = &kv;
 282     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
 283         ORTE_ERROR_LOG(rc);
 284         OBJ_DESTRUCT(&kv);
 285         OBJ_RELEASE(buf);
 286         return;
 287     }
 288     OBJ_DESTRUCT(&kv);
 289 
 290     /* pass along the proc(s) to be notified */
 291     OBJ_CONSTRUCT(&kv, opal_value_t);
 292     kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE);
 293     kv.type = OPAL_NAME;
 294     kv.data.name.jobid = target->jobid;
 295     kv.data.name.vpid = target->vpid;
 296     kvptr = &kv;
 297     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
 298         ORTE_ERROR_LOG(rc);
 299         OBJ_DESTRUCT(&kv);
 300         OBJ_RELEASE(buf);
 301         return;
 302     }
 303     OBJ_DESTRUCT(&kv);
 304 
 305     /* if the targets are a wildcard, then xcast it to everyone */
 306     if (ORTE_VPID_WILDCARD == target->vpid) {
 307         OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t);
 308         sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
 309         sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
 310         sig.signature[0].vpid = ORTE_VPID_WILDCARD;
 311         sig.sz = 1;
 312 
 313         if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
 314             ORTE_ERROR_LOG(rc);
 315         }
 316         OBJ_DESTRUCT(&sig);
 317         OBJ_RELEASE(buf);
 318     } else {
 319         /* get the daemon hosting the proc to be notified */
 320         daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 321         daemon.vpid = orte_get_proc_daemon_vpid(target);
 322         /* send the notification to that daemon */
 323         opal_output_verbose(5, orte_state_base_framework.framework_output,
 324                             "%s state:base:sending notification %s to proc %s at daemon %s",
 325                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 326                             ORTE_ERROR_NAME(status),
 327                             ORTE_NAME_PRINT(target),
 328                             ORTE_NAME_PRINT(&daemon));
 329         if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&daemon, buf,
 330                                                           ORTE_RML_TAG_NOTIFICATION,
 331                                                           orte_rml_send_callback, NULL))) {
 332             ORTE_ERROR_LOG(rc);
 333             OBJ_RELEASE(buf);
 334         }
 335     }
 336 }
 337 
 338 static void hnp_notify(int sd, short args, void *cbdata)
 339 {
 340     orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
 341     orte_job_t *jdata = caddy->jdata;
 342     orte_process_name_t parent, target, *npptr;
 343 
 344     /* if they requested notification upon completion, provide it */
 345     if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) {
 346         /* notify_completion => notify the parent of the termination
 347          * of this child job. So get the parent jobid info */
 348         npptr = &parent;
 349         if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) {
 350             /* notify everyone who asked for it */
 351             target.jobid = jdata->jobid;
 352             target.vpid = ORTE_VPID_WILDCARD;
 353             _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, ORTE_NAME_WILDCARD);
 354         } else {
 355             target.jobid = jdata->jobid;
 356             target.vpid = ORTE_VPID_WILDCARD;
 357             _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, &parent);
 358         }
 359     }
 360     ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFIED);
 361 
 362     OBJ_RELEASE(caddy);
 363 }

/* [<][>][^][v][top][bottom][index][help] */