root/opal/mca/pmix/pmix4x/pmix/src/client/pmix_client_group.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. gtcon
  2. gtdes
  3. PMIx_Group_construct
  4. PMIx_Group_construct_nb
  5. PMIx_Group_destruct
  6. PMIx_Group_destruct_nb
  7. chaincbfunc
  8. relcbfunc
  9. invite_handler
  10. regcbfunc
  11. PMIx_Group_invite
  12. PMIx_Group_invite_nb
  13. PMIx_Group_join
  14. PMIx_Group_join_nb
  15. op_cbfunc
  16. relfn
  17. grp_cbfunc
  18. info_cbfunc

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014      Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016      Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 
  22 #include <pmix.h>
  23 #include <pmix_rename.h>
  24 
  25 #include "src/include/pmix_globals.h"
  26 #include "src/mca/gds/base/base.h"
  27 
  28 #ifdef HAVE_STRING_H
  29 #include <string.h>
  30 #endif
  31 #include <fcntl.h>
  32 #ifdef HAVE_UNISTD_H
  33 #include <unistd.h>
  34 #endif
  35 #ifdef HAVE_SYS_SOCKET_H
  36 #include <sys/socket.h>
  37 #endif
  38 #ifdef HAVE_SYS_UN_H
  39 #include <sys/un.h>
  40 #endif
  41 #ifdef HAVE_SYS_UIO_H
  42 #include <sys/uio.h>
  43 #endif
  44 #ifdef HAVE_SYS_TYPES_H
  45 #include <sys/types.h>
  46 #endif
  47 #include PMIX_EVENT_HEADER
  48 
  49 #include "src/class/pmix_list.h"
  50 #include "src/mca/bfrops/bfrops.h"
  51 #include "src/util/argv.h"
  52 #include "src/util/error.h"
  53 #include "src/util/output.h"
  54 #include "src/threads/threads.h"
  55 #include "src/mca/gds/gds.h"
  56 #include "src/mca/ptl/ptl.h"
  57 
  58 #include "pmix_client_ops.h"
  59 
  60 /* define a tracking object for group operations */
  61 typedef struct {
  62     pmix_object_t super;
  63     pmix_lock_t lock;
  64     pmix_status_t status;
  65     size_t ref;
  66     size_t accepted;
  67     pmix_proc_t *members;
  68     size_t nmembers;
  69     pmix_info_t *info;
  70     size_t ninfo;
  71     pmix_info_t *results;
  72     size_t nresults;
  73     pmix_op_cbfunc_t opcbfunc;
  74     pmix_info_cbfunc_t cbfunc;
  75     void *cbdata;
  76 } pmix_group_tracker_t;
  77 
  78 static void gtcon(pmix_group_tracker_t *p)
  79 {
  80     PMIX_CONSTRUCT_LOCK(&p->lock);
  81     p->status = PMIX_SUCCESS;
  82     p->ref = 0;
  83     p->accepted = 0;
  84     p->members = NULL;
  85     p->nmembers = 0;
  86     p->info = NULL;
  87     p->ninfo = 0;
  88     p->results = NULL;
  89     p->nresults = 0;
  90     p->cbfunc = NULL;
  91     p->opcbfunc = NULL;
  92     p->cbdata = NULL;
  93 }
  94 static void gtdes(pmix_group_tracker_t *p)
  95 {
  96     PMIX_DESTRUCT_LOCK(&p->lock);
  97     if (NULL != p->members) {
  98         PMIX_PROC_FREE(p->members, p->nmembers);
  99     }
 100     if (NULL != p->info) {
 101         PMIX_INFO_FREE(p->info, p->ninfo);
 102     }
 103 }
 104 PMIX_CLASS_INSTANCE(pmix_group_tracker_t,
 105                     pmix_object_t,
 106                     gtcon, gtdes);
 107 
 108 /* callback for wait completion */
 109 static void grp_cbfunc(struct pmix_peer_t *pr,
 110                         pmix_ptl_hdr_t *hdr,
 111                         pmix_buffer_t *buf, void *cbdata);
 112 static void op_cbfunc(pmix_status_t status, void *cbdata);
 113 
 114 static void info_cbfunc(pmix_status_t status,
 115                         pmix_info_t *info, size_t ninfo,
 116                         void *cbdata,
 117                         pmix_release_cbfunc_t release_fn,
 118                         void *release_cbdata);
 119 
 120 
 121 PMIX_EXPORT pmix_status_t PMIx_Group_construct(const char grp[],
 122                                                const pmix_proc_t procs[], size_t nprocs,
 123                                                const pmix_info_t info[], size_t ninfo,
 124                                                pmix_info_t **results, size_t *nresults)
 125 {
 126     pmix_status_t rc;
 127     pmix_group_tracker_t *cb;
 128 
 129     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 130 
 131     pmix_output_verbose(2, pmix_client_globals.connect_output,
 132                         "pmix: group_construct called");
 133 
 134     if (pmix_globals.init_cntr <= 0) {
 135         PMIX_RELEASE_THREAD(&pmix_global_lock);
 136         return PMIX_ERR_INIT;
 137     }
 138 
 139     /* if we aren't connected, don't attempt to send */
 140     if (!pmix_globals.connected) {
 141         PMIX_RELEASE_THREAD(&pmix_global_lock);
 142         return PMIX_ERR_UNREACH;
 143     }
 144     PMIX_RELEASE_THREAD(&pmix_global_lock);
 145 
 146     /* create a callback object as we need to pass it to the
 147      * recv routine so we know which callback to use when
 148      * the return message is recvd */
 149     cb = PMIX_NEW(pmix_group_tracker_t);
 150 
 151     /* push the message into our event base to send to the server */
 152     if (PMIX_SUCCESS != (rc = PMIx_Group_construct_nb(grp, procs, nprocs, info, ninfo, info_cbfunc, cb))) {
 153         PMIX_RELEASE(cb);
 154         return rc;
 155     }
 156 
 157     /* wait for the connect to complete */
 158     PMIX_WAIT_THREAD(&cb->lock);
 159     rc = cb->status;
 160     /* user takes responsibility for releasing any results */
 161     *results = cb->results;
 162     *nresults =  cb->nresults;
 163     PMIX_RELEASE(cb);
 164 
 165     pmix_output_verbose(2, pmix_globals.debug_output,
 166                         "pmix: group construct completed");
 167 
 168     return rc;
 169 }
 170 
 171 PMIX_EXPORT pmix_status_t PMIx_Group_construct_nb(const char grp[],
 172                                                   const pmix_proc_t procs[], size_t nprocs,
 173                                                   const pmix_info_t info[], size_t ninfo,
 174                                                   pmix_info_cbfunc_t cbfunc, void *cbdata)
 175 {
 176     pmix_buffer_t *msg = NULL;
 177     pmix_cmd_t cmd = PMIX_GROUP_CONSTRUCT_CMD;
 178     pmix_status_t rc;
 179     pmix_group_tracker_t *cb = NULL;
 180     size_t n, num;
 181     bool embed = true;
 182     pmix_info_t *iptr = NULL;
 183 
 184     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 185 
 186     pmix_output_verbose(2, pmix_client_globals.connect_output,
 187                         "pmix:group_construct_nb called");
 188 
 189     if (pmix_globals.init_cntr <= 0) {
 190         PMIX_RELEASE_THREAD(&pmix_global_lock);
 191         return PMIX_ERR_INIT;
 192     }
 193 
 194     /* if we aren't connected, don't attempt to send */
 195     if (!pmix_globals.connected) {
 196         PMIX_RELEASE_THREAD(&pmix_global_lock);
 197         return PMIX_ERR_UNREACH;
 198     }
 199     PMIX_RELEASE_THREAD(&pmix_global_lock);
 200 
 201     /* check for bozo input */
 202     if (NULL == procs || 0 >= nprocs) {
 203         return PMIX_ERR_BAD_PARAM;
 204     }
 205 
 206     /* need to add the fence request to the provided info
 207      * structs as this is a blocking operation - only add
 208      * it if the user didn't specify this themselves */
 209     for (n=0; n < ninfo; n++) {
 210         if (PMIX_CHECK_KEY(&info[n], PMIX_EMBED_BARRIER)) {
 211             embed = PMIX_INFO_TRUE(&info[n]);
 212             break;
 213         }
 214     }
 215     if (embed) {
 216         PMIX_INFO_CREATE(iptr, ninfo + 1);
 217         num = ninfo + 1;
 218         for (n=0; n < ninfo; n++) {
 219             PMIX_INFO_XFER(&iptr[n], &info[n]);
 220         }
 221         PMIX_INFO_LOAD(&iptr[ninfo], PMIX_EMBED_BARRIER, &embed, PMIX_BOOL);
 222     } else {
 223         iptr = (pmix_info_t*)info;
 224         num = ninfo;
 225     }
 226 
 227     msg = PMIX_NEW(pmix_buffer_t);
 228     /* pack the cmd */
 229     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 230                      msg, &cmd, 1, PMIX_COMMAND);
 231     if (PMIX_SUCCESS != rc) {
 232         PMIX_ERROR_LOG(rc);
 233         goto done;
 234     }
 235 
 236     /* pack the group ID */
 237     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 238                      msg, &grp, 1, PMIX_STRING);
 239     if (PMIX_SUCCESS != rc) {
 240         PMIX_ERROR_LOG(rc);
 241         goto done;
 242     }
 243 
 244     /* pack the number of procs */
 245     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 246                      msg, &nprocs, 1, PMIX_SIZE);
 247     if (PMIX_SUCCESS != rc) {
 248         PMIX_ERROR_LOG(rc);
 249         goto done;
 250     }
 251     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 252                      msg, procs, nprocs, PMIX_PROC);
 253     if (PMIX_SUCCESS != rc) {
 254         PMIX_ERROR_LOG(rc);
 255         goto done;
 256     }
 257 
 258     /* pack the info structs */
 259     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 260                      msg, &num, 1, PMIX_SIZE);
 261     if (PMIX_SUCCESS != rc) {
 262         PMIX_ERROR_LOG(rc);
 263         PMIX_RELEASE(msg);
 264         goto done;
 265     }
 266     if (0 < num) {
 267         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 268                          msg, iptr, num, PMIX_INFO);
 269         if (PMIX_SUCCESS != rc) {
 270             PMIX_ERROR_LOG(rc);
 271             PMIX_RELEASE(msg);
 272             goto done;
 273         }
 274     }
 275 
 276     /* create a callback object as we need to pass it to the
 277      * recv routine so we know which callback to use when
 278      * the return message is recvd */
 279     cb = PMIX_NEW(pmix_group_tracker_t);
 280     cb->cbfunc = cbfunc;
 281     cb->cbdata = cbdata;
 282 
 283     /* push the message into our event base to send to the server */
 284     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 285                        msg, grp_cbfunc, (void*)cb);
 286     if (PMIX_SUCCESS != rc) {
 287         PMIX_RELEASE(cb);
 288     }
 289 
 290   done:
 291     if (embed && NULL != iptr) {
 292         PMIX_INFO_FREE(iptr, num);
 293     }
 294     if (PMIX_SUCCESS != rc && NULL != msg) {
 295         PMIX_RELEASE(msg);
 296     }
 297     return rc;
 298 }
 299 
 300 PMIX_EXPORT pmix_status_t PMIx_Group_destruct(const char grp[],
 301                                               const pmix_info_t info[], size_t ninfo)
 302 {
 303     pmix_status_t rc;
 304     pmix_group_tracker_t cb;
 305 
 306     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 307 
 308     pmix_output_verbose(2, pmix_client_globals.connect_output,
 309                         "pmix: group_destruct called");
 310 
 311     if (pmix_globals.init_cntr <= 0) {
 312         PMIX_RELEASE_THREAD(&pmix_global_lock);
 313         return PMIX_ERR_INIT;
 314     }
 315 
 316     /* if we aren't connected, don't attempt to send */
 317     if (!pmix_globals.connected) {
 318         PMIX_RELEASE_THREAD(&pmix_global_lock);
 319         return PMIX_ERR_UNREACH;
 320     }
 321     PMIX_RELEASE_THREAD(&pmix_global_lock);
 322 
 323     /* create a callback object as we need to pass it to the
 324      * recv routine so we know which callback to use when
 325      * the return message is recvd */
 326     PMIX_CONSTRUCT(&cb, pmix_group_tracker_t);
 327 
 328     /* push the message into our event base to send to the server */
 329     if (PMIX_SUCCESS != (rc = PMIx_Group_destruct_nb(grp, info, ninfo, op_cbfunc, (void*)&cb))) {
 330         PMIX_ERROR_LOG(rc);
 331         PMIX_DESTRUCT(&cb);
 332         return rc;
 333     }
 334 
 335     /* wait for the connect to complete */
 336     PMIX_WAIT_THREAD(&cb.lock);
 337     rc = cb.status;
 338     PMIX_DESTRUCT(&cb);
 339 
 340     pmix_output_verbose(2, pmix_client_globals.connect_output,
 341                         "pmix: group destruct completed");
 342 
 343     return rc;
 344 }
 345 
 346 PMIX_EXPORT pmix_status_t PMIx_Group_destruct_nb(const char grp[],
 347                                                  const pmix_info_t info[], size_t ninfo,
 348                                                  pmix_op_cbfunc_t cbfunc, void *cbdata)
 349 {
 350     pmix_buffer_t *msg = NULL;
 351     pmix_cmd_t cmd = PMIX_GROUP_DESTRUCT_CMD;
 352     pmix_status_t rc;
 353     pmix_group_tracker_t *cb = NULL;
 354     size_t n, num;
 355     bool embed = true;
 356     pmix_info_t *iptr = NULL;
 357 
 358     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 359 
 360     pmix_output_verbose(2, pmix_client_globals.connect_output,
 361                         "pmix:group_destruct_nb called");
 362 
 363     if (pmix_globals.init_cntr <= 0) {
 364         PMIX_RELEASE_THREAD(&pmix_global_lock);
 365         return PMIX_ERR_INIT;
 366     }
 367 
 368     /* if we aren't connected, don't attempt to send */
 369     if (!pmix_globals.connected) {
 370         PMIX_RELEASE_THREAD(&pmix_global_lock);
 371         return PMIX_ERR_UNREACH;
 372     }
 373     PMIX_RELEASE_THREAD(&pmix_global_lock);
 374 
 375     /* check for bozo input */
 376     if (NULL == grp) {
 377         return PMIX_ERR_BAD_PARAM;
 378     }
 379 
 380     /* need to add the fence request to the provided info
 381      * structs as this is a blocking operation - only add
 382      * it if the user didn't specify this themselves */
 383     for (n=0; n < ninfo; n++) {
 384         if (PMIX_CHECK_KEY(&info[n], PMIX_EMBED_BARRIER)) {
 385             embed = PMIX_INFO_TRUE(&info[n]);
 386             break;
 387         }
 388     }
 389     if (embed) {
 390         PMIX_INFO_CREATE(iptr, ninfo + 1);
 391         num = ninfo + 1;
 392         for (n=0; n < ninfo; n++) {
 393             PMIX_INFO_XFER(&iptr[n], &info[n]);
 394         }
 395         PMIX_INFO_LOAD(&iptr[ninfo], PMIX_EMBED_BARRIER, &embed, PMIX_BOOL);
 396     } else {
 397         iptr = (pmix_info_t*)info;
 398         num = ninfo;
 399     }
 400 
 401     msg = PMIX_NEW(pmix_buffer_t);
 402     /* pack the cmd */
 403     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 404                      msg, &cmd, 1, PMIX_COMMAND);
 405     if (PMIX_SUCCESS != rc) {
 406         PMIX_ERROR_LOG(rc);
 407         goto done;
 408     }
 409 
 410     /* pack the group ID */
 411     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 412                      msg, &grp, 1, PMIX_STRING);
 413     if (PMIX_SUCCESS != rc) {
 414         PMIX_ERROR_LOG(rc);
 415         goto done;
 416     }
 417 
 418     /* pack the info structs */
 419     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 420                      msg, &ninfo, 1, PMIX_SIZE);
 421     if (PMIX_SUCCESS != rc) {
 422         PMIX_ERROR_LOG(rc);
 423         PMIX_RELEASE(msg);
 424         goto done;
 425     }
 426     if (0 < ninfo) {
 427         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 428                          msg, info, ninfo, PMIX_INFO);
 429         if (PMIX_SUCCESS != rc) {
 430             PMIX_ERROR_LOG(rc);
 431             PMIX_RELEASE(msg);
 432             goto done;
 433         }
 434     }
 435 
 436     /* create a callback object as we need to pass it to the
 437      * recv routine so we know which callback to use when
 438      * the return message is recvd */
 439     cb = PMIX_NEW(pmix_group_tracker_t);
 440     cb->opcbfunc = cbfunc;
 441     cb->cbdata = cbdata;
 442 
 443     /* push the message into our event base to send to the server */
 444     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 445                        msg, grp_cbfunc, (void*)cb);
 446     if (PMIX_SUCCESS != rc) {
 447         PMIX_RELEASE(cb);
 448     }
 449 
 450   done:
 451     if (embed && NULL != iptr) {
 452         PMIX_INFO_FREE(iptr, num);
 453     }
 454     if (PMIX_SUCCESS != rc && NULL != msg) {
 455         PMIX_RELEASE(msg);
 456     }
 457     return rc;
 458 }
 459 
 460 static void chaincbfunc(pmix_status_t status, void *cbdata)
 461 {
 462     pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
 463 
 464     if (NULL != cb) {
 465         PMIX_RELEASE(cb);
 466     }
 467 }
 468 
 469 static void relcbfunc(void *cbdata)
 470 {
 471     pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
 472 
 473     PMIX_RELEASE(cb);
 474 }
 475 
 476 static void invite_handler(size_t evhdlr_registration_id,
 477                            pmix_status_t status,
 478                            const pmix_proc_t *source,
 479                            pmix_info_t info[], size_t ninfo,
 480                            pmix_info_t *results, size_t nresults,
 481                            pmix_event_notification_cbfunc_fn_t cbfunc,
 482                            void *cbdata)
 483 {
 484     pmix_group_tracker_t *cb = NULL;
 485     pmix_proc_t *affected = NULL;
 486     size_t n;
 487     pmix_status_t rc = PMIX_GROUP_INVITE_DECLINED;
 488     size_t contextid = SIZE_MAX;
 489 
 490     /* find the object we asked to be returned with event */
 491     for (n=0; n < ninfo; n++) {
 492         if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_RETURN_OBJECT)) {
 493             if (PMIX_POINTER != info[n].value.type) {
 494                 /* this is an unrecoverable error - need to abort */
 495             }
 496             cb = (pmix_group_tracker_t*)info[n].value.data.ptr;
 497         } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
 498             if (PMIX_PROC != info[n].value.type) {
 499                 /* this is an unrecoverable error - need to abort */
 500             }
 501             affected = info[n].value.data.proc;
 502         }  else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_CONTEXT_ID)) {
 503             PMIX_VALUE_GET_NUMBER(rc, &info[n].value, contextid, size_t);
 504         }
 505     }
 506     if (NULL == cb) {
 507         pmix_output(0, "[%s:%d] INVITE HANDLER NULL OBJECT", pmix_globals.myid.nspace, pmix_globals.myid.rank);
 508         /* this is an unrecoverable error - need to abort */
 509         /* always must continue the chain */
 510         cbfunc(rc, NULL, 0, chaincbfunc, NULL, cbdata);
 511         return;
 512     }
 513 
 514     /* if the status is accepted, then record it */
 515     if (PMIX_GROUP_INVITE_ACCEPTED == status) {
 516         cb->accepted++;
 517         /* allow the chain to continue */
 518         rc = PMIX_SUCCESS;
 519         goto complete;
 520     }
 521 
 522     /* if the reporting process terminated, then issue the corresponding
 523      * group event - it only goes to this process */
 524     if (PMIX_PROC_TERMINATED == status) {
 525         cb->ninfo = 2;
 526         PMIX_INFO_CREATE(cb->info, cb->ninfo);
 527         PMIX_INFO_LOAD(&cb->info[0], PMIX_EVENT_AFFECTED_PROC, affected, PMIX_PROC);
 528         PMIX_INFO_LOAD(&cb->info[1], PMIX_GROUP_CONTEXT_ID, &contextid, PMIX_SIZE);
 529         rc = PMIx_Notify_event(PMIX_GROUP_INVITE_FAILED,
 530                                &pmix_globals.myid,
 531                                PMIX_RANGE_PROC_LOCAL,
 532                                cb->info, cb->ninfo,
 533                                chaincbfunc, (void*)cb);
 534         if (PMIX_SUCCESS != rc) {
 535             /* this is an unrecoverable error - need to abort */
 536             pmix_output(0, "[%s:%d] INVITE HANDLER ERROR", pmix_globals.myid.nspace, pmix_globals.myid.rank);
 537         }
 538         PMIX_INFO_FREE(cb->info, cb->ninfo);
 539         goto complete;
 540     }
 541 
 542     /* otherwise, we consider the process as having "declined" and
 543      * ignore it here - if the user registered a handler for that
 544      * case, then the chain will eventually call it
 545      */
 546 
 547   complete:
 548     /* check for complete */
 549     if (cb->accepted == cb->nmembers) {
 550         /* execute the completion callback */
 551         if (NULL != cb->cbfunc) {
 552             cb->cbfunc(PMIX_SUCCESS, NULL, 0, cb->cbdata, relcbfunc, cb);
 553             rc = PMIX_EVENT_ACTION_COMPLETE;
 554         }
 555     }
 556 
 557     /* always must continue the chain */
 558     cbfunc(rc, cb->results, cb->nresults, chaincbfunc, cb, cbdata);
 559     return;
 560 }
 561 
 562 static void regcbfunc(pmix_status_t status,
 563                       size_t refid,
 564                       void *cbdata)
 565 {
 566     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 567 
 568     cb->status = status;
 569     cb->errhandler_ref = refid;
 570     PMIX_WAKEUP_THREAD(&cb->lock);
 571 }
 572 
 573 PMIX_EXPORT pmix_status_t PMIx_Group_invite(const char grp[],
 574                                             const pmix_proc_t procs[], size_t nprocs,
 575                                             const pmix_info_t info[], size_t ninfo,
 576                                             pmix_info_t **results, size_t *nresults)
 577 {
 578     pmix_group_tracker_t cb;
 579     pmix_status_t rc;
 580     size_t n;
 581 
 582     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 583     if (pmix_globals.init_cntr <= 0) {
 584         PMIX_RELEASE_THREAD(&pmix_global_lock);
 585         return PMIX_ERR_INIT;
 586     }
 587 
 588     /* if we aren't connected, then we cannot notify */
 589     if (!pmix_globals.connected) {
 590         PMIX_RELEASE_THREAD(&pmix_global_lock);
 591         return PMIX_ERR_UNREACH;
 592     }
 593     PMIX_RELEASE_THREAD(&pmix_global_lock);
 594 
 595     /* check for bozo input */
 596     if (NULL == grp || NULL == procs) {
 597         return PMIX_ERR_BAD_PARAM;
 598     }
 599 
 600     PMIX_CONSTRUCT(&cb, pmix_group_tracker_t);
 601 
 602     rc = PMIx_Group_invite_nb(grp, procs, nprocs, info, ninfo, info_cbfunc, (void*)&cb);
 603     if (PMIX_SUCCESS != rc) {
 604         PMIX_DESTRUCT(&cb);
 605         return rc;
 606     }
 607 
 608     /* wait for the group construction to complete or fail */
 609     PMIX_WAIT_THREAD(&cb.lock);
 610     rc = cb.status;
 611     *results = cb.results;
 612     *nresults = cb.nresults;
 613     PMIX_DESTRUCT(&cb);
 614 
 615     /* announce group completion, including list of final
 616      * members - only sent to members. Servers will intercept
 617      * and update their membership lists */
 618     PMIX_CONSTRUCT(&cb, pmix_group_tracker_t);
 619     PMIX_INFO_CREATE(cb.info, 3);
 620     if (NULL == cb.info) {
 621         PMIX_DESTRUCT(&cb);
 622         return PMIX_ERR_NOMEM;
 623     }
 624     cb.ninfo = 3;
 625     n = 0;
 626     (void)strncpy(cb.info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN);
 627     cb.info[n].value.type = PMIX_DATA_ARRAY;
 628     PMIX_DATA_ARRAY_CREATE(cb.info[n].value.data.darray, nprocs, PMIX_PROC);
 629     if (NULL == cb.info[n].value.data.darray ||
 630         NULL == cb.info[n].value.data.darray->array) {
 631         PMIX_DESTRUCT(&cb);
 632         return PMIX_ERR_NOMEM;
 633     }
 634     memcpy(cb.info[n].value.data.darray->array, procs, nprocs * sizeof(pmix_proc_t));
 635     ++n;
 636     /* mark that this only goes to non-default handlers */
 637     PMIX_INFO_LOAD(&cb.info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
 638     ++n;
 639     /* provide the group ID */
 640     PMIX_INFO_LOAD(&cb.info[n], PMIX_GROUP_ID, grp, PMIX_STRING);
 641 
 642     /* notify members */
 643     rc = PMIx_Notify_event(PMIX_GROUP_CONSTRUCT_COMPLETE,
 644                            &pmix_globals.myid,
 645                            PMIX_RANGE_CUSTOM,
 646                            cb.info, cb.ninfo,
 647                            op_cbfunc, (void*)&cb);
 648     if (PMIX_SUCCESS != rc) {
 649         PMIX_DESTRUCT(&cb);
 650         return rc;
 651     }
 652 
 653     /* wait for the notify to get out */
 654     PMIX_WAIT_THREAD(&cb.lock);
 655     rc = cb.status;
 656     PMIX_DESTRUCT(&cb);
 657     return rc;
 658 }
 659 
 660 PMIX_EXPORT pmix_status_t PMIx_Group_invite_nb(const char grp[],
 661                                                const pmix_proc_t procs[], size_t nprocs,
 662                                                const pmix_info_t info[], size_t ninfo,
 663                                                pmix_info_cbfunc_t cbfunc, void *cbdata)
 664 {
 665     pmix_group_tracker_t *cb;
 666     pmix_cb_t lock;
 667     pmix_status_t codes[] = {
 668         PMIX_GROUP_INVITE_ACCEPTED,
 669         PMIX_GROUP_INVITE_DECLINED,
 670         PMIX_PROC_TERMINATED
 671     };
 672     size_t ncodes, n;
 673     pmix_info_t myinfo[2];
 674     pmix_status_t rc;
 675 
 676     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 677     if (pmix_globals.init_cntr <= 0) {
 678         PMIX_RELEASE_THREAD(&pmix_global_lock);
 679         return PMIX_ERR_INIT;
 680     }
 681 
 682     /* if we aren't connected, then we cannot notify */
 683     if (!pmix_globals.connected) {
 684         PMIX_RELEASE_THREAD(&pmix_global_lock);
 685         return PMIX_ERR_UNREACH;
 686     }
 687     PMIX_RELEASE_THREAD(&pmix_global_lock);
 688 
 689     /* check for bozo input */
 690     if (NULL == grp || NULL == procs) {
 691         return PMIX_ERR_BAD_PARAM;
 692     }
 693 
 694     cb = PMIX_NEW(pmix_group_tracker_t);
 695     if (NULL == cb) {
 696         return PMIX_ERR_NOMEM;
 697     }
 698     cb->cbfunc = cbfunc;
 699     cb->cbdata = cbdata;
 700     cb->accepted = 1;  // obviously, we accept
 701 
 702     /* compute the number of proposed members */
 703     for (n=0; n < nprocs; n++) {
 704         if (PMIX_RANK_WILDCARD == procs[n].rank) {
 705             /* must get the number of procs in this nspace */
 706         } else {
 707             cb->nmembers++;
 708         }
 709     }
 710 
 711     /* register an event handler specifically to respond
 712      * to accept responses */
 713     PMIX_INFO_LOAD(&myinfo[0], PMIX_EVENT_RETURN_OBJECT, cb, PMIX_POINTER);
 714     PMIX_INFO_LOAD(&myinfo[1], PMIX_EVENT_HDLR_PREPEND, NULL, PMIX_BOOL);
 715     ncodes = sizeof(codes)/sizeof(pmix_status_t);
 716     PMIX_CONSTRUCT(&lock, pmix_cb_t);
 717     PMIx_Register_event_handler(codes, ncodes, myinfo, 2,
 718                                 invite_handler, regcbfunc, &lock);
 719     PMIX_WAIT_THREAD(&lock.lock);
 720     rc = lock.status;
 721     cb->ref = lock.errhandler_ref;
 722     PMIX_DESTRUCT(&lock);
 723     PMIX_INFO_DESTRUCT(&myinfo[0]);
 724     PMIX_INFO_DESTRUCT(&myinfo[1]);
 725     if (PMIX_SUCCESS != rc) {
 726         PMIX_RELEASE(cb);
 727         return rc;
 728     }
 729 
 730     /* check for directives */
 731     if (NULL != info) {
 732         for (n=0; n < ninfo; n++) {
 733             if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
 734                 /* setup a timer */
 735                 break;
 736             }
 737         }
 738     }
 739 
 740     /* limit the range to just the procs we are inviting */
 741     PMIX_INFO_CREATE(cb->info, 3);
 742     if (NULL == cb->info) {
 743         PMIX_CONSTRUCT(&lock, pmix_cb_t);
 744         PMIx_Deregister_event_handler(cb->ref,
 745                                       op_cbfunc, &lock);
 746         PMIX_WAIT_THREAD(&lock.lock);
 747         PMIX_DESTRUCT(&lock);
 748         PMIX_RELEASE(cb);
 749         return PMIX_ERR_NOMEM;
 750     }
 751     cb->ninfo = 3;
 752     n = 0;
 753     (void)strncpy(cb->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN);
 754     cb->info[n].value.type = PMIX_DATA_ARRAY;
 755     PMIX_DATA_ARRAY_CREATE(cb->info[n].value.data.darray, nprocs, PMIX_PROC);
 756     if (NULL == cb->info[n].value.data.darray ||
 757         NULL == cb->info[n].value.data.darray->array) {
 758         PMIX_CONSTRUCT(&lock, pmix_cb_t);
 759         PMIx_Deregister_event_handler(cb->ref,
 760                                       op_cbfunc, &lock);
 761         PMIX_WAIT_THREAD(&lock.lock);
 762         PMIX_DESTRUCT(&lock);
 763         PMIX_RELEASE(cb);
 764         return PMIX_ERR_NOMEM;
 765     }
 766     memcpy(cb->info[n].value.data.darray->array, procs, nprocs * sizeof(pmix_proc_t));
 767     ++n;
 768     /* mark that this only goes to non-default handlers */
 769     PMIX_INFO_LOAD(&cb->info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
 770     ++n;
 771     /* provide the group ID */
 772     PMIX_INFO_LOAD(&cb->info[n], PMIX_GROUP_ID, grp, PMIX_STRING);
 773 
 774     /* notify everyone of the invitation */
 775     PMIX_CONSTRUCT(&lock, pmix_cb_t);
 776     rc = PMIx_Notify_event(PMIX_GROUP_INVITED,
 777                            &pmix_globals.myid,
 778                            PMIX_RANGE_CUSTOM,
 779                            cb->info, cb->ninfo,
 780                            op_cbfunc, (void*)&lock);
 781     PMIX_WAIT_THREAD(&lock.lock);
 782     rc = lock.status;
 783     PMIX_DESTRUCT(&lock);
 784     if (PMIX_SUCCESS != rc) {
 785         PMIX_CONSTRUCT(&lock, pmix_cb_t);
 786         PMIx_Deregister_event_handler(cb->ref,
 787                                       op_cbfunc, &lock);
 788         PMIX_WAIT_THREAD(&lock.lock);
 789         PMIX_DESTRUCT(&lock);
 790         PMIX_RELEASE(cb);
 791     }
 792 
 793     return rc;
 794 }
 795 
 796 PMIX_EXPORT pmix_status_t PMIx_Group_join(const char grp[],
 797                                           const pmix_proc_t *leader,
 798                                           pmix_group_opt_t opt,
 799                                           const pmix_info_t info[], size_t ninfo,
 800                                           pmix_info_t **results, size_t *nresults)
 801 {
 802     pmix_status_t rc;
 803     pmix_group_tracker_t *cb;
 804 
 805     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 806     if (pmix_globals.init_cntr <= 0) {
 807         PMIX_RELEASE_THREAD(&pmix_global_lock);
 808         return PMIX_ERR_INIT;
 809     }
 810 
 811     /* if we aren't connected, don't attempt to send */
 812     if (!pmix_globals.connected) {
 813         PMIX_RELEASE_THREAD(&pmix_global_lock);
 814         return PMIX_ERR_UNREACH;
 815     }
 816     PMIX_RELEASE_THREAD(&pmix_global_lock);
 817 
 818     /* create a callback object as we need to pass it to the
 819      * recv routine so we know which lock to release when
 820      * the return message is recvd */
 821     cb = PMIX_NEW(pmix_group_tracker_t);
 822 
 823     if (PMIX_SUCCESS != (rc = PMIx_Group_join_nb(grp, leader, opt, info, ninfo, info_cbfunc, cb))) {
 824         PMIX_RELEASE(cb);
 825         return rc;
 826     }
 827 
 828     /* wait for the group construction to complete */
 829     PMIX_WAIT_THREAD(&cb->lock);
 830     rc = cb->status;
 831     PMIX_RELEASE(cb);
 832 
 833     pmix_output_verbose(2, pmix_client_globals.connect_output,
 834                         "pmix: group construction completed");
 835 
 836     return rc;
 837 }
 838 
 839 PMIX_EXPORT pmix_status_t PMIx_Group_join_nb(const char grp[],
 840                                              const pmix_proc_t *leader,
 841                                              pmix_group_opt_t opt,
 842                                              const pmix_info_t info[], size_t ninfo,
 843                                              pmix_info_cbfunc_t cbfunc, void *cbdata)
 844 {
 845     pmix_status_t rc;
 846     pmix_group_tracker_t *cb;
 847     pmix_status_t code;
 848     size_t n;
 849     pmix_data_range_t range;
 850 
 851     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 852 
 853     pmix_output_verbose(2, pmix_client_globals.connect_output,
 854                         "[%s:%d] pmix: join nb called",
 855                         pmix_globals.myid.nspace, pmix_globals.myid.rank);
 856 
 857     if (pmix_globals.init_cntr <= 0) {
 858         PMIX_RELEASE_THREAD(&pmix_global_lock);
 859         return PMIX_ERR_INIT;
 860     }
 861 
 862     /* if we aren't connected, then we cannot notify */
 863     if (!pmix_globals.connected) {
 864         PMIX_RELEASE_THREAD(&pmix_global_lock);
 865         return PMIX_ERR_UNREACH;
 866     }
 867     PMIX_RELEASE_THREAD(&pmix_global_lock);
 868 
 869     /* create a callback object as we need to pass it to the
 870      * recv routine so we know which lock to release when
 871      * the notification is done */
 872     cb = PMIX_NEW(pmix_group_tracker_t);
 873     cb->cbdata = cbdata;
 874 
 875     /* check for directives */
 876     if (NULL != info) {
 877         for (n=0; n < ninfo; n++) {
 878             if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
 879                 /* setup a timer */
 880                 break;
 881             }
 882         }
 883     }
 884 
 885     /* set the code according to their request */
 886     if (PMIX_GROUP_ACCEPT == opt) {
 887         code = PMIX_GROUP_INVITE_ACCEPTED;
 888     } else {
 889         code = PMIX_GROUP_INVITE_DECLINED;
 890     }
 891 
 892     /* only notify the leader so we don't hit all procs */
 893     if (NULL != leader) {
 894         range = PMIX_RANGE_CUSTOM;
 895         PMIX_INFO_CREATE(cb->info, 1);
 896         if (NULL == cb->info) {
 897             PMIX_RELEASE(cb);
 898             return PMIX_ERR_NOMEM;
 899         }
 900         PMIX_INFO_LOAD(&cb->info[0], PMIX_EVENT_CUSTOM_RANGE, leader, PMIX_PROC);
 901         cb->ninfo = 1;
 902     } else {
 903         range = PMIX_RANGE_SESSION;
 904     }
 905 
 906     rc = PMIx_Notify_event(code,
 907                            &pmix_globals.myid,
 908                            range,
 909                            cb->info, cb->ninfo,
 910                            op_cbfunc, (void*)cb);
 911     if (PMIX_SUCCESS != rc) {
 912         PMIX_RELEASE(cb);
 913     }
 914     pmix_output_verbose(2, pmix_client_globals.connect_output,
 915                         "[%s:%d] pmix: group invite %s",
 916                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 917                         (PMIX_GROUP_INVITE_ACCEPTED == code) ? "ACCEPTED" : "DECLINED");
 918 
 919     return rc;
 920 }
 921 
 922 static void op_cbfunc(pmix_status_t status, void *cbdata)
 923 {
 924     pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
 925 
 926     cb->status = status;
 927     if (NULL != cb->opcbfunc) {
 928         cb->opcbfunc(status, cb->cbdata);
 929     }
 930     PMIX_POST_OBJECT(cb);
 931     PMIX_WAKEUP_THREAD(&cb->lock);
 932 }
 933 
 934 static void relfn(void *cbdata)
 935 {
 936     pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
 937     PMIX_RELEASE(cb);
 938 }
 939 static void grp_cbfunc(struct pmix_peer_t *pr,
 940                         pmix_ptl_hdr_t *hdr,
 941                         pmix_buffer_t *buf, void *cbdata)
 942 {
 943     pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
 944     pmix_status_t rc;
 945     pmix_status_t ret;
 946     int32_t cnt;
 947     size_t ctxid, ninfo=0;
 948     pmix_info_t info, *iptr=NULL;
 949 
 950     pmix_output_verbose(2, pmix_client_globals.connect_output,
 951                         "pmix:client recv callback activated with %d bytes",
 952                         (NULL == buf) ? -1 : (int)buf->bytes_used);
 953 
 954     if (NULL == buf) {
 955         ret = PMIX_ERR_BAD_PARAM;
 956         goto report;
 957     }
 958 
 959     /* a zero-byte buffer indicates that this recv is being
 960      * completed due to a lost connection */
 961     if (PMIX_BUFFER_IS_EMPTY(buf)) {
 962         ret = PMIX_ERR_UNREACH;
 963         goto report;
 964     }
 965 
 966     /* unpack the returned status */
 967     cnt = 1;
 968     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 969                        buf, &ret, &cnt, PMIX_STATUS);
 970     if (PMIX_SUCCESS != rc) {
 971         PMIX_ERROR_LOG(rc);
 972         ret = rc;
 973     }
 974     /* unpack any ctxid that was provided - it is okay if
 975      * this attempts to unpack past end of buffer */
 976     cnt = 1;
 977     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 978                        buf, &ctxid, &cnt, PMIX_SIZE);
 979     if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 980         PMIX_ERROR_LOG(rc);
 981         ret = rc;
 982     } else {
 983         PMIX_INFO_LOAD(&info, PMIX_GROUP_CONTEXT_ID, &ctxid, PMIX_SIZE);
 984         iptr = &info;
 985         ninfo = 1;
 986     }
 987 
 988   report:
 989     if (NULL != cb->cbfunc) {
 990         cb->cbfunc(ret, iptr, ninfo, cb->cbdata, relfn, cb);
 991         return;
 992     } else if (NULL != cb->opcbfunc) {
 993         cb->opcbfunc(ret, cb->cbdata);
 994         return;
 995     }
 996     PMIX_RELEASE(cb);
 997 }
 998 
 999 static void info_cbfunc(pmix_status_t status,
1000                         pmix_info_t *info, size_t ninfo,
1001                         void *cbdata,
1002                         pmix_release_cbfunc_t release_fn,
1003                         void *release_cbdata)
1004 {
1005     pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
1006     size_t n;
1007 
1008     /* see if anything was returned - e.g., a context id */
1009     cb->status = status;
1010     /* copy/save any returned info */
1011     if (NULL != info) {
1012         cb->nresults = ninfo;
1013         PMIX_INFO_CREATE(cb->results, cb->nresults);
1014         for (n=0; n < ninfo; n++) {
1015             PMIX_INFO_XFER(&cb->results[n], &info[n]);
1016         }
1017     }
1018     if (NULL != release_fn) {
1019         release_fn(release_cbdata);
1020     }
1021     PMIX_POST_OBJECT(cb);
1022     PMIX_WAKEUP_THREAD(&cb->lock);
1023 }

/* [<][>][^][v][top][bottom][index][help] */