root/orte/mca/state/orted/state_orted.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. init
  2. finalize
  3. track_jobs
  4. track_procs
  5. pack_state_for_proc
  6. pack_state_update

   1 /*
   2  * Copyright (c) 2011-2017 Los Alamos National Security, LLC.
   3  *                         All rights reserved.
   4  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   5  * $COPYRIGHT$
   6  *
   7  * Additional copyrights may follow
   8  *
   9  * $HEADER$
  10  */
  11 
  12 #include "orte_config.h"
  13 
  14 #include <sys/types.h>
  15 #ifdef HAVE_UNISTD_H
  16 #include <unistd.h>
  17 #endif  /* HAVE_UNISTD_H */
  18 #include <string.h>
  19 
  20 #include "opal/util/output.h"
  21 #include "opal/dss/dss.h"
  22 #include "opal/mca/pmix/pmix.h"
  23 
  24 #include "orte/mca/errmgr/errmgr.h"
  25 #include "orte/mca/iof/base/base.h"
  26 #include "orte/mca/odls/base/base.h"
  27 #include "orte/mca/rmaps/rmaps_types.h"
  28 #include "orte/mca/rml/rml.h"
  29 #include "orte/mca/routed/routed.h"
  30 #include "orte/util/session_dir.h"
  31 #include "orte/util/threads.h"
  32 #include "orte/orted/pmix/pmix_server_internal.h"
  33 #include "orte/runtime/orte_data_server.h"
  34 #include "orte/runtime/orte_quit.h"
  35 
  36 #include "orte/mca/state/state.h"
  37 #include "orte/mca/state/base/base.h"
  38 #include "orte/mca/state/base/state_private.h"
  39 #include "state_orted.h"
  40 
  41 /*
  42  * Module functions: Global
  43  */
  44 static int init(void);
  45 static int finalize(void);
  46 
  47 /******************
  48  * ORTED module
  49  ******************/
  50 orte_state_base_module_t orte_state_orted_module = {
  51     init,
  52     finalize,
  53     orte_state_base_activate_job_state,
  54     orte_state_base_add_job_state,
  55     orte_state_base_set_job_state_callback,
  56     orte_state_base_set_job_state_priority,
  57     orte_state_base_remove_job_state,
  58     orte_state_base_activate_proc_state,
  59     orte_state_base_add_proc_state,
  60     orte_state_base_set_proc_state_callback,
  61     orte_state_base_set_proc_state_priority,
  62     orte_state_base_remove_proc_state
  63 };
  64 
  65 /* Local functions */
  66 static void track_jobs(int fd, short argc, void *cbdata);
  67 static void track_procs(int fd, short argc, void *cbdata);
  68 static int pack_state_update(opal_buffer_t *buf, orte_job_t *jdata);
  69 
  70 /* defined default state machines */
  71 static orte_job_state_t job_states[] = {
  72     ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE,
  73 };
  74 static orte_state_cbfunc_t job_callbacks[] = {
  75     track_jobs
  76 };
  77 
  78 static orte_proc_state_t proc_states[] = {
  79     ORTE_PROC_STATE_RUNNING,
  80     ORTE_PROC_STATE_REGISTERED,
  81     ORTE_PROC_STATE_IOF_COMPLETE,
  82     ORTE_PROC_STATE_WAITPID_FIRED,
  83     ORTE_PROC_STATE_TERMINATED
  84 };
  85 static orte_state_cbfunc_t proc_callbacks[] = {
  86     track_procs,
  87     track_procs,
  88     track_procs,
  89     track_procs,
  90     track_procs
  91 };
  92 
  93 /************************
  94  * API Definitions
  95  ************************/
  96 static int init(void)
  97 {
  98     int num_states, i, rc;
  99 
 100     /* setup the state machine */
 101     OBJ_CONSTRUCT(&orte_job_states, opal_list_t);
 102     OBJ_CONSTRUCT(&orte_proc_states, opal_list_t);
 103 
 104     num_states = sizeof(job_states) / sizeof(orte_job_state_t);
 105     for (i=0; i < num_states; i++) {
 106         if (ORTE_SUCCESS != (rc = orte_state.add_job_state(job_states[i],
 107                                                            job_callbacks[i],
 108                                                            ORTE_SYS_PRI))) {
 109             ORTE_ERROR_LOG(rc);
 110         }
 111     }
 112     /* add a default error response */
 113     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT,
 114                                                        orte_quit, ORTE_ERROR_PRI))) {
 115         ORTE_ERROR_LOG(rc);
 116     }
 117     /* add a state for when we are ordered to terminate */
 118     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_DAEMONS_TERMINATED,
 119                                                        orte_quit, ORTE_SYS_PRI))) {
 120         ORTE_ERROR_LOG(rc);
 121     }
 122     if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
 123         orte_state_base_print_job_state_machine();
 124     }
 125 
 126     /* populate the proc state machine to allow us to
 127      * track proc lifecycle changes
 128      */
 129     num_states = sizeof(proc_states) / sizeof(orte_proc_state_t);
 130     for (i=0; i < num_states; i++) {
 131         if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i],
 132                                                             proc_callbacks[i],
 133                                                             ORTE_SYS_PRI))) {
 134             ORTE_ERROR_LOG(rc);
 135         }
 136     }
 137     if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
 138         orte_state_base_print_proc_state_machine();
 139     }
 140     return ORTE_SUCCESS;
 141 }
 142 
 143 static int finalize(void)
 144 {
 145     opal_list_item_t *item;
 146 
 147     /* cleanup the state machines */
 148     while (NULL != (item = opal_list_remove_first(&orte_job_states))) {
 149         OBJ_RELEASE(item);
 150     }
 151     OBJ_DESTRUCT(&orte_job_states);
 152     while (NULL != (item = opal_list_remove_first(&orte_proc_states))) {
 153         OBJ_RELEASE(item);
 154     }
 155     OBJ_DESTRUCT(&orte_proc_states);
 156 
 157     return ORTE_SUCCESS;
 158 }
 159 
 160 static void track_jobs(int fd, short argc, void *cbdata)
 161 {
 162     orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
 163     opal_buffer_t *alert;
 164     orte_plm_cmd_flag_t cmd;
 165     int rc, i;
 166     orte_proc_state_t running = ORTE_PROC_STATE_RUNNING;
 167     orte_proc_t *child;
 168     orte_vpid_t null=ORTE_VPID_INVALID;
 169 
 170     ORTE_ACQUIRE_OBJECT(caddy);
 171 
 172     if (ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE == caddy->job_state) {
 173         OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
 174                             "%s state:orted:track_jobs sending local launch complete for job %s",
 175                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 176                             ORTE_JOBID_PRINT(caddy->jdata->jobid)));
 177         /* update the HNP with all proc states for this job */
 178        alert = OBJ_NEW(opal_buffer_t);
 179          /* pack update state command */
 180         cmd = ORTE_PLM_UPDATE_PROC_STATE;
 181         if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
 182             ORTE_ERROR_LOG(rc);
 183             OBJ_RELEASE(alert);
 184             goto cleanup;
 185         }
 186         /* pack the jobid */
 187         if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &caddy->jdata->jobid, 1, ORTE_JOBID))) {
 188             ORTE_ERROR_LOG(rc);
 189             OBJ_RELEASE(alert);
 190             goto cleanup;
 191         }
 192         for (i=0; i < orte_local_children->size; i++) {
 193             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 194                 continue;
 195             }
 196             /* if this child is part of the job... */
 197             if (child->name.jobid == caddy->jdata->jobid) {
 198                 /* pack the child's vpid */
 199                 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &(child->name.vpid), 1, ORTE_VPID))) {
 200                     ORTE_ERROR_LOG(rc);
 201                     OBJ_RELEASE(alert);
 202                     goto cleanup;
 203                 }
 204                 /* pack the pid */
 205                 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->pid, 1, OPAL_PID))) {
 206                     ORTE_ERROR_LOG(rc);
 207                     OBJ_RELEASE(alert);
 208                     goto cleanup;
 209                 }
 210                 /* if this proc failed to start, then send that info */
 211                 if (ORTE_PROC_STATE_UNTERMINATED < child->state) {
 212                     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->state, 1, ORTE_PROC_STATE))) {
 213                         ORTE_ERROR_LOG(rc);
 214                         OBJ_RELEASE(alert);
 215                         goto cleanup;
 216                     }
 217                 } else {
 218                     /* pack the RUNNING state to avoid any race conditions */
 219                     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &running, 1, ORTE_PROC_STATE))) {
 220                         ORTE_ERROR_LOG(rc);
 221                         OBJ_RELEASE(alert);
 222                         goto cleanup;
 223                     }
 224                 }
 225                 /* pack its exit code */
 226                 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->exit_code, 1, ORTE_EXIT_CODE))) {
 227                     ORTE_ERROR_LOG(rc);
 228                     OBJ_RELEASE(alert);
 229                     goto cleanup;
 230                 }
 231             }
 232         }
 233 
 234         /* flag that this job is complete so the receiver can know */
 235         if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &null, 1, ORTE_VPID))) {
 236             ORTE_ERROR_LOG(rc);
 237             OBJ_RELEASE(alert);
 238             goto cleanup;
 239         }
 240 
 241         /* send it */
 242         if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
 243                                               ORTE_RML_TAG_PLM,
 244                                               orte_rml_send_callback, NULL))) {
 245             ORTE_ERROR_LOG(rc);
 246             OBJ_RELEASE(alert);
 247         }
 248     }
 249 
 250   cleanup:
 251     OBJ_RELEASE(caddy);
 252 }
 253 
 254 static void track_procs(int fd, short argc, void *cbdata)
 255 {
 256     orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
 257     orte_process_name_t *proc;
 258     orte_proc_state_t state;
 259     orte_job_t *jdata;
 260     orte_proc_t *pdata, *pptr;
 261     opal_buffer_t *alert;
 262     int rc, i;
 263     orte_plm_cmd_flag_t cmd;
 264     orte_std_cntr_t index;
 265     orte_job_map_t *map;
 266     orte_node_t *node;
 267     orte_process_name_t target;
 268 
 269     ORTE_ACQUIRE_OBJECT(caddy);
 270     proc = &caddy->name;
 271     state = caddy->proc_state;
 272 
 273     OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
 274                          "%s state:orted:track_procs called for proc %s state %s",
 275                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 276                          ORTE_NAME_PRINT(proc),
 277                          orte_proc_state_to_str(state)));
 278 
 279     /* get the job object for this proc */
 280     if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
 281         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 282         goto cleanup;
 283     }
 284     pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
 285 
 286     if (ORTE_PROC_STATE_RUNNING == state) {
 287         /* update the proc state */
 288         pdata->state = state;
 289         jdata->num_launched++;
 290         if (jdata->num_launched == jdata->num_local_procs) {
 291             /* tell the state machine that all local procs for this job
 292              * were launched so that it can do whatever it needs to do,
 293              * like send a state update message for all procs to the HNP
 294              */
 295             ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);
 296         }
 297         /* don't update until we are told that all are done */
 298     } else if (ORTE_PROC_STATE_REGISTERED == state) {
 299         /* update the proc state */
 300         pdata->state = state;
 301         jdata->num_reported++;
 302         if (jdata->num_reported == jdata->num_local_procs) {
 303             /* once everyone registers, notify the HNP */
 304 
 305             OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
 306                                  "%s state:orted: notifying HNP all local registered",
 307                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 308 
 309             alert = OBJ_NEW(opal_buffer_t);
 310             /* pack registered command */
 311             cmd = ORTE_PLM_REGISTERED_CMD;
 312             if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
 313                 ORTE_ERROR_LOG(rc);
 314                 goto cleanup;
 315             }
 316             /* pack the jobid */
 317             if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &proc->jobid, 1, ORTE_JOBID))) {
 318                 ORTE_ERROR_LOG(rc);
 319                 goto cleanup;
 320             }
 321             /* pack all the local child vpids */
 322             for (i=0; i < orte_local_children->size; i++) {
 323                 if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 324                     continue;
 325                 }
 326                 if (pptr->name.jobid == proc->jobid) {
 327                     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &pptr->name.vpid, 1, ORTE_VPID))) {
 328                         ORTE_ERROR_LOG(rc);
 329                         goto cleanup;
 330                     }
 331                 }
 332             }
 333             /* send it */
 334             if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
 335                                                   ORTE_RML_TAG_PLM,
 336                                                   orte_rml_send_callback, NULL))) {
 337                 ORTE_ERROR_LOG(rc);
 338             } else {
 339                 rc = ORTE_SUCCESS;
 340             }
 341         }
 342     } else if (ORTE_PROC_STATE_IOF_COMPLETE == state) {
 343         /* do NOT update the proc state as this can hit
 344          * while we are still trying to notify the HNP of
 345          * successful launch for short-lived procs
 346          */
 347         ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_IOF_COMPLETE);
 348         /* Release the stdin IOF file descriptor for this child, if one
 349          * was defined. File descriptors for the other IOF channels - stdout,
 350          * stderr, and stddiag - were released when their associated pipes
 351          * were cleared and closed due to termination of the process
 352          * Do this after we handle termination in case the IOF needs
 353          * to check to see if all procs from the job are actually terminated
 354          */
 355         if (NULL != orte_iof.close) {
 356             orte_iof.close(proc, ORTE_IOF_STDALL);
 357         }
 358         if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_WAITPID) &&
 359             !ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_RECORDED)) {
 360             ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED);
 361         }
 362     } else if (ORTE_PROC_STATE_WAITPID_FIRED == state) {
 363         /* do NOT update the proc state as this can hit
 364          * while we are still trying to notify the HNP of
 365          * successful launch for short-lived procs
 366          */
 367         ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID);
 368         if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE) &&
 369             !ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_RECORDED)) {
 370             ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED);
 371         }
 372     } else if (ORTE_PROC_STATE_TERMINATED == state) {
 373         /* if this proc has not already recorded as terminated, then
 374          * update the accounting here */
 375         if (!ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_RECORDED)) {
 376             jdata->num_terminated++;
 377         }
 378         /* update the proc state */
 379         ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_RECORDED);
 380         ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE);
 381         pdata->state = state;
 382         /* Clean up the session directory as if we were the process
 383          * itself.  This covers the case where the process died abnormally
 384          * and didn't cleanup its own session directory.
 385          */
 386         orte_session_dir_finalize(proc);
 387         /* if we are trying to terminate and our routes are
 388          * gone, then terminate ourselves IF no local procs
 389          * remain (might be some from another job)
 390          */
 391         if (orte_orteds_term_ordered &&
 392             0 == orte_routed.num_routes()) {
 393             for (i=0; i < orte_local_children->size; i++) {
 394                 if (NULL != (pdata = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
 395                     ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_ALIVE)) {
 396                     /* at least one is still alive */
 397                     OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
 398                                          "%s state:orted all routes gone but proc %s still alive",
 399                                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 400                                          ORTE_NAME_PRINT(&pdata->name)));
 401                     goto cleanup;
 402                 }
 403             }
 404             /* call our appropriate exit procedure */
 405             OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
 406                                  "%s state:orted all routes and children gone - exiting",
 407                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 408             ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
 409             goto cleanup;
 410         }
 411         /* track job status */
 412         if (jdata->num_terminated == jdata->num_local_procs &&
 413             !orte_get_attribute(&jdata->attributes, ORTE_JOB_TERM_NOTIFIED, NULL, OPAL_BOOL)) {
 414             /* pack update state command */
 415             cmd = ORTE_PLM_UPDATE_PROC_STATE;
 416             alert = OBJ_NEW(opal_buffer_t);
 417             if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
 418                 ORTE_ERROR_LOG(rc);
 419                 goto cleanup;
 420             }
 421             /* pack the job info */
 422             if (ORTE_SUCCESS != (rc = pack_state_update(alert, jdata))) {
 423                 ORTE_ERROR_LOG(rc);
 424             }
 425             /* send it */
 426             OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
 427                                  "%s state:orted: SENDING JOB LOCAL TERMINATION UPDATE FOR JOB %s",
 428                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 429                                  ORTE_JOBID_PRINT(jdata->jobid)));
 430             if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
 431                                                   ORTE_RML_TAG_PLM,
 432                                                   orte_rml_send_callback, NULL))) {
 433                 ORTE_ERROR_LOG(rc);
 434             }
 435             /* mark that we sent it so we ensure we don't do it again */
 436             orte_set_attribute(&jdata->attributes, ORTE_JOB_TERM_NOTIFIED, ORTE_ATTR_LOCAL, NULL, OPAL_BOOL);
 437             /* cleanup the procs as these are gone */
 438             for (i=0; i < orte_local_children->size; i++) {
 439                 if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 440                     continue;
 441                 }
 442                 /* if this child is part of the job... */
 443                 if (pptr->name.jobid == jdata->jobid) {
 444                     /* clear the entry in the local children */
 445                     opal_pointer_array_set_item(orte_local_children, i, NULL);
 446                     OBJ_RELEASE(pptr);  // maintain accounting
 447                 }
 448             }
 449             /* tell the IOF that the job is complete */
 450             if (NULL != orte_iof.complete) {
 451                 orte_iof.complete(jdata);
 452             }
 453 
 454             /* tell the PMIx subsystem the job is complete */
 455             if (NULL != opal_pmix.server_deregister_nspace) {
 456                 opal_pmix.server_deregister_nspace(jdata->jobid, NULL, NULL);
 457             }
 458 
 459             /* release the resources */
 460             if (NULL != jdata->map) {
 461                 map = jdata->map;
 462                 for (index = 0; index < map->nodes->size; index++) {
 463                     if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) {
 464                         continue;
 465                     }
 466                     OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
 467                                          "%s state:orted releasing procs from node %s",
 468                                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 469                                          node->name));
 470                     for (i = 0; i < node->procs->size; i++) {
 471                         if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
 472                             continue;
 473                         }
 474                         if (pptr->name.jobid != jdata->jobid) {
 475                             /* skip procs from another job */
 476                             continue;
 477                         }
 478                         if (!ORTE_FLAG_TEST(pptr, ORTE_PROC_FLAG_TOOL)) {
 479                             node->slots_inuse--;
 480                             node->num_procs--;
 481                         }
 482                         OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
 483                                              "%s state:orted releasing proc %s from node %s",
 484                                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 485                                              ORTE_NAME_PRINT(&pptr->name), node->name));
 486                         /* set the entry in the node array to NULL */
 487                         opal_pointer_array_set_item(node->procs, i, NULL);
 488                         /* release the proc once for the map entry */
 489                         OBJ_RELEASE(pptr);
 490                     }
 491                     /* set the node location to NULL */
 492                     opal_pointer_array_set_item(map->nodes, index, NULL);
 493                     /* maintain accounting */
 494                     OBJ_RELEASE(node);
 495                     /* flag that the node is no longer in a map */
 496                     ORTE_FLAG_UNSET(node, ORTE_NODE_FLAG_MAPPED);
 497                 }
 498                 OBJ_RELEASE(map);
 499                 jdata->map = NULL;
 500             }
 501 
 502             /* if requested, check fd status for leaks */
 503             if (orte_state_base_run_fdcheck) {
 504                 orte_state_base_check_fds(jdata);
 505             }
 506 
 507             /* if ompi-server is around, then notify it to purge
 508              * any session-related info */
 509             if (NULL != orte_data_server_uri) {
 510                 target.jobid = jdata->jobid;
 511                 target.vpid = ORTE_VPID_WILDCARD;
 512                 orte_state_base_notify_data_server(&target);
 513             }
 514 
 515             /* cleanup the job info */
 516             opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
 517             OBJ_RELEASE(jdata);
 518         }
 519     }
 520 
 521   cleanup:
 522     OBJ_RELEASE(caddy);
 523 }
 524 
 525 static int pack_state_for_proc(opal_buffer_t *alert, orte_proc_t *child)
 526 {
 527     int rc;
 528 
 529     /* pack the child's vpid */
 530     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &(child->name.vpid), 1, ORTE_VPID))) {
 531         ORTE_ERROR_LOG(rc);
 532         return rc;
 533     }
 534     /* pack the pid */
 535     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->pid, 1, OPAL_PID))) {
 536         ORTE_ERROR_LOG(rc);
 537         return rc;
 538     }
 539     /* pack its state */
 540     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->state, 1, ORTE_PROC_STATE))) {
 541         ORTE_ERROR_LOG(rc);
 542         return rc;
 543     }
 544     /* pack its exit code */
 545     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->exit_code, 1, ORTE_EXIT_CODE))) {
 546         ORTE_ERROR_LOG(rc);
 547         return rc;
 548     }
 549 
 550     return ORTE_SUCCESS;
 551 }
 552 
 553 static int pack_state_update(opal_buffer_t *alert, orte_job_t *jdata)
 554 {
 555     int i, rc;
 556     orte_proc_t *child;
 557     orte_vpid_t null=ORTE_VPID_INVALID;
 558 
 559     /* pack the jobid */
 560     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &jdata->jobid, 1, ORTE_JOBID))) {
 561         ORTE_ERROR_LOG(rc);
 562         return rc;
 563     }
 564     for (i=0; i < orte_local_children->size; i++) {
 565         if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 566             continue;
 567         }
 568         /* if this child is part of the job... */
 569         if (child->name.jobid == jdata->jobid) {
 570             if (ORTE_SUCCESS != (rc = pack_state_for_proc(alert, child))) {
 571                 ORTE_ERROR_LOG(rc);
 572                 return rc;
 573             }
 574         }
 575     }
 576     /* flag that this job is complete so the receiver can know */
 577     if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &null, 1, ORTE_VPID))) {
 578         ORTE_ERROR_LOG(rc);
 579         return rc;
 580     }
 581 
 582     return ORTE_SUCCESS;
 583 }

/* [<][>][^][v][top][bottom][index][help] */