root/orte/mca/grpcomm/base/grpcomm_base_stubs.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. gccon
  2. gcdes
  3. orte_grpcomm_API_xcast
  4. allgather_stub
  5. orte_grpcomm_API_allgather
  6. orte_grpcomm_base_get_tracker
  7. create_dmns
  8. pack_xcast
  9. orte_grpcomm_base_mark_distance_recv
  10. orte_grpcomm_base_check_distance_recv

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2005 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
  14  *                         reserved.
  15  * Copyright (c) 2016-2019 Intel, Inc.  All rights reserved.
  16  * Copyright (c) 2017      Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 /** @file:
  25  *
  26  */
  27 
  28 /*
  29  * includes
  30  */
  31 #include "orte_config.h"
  32 
  33 
  34 #include "opal/dss/dss.h"
  35 
  36 #include "opal/mca/compress/compress.h"
  37 #include "orte/util/proc_info.h"
  38 #include "orte/util/error_strings.h"
  39 #include "orte/mca/errmgr/errmgr.h"
  40 #include "orte/mca/odls/base/base.h"
  41 #include "orte/mca/rmaps/rmaps_types.h"
  42 #include "orte/mca/rml/rml.h"
  43 #include "orte/mca/routed/routed.h"
  44 #include "orte/mca/state/state.h"
  45 #include "orte/util/name_fns.h"
  46 #include "orte/util/threads.h"
  47 #include "orte/runtime/orte_globals.h"
  48 
  49 #include "orte/mca/grpcomm/grpcomm.h"
  50 #include "orte/mca/grpcomm/base/base.h"
  51 
  52 static int pack_xcast(orte_grpcomm_signature_t *sig,
  53                       opal_buffer_t *buffer,
  54                       opal_buffer_t *message,
  55                       orte_rml_tag_t tag);
  56 
  57 static int create_dmns(orte_grpcomm_signature_t *sig,
  58                        orte_vpid_t **dmns, size_t *ndmns);
  59 
  60 typedef struct {
  61     opal_object_t super;
  62     opal_event_t ev;
  63     orte_grpcomm_signature_t *sig;
  64     opal_buffer_t *buf;
  65     orte_grpcomm_cbfunc_t cbfunc;
  66     void *cbdata;
  67 } orte_grpcomm_caddy_t;
  68 static void gccon(orte_grpcomm_caddy_t *p)
  69 {
  70     p->sig = NULL;
  71     p->buf = NULL;
  72     p->cbfunc = NULL;
  73     p->cbdata = NULL;
  74 }
  75 static void gcdes(orte_grpcomm_caddy_t *p)
  76 {
  77     if (NULL != p->buf) {
  78         OBJ_RELEASE(p->buf);
  79     }
  80 }
  81 static OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
  82                           opal_object_t,
  83                           gccon, gcdes);
  84 
  85 int orte_grpcomm_API_xcast(orte_grpcomm_signature_t *sig,
  86                            orte_rml_tag_t tag,
  87                            opal_buffer_t *msg)
  88 {
  89     int rc = ORTE_ERROR;
  90     opal_buffer_t *buf;
  91     orte_grpcomm_base_active_t *active;
  92     orte_vpid_t *dmns;
  93     size_t ndmns;
  94 
  95     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
  96                          "%s grpcomm:base:xcast sending %u bytes to tag %ld",
  97                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  98                          (NULL == msg) ? 0 : (unsigned int)msg->bytes_used, (long)tag));
  99 
 100     /* this function does not access any framework-global data, and
 101      * so it does not require us to push it into the event library */
 102 
 103     /* prep the output buffer */
 104     buf = OBJ_NEW(opal_buffer_t);
 105 
 106     /* create the array of participating daemons */
 107     if (ORTE_SUCCESS != (rc = create_dmns(sig, &dmns, &ndmns))) {
 108         ORTE_ERROR_LOG(rc);
 109         OBJ_RELEASE(buf);
 110         return rc;
 111     }
 112 
 113     /* setup the payload */
 114     if (ORTE_SUCCESS != (rc = pack_xcast(sig, buf, msg, tag))) {
 115         ORTE_ERROR_LOG(rc);
 116         OBJ_RELEASE(buf);
 117         if (NULL != dmns) {
 118             free(dmns);
 119         }
 120         return rc;
 121     }
 122 
 123     /* cycle thru the actives and see who can send it */
 124     OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
 125         if (NULL != active->module->xcast) {
 126             if (ORTE_SUCCESS == (rc = active->module->xcast(dmns, ndmns, buf))) {
 127                 break;
 128             }
 129         }
 130     }
 131     OBJ_RELEASE(buf);  // if the module needs to keep the buf, it should OBJ_RETAIN it
 132     if (NULL != dmns) {
 133         free(dmns);
 134     }
 135     return rc;
 136 }
 137 
 138 static void allgather_stub(int fd, short args, void *cbdata)
 139 {
 140     orte_grpcomm_caddy_t *cd = (orte_grpcomm_caddy_t*)cbdata;
 141     int ret = OPAL_SUCCESS;
 142     int rc;
 143     orte_grpcomm_base_active_t *active;
 144     orte_grpcomm_coll_t *coll;
 145     uint32_t *seq_number;
 146 
 147     ORTE_ACQUIRE_OBJECT(cd);
 148 
 149     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 150                          "%s grpcomm:base:allgather stub",
 151                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 152 
 153     /* retrieve an existing tracker, create it if not
 154      * already found. The allgather module is responsible
 155      * for releasing it upon completion of the collective */
 156     ret = opal_hash_table_get_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void **)&seq_number);
 157     if (OPAL_ERR_NOT_FOUND == ret) {
 158         seq_number = (uint32_t *)malloc(sizeof(uint32_t));
 159         *seq_number = 0;
 160     } else if (OPAL_SUCCESS == ret) {
 161         *seq_number = *seq_number + 1;
 162     } else {
 163         OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
 164                      "%s rpcomm:base:allgather cannot get signature from hash table",
 165                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 166         ORTE_ERROR_LOG(ret);
 167         OBJ_RELEASE(cd);
 168         return;
 169     }
 170     ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)seq_number);
 171     if (OPAL_SUCCESS != ret) {
 172         OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
 173                      "%s rpcomm:base:allgather cannot add new signature to hash table",
 174                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 175         ORTE_ERROR_LOG(ret);
 176         OBJ_RELEASE(cd);
 177         return;
 178     }
 179     coll = orte_grpcomm_base_get_tracker(cd->sig, true);
 180     if (NULL == coll) {
 181         OBJ_RELEASE(cd->sig);
 182         OBJ_RELEASE(cd);
 183         return;
 184     }
 185     OBJ_RELEASE(cd->sig);
 186     coll->cbfunc = cd->cbfunc;
 187     coll->cbdata = cd->cbdata;
 188 
 189     /* cycle thru the actives and see who can process it */
 190     OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
 191         if (NULL != active->module->allgather) {
 192             if (ORTE_SUCCESS == (rc = active->module->allgather(coll, cd->buf))) {
 193                 break;
 194             }
 195         }
 196     }
 197     OBJ_RELEASE(cd);
 198 }
 199 
 200 int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
 201                                opal_buffer_t *buf,
 202                                orte_grpcomm_cbfunc_t cbfunc,
 203                                void *cbdata)
 204 {
 205     orte_grpcomm_caddy_t *cd;
 206 
 207     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 208                          "%s grpcomm:base:allgather",
 209                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 210 
 211     /* must push this into the event library to ensure we can
 212      * access framework-global data safely */
 213     cd = OBJ_NEW(orte_grpcomm_caddy_t);
 214     /* ensure the data doesn't go away */
 215     OBJ_RETAIN(buf);
 216     opal_dss.copy((void **)&cd->sig, (void *)sig, ORTE_SIGNATURE);
 217     cd->buf = buf;
 218     cd->cbfunc = cbfunc;
 219     cd->cbdata = cbdata;
 220     opal_event_set(orte_event_base, &cd->ev, -1, OPAL_EV_WRITE, allgather_stub, cd);
 221     opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
 222     ORTE_POST_OBJECT(cd);
 223     opal_event_active(&cd->ev, OPAL_EV_WRITE, 1);
 224     return ORTE_SUCCESS;
 225 }
 226 
 227 orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create)
 228 {
 229     orte_grpcomm_coll_t *coll;
 230     int rc;
 231     orte_namelist_t *nm;
 232     opal_list_t children;
 233     size_t n;
 234 
 235     /* search the existing tracker list to see if this already exists */
 236     OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
 237         if (NULL == sig->signature) {
 238             if (NULL == coll->sig->signature) {
 239                 /* only one collective can operate at a time
 240                  * across every process in the system */
 241                 return coll;
 242             }
 243             /* if only one is NULL, then we can't possibly match */
 244             break;
 245         }
 246         if (OPAL_EQUAL == (rc = opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE))) {
 247             OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 248                                  "%s grpcomm:base:returning existing collective",
 249                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 250             return coll;
 251         }
 252     }
 253     /* if we get here, then this is a new collective - so create
 254      * the tracker for it */
 255     if (!create) {
 256         OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 257                              "%s grpcomm:base: not creating new coll",
 258                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 259 
 260         return NULL;
 261     }
 262     coll = OBJ_NEW(orte_grpcomm_coll_t);
 263     opal_dss.copy((void **)&coll->sig, (void *)sig, ORTE_SIGNATURE);
 264 
 265     if (1 < opal_output_get_verbosity(orte_grpcomm_base_framework.framework_output)) {
 266         char *tmp=NULL;
 267         (void)opal_dss.print(&tmp, NULL, coll->sig, ORTE_SIGNATURE);
 268         opal_output(0, "%s grpcomm:base: creating new coll for%s",
 269                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tmp);
 270         free(tmp);
 271     }
 272 
 273     opal_list_append(&orte_grpcomm_base.ongoing, &coll->super);
 274 
 275     /* now get the daemons involved */
 276     if (ORTE_SUCCESS != (rc = create_dmns(sig, &coll->dmns, &coll->ndmns))) {
 277         ORTE_ERROR_LOG(rc);
 278         return NULL;
 279     }
 280 
 281     /* cycle thru the array of daemons and compare them to our
 282      * children in the routing tree, counting the ones that match
 283      * so we know how many daemons we should receive contributions from */
 284     OBJ_CONSTRUCT(&children, opal_list_t);
 285     orte_routed.get_routing_list(&children);
 286     while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
 287         for (n=0; n < coll->ndmns; n++) {
 288             if (nm->name.vpid == coll->dmns[n]) {
 289                 coll->nexpected++;
 290                 break;
 291             }
 292         }
 293         OBJ_RELEASE(nm);
 294     }
 295     OPAL_LIST_DESTRUCT(&children);
 296 
 297     /* see if I am in the array of participants - note that I may
 298      * be in the rollup tree even though I'm not participating
 299      * in the collective itself */
 300     for (n=0; n < coll->ndmns; n++) {
 301         if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
 302             coll->nexpected++;
 303             break;
 304         }
 305     }
 306 
 307     return coll;
 308 }
 309 
 310 static int create_dmns(orte_grpcomm_signature_t *sig,
 311                        orte_vpid_t **dmns, size_t *ndmns)
 312 {
 313     size_t n;
 314     orte_job_t *jdata;
 315     orte_proc_t *proc;
 316     orte_node_t *node;
 317     int i;
 318     opal_list_t ds;
 319     orte_namelist_t *nm;
 320     orte_vpid_t vpid;
 321     bool found;
 322     size_t nds;
 323     orte_vpid_t *dns;
 324 
 325     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 326                          "%s grpcomm:base:create_dmns called with %s signature",
 327                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 328                          (NULL == sig->signature) ? "NULL" : "NON-NULL"));
 329 
 330     /* if NULL == procs, or the target jobid is our own,
 331      * then all daemons are participating */
 332     if (NULL == sig->signature || ORTE_PROC_MY_NAME->jobid == sig->signature[0].jobid) {
 333         *ndmns = orte_process_info.num_procs;
 334         *dmns = NULL;
 335         return ORTE_SUCCESS;
 336     }
 337 
 338     if (ORTE_VPID_WILDCARD == sig->signature[0].vpid) {
 339         OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 340                              "%s grpcomm:base:create_dmns called for all procs in job %s",
 341                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 342                              ORTE_JOBID_PRINT(sig->signature[0].jobid)));
 343         /* all daemons hosting this jobid are participating */
 344         if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
 345             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 346             ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 347             *ndmns = 0;
 348             *dmns = NULL;
 349             return ORTE_ERR_NOT_FOUND;
 350         }
 351         if (NULL == jdata->map || 0 == jdata->map->num_nodes) {
 352             /* we haven't generated a job map yet - if we are the HNP,
 353              * then we should only involve ourselves. Otherwise, we have
 354              * no choice but to abort to avoid hangs */
 355             if (ORTE_PROC_IS_HNP) {
 356                 dns = (orte_vpid_t*)malloc(sizeof(vpid));
 357                 dns[0] = ORTE_PROC_MY_NAME->vpid;
 358                 *ndmns = 1;
 359                 *dmns = dns;
 360                 return ORTE_SUCCESS;
 361             }
 362             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 363             ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 364             *ndmns = 0;
 365             *dmns = NULL;
 366             return ORTE_ERR_NOT_FOUND;
 367         }
 368         dns = (orte_vpid_t*)malloc(jdata->map->num_nodes * sizeof(vpid));
 369         nds = 0;
 370         for (i=0; i < jdata->map->nodes->size && (int)nds < jdata->map->num_nodes; i++) {
 371             if (NULL == (node = opal_pointer_array_get_item(jdata->map->nodes, i))) {
 372                 continue;
 373             }
 374             if (NULL == node->daemon) {
 375                 /* should never happen */
 376                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 377                 free(dns);
 378                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 379                 *ndmns = 0;
 380                 *dmns = NULL;
 381                 return ORTE_ERR_NOT_FOUND;
 382             }
 383             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 384                                  "%s grpcomm:base:create_dmns adding daemon %s to array",
 385                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 386                                  ORTE_NAME_PRINT(&node->daemon->name)));
 387             dns[nds++] = node->daemon->name.vpid;
 388         }
 389     } else {
 390         /* lookup the daemon for each proc and add it to the list, checking to
 391          * ensure any daemon only gets added once. Yes, this isn't a scalable
 392          * algo - someone can come up with something better! */
 393         OBJ_CONSTRUCT(&ds, opal_list_t);
 394         for (n=0; n < sig->sz; n++) {
 395             if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
 396                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 397                 OPAL_LIST_DESTRUCT(&ds);
 398                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 399                 *ndmns = 0;
 400                 *dmns = NULL;
 401                 return ORTE_ERR_NOT_FOUND;
 402             }
 403             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 404                                 "%s sign: GETTING PROC OBJECT FOR %s",
 405                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 406                                 ORTE_NAME_PRINT(&sig->signature[n])));
 407             if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
 408                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 409                 OPAL_LIST_DESTRUCT(&ds);
 410                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 411                 *ndmns = 0;
 412                 *dmns = NULL;
 413                 return ORTE_ERR_NOT_FOUND;
 414             }
 415             if (NULL == proc->node || NULL == proc->node->daemon) {
 416                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 417                 OPAL_LIST_DESTRUCT(&ds);
 418                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 419                 *ndmns = 0;
 420                 *dmns = NULL;
 421                 return ORTE_ERR_NOT_FOUND;
 422             }
 423             vpid = proc->node->daemon->name.vpid;
 424             found = false;
 425             OPAL_LIST_FOREACH(nm, &ds, orte_namelist_t) {
 426                 if (nm->name.vpid == vpid) {
 427                     found = true;
 428                     break;
 429                 }
 430             }
 431             if (!found) {
 432                 nm = OBJ_NEW(orte_namelist_t);
 433                 nm->name.vpid = vpid;
 434                 opal_list_append(&ds, &nm->super);
 435             }
 436         }
 437         if (0 == opal_list_get_size(&ds)) {
 438             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 439             OPAL_LIST_DESTRUCT(&ds);
 440             ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 441             *ndmns = 0;
 442             *dmns = NULL;
 443             return ORTE_ERR_NOT_FOUND;
 444         }
 445         dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
 446         nds = 0;
 447         while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&ds))) {
 448             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 449                                  "%s grpcomm:base:create_dmns adding daemon %s to array",
 450                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 451                                  ORTE_NAME_PRINT(&nm->name)));
 452             dns[nds++] = nm->name.vpid;
 453             OBJ_RELEASE(nm);
 454         }
 455         OPAL_LIST_DESTRUCT(&ds);
 456     }
 457     *dmns = dns;
 458     *ndmns = nds;
 459     return ORTE_SUCCESS;
 460 }
 461 
 462 static int pack_xcast(orte_grpcomm_signature_t *sig,
 463                       opal_buffer_t *buffer,
 464                       opal_buffer_t *message,
 465                       orte_rml_tag_t tag)
 466 {
 467     int rc;
 468     opal_buffer_t data;
 469     int8_t flag;
 470     uint8_t *cmpdata;
 471     size_t cmplen;
 472 
 473     /* setup an intermediate buffer */
 474     OBJ_CONSTRUCT(&data, opal_buffer_t);
 475 
 476     /* pass along the signature */
 477     if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &sig, 1, ORTE_SIGNATURE))) {
 478         ORTE_ERROR_LOG(rc);
 479         OBJ_DESTRUCT(&data);
 480         return rc;
 481     }
 482     /* pass the final tag */
 483     if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &tag, 1, ORTE_RML_TAG))) {
 484         ORTE_ERROR_LOG(rc);
 485         OBJ_DESTRUCT(&data);
 486         return rc;
 487     }
 488 
 489     /* copy the payload into the new buffer - this is non-destructive, so our
 490      * caller is still responsible for releasing any memory in the buffer they
 491      * gave to us
 492      */
 493     if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&data, message))) {
 494         ORTE_ERROR_LOG(rc);
 495         OBJ_DESTRUCT(&data);
 496         return rc;
 497     }
 498 
 499     /* see if we want to compress this message */
 500     if (opal_compress.compress_block((uint8_t*)data.base_ptr, data.bytes_used,
 501                                      &cmpdata, &cmplen)) {
 502         /* the data was compressed - mark that we compressed it */
 503         flag = 1;
 504         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
 505             ORTE_ERROR_LOG(rc);
 506             free(cmpdata);
 507             OBJ_DESTRUCT(&data);
 508             return rc;
 509         }
 510         /* pack the compressed length */
 511         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
 512             ORTE_ERROR_LOG(rc);
 513             free(cmpdata);
 514             OBJ_DESTRUCT(&data);
 515             return rc;
 516         }
 517         /* pack the uncompressed length */
 518         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
 519             ORTE_ERROR_LOG(rc);
 520             free(cmpdata);
 521             OBJ_DESTRUCT(&data);
 522             return rc;
 523         }
 524         /* pack the compressed info */
 525         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
 526             ORTE_ERROR_LOG(rc);
 527             free(cmpdata);
 528             OBJ_DESTRUCT(&data);
 529             return rc;
 530         }
 531         OBJ_DESTRUCT(&data);
 532         free(cmpdata);
 533     } else {
 534         /* mark that it was not compressed */
 535         flag = 0;
 536         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
 537             ORTE_ERROR_LOG(rc);
 538             OBJ_DESTRUCT(&data);
 539             free(cmpdata);
 540             return rc;
 541         }
 542         /* transfer the payload across */
 543         opal_dss.copy_payload(buffer, &data);
 544         OBJ_DESTRUCT(&data);
 545     }
 546 
 547     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 548                          "MSG SIZE: %lu", buffer->bytes_used));
 549     return ORTE_SUCCESS;
 550 }
 551 
 552 void orte_grpcomm_base_mark_distance_recv (orte_grpcomm_coll_t *coll,
 553                                            uint32_t distance) {
 554     opal_bitmap_set_bit (&coll->distance_mask_recv, distance);
 555 }
 556 
 557 unsigned int orte_grpcomm_base_check_distance_recv (orte_grpcomm_coll_t *coll,
 558                                                     uint32_t distance) {
 559     return opal_bitmap_is_set_bit (&coll->distance_mask_recv, distance);
 560 }

/* [<][>][^][v][top][bottom][index][help] */