root/orte/util/comm/comm.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. quicktime_cb
  2. send_cbfunc
  3. recv_info
  4. orte_util_comm_connect_tool
  5. orte_util_comm_report_event
  6. orte_util_comm_query_job_info
  7. orte_util_comm_query_node_info
  8. orte_util_comm_query_proc_info
  9. comm_cbfunc
  10. orte_util_comm_spawn_job
  11. orte_util_comm_terminate_job
  12. orte_util_comm_halt_vm

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2010-2012 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 #include "orte_config.h"
  22 #include "orte/types.h"
  23 #include "orte/constants.h"
  24 
  25 #include <stdio.h>
  26 #include <string.h>
  27 
  28 #include "opal/util/output.h"
  29 #include "opal/threads/tsd.h"
  30 #include "opal/mca/event/event.h"
  31 #include "opal/mca/pmix/pmix.h"
  32 #include "opal/runtime/opal_progress.h"
  33 
  34 #include "opal/dss/dss.h"
  35 #include "orte/mca/errmgr/errmgr.h"
  36 #include "orte/mca/odls/odls_types.h"
  37 #include "orte/mca/rml/rml.h"
  38 #include "orte/mca/rml/rml_types.h"
  39 #include "orte/mca/rml/base/rml_contact.h"
  40 #include "orte/mca/routed/routed.h"
  41 #include "orte/util/name_fns.h"
  42 #include "orte/util/threads.h"
  43 #include "orte/runtime/orte_globals.h"
  44 #include "orte/runtime/orte_wait.h"
  45 
  46 #include "orte/util/comm/comm.h"
  47 
  48 /* internal communication handshake */
  49 /* quick timeout loop */
  50 static bool timer_fired;
  51 static opal_buffer_t answer;
  52 static opal_event_t *quicktime=NULL;
  53 static int error_exit;
  54 
  55 static void quicktime_cb(int fd, short event, void *cbdata)
  56 {
  57     /* release the timer */
  58     if (NULL != quicktime) {
  59         opal_event_free(quicktime);
  60         quicktime = NULL;
  61     }
  62 
  63     /* cancel the recv */
  64     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_TOOL);
  65 
  66     error_exit = ORTE_ERR_SILENT;
  67     /* declare it fired */
  68     timer_fired = true;
  69 }
  70 
  71 static void send_cbfunc(int status, orte_process_name_t* sender,
  72                         opal_buffer_t* buffer, orte_rml_tag_t tag,
  73                         void* cbdata)
  74 {
  75     /* cancel the timer */
  76     if (NULL != quicktime) {
  77         opal_event_free(quicktime);
  78         quicktime = NULL;
  79     }
  80     /* declare the work done */
  81     timer_fired = true;
  82     /* release the message */
  83     OBJ_RELEASE(buffer);
  84 }
  85 
  86 static void recv_info(int status, orte_process_name_t* sender,
  87                       opal_buffer_t* buffer, orte_rml_tag_t tag,
  88                       void* cbdata)
  89 {
  90     int rc;
  91 
  92     /* cancel the timer */
  93     if (NULL != quicktime) {
  94         opal_event_free (quicktime);
  95         quicktime = NULL;
  96     }
  97     /* xfer the answer */
  98     if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&answer, buffer))) {
  99         ORTE_ERROR_LOG(rc);
 100     }
 101     /* declare the work done */
 102     timer_fired = true;
 103 }
 104 
 105 
 106 /* name of attached tool */
 107 static orte_process_name_t tool;
 108 static bool tool_connected = false;
 109 
 110 /* connect a tool to us so we can send reports */
 111 int orte_util_comm_connect_tool(char *uri)
 112 {
 113     int rc;
 114     opal_value_t val;
 115 
 116     /* extract the tool's name and store it */
 117     if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(uri, &tool, NULL))) {
 118         ORTE_ERROR_LOG(rc);
 119         return rc;
 120     }
 121 
 122     /* set the contact info into the comm hash tables*/
 123     OBJ_CONSTRUCT(&val, opal_value_t);
 124     val.key = OPAL_PMIX_PROC_URI;
 125     val.type = OPAL_STRING;
 126     val.data.string = uri;
 127     if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&tool, &val))) {
 128         ORTE_ERROR_LOG(rc);
 129         val.key = NULL;
 130         val.data.string = NULL;
 131         OBJ_DESTRUCT(&val);
 132         return rc;
 133     }
 134     val.key = NULL;
 135     val.data.string = NULL;
 136     OBJ_DESTRUCT(&val);
 137 
 138     /* set the route to be direct */
 139     if (ORTE_SUCCESS != (rc = orte_routed.update_route(&tool, &tool))) {
 140         ORTE_ERROR_LOG(rc);
 141         return rc;
 142     }
 143 
 144     tool_connected = true;
 145     return ORTE_SUCCESS;
 146 }
 147 
 148 /* whether we are in step mode */
 149 static bool step=false;
 150 
 151 /* report an event to a connected tool */
 152 int orte_util_comm_report_event(orte_comm_event_t ev)
 153 {
 154     int rc, i;
 155     opal_buffer_t *buf;
 156     orte_node_t *node;
 157     struct timeval tv;
 158 
 159     /* if nothing is connected, ignore this */
 160     if (!tool_connected) {
 161         return ORTE_SUCCESS;
 162     }
 163 
 164     /* init a buffer for the data */
 165     buf = OBJ_NEW(opal_buffer_t);
 166     /* flag the type of event */
 167     opal_dss.pack(buf, &ev, 1, ORTE_COMM_EVENT);
 168 
 169     switch (ev) {
 170         case ORTE_COMM_EVENT_ALLOCATE:
 171             /* loop through nodes, storing just node names */
 172             for (i=0; i < orte_node_pool->size; i++) {
 173                 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
 174                     continue;
 175                 }
 176                 opal_dss.pack(buf, &node->name, 1, OPAL_STRING);
 177             }
 178             break;
 179 
 180         case ORTE_COMM_EVENT_MAP:
 181             break;
 182 
 183         case ORTE_COMM_EVENT_LAUNCH:
 184             break;
 185 
 186         default:
 187             ORTE_ERROR_LOG(ORTE_ERROR);
 188             OBJ_RELEASE(buf);
 189             return ORTE_ERROR;
 190             break;
 191     }
 192 
 193    /* define a max time to wait for send to complete */
 194     timer_fired = false;
 195     error_exit = ORTE_SUCCESS;
 196     quicktime = opal_event_alloc();
 197     tv.tv_sec = 0;
 198     tv.tv_usec = 100000;
 199     opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 200     opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 201     opal_event_evtimer_add(quicktime, &tv);
 202 
 203     /* do the send */
 204     if (0 > (rc = orte_rml.send_buffer_nb(&tool, buf, ORTE_RML_TAG_TOOL, send_cbfunc, NULL))) {
 205         ORTE_ERROR_LOG(rc);
 206         OBJ_RELEASE(buf);
 207         return rc;
 208     }
 209 
 210     while (!timer_fired) {
 211         opal_progress();
 212     }
 213 
 214     if (ORTE_SUCCESS != error_exit) {
 215         return error_exit;
 216     }
 217 
 218     if (step) {
 219         /* the caller wants to wait until an ack is received -
 220          * define a max time to wait for an answer
 221          */
 222         OBJ_CONSTRUCT(&answer, opal_buffer_t);
 223         timer_fired = false;
 224         error_exit = ORTE_SUCCESS;
 225 
 226         /* get the answer */
 227         orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 228                                 ORTE_RML_TAG_TOOL,
 229                                 ORTE_RML_NON_PERSISTENT,
 230                                 recv_info,
 231                                 NULL);
 232         /* set a timer for getting the answer */
 233         quicktime = opal_event_alloc();
 234         tv.tv_sec = 0;
 235         tv.tv_usec = 100000;
 236         opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 237         opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 238         opal_event_evtimer_add(quicktime, &tv);
 239 
 240         while (!timer_fired) {
 241             opal_progress();
 242         }
 243 
 244         /* cleanup */
 245         OBJ_DESTRUCT(&answer);
 246 
 247         if (ORTE_SUCCESS != error_exit) {
 248             return error_exit;
 249         }
 250     }
 251 
 252     return ORTE_SUCCESS;
 253 }
 254 
 255 
 256 int orte_util_comm_query_job_info(const orte_process_name_t *hnp, orte_jobid_t job,
 257                                   int *num_jobs, orte_job_t ***job_info_array)
 258 {
 259     int ret;
 260     int32_t cnt, cnt_jobs, n;
 261     opal_buffer_t *cmd;
 262     orte_daemon_cmd_flag_t command = ORTE_DAEMON_REPORT_JOB_INFO_CMD;
 263     orte_job_t **job_info;
 264     struct timeval tv;
 265 
 266     /* set default response */
 267     *num_jobs = 0;
 268     *job_info_array = NULL;
 269 
 270     /* send query to HNP */
 271     cmd = OBJ_NEW(opal_buffer_t);
 272     if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &command, 1, ORTE_DAEMON_CMD))) {
 273         ORTE_ERROR_LOG(ret);
 274         OBJ_RELEASE(cmd);
 275         return ret;
 276     }
 277     if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &job, 1, ORTE_JOBID))) {
 278         ORTE_ERROR_LOG(ret);
 279         OBJ_RELEASE(cmd);
 280         return ret;
 281     }
 282 
 283    /* define a max time to wait for send to complete */
 284     timer_fired = false;
 285     error_exit = ORTE_SUCCESS;
 286     quicktime = opal_event_alloc();
 287     tv.tv_sec = 0;
 288     tv.tv_usec = 100000;
 289     opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 290     opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 291     opal_event_evtimer_add(quicktime, &tv);
 292 
 293     /* do the send */
 294     if (0 > (ret = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, cmd,
 295                                            ORTE_RML_TAG_DAEMON, send_cbfunc, NULL))) {
 296         ORTE_ERROR_LOG(ret);
 297         OBJ_RELEASE(cmd);
 298         return ret;
 299     }
 300 
 301     while (!timer_fired) {
 302         opal_progress();
 303     }
 304 
 305     /* setup for answer */
 306     OBJ_CONSTRUCT(&answer, opal_buffer_t);
 307 
 308     /* define a max time to wait for an answer */
 309     timer_fired = false;
 310     error_exit = ORTE_SUCCESS;
 311 
 312     /* get the answer */
 313     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 314                             ORTE_RML_TAG_TOOL,
 315                             ORTE_RML_NON_PERSISTENT,
 316                             recv_info,
 317                             NULL);
 318 
 319     /* set a timer for getting the answer */
 320     quicktime = opal_event_alloc();
 321     tv.tv_sec = 0;
 322     tv.tv_usec = 100000;
 323     opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 324     opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 325     opal_event_evtimer_add(quicktime, &tv);
 326 
 327     while (!timer_fired) {
 328         opal_progress();
 329     }
 330 
 331     if (ORTE_SUCCESS != error_exit) {
 332         OBJ_DESTRUCT(&answer);
 333         return error_exit;
 334     }
 335 
 336     cnt = 1;
 337     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &cnt_jobs, &cnt, OPAL_INT32))) {
 338         ORTE_ERROR_LOG(ret);
 339         OBJ_DESTRUCT(&answer);
 340         return ret;
 341     }
 342 
 343     /* allocate the required memory */
 344     if (0 < cnt_jobs) {
 345         job_info = (orte_job_t**)malloc(cnt_jobs * sizeof(orte_job_t*));
 346         /* unpack the job data */
 347         for (n=0; n < cnt_jobs; n++) {
 348             cnt = 1;
 349             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &job_info[n], &cnt, ORTE_JOB))) {
 350                 ORTE_ERROR_LOG(ret);
 351                 OBJ_DESTRUCT(&answer);
 352                 free(job_info);
 353                 return ret;
 354             }
 355         }
 356         *job_info_array = job_info;
 357         *num_jobs = cnt_jobs;
 358     }
 359     OBJ_DESTRUCT(&answer);
 360 
 361     return ORTE_SUCCESS;
 362 }
 363 
 364 int orte_util_comm_query_node_info(const orte_process_name_t *hnp, char *node,
 365                                    int *num_nodes, orte_node_t ***node_info_array)
 366 {
 367     int ret;
 368     int32_t cnt, cnt_nodes, n;
 369     opal_buffer_t *cmd;
 370     orte_daemon_cmd_flag_t command = ORTE_DAEMON_REPORT_NODE_INFO_CMD;
 371     orte_node_t **node_info;
 372     struct timeval tv;
 373 
 374     /* set default response */
 375     *num_nodes = 0;
 376     *node_info_array = NULL;
 377 
 378     /* query the HNP for node info */
 379     cmd = OBJ_NEW(opal_buffer_t);
 380     if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &command, 1, ORTE_DAEMON_CMD))) {
 381         ORTE_ERROR_LOG(ret);
 382         OBJ_RELEASE(cmd);
 383         return ret;
 384     }
 385     if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &node, 1, OPAL_STRING))) {
 386         ORTE_ERROR_LOG(ret);
 387         OBJ_RELEASE(cmd);
 388         return ret;
 389     }
 390 
 391    /* define a max time to wait for send to complete */
 392     timer_fired = false;
 393     error_exit = ORTE_SUCCESS;
 394     quicktime = opal_event_alloc();
 395     tv.tv_sec = 0;
 396     tv.tv_usec = 100000;
 397     opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 398     opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 399     opal_event_evtimer_add(quicktime, &tv);
 400 
 401     /* do the send */
 402     if (0 > (ret = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, cmd,
 403                                            ORTE_RML_TAG_DAEMON, send_cbfunc, NULL))) {
 404         ORTE_ERROR_LOG(ret);
 405         OBJ_RELEASE(cmd);
 406         return ret;
 407     }
 408 
 409     while (!timer_fired) {
 410         opal_progress();
 411     }
 412 
 413     /* did it succeed? */
 414     if (ORTE_SUCCESS != error_exit) {
 415         return error_exit;
 416     }
 417 
 418     /* define a max time to wait for an answer */
 419     timer_fired = false;
 420     error_exit = ORTE_SUCCESS;
 421 
 422     /* get the answer */
 423     OBJ_CONSTRUCT(&answer, opal_buffer_t);
 424     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 425                             ORTE_RML_TAG_TOOL,
 426                             ORTE_RML_NON_PERSISTENT,
 427                             recv_info,
 428                             NULL);
 429 
 430     /* set a timer for getting the answer */
 431     quicktime = opal_event_alloc();
 432     tv.tv_sec = 0;
 433     tv.tv_usec = 100000;
 434     opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 435     opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 436     opal_event_evtimer_add(quicktime, &tv);
 437 
 438     while (!timer_fired) {
 439         opal_progress();
 440     }
 441 
 442     if (ORTE_SUCCESS != error_exit) {
 443         OBJ_DESTRUCT(&answer);
 444         return error_exit;
 445     }
 446 
 447     cnt = 1;
 448     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &cnt_nodes, &cnt, OPAL_INT32))) {
 449         ORTE_ERROR_LOG(ret);
 450         OBJ_DESTRUCT(&answer);
 451         return ret;
 452     }
 453 
 454     /* allocate the required memory */
 455     if (0 < cnt_nodes) {
 456         node_info = (orte_node_t**)malloc(cnt_nodes * sizeof(orte_node_t*));
 457         /* unpack the node data */
 458         for (n=0; n < cnt_nodes; n++) {
 459             cnt = 1;
 460             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &node_info[n], &cnt, ORTE_NODE))) {
 461                 ORTE_ERROR_LOG(ret);
 462                 OBJ_DESTRUCT(&answer);
 463                 free(node_info);
 464                 return ret;
 465             }
 466         }
 467         *node_info_array = node_info;
 468         *num_nodes = cnt_nodes;
 469     }
 470     OBJ_DESTRUCT(&answer);
 471 
 472     return ORTE_SUCCESS;
 473 }
 474 
 475 int orte_util_comm_query_proc_info(const orte_process_name_t *hnp, orte_jobid_t job, orte_vpid_t vpid,
 476                                    int *num_procs, orte_proc_t ***proc_info_array)
 477 {
 478     int ret;
 479     int32_t cnt, cnt_procs, n;
 480     opal_buffer_t *cmd;
 481     orte_daemon_cmd_flag_t command = ORTE_DAEMON_REPORT_PROC_INFO_CMD;
 482     orte_proc_t **proc_info;
 483     struct timeval tv;
 484     char *nodename;
 485 
 486     /* set default response */
 487     *num_procs = 0;
 488     *proc_info_array = NULL;
 489 
 490     /* query the HNP for info on the procs in this job */
 491     cmd = OBJ_NEW(opal_buffer_t);
 492     if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &command, 1, ORTE_DAEMON_CMD))) {
 493         ORTE_ERROR_LOG(ret);
 494         OBJ_RELEASE(cmd);
 495         return ret;
 496     }
 497     if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &job, 1, ORTE_JOBID))) {
 498         ORTE_ERROR_LOG(ret);
 499         OBJ_RELEASE(cmd);
 500         return ret;
 501     }
 502     if (ORTE_SUCCESS != (ret = opal_dss.pack(cmd, &vpid, 1, ORTE_VPID))) {
 503         ORTE_ERROR_LOG(ret);
 504         OBJ_RELEASE(cmd);
 505         return ret;
 506     }
 507 
 508     /* define a max time to wait for send to complete */
 509     timer_fired = false;
 510     error_exit = ORTE_SUCCESS;
 511     quicktime = opal_event_alloc();
 512     tv.tv_sec = 0;
 513     tv.tv_usec = 100000;
 514     opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 515     opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 516     opal_event_evtimer_add(quicktime, &tv);
 517 
 518     /* do the send */
 519     if (0 > (ret = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, cmd, ORTE_RML_TAG_DAEMON,
 520                                            send_cbfunc, NULL))) {
 521         ORTE_ERROR_LOG(ret);
 522         OBJ_RELEASE(cmd);
 523         return ret;
 524     }
 525 
 526     while (!timer_fired) {
 527         opal_progress();
 528     }
 529 
 530     /* did it succeed? */
 531     if (ORTE_SUCCESS != error_exit) {
 532         return error_exit;
 533     }
 534 
 535     /* define a max time to wait for an answer */
 536     timer_fired = false;
 537     error_exit = ORTE_SUCCESS;
 538 
 539     /* get the answer */
 540     OBJ_CONSTRUCT(&answer, opal_buffer_t);
 541     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 542                             ORTE_RML_TAG_TOOL,
 543                             ORTE_RML_NON_PERSISTENT,
 544                             recv_info,
 545                             NULL);
 546 
 547     /* set a timer for getting the answer */
 548     quicktime = opal_event_alloc();
 549     tv.tv_sec = 0;
 550     tv.tv_usec = 100000;
 551     opal_event_evtimer_set(orte_event_base, quicktime, quicktime_cb, NULL);
 552     opal_event_set_priority(quicktime, ORTE_ERROR_PRI);
 553     opal_event_evtimer_add(quicktime, &tv);
 554 
 555     while (!timer_fired) {
 556         opal_progress();
 557     }
 558 
 559     if (ORTE_SUCCESS != error_exit) {
 560         OBJ_DESTRUCT(&answer);
 561         return error_exit;
 562     }
 563 
 564     cnt = 1;
 565     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &cnt_procs, &cnt, OPAL_INT32))) {
 566         ORTE_ERROR_LOG(ret);
 567         OBJ_DESTRUCT(&answer);
 568         return ret;
 569     }
 570 
 571     /* allocate the required memory */
 572     if (0 < cnt_procs) {
 573         proc_info = (orte_proc_t**)malloc(cnt_procs * sizeof(orte_proc_t*));
 574         /* unpack the procs */
 575         for (n=0; n < cnt_procs; n++) {
 576             cnt = 1;
 577             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &proc_info[n], &cnt, ORTE_PROC))) {
 578                 ORTE_ERROR_LOG(ret);
 579                 OBJ_DESTRUCT(&answer);
 580                 free(proc_info);
 581                 return ret;
 582             }
 583             /* the vpid and nodename for this proc are no longer packed
 584              * in the ORTE_PROC packing routines to save space for other
 585              * uses, so we have to unpack them separately
 586              */
 587             cnt = 1;
 588             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &proc_info[n]->pid, &cnt, OPAL_PID))) {
 589                 ORTE_ERROR_LOG(ret);
 590                 OBJ_DESTRUCT(&answer);
 591                 free(proc_info);
 592                 return ret;
 593             }
 594             cnt = 1;
 595             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&answer, &nodename, &cnt, OPAL_STRING))) {
 596                 ORTE_ERROR_LOG(ret);
 597                 OBJ_DESTRUCT(&answer);
 598                 free(proc_info);
 599                 return ret;
 600             }
 601             orte_set_attribute(&proc_info[n]->attributes, ORTE_PROC_NODENAME, ORTE_ATTR_LOCAL, nodename, OPAL_STRING);
 602         }
 603         *proc_info_array = proc_info;
 604         *num_procs = (int)cnt_procs;
 605     }
 606     OBJ_DESTRUCT(&answer);
 607 
 608     return ORTE_SUCCESS;
 609 }
 610 
 611 /* The spawn function cannot just call the plm.proxy since that won't
 612  * necessarily be open. Likewise, we can't just send the launch request
 613  * to the HNP's plm_receive as that function would return the response
 614  * to the plm_proxy tag! So we have to go another route to get this
 615  * request processed
 616  */
 617 static bool reply_waiting;
 618 
 619 static void comm_cbfunc(int status, orte_process_name_t* sender,
 620                         opal_buffer_t* buffer, orte_rml_tag_t tag,
 621                         void* cbdata)
 622 {
 623     opal_buffer_t *buf = (opal_buffer_t*)cbdata;
 624 
 625     if (NULL != buf) {
 626         opal_dss.copy_payload(buf, buffer);
 627     }
 628     reply_waiting = false;
 629 }
 630 
 631 int orte_util_comm_spawn_job(const orte_process_name_t *hnp, orte_job_t *jdata)
 632 {
 633     opal_buffer_t *buf;
 634     orte_daemon_cmd_flag_t command;
 635     orte_std_cntr_t count;
 636     int rc;
 637 
 638     OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
 639                          "%s util_comm_spawn_job: requesting HNP %s spawn new job",
 640                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 641                          ORTE_NAME_PRINT(hnp)));
 642 
 643     /* setup the buffer */
 644     buf = OBJ_NEW(opal_buffer_t);
 645 
 646     /* tell the HNP we are sending a launch request */
 647     command = ORTE_DAEMON_SPAWN_JOB_CMD;
 648     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
 649         ORTE_ERROR_LOG(rc);
 650         OBJ_RELEASE(buf);
 651         goto CLEANUP;
 652     }
 653 
 654     /* pack the jdata object */
 655     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &jdata, 1, ORTE_JOB))) {
 656         ORTE_ERROR_LOG(rc);
 657         OBJ_RELEASE(buf);
 658         goto CLEANUP;
 659 
 660     }
 661 
 662     OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
 663                          "%s util_comm_spawn_job: sending spawn cmd to HNP %s",
 664                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 665                          ORTE_NAME_PRINT(hnp)));
 666 
 667     /* tell the target HNP to launch the job */
 668     if (0 > (rc = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, buf,
 669                                           ORTE_RML_TAG_DAEMON,
 670                                           orte_rml_send_callback, NULL))) {
 671         ORTE_ERROR_LOG(rc);
 672         OBJ_RELEASE(buf);
 673         goto CLEANUP;
 674     }
 675 
 676 
 677     OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
 678                          "%s util_comm_spawn_job: waiting for response",
 679                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 680 
 681     /* wait for the target's response */
 682     reply_waiting = true;
 683     buf = OBJ_NEW(opal_buffer_t);
 684     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_TOOL, 0,
 685                             comm_cbfunc, buf);
 686     ORTE_WAIT_FOR_COMPLETION(reply_waiting);
 687 
 688     /* get the new jobid back in case the caller wants it */
 689     count = 1;
 690     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &(jdata->jobid), &count, ORTE_JOBID))) {
 691         ORTE_ERROR_LOG(rc);
 692         OBJ_RELEASE(buf);
 693         goto CLEANUP;
 694     }
 695     if (ORTE_JOBID_INVALID == jdata->jobid) {
 696         /* something went wrong on far end - go no further */
 697         rc = ORTE_ERR_FAILED_TO_START;
 698         OBJ_RELEASE(buf);
 699         goto CLEANUP;
 700     }
 701     OBJ_RELEASE(buf);
 702 
 703     /* good to go! */
 704 
 705 CLEANUP:
 706     return rc;
 707 }
 708 
 709 
 710 int orte_util_comm_terminate_job(const orte_process_name_t *hnp, orte_jobid_t job)
 711 {
 712     opal_buffer_t *buf;
 713     orte_daemon_cmd_flag_t command;
 714     orte_std_cntr_t count;
 715     int rc, ret = ORTE_ERROR;
 716 
 717     OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
 718                          "%s util_comm_spawn_job: requesting HNP %s terminate job %s",
 719                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 720                          ORTE_NAME_PRINT(hnp),
 721                          ORTE_JOBID_PRINT(job)));
 722 
 723     /* setup the buffer */
 724     buf = OBJ_NEW(opal_buffer_t);
 725 
 726     /* tell the HNP we are sending a terminate request */
 727     command = ORTE_DAEMON_TERMINATE_JOB_CMD;
 728     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
 729         ORTE_ERROR_LOG(rc);
 730         ret = rc;
 731         OBJ_RELEASE(buf);
 732         goto CLEANUP;
 733     }
 734 
 735     /* pack the jobid */
 736     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
 737         ORTE_ERROR_LOG(rc);
 738         ret = rc;
 739         OBJ_RELEASE(buf);
 740         goto CLEANUP;
 741     }
 742 
 743     OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
 744                          "%s util_comm_spawn_job: sending terminate cmd to HNP %s",
 745                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 746                          ORTE_NAME_PRINT(hnp)));
 747 
 748     /* tell the target HNP to terminate the job */
 749     if (0 > (rc = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, buf,
 750                                           ORTE_RML_TAG_DAEMON,
 751                                           orte_rml_send_callback, NULL))) {
 752         ORTE_ERROR_LOG(rc);
 753         ret = rc;
 754         OBJ_RELEASE(buf);
 755         goto CLEANUP;
 756     }
 757 
 758 
 759     OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
 760                          "%s util_comm_terminate_job: waiting for response",
 761                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 762 
 763     /* wait for the target's response */
 764     reply_waiting = true;
 765     buf = OBJ_NEW(opal_buffer_t);
 766     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 767                             ORTE_RML_TAG_TOOL, 0,
 768                             comm_cbfunc, buf);
 769     ORTE_WAIT_FOR_COMPLETION(reply_waiting);
 770 
 771     /* get the status code */
 772     count = 1;
 773     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &ret, &count, OPAL_INT))) {
 774         ORTE_ERROR_LOG(rc);
 775         ret = rc;
 776     }
 777     OBJ_RELEASE(buf);
 778 
 779 CLEANUP:
 780     return ret;
 781 }
 782 
 783 int orte_util_comm_halt_vm(const orte_process_name_t *hnp)
 784 {
 785     opal_buffer_t *buf;
 786     orte_daemon_cmd_flag_t command;
 787     int rc;
 788 
 789     OPAL_OUTPUT_VERBOSE((5, orte_debug_output,
 790                          "%s util_comm_halt_vm: ordering HNP %s terminate",
 791                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 792                          ORTE_NAME_PRINT(hnp)));
 793 
 794     /* setup the buffer */
 795     buf = OBJ_NEW(opal_buffer_t);
 796 
 797     /* tell the HNP to die */
 798     command = ORTE_DAEMON_HALT_VM_CMD;
 799     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
 800         ORTE_ERROR_LOG(rc);
 801         OBJ_RELEASE(buf);
 802         goto CLEANUP;
 803     }
 804 
 805     /* send the order */
 806     if (0 > (rc = orte_rml.send_buffer_nb((orte_process_name_t*)hnp, buf,
 807                                           ORTE_RML_TAG_DAEMON,
 808                                           orte_rml_send_callback, NULL))) {
 809         ORTE_ERROR_LOG(rc);
 810         OBJ_RELEASE(buf);
 811         goto CLEANUP;
 812     }
 813     OBJ_RELEASE(buf);
 814 
 815     /* don't bother waiting around */
 816 CLEANUP:
 817     return rc;
 818 }

/* [<][>][^][v][top][bottom][index][help] */