This source file includes following definitions.
- force_quit
- init
- finalize
- _send_notification
- hnp_notify
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 #include "orte_config.h"
  15 
  16 #include <sys/types.h>
  17 #ifdef HAVE_UNISTD_H
  18 #include <unistd.h>
  19 #endif  
  20 #include <string.h>
  21 
  22 #include "opal/util/output.h"
  23 #include "opal/mca/pmix/pmix.h"
  24 
  25 #include "orte/mca/errmgr/errmgr.h"
  26 #include "orte/mca/grpcomm/grpcomm.h"
  27 #include "orte/mca/rml/rml.h"
  28 #include "orte/mca/iof/iof.h"
  29 #include "orte/mca/plm/base/base.h"
  30 #include "orte/mca/ras/base/base.h"
  31 #include "orte/mca/rmaps/base/base.h"
  32 #include "orte/mca/routed/routed.h"
  33 #include "orte/util/session_dir.h"
  34 #include "orte/runtime/orte_quit.h"
  35 
  36 #include "orte/mca/state/state.h"
  37 #include "orte/mca/state/base/base.h"
  38 #include "orte/mca/state/base/state_private.h"
  39 #include "state_hnp.h"
  40 
  41 
  42 
  43 
  44 static int init(void);
  45 static int finalize(void);
  46 
  47 
  48 
  49 
  50 
  51 
  52 
  53 orte_state_base_module_t orte_state_hnp_module = {
  54     init,
  55     finalize,
  56     orte_state_base_activate_job_state,
  57     orte_state_base_add_job_state,
  58     orte_state_base_set_job_state_callback,
  59     orte_state_base_set_job_state_priority,
  60     orte_state_base_remove_job_state,
  61     orte_state_base_activate_proc_state,
  62     orte_state_base_add_proc_state,
  63     orte_state_base_set_proc_state_callback,
  64     orte_state_base_set_proc_state_priority,
  65     orte_state_base_remove_proc_state
  66 };
  67 
  68 static void hnp_notify(int sd, short args, void *cbdata);
  69 
  70 
  71 
  72 
  73 static orte_job_state_t launch_states[] = {
  74     ORTE_JOB_STATE_INIT,
  75     ORTE_JOB_STATE_INIT_COMPLETE,
  76     ORTE_JOB_STATE_ALLOCATE,
  77     ORTE_JOB_STATE_ALLOCATION_COMPLETE,
  78     ORTE_JOB_STATE_DAEMONS_LAUNCHED,
  79     ORTE_JOB_STATE_DAEMONS_REPORTED,
  80     ORTE_JOB_STATE_VM_READY,
  81     ORTE_JOB_STATE_MAP,
  82     ORTE_JOB_STATE_MAP_COMPLETE,
  83     ORTE_JOB_STATE_SYSTEM_PREP,
  84     ORTE_JOB_STATE_LAUNCH_APPS,
  85     ORTE_JOB_STATE_SEND_LAUNCH_MSG,
  86     ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE,
  87     ORTE_JOB_STATE_RUNNING,
  88     ORTE_JOB_STATE_REGISTERED,
  89     
  90     ORTE_JOB_STATE_TERMINATED,
  91     ORTE_JOB_STATE_NOTIFY_COMPLETED,
  92     ORTE_JOB_STATE_NOTIFIED,
  93     ORTE_JOB_STATE_ALL_JOBS_COMPLETE
  94 };
  95 static orte_state_cbfunc_t launch_callbacks[] = {
  96     orte_plm_base_setup_job,
  97     orte_plm_base_setup_job_complete,
  98     orte_ras_base_allocate,
  99     orte_plm_base_allocation_complete,
 100     orte_plm_base_daemons_launched,
 101     orte_plm_base_daemons_reported,
 102     orte_plm_base_vm_ready,
 103     orte_rmaps_base_map_job,
 104     orte_plm_base_mapping_complete,
 105     orte_plm_base_complete_setup,
 106     orte_plm_base_launch_apps,
 107     orte_plm_base_send_launch_msg,
 108     orte_state_base_local_launch_complete,
 109     orte_plm_base_post_launch,
 110     orte_plm_base_registered,
 111     orte_state_base_check_all_complete,
 112     hnp_notify,
 113     orte_state_base_cleanup_job,
 114     orte_quit
 115 };
 116 
 117 static orte_proc_state_t proc_states[] = {
 118     ORTE_PROC_STATE_RUNNING,
 119     ORTE_PROC_STATE_REGISTERED,
 120     ORTE_PROC_STATE_IOF_COMPLETE,
 121     ORTE_PROC_STATE_WAITPID_FIRED,
 122     ORTE_PROC_STATE_TERMINATED
 123 };
 124 static orte_state_cbfunc_t proc_callbacks[] = {
 125     orte_state_base_track_procs,
 126     orte_state_base_track_procs,
 127     orte_state_base_track_procs,
 128     orte_state_base_track_procs,
 129     orte_state_base_track_procs
 130 };
 131 
 132 static void force_quit(int fd, short args, void *cbdata)
 133 {
 134     orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
 135 
 136     
 137     orte_plm.terminate_orteds();
 138     OBJ_RELEASE(caddy);
 139 }
 140 
 141 
 142 
 143 
 144 static int init(void)
 145 {
 146     int i, rc;
 147     int num_states;
 148 
 149     
 150     OBJ_CONSTRUCT(&orte_job_states, opal_list_t);
 151     OBJ_CONSTRUCT(&orte_proc_states, opal_list_t);
 152 
 153     
 154     num_states = sizeof(launch_states) / sizeof(orte_job_state_t);
 155     for (i=0; i < num_states; i++) {
 156         if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i],
 157                                                            launch_callbacks[i],
 158                                                            ORTE_SYS_PRI))) {
 159             ORTE_ERROR_LOG(rc);
 160         }
 161     }
 162     
 163     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_DAEMONS_TERMINATED,
 164                                                        orte_quit, ORTE_SYS_PRI))) {
 165         ORTE_ERROR_LOG(rc);
 166     }
 167     
 168     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT,
 169                                                        force_quit, ORTE_ERROR_PRI))) {
 170         ORTE_ERROR_LOG(rc);
 171     }
 172     
 173     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS,
 174                                                        orte_state_base_report_progress, ORTE_ERROR_PRI))) {
 175         ORTE_ERROR_LOG(rc);
 176     }
 177     if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
 178         orte_state_base_print_job_state_machine();
 179     }
 180 
 181     
 182 
 183 
 184     num_states = sizeof(proc_states) / sizeof(orte_proc_state_t);
 185     for (i=0; i < num_states; i++) {
 186         if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i],
 187                                                             proc_callbacks[i],
 188                                                             ORTE_SYS_PRI))) {
 189             ORTE_ERROR_LOG(rc);
 190         }
 191     }
 192     if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
 193         orte_state_base_print_proc_state_machine();
 194     }
 195 
 196     return ORTE_SUCCESS;
 197 }
 198 
 199 static int finalize(void)
 200 {
 201     
 202     OPAL_LIST_DESTRUCT(&orte_proc_states);
 203     
 204     OPAL_LIST_DESTRUCT(&orte_job_states);
 205 
 206     return ORTE_SUCCESS;
 207 }
 208 
 209 static void _send_notification(int status,
 210                                orte_proc_state_t state,
 211                                orte_process_name_t *proc,
 212                                orte_process_name_t *target)
 213 {
 214     opal_buffer_t *buf;
 215     orte_grpcomm_signature_t sig;
 216     int rc;
 217     opal_value_t kv, *kvptr;
 218     orte_process_name_t daemon;
 219 
 220     buf = OBJ_NEW(opal_buffer_t);
 221 
 222     opal_output_verbose(5, orte_state_base_framework.framework_output,
 223                         "%s state:hnp:sending notification %s proc %s target %s",
 224                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 225                         ORTE_ERROR_NAME(status),
 226                         ORTE_NAME_PRINT(proc),
 227                         ORTE_NAME_PRINT(target));
 228 
 229     
 230     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
 231         ORTE_ERROR_LOG(rc);
 232         OBJ_RELEASE(buf);
 233         return;
 234     }
 235 
 236     
 237     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, proc, 1, ORTE_NAME))) {
 238         ORTE_ERROR_LOG(rc);
 239         OBJ_RELEASE(buf);
 240         return;
 241     }
 242 
 243     if (OPAL_ERR_PROC_ABORTED == status) {
 244         
 245         rc = 3;
 246         if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
 247             ORTE_ERROR_LOG(rc);
 248             OBJ_RELEASE(buf);
 249             return;
 250         }
 251         
 252         OBJ_CONSTRUCT(&kv, opal_value_t);
 253         kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
 254         kv.type = OPAL_NAME;
 255         kv.data.name.jobid = proc->jobid;
 256         kv.data.name.vpid = proc->vpid;
 257         kvptr = &kv;
 258         if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
 259             ORTE_ERROR_LOG(rc);
 260             OBJ_DESTRUCT(&kv);
 261             OBJ_RELEASE(buf);
 262             return;
 263         }
 264         OBJ_DESTRUCT(&kv);
 265     } else {
 266         
 267         rc = 2;
 268         if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
 269             ORTE_ERROR_LOG(rc);
 270             OBJ_RELEASE(buf);
 271             return;
 272         }
 273     }
 274 
 275     
 276     OBJ_CONSTRUCT(&kv, opal_value_t);
 277     kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
 278     kv.type = OPAL_NAME;
 279     kv.data.name.jobid = proc->jobid;
 280     kv.data.name.vpid = proc->vpid;
 281     kvptr = &kv;
 282     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
 283         ORTE_ERROR_LOG(rc);
 284         OBJ_DESTRUCT(&kv);
 285         OBJ_RELEASE(buf);
 286         return;
 287     }
 288     OBJ_DESTRUCT(&kv);
 289 
 290     
 291     OBJ_CONSTRUCT(&kv, opal_value_t);
 292     kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE);
 293     kv.type = OPAL_NAME;
 294     kv.data.name.jobid = target->jobid;
 295     kv.data.name.vpid = target->vpid;
 296     kvptr = &kv;
 297     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
 298         ORTE_ERROR_LOG(rc);
 299         OBJ_DESTRUCT(&kv);
 300         OBJ_RELEASE(buf);
 301         return;
 302     }
 303     OBJ_DESTRUCT(&kv);
 304 
 305     
 306     if (ORTE_VPID_WILDCARD == target->vpid) {
 307         OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t);
 308         sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
 309         sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
 310         sig.signature[0].vpid = ORTE_VPID_WILDCARD;
 311         sig.sz = 1;
 312 
 313         if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
 314             ORTE_ERROR_LOG(rc);
 315         }
 316         OBJ_DESTRUCT(&sig);
 317         OBJ_RELEASE(buf);
 318     } else {
 319         
 320         daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 321         daemon.vpid = orte_get_proc_daemon_vpid(target);
 322         
 323         opal_output_verbose(5, orte_state_base_framework.framework_output,
 324                             "%s state:base:sending notification %s to proc %s at daemon %s",
 325                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 326                             ORTE_ERROR_NAME(status),
 327                             ORTE_NAME_PRINT(target),
 328                             ORTE_NAME_PRINT(&daemon));
 329         if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&daemon, buf,
 330                                                           ORTE_RML_TAG_NOTIFICATION,
 331                                                           orte_rml_send_callback, NULL))) {
 332             ORTE_ERROR_LOG(rc);
 333             OBJ_RELEASE(buf);
 334         }
 335     }
 336 }
 337 
 338 static void hnp_notify(int sd, short args, void *cbdata)
 339 {
 340     orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
 341     orte_job_t *jdata = caddy->jdata;
 342     orte_process_name_t parent, target, *npptr;
 343 
 344     
 345     if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) {
 346         
 347 
 348         npptr = &parent;
 349         if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) {
 350             
 351             target.jobid = jdata->jobid;
 352             target.vpid = ORTE_VPID_WILDCARD;
 353             _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, ORTE_NAME_WILDCARD);
 354         } else {
 355             target.jobid = jdata->jobid;
 356             target.vpid = ORTE_VPID_WILDCARD;
 357             _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, &parent);
 358         }
 359     }
 360     ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFIED);
 361 
 362     OBJ_RELEASE(caddy);
 363 }