This source file includes following definitions.
- gccon
- gcdes
- orte_grpcomm_API_xcast
- allgather_stub
- orte_grpcomm_API_allgather
- orte_grpcomm_base_get_tracker
- create_dmns
- pack_xcast
- orte_grpcomm_base_mark_distance_recv
- orte_grpcomm_base_check_distance_recv
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 #include "orte_config.h"
  32 
  33 
  34 #include "opal/dss/dss.h"
  35 
  36 #include "opal/mca/compress/compress.h"
  37 #include "orte/util/proc_info.h"
  38 #include "orte/util/error_strings.h"
  39 #include "orte/mca/errmgr/errmgr.h"
  40 #include "orte/mca/odls/base/base.h"
  41 #include "orte/mca/rmaps/rmaps_types.h"
  42 #include "orte/mca/rml/rml.h"
  43 #include "orte/mca/routed/routed.h"
  44 #include "orte/mca/state/state.h"
  45 #include "orte/util/name_fns.h"
  46 #include "orte/util/threads.h"
  47 #include "orte/runtime/orte_globals.h"
  48 
  49 #include "orte/mca/grpcomm/grpcomm.h"
  50 #include "orte/mca/grpcomm/base/base.h"
  51 
  52 static int pack_xcast(orte_grpcomm_signature_t *sig,
  53                       opal_buffer_t *buffer,
  54                       opal_buffer_t *message,
  55                       orte_rml_tag_t tag);
  56 
  57 static int create_dmns(orte_grpcomm_signature_t *sig,
  58                        orte_vpid_t **dmns, size_t *ndmns);
  59 
  60 typedef struct {
  61     opal_object_t super;
  62     opal_event_t ev;
  63     orte_grpcomm_signature_t *sig;
  64     opal_buffer_t *buf;
  65     orte_grpcomm_cbfunc_t cbfunc;
  66     void *cbdata;
  67 } orte_grpcomm_caddy_t;
  68 static void gccon(orte_grpcomm_caddy_t *p)
  69 {
  70     p->sig = NULL;
  71     p->buf = NULL;
  72     p->cbfunc = NULL;
  73     p->cbdata = NULL;
  74 }
  75 static void gcdes(orte_grpcomm_caddy_t *p)
  76 {
  77     if (NULL != p->buf) {
  78         OBJ_RELEASE(p->buf);
  79     }
  80 }
  81 static OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
  82                           opal_object_t,
  83                           gccon, gcdes);
  84 
  85 int orte_grpcomm_API_xcast(orte_grpcomm_signature_t *sig,
  86                            orte_rml_tag_t tag,
  87                            opal_buffer_t *msg)
  88 {
  89     int rc = ORTE_ERROR;
  90     opal_buffer_t *buf;
  91     orte_grpcomm_base_active_t *active;
  92     orte_vpid_t *dmns;
  93     size_t ndmns;
  94 
  95     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
  96                          "%s grpcomm:base:xcast sending %u bytes to tag %ld",
  97                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  98                          (NULL == msg) ? 0 : (unsigned int)msg->bytes_used, (long)tag));
  99 
 100     
 101 
 102 
 103     
 104     buf = OBJ_NEW(opal_buffer_t);
 105 
 106     
 107     if (ORTE_SUCCESS != (rc = create_dmns(sig, &dmns, &ndmns))) {
 108         ORTE_ERROR_LOG(rc);
 109         OBJ_RELEASE(buf);
 110         return rc;
 111     }
 112 
 113     
 114     if (ORTE_SUCCESS != (rc = pack_xcast(sig, buf, msg, tag))) {
 115         ORTE_ERROR_LOG(rc);
 116         OBJ_RELEASE(buf);
 117         if (NULL != dmns) {
 118             free(dmns);
 119         }
 120         return rc;
 121     }
 122 
 123     
 124     OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
 125         if (NULL != active->module->xcast) {
 126             if (ORTE_SUCCESS == (rc = active->module->xcast(dmns, ndmns, buf))) {
 127                 break;
 128             }
 129         }
 130     }
 131     OBJ_RELEASE(buf);  
 132     if (NULL != dmns) {
 133         free(dmns);
 134     }
 135     return rc;
 136 }
 137 
 138 static void allgather_stub(int fd, short args, void *cbdata)
 139 {
 140     orte_grpcomm_caddy_t *cd = (orte_grpcomm_caddy_t*)cbdata;
 141     int ret = OPAL_SUCCESS;
 142     int rc;
 143     orte_grpcomm_base_active_t *active;
 144     orte_grpcomm_coll_t *coll;
 145     uint32_t *seq_number;
 146 
 147     ORTE_ACQUIRE_OBJECT(cd);
 148 
 149     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 150                          "%s grpcomm:base:allgather stub",
 151                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 152 
 153     
 154 
 155 
 156     ret = opal_hash_table_get_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void **)&seq_number);
 157     if (OPAL_ERR_NOT_FOUND == ret) {
 158         seq_number = (uint32_t *)malloc(sizeof(uint32_t));
 159         *seq_number = 0;
 160     } else if (OPAL_SUCCESS == ret) {
 161         *seq_number = *seq_number + 1;
 162     } else {
 163         OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
 164                      "%s rpcomm:base:allgather cannot get signature from hash table",
 165                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 166         ORTE_ERROR_LOG(ret);
 167         OBJ_RELEASE(cd);
 168         return;
 169     }
 170     ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)seq_number);
 171     if (OPAL_SUCCESS != ret) {
 172         OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
 173                      "%s rpcomm:base:allgather cannot add new signature to hash table",
 174                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 175         ORTE_ERROR_LOG(ret);
 176         OBJ_RELEASE(cd);
 177         return;
 178     }
 179     coll = orte_grpcomm_base_get_tracker(cd->sig, true);
 180     if (NULL == coll) {
 181         OBJ_RELEASE(cd->sig);
 182         OBJ_RELEASE(cd);
 183         return;
 184     }
 185     OBJ_RELEASE(cd->sig);
 186     coll->cbfunc = cd->cbfunc;
 187     coll->cbdata = cd->cbdata;
 188 
 189     
 190     OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
 191         if (NULL != active->module->allgather) {
 192             if (ORTE_SUCCESS == (rc = active->module->allgather(coll, cd->buf))) {
 193                 break;
 194             }
 195         }
 196     }
 197     OBJ_RELEASE(cd);
 198 }
 199 
 200 int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
 201                                opal_buffer_t *buf,
 202                                orte_grpcomm_cbfunc_t cbfunc,
 203                                void *cbdata)
 204 {
 205     orte_grpcomm_caddy_t *cd;
 206 
 207     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 208                          "%s grpcomm:base:allgather",
 209                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 210 
 211     
 212 
 213     cd = OBJ_NEW(orte_grpcomm_caddy_t);
 214     
 215     OBJ_RETAIN(buf);
 216     opal_dss.copy((void **)&cd->sig, (void *)sig, ORTE_SIGNATURE);
 217     cd->buf = buf;
 218     cd->cbfunc = cbfunc;
 219     cd->cbdata = cbdata;
 220     opal_event_set(orte_event_base, &cd->ev, -1, OPAL_EV_WRITE, allgather_stub, cd);
 221     opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
 222     ORTE_POST_OBJECT(cd);
 223     opal_event_active(&cd->ev, OPAL_EV_WRITE, 1);
 224     return ORTE_SUCCESS;
 225 }
 226 
 227 orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create)
 228 {
 229     orte_grpcomm_coll_t *coll;
 230     int rc;
 231     orte_namelist_t *nm;
 232     opal_list_t children;
 233     size_t n;
 234 
 235     
 236     OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
 237         if (NULL == sig->signature) {
 238             if (NULL == coll->sig->signature) {
 239                 
 240 
 241                 return coll;
 242             }
 243             
 244             break;
 245         }
 246         if (OPAL_EQUAL == (rc = opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE))) {
 247             OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 248                                  "%s grpcomm:base:returning existing collective",
 249                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 250             return coll;
 251         }
 252     }
 253     
 254 
 255     if (!create) {
 256         OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 257                              "%s grpcomm:base: not creating new coll",
 258                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 259 
 260         return NULL;
 261     }
 262     coll = OBJ_NEW(orte_grpcomm_coll_t);
 263     opal_dss.copy((void **)&coll->sig, (void *)sig, ORTE_SIGNATURE);
 264 
 265     if (1 < opal_output_get_verbosity(orte_grpcomm_base_framework.framework_output)) {
 266         char *tmp=NULL;
 267         (void)opal_dss.print(&tmp, NULL, coll->sig, ORTE_SIGNATURE);
 268         opal_output(0, "%s grpcomm:base: creating new coll for%s",
 269                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tmp);
 270         free(tmp);
 271     }
 272 
 273     opal_list_append(&orte_grpcomm_base.ongoing, &coll->super);
 274 
 275     
 276     if (ORTE_SUCCESS != (rc = create_dmns(sig, &coll->dmns, &coll->ndmns))) {
 277         ORTE_ERROR_LOG(rc);
 278         return NULL;
 279     }
 280 
 281     
 282 
 283 
 284     OBJ_CONSTRUCT(&children, opal_list_t);
 285     orte_routed.get_routing_list(&children);
 286     while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
 287         for (n=0; n < coll->ndmns; n++) {
 288             if (nm->name.vpid == coll->dmns[n]) {
 289                 coll->nexpected++;
 290                 break;
 291             }
 292         }
 293         OBJ_RELEASE(nm);
 294     }
 295     OPAL_LIST_DESTRUCT(&children);
 296 
 297     
 298 
 299 
 300     for (n=0; n < coll->ndmns; n++) {
 301         if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
 302             coll->nexpected++;
 303             break;
 304         }
 305     }
 306 
 307     return coll;
 308 }
 309 
 310 static int create_dmns(orte_grpcomm_signature_t *sig,
 311                        orte_vpid_t **dmns, size_t *ndmns)
 312 {
 313     size_t n;
 314     orte_job_t *jdata;
 315     orte_proc_t *proc;
 316     orte_node_t *node;
 317     int i;
 318     opal_list_t ds;
 319     orte_namelist_t *nm;
 320     orte_vpid_t vpid;
 321     bool found;
 322     size_t nds;
 323     orte_vpid_t *dns;
 324 
 325     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 326                          "%s grpcomm:base:create_dmns called with %s signature",
 327                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 328                          (NULL == sig->signature) ? "NULL" : "NON-NULL"));
 329 
 330     
 331 
 332     if (NULL == sig->signature || ORTE_PROC_MY_NAME->jobid == sig->signature[0].jobid) {
 333         *ndmns = orte_process_info.num_procs;
 334         *dmns = NULL;
 335         return ORTE_SUCCESS;
 336     }
 337 
 338     if (ORTE_VPID_WILDCARD == sig->signature[0].vpid) {
 339         OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 340                              "%s grpcomm:base:create_dmns called for all procs in job %s",
 341                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 342                              ORTE_JOBID_PRINT(sig->signature[0].jobid)));
 343         
 344         if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
 345             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 346             ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 347             *ndmns = 0;
 348             *dmns = NULL;
 349             return ORTE_ERR_NOT_FOUND;
 350         }
 351         if (NULL == jdata->map || 0 == jdata->map->num_nodes) {
 352             
 353 
 354 
 355             if (ORTE_PROC_IS_HNP) {
 356                 dns = (orte_vpid_t*)malloc(sizeof(vpid));
 357                 dns[0] = ORTE_PROC_MY_NAME->vpid;
 358                 *ndmns = 1;
 359                 *dmns = dns;
 360                 return ORTE_SUCCESS;
 361             }
 362             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 363             ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 364             *ndmns = 0;
 365             *dmns = NULL;
 366             return ORTE_ERR_NOT_FOUND;
 367         }
 368         dns = (orte_vpid_t*)malloc(jdata->map->num_nodes * sizeof(vpid));
 369         nds = 0;
 370         for (i=0; i < jdata->map->nodes->size && (int)nds < jdata->map->num_nodes; i++) {
 371             if (NULL == (node = opal_pointer_array_get_item(jdata->map->nodes, i))) {
 372                 continue;
 373             }
 374             if (NULL == node->daemon) {
 375                 
 376                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 377                 free(dns);
 378                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 379                 *ndmns = 0;
 380                 *dmns = NULL;
 381                 return ORTE_ERR_NOT_FOUND;
 382             }
 383             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 384                                  "%s grpcomm:base:create_dmns adding daemon %s to array",
 385                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 386                                  ORTE_NAME_PRINT(&node->daemon->name)));
 387             dns[nds++] = node->daemon->name.vpid;
 388         }
 389     } else {
 390         
 391 
 392 
 393         OBJ_CONSTRUCT(&ds, opal_list_t);
 394         for (n=0; n < sig->sz; n++) {
 395             if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
 396                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 397                 OPAL_LIST_DESTRUCT(&ds);
 398                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 399                 *ndmns = 0;
 400                 *dmns = NULL;
 401                 return ORTE_ERR_NOT_FOUND;
 402             }
 403             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 404                                 "%s sign: GETTING PROC OBJECT FOR %s",
 405                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 406                                 ORTE_NAME_PRINT(&sig->signature[n])));
 407             if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
 408                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 409                 OPAL_LIST_DESTRUCT(&ds);
 410                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 411                 *ndmns = 0;
 412                 *dmns = NULL;
 413                 return ORTE_ERR_NOT_FOUND;
 414             }
 415             if (NULL == proc->node || NULL == proc->node->daemon) {
 416                 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 417                 OPAL_LIST_DESTRUCT(&ds);
 418                 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 419                 *ndmns = 0;
 420                 *dmns = NULL;
 421                 return ORTE_ERR_NOT_FOUND;
 422             }
 423             vpid = proc->node->daemon->name.vpid;
 424             found = false;
 425             OPAL_LIST_FOREACH(nm, &ds, orte_namelist_t) {
 426                 if (nm->name.vpid == vpid) {
 427                     found = true;
 428                     break;
 429                 }
 430             }
 431             if (!found) {
 432                 nm = OBJ_NEW(orte_namelist_t);
 433                 nm->name.vpid = vpid;
 434                 opal_list_append(&ds, &nm->super);
 435             }
 436         }
 437         if (0 == opal_list_get_size(&ds)) {
 438             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 439             OPAL_LIST_DESTRUCT(&ds);
 440             ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
 441             *ndmns = 0;
 442             *dmns = NULL;
 443             return ORTE_ERR_NOT_FOUND;
 444         }
 445         dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
 446         nds = 0;
 447         while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&ds))) {
 448             OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
 449                                  "%s grpcomm:base:create_dmns adding daemon %s to array",
 450                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 451                                  ORTE_NAME_PRINT(&nm->name)));
 452             dns[nds++] = nm->name.vpid;
 453             OBJ_RELEASE(nm);
 454         }
 455         OPAL_LIST_DESTRUCT(&ds);
 456     }
 457     *dmns = dns;
 458     *ndmns = nds;
 459     return ORTE_SUCCESS;
 460 }
 461 
 462 static int pack_xcast(orte_grpcomm_signature_t *sig,
 463                       opal_buffer_t *buffer,
 464                       opal_buffer_t *message,
 465                       orte_rml_tag_t tag)
 466 {
 467     int rc;
 468     opal_buffer_t data;
 469     int8_t flag;
 470     uint8_t *cmpdata;
 471     size_t cmplen;
 472 
 473     
 474     OBJ_CONSTRUCT(&data, opal_buffer_t);
 475 
 476     
 477     if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &sig, 1, ORTE_SIGNATURE))) {
 478         ORTE_ERROR_LOG(rc);
 479         OBJ_DESTRUCT(&data);
 480         return rc;
 481     }
 482     
 483     if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &tag, 1, ORTE_RML_TAG))) {
 484         ORTE_ERROR_LOG(rc);
 485         OBJ_DESTRUCT(&data);
 486         return rc;
 487     }
 488 
 489     
 490 
 491 
 492 
 493     if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&data, message))) {
 494         ORTE_ERROR_LOG(rc);
 495         OBJ_DESTRUCT(&data);
 496         return rc;
 497     }
 498 
 499     
 500     if (opal_compress.compress_block((uint8_t*)data.base_ptr, data.bytes_used,
 501                                      &cmpdata, &cmplen)) {
 502         
 503         flag = 1;
 504         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
 505             ORTE_ERROR_LOG(rc);
 506             free(cmpdata);
 507             OBJ_DESTRUCT(&data);
 508             return rc;
 509         }
 510         
 511         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
 512             ORTE_ERROR_LOG(rc);
 513             free(cmpdata);
 514             OBJ_DESTRUCT(&data);
 515             return rc;
 516         }
 517         
 518         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
 519             ORTE_ERROR_LOG(rc);
 520             free(cmpdata);
 521             OBJ_DESTRUCT(&data);
 522             return rc;
 523         }
 524         
 525         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
 526             ORTE_ERROR_LOG(rc);
 527             free(cmpdata);
 528             OBJ_DESTRUCT(&data);
 529             return rc;
 530         }
 531         OBJ_DESTRUCT(&data);
 532         free(cmpdata);
 533     } else {
 534         
 535         flag = 0;
 536         if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
 537             ORTE_ERROR_LOG(rc);
 538             OBJ_DESTRUCT(&data);
 539             free(cmpdata);
 540             return rc;
 541         }
 542         
 543         opal_dss.copy_payload(buffer, &data);
 544         OBJ_DESTRUCT(&data);
 545     }
 546 
 547     OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
 548                          "MSG SIZE: %lu", buffer->bytes_used));
 549     return ORTE_SUCCESS;
 550 }
 551 
 552 void orte_grpcomm_base_mark_distance_recv (orte_grpcomm_coll_t *coll,
 553                                            uint32_t distance) {
 554     opal_bitmap_set_bit (&coll->distance_mask_recv, distance);
 555 }
 556 
 557 unsigned int orte_grpcomm_base_check_distance_recv (orte_grpcomm_coll_t *coll,
 558                                                     uint32_t distance) {
 559     return opal_bitmap_is_set_bit (&coll->distance_mask_recv, distance);
 560 }