root/orte/mca/grpcomm/direct/grpcomm_direct.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. init
  2. finalize
  3. xcast
  4. allgather
  5. allgather_recv
  6. xcast_recv
  7. barrier_release

   1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
   2 /*
   3  * Copyright (c) 2007      The Trustees of Indiana University.
   4  *                         All rights reserved.
   5  * Copyright (c) 2011      Cisco Systems, Inc.  All rights reserved.
   6  * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
   7  *                         rights reserved.
   8  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   9  * Copyright (c) 2014-2017 Research Organization for Information Science
  10  *                         and Technology (RIST). All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include "orte_config.h"
  19 #include "orte/constants.h"
  20 #include "orte/types.h"
  21 
  22 #include <string.h>
  23 
  24 #include "opal/dss/dss.h"
  25 #include "opal/class/opal_list.h"
  26 #include "opal/mca/pmix/pmix.h"
  27 #include "opal/mca/compress/compress.h"
  28 
  29 #include "orte/mca/errmgr/errmgr.h"
  30 #include "orte/mca/rml/base/base.h"
  31 #include "orte/mca/rml/base/rml_contact.h"
  32 #include "orte/mca/routed/base/base.h"
  33 #include "orte/mca/state/state.h"
  34 #include "orte/util/name_fns.h"
  35 #include "orte/util/nidmap.h"
  36 #include "orte/util/proc_info.h"
  37 
  38 #include "orte/mca/grpcomm/base/base.h"
  39 #include "grpcomm_direct.h"
  40 
  41 
  42 /* Static API's */
  43 static int init(void);
  44 static void finalize(void);
  45 static int xcast(orte_vpid_t *vpids,
  46                  size_t nprocs,
  47                  opal_buffer_t *buf);
  48 static int allgather(orte_grpcomm_coll_t *coll,
  49                      opal_buffer_t *buf);
  50 
  51 /* Module def */
  52 orte_grpcomm_base_module_t orte_grpcomm_direct_module = {
  53     init,
  54     finalize,
  55     xcast,
  56     allgather
  57 };
  58 
  59 /* internal functions */
  60 static void xcast_recv(int status, orte_process_name_t* sender,
  61                        opal_buffer_t* buffer, orte_rml_tag_t tag,
  62                        void* cbdata);
  63 static void allgather_recv(int status, orte_process_name_t* sender,
  64                            opal_buffer_t* buffer, orte_rml_tag_t tag,
  65                            void* cbdata);
  66 static void barrier_release(int status, orte_process_name_t* sender,
  67                             opal_buffer_t* buffer, orte_rml_tag_t tag,
  68                             void* cbdata);
  69 
  70 /* internal variables */
  71 static opal_list_t tracker;
  72 
  73 /**
  74  * Initialize the module
  75  */
  76 static int init(void)
  77 {
  78     OBJ_CONSTRUCT(&tracker, opal_list_t);
  79 
  80     /* post the receives */
  81     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  82                             ORTE_RML_TAG_XCAST,
  83                             ORTE_RML_PERSISTENT,
  84                             xcast_recv, NULL);
  85     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  86                             ORTE_RML_TAG_ALLGATHER_DIRECT,
  87                             ORTE_RML_PERSISTENT,
  88                             allgather_recv, NULL);
  89     /* setup recv for barrier release */
  90     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  91                             ORTE_RML_TAG_COLL_RELEASE,
  92                             ORTE_RML_PERSISTENT,
  93                             barrier_release, NULL);
  94 
  95     return OPAL_SUCCESS;
  96 }
  97 
  98 /**
  99  * Finalize the module
 100  */
 101 static void finalize(void)
 102 {
 103     OPAL_LIST_DESTRUCT(&tracker);
 104     return;
 105 }
 106 
 107 static int xcast(orte_vpid_t *vpids,
 108                  size_t nprocs,
 109                  opal_buffer_t *buf)
 110 {
 111     int rc;
 112 
 113     /* send it to the HNP (could be myself) for relay */
 114     OBJ_RETAIN(buf);  // we'll let the RML release it
 115     if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_XCAST,
 116                                           orte_rml_send_callback, NULL))) {
 117         ORTE_ERROR_LOG(rc);
 118         OBJ_RELEASE(buf);
 119         return rc;
 120     }
 121     return ORTE_SUCCESS;
 122 }
 123 
 124 static int allgather(orte_grpcomm_coll_t *coll,
 125                      opal_buffer_t *buf)
 126 {
 127     int rc;
 128     opal_buffer_t *relay;
 129 
 130     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 131                          "%s grpcomm:direct: allgather",
 132                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 133 
 134     /* the base functions pushed us into the event library
 135      * before calling us, so we can safely access global data
 136      * at this point */
 137 
 138     relay = OBJ_NEW(opal_buffer_t);
 139     /* pack the signature */
 140     if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &coll->sig, 1, ORTE_SIGNATURE))) {
 141         ORTE_ERROR_LOG(rc);
 142         OBJ_RELEASE(relay);
 143         return rc;
 144     }
 145 
 146     /* pass along the payload */
 147     opal_dss.copy_payload(relay, buf);
 148 
 149     /* send this to ourselves for processing */
 150     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 151                          "%s grpcomm:direct:allgather sending to ourself",
 152                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 153 
 154     /* send the info to ourselves for tracking */
 155     rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay,
 156                                  ORTE_RML_TAG_ALLGATHER_DIRECT,
 157                                  orte_rml_send_callback, NULL);
 158     return rc;
 159 }
 160 
 161 static void allgather_recv(int status, orte_process_name_t* sender,
 162                            opal_buffer_t* buffer, orte_rml_tag_t tag,
 163                            void* cbdata)
 164 {
 165     int32_t cnt;
 166     int rc, ret;
 167     orte_grpcomm_signature_t *sig;
 168     opal_buffer_t *reply;
 169     orte_grpcomm_coll_t *coll;
 170 
 171     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 172                          "%s grpcomm:direct allgather recvd from %s",
 173                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 174                          ORTE_NAME_PRINT(sender)));
 175 
 176     /* unpack the signature */
 177     cnt = 1;
 178     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
 179         ORTE_ERROR_LOG(rc);
 180         return;
 181     }
 182 
 183     /* check for the tracker and create it if not found */
 184     if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, true))) {
 185         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 186         OBJ_RELEASE(sig);
 187         return;
 188     }
 189 
 190     /* increment nprocs reported for collective */
 191     coll->nreported++;
 192     /* capture any provided content */
 193     opal_dss.copy_payload(&coll->bucket, buffer);
 194 
 195     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 196                          "%s grpcomm:direct allgather recv nexpected %d nrep %d",
 197                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 198                          (int)coll->nexpected, (int)coll->nreported));
 199 
 200     /* see if everyone has reported */
 201     if (coll->nreported == coll->nexpected) {
 202         if (ORTE_PROC_IS_HNP) {
 203             OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 204                                  "%s grpcomm:direct allgather HNP reports complete",
 205                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 206             /* the allgather is complete - send the xcast */
 207             reply = OBJ_NEW(opal_buffer_t);
 208             /* pack the signature */
 209             if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
 210                 ORTE_ERROR_LOG(rc);
 211                 OBJ_RELEASE(reply);
 212                 OBJ_RELEASE(sig);
 213                 return;
 214             }
 215             /* pack the status - success since the allgather completed. This
 216              * would be an error if we timeout instead */
 217             ret = ORTE_SUCCESS;
 218             if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
 219                 ORTE_ERROR_LOG(rc);
 220                 OBJ_RELEASE(reply);
 221                 OBJ_RELEASE(sig);
 222                 return;
 223             }
 224             /* transfer the collected bucket */
 225             opal_dss.copy_payload(reply, &coll->bucket);
 226             /* send the release via xcast */
 227             (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
 228             OBJ_RELEASE(reply);
 229         } else {
 230             OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 231                                  "%s grpcomm:direct allgather rollup complete - sending to %s",
 232                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 233                                  ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
 234             /* relay the bucket upward */
 235             reply = OBJ_NEW(opal_buffer_t);
 236             /* pack the signature */
 237             if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
 238                 ORTE_ERROR_LOG(rc);
 239                 OBJ_RELEASE(reply);
 240                 OBJ_RELEASE(sig);
 241                 return;
 242             }
 243             /* transfer the collected bucket */
 244             opal_dss.copy_payload(reply, &coll->bucket);
 245             /* send the info to our parent */
 246             rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply,
 247                                          ORTE_RML_TAG_ALLGATHER_DIRECT,
 248                                          orte_rml_send_callback, NULL);
 249         }
 250     }
 251     OBJ_RELEASE(sig);
 252 }
 253 
 254 static void xcast_recv(int status, orte_process_name_t* sender,
 255                        opal_buffer_t* buffer, orte_rml_tag_t tg,
 256                        void* cbdata)
 257 {
 258     opal_list_item_t *item;
 259     orte_namelist_t *nm;
 260     int ret, cnt;
 261     opal_buffer_t *relay=NULL, *rly;
 262     orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD;
 263     opal_buffer_t wireup, datbuf, *data;
 264     opal_byte_object_t *bo;
 265     int8_t flag;
 266     orte_job_t *jdata;
 267     orte_proc_t *rec;
 268     opal_list_t coll;
 269     orte_grpcomm_signature_t *sig;
 270     orte_rml_tag_t tag;
 271     size_t inlen, cmplen;
 272     uint8_t *packed_data, *cmpdata;
 273     int32_t nvals, i;
 274     opal_value_t kv, *kval;
 275     orte_process_name_t dmn;
 276 
 277     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 278                          "%s grpcomm:direct:xcast:recv: with %d bytes",
 279                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 280                          (int)buffer->bytes_used));
 281 
 282     /* we need a passthru buffer to send to our children - we leave it
 283      * as compressed data */
 284     rly = OBJ_NEW(opal_buffer_t);
 285     opal_dss.copy_payload(rly, buffer);
 286     OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
 287     /* setup the relay list */
 288     OBJ_CONSTRUCT(&coll, opal_list_t);
 289 
 290     /* unpack the flag to see if this payload is compressed */
 291     cnt=1;
 292     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
 293         ORTE_ERROR_LOG(ret);
 294         ORTE_FORCED_TERMINATE(ret);
 295         OBJ_DESTRUCT(&datbuf);
 296         OBJ_DESTRUCT(&coll);
 297         OBJ_RELEASE(rly);
 298         return;
 299     }
 300     if (flag) {
 301         /* unpack the data size */
 302         cnt=1;
 303         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &inlen, &cnt, OPAL_SIZE))) {
 304             ORTE_ERROR_LOG(ret);
 305             ORTE_FORCED_TERMINATE(ret);
 306             OBJ_DESTRUCT(&datbuf);
 307             OBJ_DESTRUCT(&coll);
 308             OBJ_RELEASE(rly);
 309             return;
 310         }
 311         /* unpack the unpacked data size */
 312         cnt=1;
 313         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cmplen, &cnt, OPAL_SIZE))) {
 314             ORTE_ERROR_LOG(ret);
 315             ORTE_FORCED_TERMINATE(ret);
 316             OBJ_DESTRUCT(&datbuf);
 317             OBJ_DESTRUCT(&coll);
 318             OBJ_RELEASE(rly);
 319             return;
 320         }
 321         /* allocate the space */
 322         packed_data = (uint8_t*)malloc(inlen);
 323         /* unpack the data blob */
 324         cnt = inlen;
 325         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, packed_data, &cnt, OPAL_UINT8))) {
 326             ORTE_ERROR_LOG(ret);
 327             free(packed_data);
 328             ORTE_FORCED_TERMINATE(ret);
 329             OBJ_DESTRUCT(&datbuf);
 330             OBJ_DESTRUCT(&coll);
 331             OBJ_RELEASE(rly);
 332             return;
 333         }
 334         /* decompress the data */
 335         if (opal_compress.decompress_block(&cmpdata, cmplen,
 336                                        packed_data, inlen)) {
 337             /* the data has been uncompressed */
 338             opal_dss.load(&datbuf, cmpdata, cmplen);
 339             data = &datbuf;
 340         } else {
 341             data = buffer;
 342         }
 343         free(packed_data);
 344     } else {
 345         data = buffer;
 346     }
 347 
 348     /* get the signature that we do not need */
 349     cnt=1;
 350     if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &sig, &cnt, ORTE_SIGNATURE))) {
 351         ORTE_ERROR_LOG(ret);
 352         OBJ_DESTRUCT(&datbuf);
 353         OBJ_DESTRUCT(&coll);
 354         OBJ_RELEASE(rly);
 355         ORTE_FORCED_TERMINATE(ret);
 356         return;
 357     }
 358     OBJ_RELEASE(sig);
 359 
 360     /* get the target tag */
 361     cnt=1;
 362     if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &tag, &cnt, ORTE_RML_TAG))) {
 363         ORTE_ERROR_LOG(ret);
 364         OBJ_DESTRUCT(&datbuf);
 365         OBJ_DESTRUCT(&coll);
 366         OBJ_RELEASE(rly);
 367         ORTE_FORCED_TERMINATE(ret);
 368         return;
 369     }
 370 
 371     /* if this is headed for the daemon command processor,
 372      * then we first need to check for add_local_procs
 373      * as that command includes some needed wireup info */
 374     if (ORTE_RML_TAG_DAEMON == tag) {
 375         /* peek at the command */
 376         cnt=1;
 377         if (ORTE_SUCCESS == (ret = opal_dss.unpack(data, &command, &cnt, ORTE_DAEMON_CMD))) {
 378             /* if it is an exit cmd, then flag that we are quitting so we will properly
 379              * handle connection losses from our downstream peers */
 380             if (ORTE_DAEMON_EXIT_CMD == command ||
 381                 ORTE_DAEMON_HALT_VM_CMD == command) {
 382                 orte_orteds_term_ordered = true;
 383                 if (ORTE_DAEMON_HALT_VM_CMD == command) {
 384                     /* this is an abnormal termination */
 385                     orte_abnormal_term_ordered = true;
 386                 }
 387                 /* copy the msg for relay to ourselves */
 388                 relay = OBJ_NEW(opal_buffer_t);
 389                 /* repack the command */
 390                 if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
 391                     ORTE_ERROR_LOG(ret);
 392                     goto relay;
 393                 }
 394                 opal_dss.copy_payload(relay, data);
 395             } else if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
 396                        ORTE_DAEMON_DVM_NIDMAP_CMD == command ||
 397                        ORTE_DAEMON_DVM_ADD_PROCS == command) {
 398                 /* setup our internal relay buffer */
 399                 relay = OBJ_NEW(opal_buffer_t);
 400                 /* repack the command */
 401                 if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
 402                     ORTE_ERROR_LOG(ret);
 403                     goto relay;
 404                 }
 405                 /* unpack flag indicating if nidmap included */
 406                 cnt = 1;
 407                 if (OPAL_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
 408                     ORTE_ERROR_LOG(ret);
 409                     goto relay;
 410                 }
 411                 if (1 == flag) {
 412                     if (ORTE_SUCCESS != (ret = orte_util_decode_nidmap(data))) {
 413                         ORTE_ERROR_LOG(ret);
 414                         goto relay;
 415                     }
 416                     if (!ORTE_PROC_IS_HNP) {
 417                         /* update the routing plan - the HNP already did
 418                          * it when it computed the VM, so don't waste time
 419                          * re-doing it here */
 420                         orte_routed.update_routing_plan();
 421                     }
 422                     /* routing is now possible */
 423                     orte_routed_base.routing_enabled = true;
 424 
 425                     /* unpack the wireup byte object */
 426                     cnt=1;
 427                     if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
 428                         ORTE_ERROR_LOG(ret);
 429                         goto relay;
 430                     }
 431                     if (0 < bo->size) {
 432                         /* load it into a buffer */
 433                         OBJ_CONSTRUCT(&wireup, opal_buffer_t);
 434                         opal_dss.load(&wireup, bo->bytes, bo->size);
 435                         /* decode it, pushing the info into our database */
 436                         if (opal_pmix.legacy_get()) {
 437                             OBJ_CONSTRUCT(&kv, opal_value_t);
 438                             kv.key = OPAL_PMIX_PROC_URI;
 439                             kv.type = OPAL_STRING;
 440                             cnt=1;
 441                             while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
 442                                 cnt = 1;
 443                                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kv.data.string, &cnt, OPAL_STRING))) {
 444                                     ORTE_ERROR_LOG(ret);
 445                                     break;
 446                                 }
 447                                 if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, &kv))) {
 448                                     ORTE_ERROR_LOG(ret);
 449                                     free(kv.data.string);
 450                                     break;
 451                                 }
 452                                 free(kv.data.string);
 453                                 kv.data.string = NULL;
 454                             }
 455                             if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
 456                                 ORTE_ERROR_LOG(ret);
 457                             }
 458                         } else {
 459                            cnt=1;
 460                            while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
 461                                cnt = 1;
 462                                if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &nvals, &cnt, OPAL_INT32))) {
 463                                    ORTE_ERROR_LOG(ret);
 464                                    break;
 465                                }
 466                                for (i=0; i < nvals; i++) {
 467                                 cnt = 1;
 468                                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kval, &cnt, OPAL_VALUE))) {
 469                                     ORTE_ERROR_LOG(ret);
 470                                     break;
 471                                 }
 472                                 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 473                                                      "%s STORING MODEX DATA FOR PROC %s KEY %s",
 474                                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 475                                                      ORTE_NAME_PRINT(&dmn), kval->key));
 476                                 if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, kval))) {
 477                                     ORTE_ERROR_LOG(ret);
 478                                     OBJ_RELEASE(kval);
 479                                     break;
 480                                 }
 481                                 OBJ_RELEASE(kval);
 482                             }
 483                             }
 484                             if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
 485                                 ORTE_ERROR_LOG(ret);
 486                             }
 487                         }
 488                         /* done with the wireup buffer - dump it */
 489                         OBJ_DESTRUCT(&wireup);
 490                     }
 491                     free(bo);
 492                 }
 493                 /* copy the remainder of the payload - we don't pass wiring info
 494                  * to the odls */
 495                 opal_dss.copy_payload(relay, data);
 496             } else {
 497                 relay = OBJ_NEW(opal_buffer_t);
 498                 /* repack the command */
 499                 if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
 500                     ORTE_ERROR_LOG(ret);
 501                     goto relay;
 502                 }
 503                 /* copy the msg for relay to ourselves */
 504                 opal_dss.copy_payload(relay, data);
 505             }
 506         } else {
 507             ORTE_ERROR_LOG(ret);
 508             goto CLEANUP;
 509         }
 510     } else {
 511         /* copy the msg for relay to ourselves */
 512         relay = OBJ_NEW(opal_buffer_t);
 513         opal_dss.copy_payload(relay, data);
 514     }
 515 
 516   relay:
 517     if (!orte_do_not_launch) {
 518         /* get the list of next recipients from the routed module */
 519         orte_routed.get_routing_list(&coll);
 520 
 521         /* if list is empty, no relay is required */
 522         if (opal_list_is_empty(&coll)) {
 523             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 524                                  "%s grpcomm:direct:send_relay - recipient list is empty!",
 525                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 526             goto CLEANUP;
 527         }
 528 
 529         /* send the message to each recipient on list, deconstructing it as we go */
 530         while (NULL != (item = opal_list_remove_first(&coll))) {
 531             nm = (orte_namelist_t*)item;
 532 
 533             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 534                                  "%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
 535                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
 536                                  ORTE_NAME_PRINT(&nm->name)));
 537             OBJ_RETAIN(rly);
 538             /* check the state of the recipient - no point
 539              * sending to someone not alive
 540              */
 541             jdata = orte_get_job_data_object(nm->name.jobid);
 542             if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
 543                 if (!orte_abnormal_term_ordered && !orte_orteds_term_ordered) {
 544                     opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
 545                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
 546                 }
 547                 OBJ_RELEASE(rly);
 548                 OBJ_RELEASE(item);
 549                 ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
 550                 continue;
 551             }
 552             if ((ORTE_PROC_STATE_RUNNING < rec->state &&
 553                 ORTE_PROC_STATE_CALLED_ABORT != rec->state) ||
 554                 !ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
 555                 if (!orte_abnormal_term_ordered && !orte_orteds_term_ordered) {
 556                     opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay: %s ",
 557                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name),
 558                                 ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE) ? orte_proc_state_to_str(rec->state) : "NOT ALIVE");
 559                 }
 560                 OBJ_RELEASE(rly);
 561                 OBJ_RELEASE(item);
 562                 ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
 563                 continue;
 564             }
 565             if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(&nm->name, rly, ORTE_RML_TAG_XCAST,
 566                                                                orte_rml_send_callback, NULL))) {
 567                 ORTE_ERROR_LOG(ret);
 568                 OBJ_RELEASE(rly);
 569                 OBJ_RELEASE(item);
 570                 ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
 571                 continue;
 572             }
 573             OBJ_RELEASE(item);
 574         }
 575     }
 576 
 577  CLEANUP:
 578     /* cleanup */
 579     OPAL_LIST_DESTRUCT(&coll);
 580     OBJ_RELEASE(rly);  // retain accounting
 581 
 582     /* now pass the relay buffer to myself for processing - don't
 583      * inject it into the RML system via send as that will compete
 584      * with the relay messages down in the OOB. Instead, pass it
 585      * directly to the RML message processor */
 586     if (ORTE_DAEMON_DVM_NIDMAP_CMD != command) {
 587         ORTE_RML_POST_MESSAGE(ORTE_PROC_MY_NAME, tag, 1,
 588                               relay->base_ptr, relay->bytes_used);
 589         relay->base_ptr = NULL;
 590         relay->bytes_used = 0;
 591     }
 592     if (NULL != relay) {
 593         OBJ_RELEASE(relay);
 594     }
 595     OBJ_DESTRUCT(&datbuf);
 596 }
 597 
 598 static void barrier_release(int status, orte_process_name_t* sender,
 599                             opal_buffer_t* buffer, orte_rml_tag_t tag,
 600                             void* cbdata)
 601 {
 602     int32_t cnt;
 603     int rc, ret;
 604     orte_grpcomm_signature_t *sig;
 605     orte_grpcomm_coll_t *coll;
 606 
 607     OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 608                          "%s grpcomm:direct: barrier release called with %d bytes",
 609                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buffer->bytes_used));
 610 
 611     /* unpack the signature */
 612     cnt = 1;
 613     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
 614         ORTE_ERROR_LOG(rc);
 615         return;
 616     }
 617 
 618     /* unpack the return status */
 619     cnt = 1;
 620     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
 621         ORTE_ERROR_LOG(rc);
 622         return;
 623     }
 624 
 625     /* check for the tracker - it is not an error if not
 626      * found as that just means we wre not involved
 627      * in the collective */
 628     if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, false))) {
 629         OBJ_RELEASE(sig);
 630         return;
 631     }
 632 
 633     /* execute the callback */
 634     if (NULL != coll->cbfunc) {
 635         coll->cbfunc(ret, buffer, coll->cbdata);
 636     }
 637     opal_list_remove_item(&orte_grpcomm_base.ongoing, &coll->super);
 638     OBJ_RELEASE(coll);
 639     OBJ_RELEASE(sig);
 640 }

/* [<][>][^][v][top][bottom][index][help] */