root/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_notification.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. PMIx_Notify_event
  2. notify_event_cbfunc
  3. notify_event_cache
  4. notify_server_of_event
  5. progress_local_event_hdlr
  6. pmix_invoke_local_event_hdlr
  7. local_cbfunc
  8. _notify_client_event
  9. pmix_server_notify_client_of_event
  10. pmix_notify_check_range
  11. pmix_notify_check_affected
  12. pmix_event_timeout_cb
  13. pmix_prep_event_chain
  14. sevcon
  15. sevdes
  16. accon
  17. evcon
  18. evdes
  19. chcon
  20. chdes

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2017-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2017      IBM Corporation. All rights reserved.
   7  *
   8  * $COPYRIGHT$
   9  *
  10  * Additional copyrights may follow
  11  *
  12  * $HEADER$
  13  */
  14 #include <src/include/pmix_config.h>
  15 
  16 #include <pmix.h>
  17 #include <pmix_common.h>
  18 #include <pmix_server.h>
  19 #include <pmix_rename.h>
  20 
  21 #include "src/threads/threads.h"
  22 #include "src/util/error.h"
  23 #include "src/util/output.h"
  24 
  25 #include "src/mca/bfrops/bfrops.h"
  26 #include "src/client/pmix_client_ops.h"
  27 #include "src/server/pmix_server_ops.h"
  28 #include "src/include/pmix_globals.h"
  29 
  30 static pmix_status_t notify_server_of_event(pmix_status_t status,
  31                                             const pmix_proc_t *source,
  32                                             pmix_data_range_t range,
  33                                             const pmix_info_t info[], size_t ninfo,
  34                                             pmix_op_cbfunc_t cbfunc, void *cbdata);
  35 
  36 /* if we are a client, we call this function to notify the server of
  37  * an event. If we are a server, our host RM will call this function
  38  * to notify us of an event */
  39 PMIX_EXPORT pmix_status_t PMIx_Notify_event(pmix_status_t status,
  40                                             const pmix_proc_t *source,
  41                                             pmix_data_range_t range,
  42                                             const pmix_info_t info[], size_t ninfo,
  43                                             pmix_op_cbfunc_t cbfunc, void *cbdata)
  44 {
  45     int rc;
  46 
  47     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
  48 
  49     if (pmix_globals.init_cntr <= 0) {
  50         PMIX_RELEASE_THREAD(&pmix_global_lock);
  51         return PMIX_ERR_INIT;
  52     }
  53 
  54     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
  55         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
  56         PMIX_RELEASE_THREAD(&pmix_global_lock);
  57 
  58         pmix_output_verbose(2, pmix_server_globals.event_output,
  59                             "pmix_server_notify_event source = %s:%d event_status = %s",
  60                             (NULL == source) ? "UNKNOWN" : source->nspace,
  61                             (NULL == source) ? PMIX_RANK_WILDCARD : source->rank, PMIx_Error_string(status));
  62 
  63         rc = pmix_server_notify_client_of_event(status, source, range,
  64                                                 info, ninfo,
  65                                                 cbfunc, cbdata);
  66 
  67         if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) {
  68             PMIX_ERROR_LOG(rc);
  69         }
  70         return rc;
  71     }
  72 
  73     /* if we aren't connected, don't attempt to send */
  74     if (!pmix_globals.connected) {
  75         PMIX_RELEASE_THREAD(&pmix_global_lock);
  76         return PMIX_ERR_UNREACH;
  77     }
  78     PMIX_RELEASE_THREAD(&pmix_global_lock);
  79     pmix_output_verbose(2, pmix_client_globals.event_output,
  80                         "pmix_client_notify_event source = %s:%d event_status =%d",
  81                         (NULL == source) ? pmix_globals.myid.nspace : source->nspace,
  82                         (NULL == source) ? pmix_globals.myid.rank : source->rank, status);
  83 
  84     rc = notify_server_of_event(status, source, range,
  85                                 info, ninfo,
  86                                 cbfunc, cbdata);
  87     if (PMIX_SUCCESS != rc) {
  88         PMIX_ERROR_LOG(rc);
  89     }
  90     return rc;
  91 }
  92 
  93 static void notify_event_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr,
  94                                 pmix_buffer_t *buf, void *cbdata)
  95 {
  96     pmix_status_t rc, ret;
  97     int32_t cnt = 1;
  98     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
  99 
 100     /* unpack the status */
 101     PMIX_BFROPS_UNPACK(rc, pr, buf, &ret, &cnt, PMIX_STATUS);
 102     if (PMIX_SUCCESS != rc) {
 103         PMIX_ERROR_LOG(rc);
 104         ret = rc;
 105     }
 106     /* do the cback */
 107     if (NULL != cb->cbfunc.opfn) {
 108         cb->cbfunc.opfn(ret, cb->cbdata);
 109     }
 110     PMIX_RELEASE(cb);
 111 }
 112 
 113 static pmix_status_t notify_event_cache(pmix_notify_caddy_t *cd)
 114 {
 115     pmix_status_t rc;
 116     int j;
 117     pmix_notify_caddy_t *pk;
 118     int idx;
 119     time_t etime;
 120 
 121     /* add to our cache */
 122     rc = pmix_hotel_checkin(&pmix_globals.notifications, cd, &cd->room);
 123     /* if there wasn't room, then search for the longest tenured
 124      * occupant and evict them */
 125     if (PMIX_SUCCESS != rc) {
 126         etime = 0;
 127         idx = -1;
 128         for (j=0; j < pmix_globals.max_events; j++) {
 129             pmix_hotel_knock(&pmix_globals.notifications, j, (void**)&pk);
 130             if (NULL == pk) {
 131                 /* hey, there is room! */
 132                 pmix_hotel_checkin_with_res(&pmix_globals.notifications, cd, &cd->room);
 133                 return PMIX_SUCCESS;
 134             }
 135             /* check the age */
 136             if (0 == j) {
 137                 etime = pk->ts;
 138                 idx = j;
 139             } else {
 140                 if (difftime(pk->ts, etime) < 0) {
 141                     etime = pk->ts;
 142                     idx = j;
 143                 }
 144             }
 145         }
 146         if (0 <= idx) {
 147             /* we found the oldest occupant - evict it */
 148             pmix_hotel_checkout_and_return_occupant(&pmix_globals.notifications, idx, (void**)&pk);
 149             PMIX_RELEASE(pk);
 150             rc = pmix_hotel_checkin(&pmix_globals.notifications, cd, &cd->room);
 151         }
 152     }
 153     return rc;
 154 }
 155 
 156 /* as a client, we pass the notification to our server */
 157 static pmix_status_t notify_server_of_event(pmix_status_t status,
 158                                             const pmix_proc_t *source,
 159                                             pmix_data_range_t range,
 160                                             const pmix_info_t info[], size_t ninfo,
 161                                             pmix_op_cbfunc_t cbfunc, void *cbdata)
 162 {
 163     pmix_status_t rc;
 164     pmix_buffer_t *msg = NULL;
 165     pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
 166     pmix_cb_t *cb;
 167     pmix_event_chain_t *chain;
 168     size_t n;
 169     pmix_notify_caddy_t *cd;
 170 
 171     pmix_output_verbose(2, pmix_client_globals.event_output,
 172                         "[%s:%d] client: notifying server %s:%d of status %s for range %s",
 173                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 174                         pmix_client_globals.myserver->info->pname.nspace,
 175                         pmix_client_globals.myserver->info->pname.rank,
 176                         PMIx_Error_string(status), PMIx_Data_range_string(range));
 177 
 178     if (PMIX_RANGE_PROC_LOCAL != range) {
 179         /* create the msg object */
 180         msg = PMIX_NEW(pmix_buffer_t);
 181         if (NULL == msg) {
 182             return PMIX_ERR_NOMEM;
 183         }
 184         /* pack the command */
 185         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &cmd, 1, PMIX_COMMAND);
 186         if (PMIX_SUCCESS != rc) {
 187             PMIX_ERROR_LOG(rc);
 188             goto cleanup;
 189         }
 190         /* pack the status */
 191         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &status, 1, PMIX_STATUS);
 192         if (PMIX_SUCCESS != rc) {
 193             PMIX_ERROR_LOG(rc);
 194             goto cleanup;
 195         }
 196         /* no need to pack the source as it is us */
 197 
 198         /* pack the range */
 199         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &range, 1, PMIX_DATA_RANGE);
 200         if (PMIX_SUCCESS != rc) {
 201             PMIX_ERROR_LOG(rc);
 202             goto cleanup;
 203         }
 204         /* pack the info */
 205         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &ninfo, 1, PMIX_SIZE);
 206         if (PMIX_SUCCESS != rc) {
 207             PMIX_ERROR_LOG(rc);
 208             goto cleanup;
 209         }
 210         if (0 < ninfo) {
 211             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, info, ninfo, PMIX_INFO);
 212             if (PMIX_SUCCESS != rc) {
 213                 PMIX_ERROR_LOG(rc);
 214                 goto cleanup;
 215             }
 216         }
 217     }
 218 
 219     /* setup for our own local callbacks */
 220     chain = PMIX_NEW(pmix_event_chain_t);
 221     chain->status = status;
 222     pmix_strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
 223     chain->source.rank = pmix_globals.myid.rank;
 224     /* we always leave space for event hdlr name and a callback object */
 225     chain->nallocated = ninfo + 2;
 226     PMIX_INFO_CREATE(chain->info, chain->nallocated);
 227     /* prep the chain for processing */
 228     pmix_prep_event_chain(chain, info, ninfo, true);
 229 
 230     /* we need to cache this event so we can pass it into
 231      * ourselves should someone later register for it */
 232     cd = PMIX_NEW(pmix_notify_caddy_t);
 233     cd->status = status;
 234     if (NULL == source) {
 235         pmix_strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN);
 236         cd->source.rank = PMIX_RANK_UNDEF;
 237     } else {
 238         pmix_strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN);
 239         cd->source.rank = source->rank;
 240     }
 241     cd->range = range;
 242     if (0 < chain->ninfo) {
 243         cd->ninfo = chain->ninfo;
 244         PMIX_INFO_CREATE(cd->info, cd->ninfo);
 245         cd->nondefault = chain->nondefault;
 246        /* need to copy the info */
 247         for (n=0; n < cd->ninfo; n++) {
 248             PMIX_INFO_XFER(&cd->info[n], &chain->info[n]);
 249         }
 250     }
 251     if (NULL != chain->targets) {
 252         cd->ntargets = chain->ntargets;
 253         PMIX_PROC_CREATE(cd->targets, cd->ntargets);
 254         memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
 255     }
 256     if (NULL != chain->affected) {
 257         cd->naffected = chain->naffected;
 258         PMIX_PROC_CREATE(cd->affected, cd->naffected);
 259         if (NULL == cd->affected) {
 260             cd->naffected = 0;
 261             rc = PMIX_ERR_NOMEM;
 262             goto cleanup;
 263         }
 264         memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
 265     }
 266     /* cache it */
 267     rc = notify_event_cache(cd);
 268     if (PMIX_SUCCESS != rc) {
 269         PMIX_ERROR_LOG(rc);
 270         PMIX_RELEASE(cd);
 271         goto cleanup;
 272     }
 273 
 274     if (PMIX_RANGE_PROC_LOCAL != range && NULL != msg) {
 275         /* create a callback object as we need to pass it to the
 276          * recv routine so we know which callback to use when
 277          * the server acks/nacks the register events request. The
 278          * server will _not_ send this notification back to us,
 279          * so we handle it locally */
 280         cb = PMIX_NEW(pmix_cb_t);
 281         cb->cbfunc.opfn = cbfunc;
 282         cb->cbdata = cbdata;
 283         /* send to the server */
 284         pmix_output_verbose(2, pmix_client_globals.event_output,
 285                             "[%s:%d] client: notifying server %s:%d - sending",
 286                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
 287                             pmix_client_globals.myserver->info->pname.nspace,
 288                             pmix_client_globals.myserver->info->pname.rank);
 289         PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 290                            msg, notify_event_cbfunc, cb);
 291         if (PMIX_SUCCESS != rc) {
 292             PMIX_ERROR_LOG(rc);
 293             PMIX_RELEASE(cb);
 294             goto cleanup;
 295         }
 296     } else if (NULL != cbfunc) {
 297         cbfunc(PMIX_SUCCESS, cbdata);
 298     }
 299 
 300     /* now notify any matching registered callbacks we have */
 301     pmix_invoke_local_event_hdlr(chain);
 302 
 303     return PMIX_SUCCESS;
 304 
 305   cleanup:
 306     pmix_output_verbose(2, pmix_client_globals.event_output,
 307                         "client: notifying server - unable to send");
 308     if (NULL != msg) {
 309         PMIX_RELEASE(msg);
 310     }
 311     /* we were unable to send anything, so we just return the error */
 312     return rc;
 313 }
 314 
 315 
 316 static void progress_local_event_hdlr(pmix_status_t status,
 317                                       pmix_info_t *results, size_t nresults,
 318                                       pmix_op_cbfunc_t cbfunc, void *thiscbdata,
 319                                       void *notification_cbdata)
 320 {
 321     /* this may be in the host's thread, so we need to threadshift it
 322      * before accessing our internal data */
 323 
 324     pmix_event_chain_t *chain = (pmix_event_chain_t*)notification_cbdata;
 325     size_t n, nsave, cnt;
 326     pmix_info_t *newinfo;
 327     pmix_list_item_t *item;
 328     pmix_event_hdlr_t *nxt;
 329 
 330     /* aggregate the results per RFC0018 - first search the
 331      * prior chained results to see if any keys have been NULL'd
 332      * as this indicates that info struct should be removed */
 333     nsave = 0;
 334     for (n=0; n < chain->nresults; n++) {
 335         if (0 < strlen(chain->results[n].key)) {
 336             ++nsave;
 337         }
 338     }
 339     /* we have to at least record the status returned by each
 340      * stage of the event handler chain, so we have to reallocate
 341      * the array to make space */
 342 
 343     /* add in any new results plus space for the returned status */
 344     nsave += nresults + 1;
 345     /* create the new space */
 346     PMIX_INFO_CREATE(newinfo, nsave);
 347     /* transfer over the prior data */
 348     cnt = 0;
 349     for (n=0; n < chain->nresults; n++) {
 350         if (0 < strlen(chain->results[n].key)) {
 351             PMIX_INFO_XFER(&newinfo[cnt], &chain->results[n]);
 352             ++cnt;
 353         }
 354     }
 355 
 356     /* save this handler's returned status */
 357     if (NULL != chain->evhdlr->name) {
 358         pmix_strncpy(newinfo[cnt].key, chain->evhdlr->name, PMIX_MAX_KEYLEN);
 359     } else {
 360         pmix_strncpy(newinfo[cnt].key, "UNKNOWN", PMIX_MAX_KEYLEN);
 361     }
 362     newinfo[cnt].value.type = PMIX_STATUS;
 363     newinfo[cnt].value.data.status = status;
 364     ++cnt;
 365     /* transfer across the new results */
 366     for (n=0; n < nresults; n++) {
 367         PMIX_INFO_XFER(&newinfo[cnt], &results[n]);
 368         ++cnt;
 369     }
 370     /* release the prior results */
 371     if (0 < chain->nresults) {
 372         PMIX_INFO_FREE(chain->results, chain->nresults);
 373     }
 374     /* pass along the new ones */
 375     chain->results = newinfo;
 376     chain->nresults = cnt;
 377     /* clear any loaded name and object */
 378     chain->ninfo = chain->nallocated - 2;
 379     PMIX_INFO_DESTRUCT(&chain->info[chain->nallocated-2]);
 380     PMIX_INFO_DESTRUCT(&chain->info[chain->nallocated-1]);
 381 
 382     /* if the caller indicates that the chain is completed,
 383      * or we completed the "last" event */
 384     if (PMIX_EVENT_ACTION_COMPLETE == status || chain->endchain) {
 385         goto complete;
 386     }
 387     item = NULL;
 388 
 389     /* see if we need to continue, starting with the single code events */
 390     if (1 == chain->evhdlr->ncodes) {
 391         /* the last handler was for a single code - see if there are
 392          * any others that match this event */
 393         item = &chain->evhdlr->super;
 394         while (pmix_list_get_end(&pmix_globals.events.single_events) != (item = pmix_list_get_next(item))) {
 395             nxt = (pmix_event_hdlr_t*)item;
 396             if (nxt->codes[0] == chain->status &&
 397                 pmix_notify_check_range(&nxt->rng, &chain->source) &&
 398                 pmix_notify_check_affected(nxt->affected, nxt->naffected,
 399                                            chain->affected, chain->naffected)) {
 400                 chain->evhdlr = nxt;
 401                 /* reset our count to the info provided by the caller */
 402                 chain->ninfo = chain->nallocated - 2;
 403                 /* if the handler has a name, then provide it */
 404                 if (NULL != chain->evhdlr->name) {
 405                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
 406                     chain->ninfo++;
 407                 }
 408 
 409                 /* if there is an evhdlr cbobject, provide it */
 410                 if (NULL != chain->evhdlr->cbobject) {
 411                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
 412                     chain->ninfo++;
 413                 }
 414                 nxt->evhdlr(nxt->index,
 415                             chain->status, &chain->source,
 416                             chain->info, chain->ninfo,
 417                             chain->results, chain->nresults,
 418                             progress_local_event_hdlr, (void*)chain);
 419                 return;
 420             }
 421         }
 422         /* if we get here, then there are no more single code
 423          * events that match */
 424         item = pmix_list_get_begin(&pmix_globals.events.multi_events);
 425     }
 426 
 427     /* see if we need to continue with the multi code events */
 428     if (NULL != chain->evhdlr->codes || NULL != item) {
 429         /* the last handler was for a multi-code event, or we exhausted
 430          * all the single code events */
 431         if (NULL == item) {
 432             /* if the last handler was multi-code, then start from that point */
 433             item = &chain->evhdlr->super;
 434         }
 435         while (pmix_list_get_end(&pmix_globals.events.multi_events) != (item = pmix_list_get_next(item))) {
 436             nxt = (pmix_event_hdlr_t*)item;
 437             if (!pmix_notify_check_range(&nxt->rng, &chain->source) ||
 438                 !pmix_notify_check_affected(nxt->affected, nxt->naffected,
 439                                             chain->affected, chain->naffected)) {
 440                 continue;
 441             }
 442             for (n=0; n < nxt->ncodes; n++) {
 443                 /* if this event handler provided a range, check to see if
 444                  * the source fits within it */
 445                 if (nxt->codes[n] == chain->status) {
 446                     chain->evhdlr = nxt;
 447                     /* reset our count to the info provided by the caller */
 448                     chain->ninfo = chain->nallocated - 2;
 449                     /* if the handler has a name, then provide it */
 450                     if (NULL != chain->evhdlr->name) {
 451                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
 452                         chain->ninfo++;
 453                     }
 454 
 455                     /* if there is an evhdlr cbobject, provide it */
 456                     if (NULL != chain->evhdlr->cbobject) {
 457                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
 458                         chain->ninfo++;
 459                     }
 460                     nxt->evhdlr(nxt->index,
 461                                 chain->status, &chain->source,
 462                                 chain->info, chain->ninfo,
 463                                 chain->results, chain->nresults,
 464                                 progress_local_event_hdlr, (void*)chain);
 465                     return;
 466                 }
 467             }
 468         }
 469         /* if we get here, then there are no more multi-mode
 470          * events that match */
 471         item = pmix_list_get_begin(&pmix_globals.events.default_events);
 472     }
 473 
 474     /* if they didn't want it to go to a default handler, then ignore them */
 475     if (!chain->nondefault) {
 476         if (NULL == item) {
 477             item = &chain->evhdlr->super;
 478         }
 479         if (pmix_list_get_end(&pmix_globals.events.default_events) != (item = pmix_list_get_next(item))) {
 480             nxt = (pmix_event_hdlr_t*)item;
 481             /* if this event handler provided a range, check to see if
 482              * the source fits within it */
 483             if (pmix_notify_check_range(&nxt->rng, &chain->source) &&
 484                 pmix_notify_check_affected(nxt->affected, nxt->naffected,
 485                                            chain->affected, chain->naffected)) {
 486                 chain->evhdlr = nxt;
 487                 /* reset our count to the info provided by the caller */
 488                 chain->ninfo = chain->nallocated - 2;
 489                 /* if the handler has a name, then provide it */
 490                 if (NULL != chain->evhdlr->name) {
 491                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
 492                     chain->ninfo++;
 493                 }
 494 
 495                 /* if there is an evhdlr cbobject, provide it */
 496                 if (NULL != chain->evhdlr->cbobject) {
 497                     PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
 498                     chain->ninfo++;
 499                 }
 500                 nxt->evhdlr(nxt->index,
 501                             chain->status, &chain->source,
 502                             chain->info, chain->ninfo,
 503                             chain->results, chain->nresults,
 504                             progress_local_event_hdlr, (void*)chain);
 505                 return;
 506             }
 507         }
 508     }
 509 
 510     /* if we registered a "last" handler, and it fits the given range
 511      * and code, then invoke it now */
 512     if (NULL != pmix_globals.events.last &&
 513         pmix_notify_check_range(&pmix_globals.events.last->rng, &chain->source) &&
 514         pmix_notify_check_affected(pmix_globals.events.last->affected, pmix_globals.events.last->naffected,
 515                                    chain->affected, chain->naffected)) {
 516         chain->endchain = true;  // ensure we don't do this again
 517         if (1 == pmix_globals.events.last->ncodes &&
 518             pmix_globals.events.last->codes[0] == chain->status) {
 519             chain->evhdlr = pmix_globals.events.last;
 520             /* reset our count to the info provided by the caller */
 521             chain->ninfo = chain->nallocated - 2;
 522             /* if the handler has a name, then provide it */
 523             if (NULL != chain->evhdlr->name) {
 524                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
 525                 chain->ninfo++;
 526             }
 527 
 528             /* if there is an evhdlr cbobject, provide it */
 529             if (NULL != chain->evhdlr->cbobject) {
 530                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
 531                 chain->ninfo++;
 532             }
 533             chain->evhdlr->evhdlr(chain->evhdlr->index,
 534                                   chain->status, &chain->source,
 535                                   chain->info, chain->ninfo,
 536                                   chain->results, chain->nresults,
 537                                   progress_local_event_hdlr, (void*)chain);
 538             return;
 539         } else if (NULL != pmix_globals.events.last->codes) {
 540             /* need to check if this code is included in the array */
 541             for (n=0; n < pmix_globals.events.last->ncodes; n++) {
 542                 if (pmix_globals.events.last->codes[n] == chain->status) {
 543                     chain->evhdlr = pmix_globals.events.last;
 544                     /* reset our count to the info provided by the caller */
 545                     chain->ninfo = chain->nallocated - 2;
 546                     /* if the handler has a name, then provide it */
 547                     if (NULL != chain->evhdlr->name) {
 548                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
 549                         chain->ninfo++;
 550                     }
 551 
 552                     /* if there is an evhdlr cbobject, provide it */
 553                     if (NULL != chain->evhdlr->cbobject) {
 554                         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
 555                         chain->ninfo++;
 556                     }
 557                     chain->evhdlr->evhdlr(chain->evhdlr->index,
 558                                           chain->status, &chain->source,
 559                                           chain->info, chain->ninfo,
 560                                           chain->results, chain->nresults,
 561                                           progress_local_event_hdlr, (void*)chain);
 562                     return;
 563                 }
 564             }
 565         } else {
 566             /* gets run for all codes */
 567             chain->evhdlr = pmix_globals.events.last;
 568             /* reset our count to the info provided by the caller */
 569             chain->ninfo = chain->nallocated - 2;
 570             /* if the handler has a name, then provide it */
 571             if (NULL != chain->evhdlr->name) {
 572                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
 573                 chain->ninfo++;
 574             }
 575 
 576             /* if there is an evhdlr cbobject, provide it */
 577             if (NULL != chain->evhdlr->cbobject) {
 578                 PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
 579                 chain->ninfo++;
 580             }
 581             chain->evhdlr->evhdlr(chain->evhdlr->index,
 582                                   chain->status, &chain->source,
 583                                   chain->info, chain->ninfo,
 584                                   chain->results, chain->nresults,
 585                                   progress_local_event_hdlr, (void*)chain);
 586             return;
 587         }
 588     }
 589 
 590   complete:
 591     /* we still have to call their final callback */
 592     if (NULL != chain->final_cbfunc) {
 593         chain->final_cbfunc(PMIX_SUCCESS, chain->final_cbdata);
 594         return;
 595     }
 596     /* maintain acctng */
 597     PMIX_RELEASE(chain);
 598     /* let the caller know that we are done with their callback */
 599     if (NULL != cbfunc) {
 600         cbfunc(PMIX_SUCCESS, thiscbdata);
 601     }
 602 }
 603 
 604 /* given notification of an event, cycle thru our list of
 605  * registered callbacks and invoke the matching ones. Note
 606  * that we will invoke the callbacks in order from single
 607  * to multi-event to default, keeping a log of any returned
 608  * info and passing it down to the next invoked event handler.
 609  * Thus, each handler is given the opportunity to see what
 610  * prior handlers did, and decide if anything further needs
 611  * to be done.
 612  */
 613 void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
 614 {
 615     /* We need to parse thru each registered handler and determine
 616      * which one(s) to call for the specific error */
 617     size_t i;
 618     pmix_event_hdlr_t *evhdlr;
 619     pmix_status_t rc = PMIX_SUCCESS;
 620     bool found;
 621 
 622     pmix_output_verbose(2, pmix_client_globals.event_output,
 623                         "%s:%d invoke_local_event_hdlr for status %s",
 624                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 625                         PMIx_Error_string(chain->status));
 626 
 627     /* sanity check */
 628     if (NULL == chain->info) {
 629         /* should never happen as space must always be
 630          * reserved for handler name and callback object*/
 631         rc = PMIX_ERR_BAD_PARAM;
 632         goto complete;
 633     }
 634 
 635     /* if we are not a target, then we can simply ignore this event */
 636     if (NULL != chain->targets) {
 637         found = false;
 638         for (i=0; i < chain->ntargets; i++) {
 639             if (PMIX_CHECK_PROCID(&chain->targets[i], &pmix_globals.myid)) {
 640                 found = true;
 641                 break;
 642             }
 643         }
 644         if (!found) {
 645             goto complete;
 646         }
 647     }
 648 
 649     /* if we registered a "first" handler, and it fits the given range,
 650      * then invoke it first */
 651     if (NULL != pmix_globals.events.first) {
 652         if (1 == pmix_globals.events.first->ncodes &&
 653             pmix_globals.events.first->codes[0] == chain->status &&
 654             pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source) &&
 655             pmix_notify_check_affected(pmix_globals.events.first->affected, pmix_globals.events.first->naffected,
 656                                        chain->affected, chain->naffected)) {
 657             /* invoke the handler */
 658             chain->evhdlr = pmix_globals.events.first;
 659             goto invk;
 660         } else if (NULL != pmix_globals.events.first->codes) {
 661             /* need to check if this code is included in the array */
 662             found = false;
 663             for (i=0; i < pmix_globals.events.first->ncodes; i++) {
 664                 if (pmix_globals.events.first->codes[i] == chain->status) {
 665                     found = true;
 666                     break;
 667                 }
 668             }
 669             /* if this event handler provided a range, check to see if
 670              * the source fits within it */
 671             if (found && pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source)) {
 672                 /* invoke the handler */
 673                 chain->evhdlr = pmix_globals.events.first;
 674                 goto invk;
 675             }
 676         } else {
 677             /* take all codes for a default handler */
 678             if (pmix_notify_check_range(&pmix_globals.events.first->rng, &chain->source)) {
 679                 /* invoke the handler */
 680                 chain->evhdlr = pmix_globals.events.first;
 681                 goto invk;
 682             }
 683         }
 684         /* get here if there is no match, so fall thru */
 685     }
 686 
 687     /* cycle thru the single-event registrations first */
 688     PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.single_events, pmix_event_hdlr_t) {
 689         if (evhdlr->codes[0] == chain->status) {
 690             if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
 691                 pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
 692                                            chain->affected, chain->naffected)) {
 693                 /* invoke the handler */
 694                 chain->evhdlr = evhdlr;
 695                 goto invk;
 696             }
 697         }
 698     }
 699 
 700     /* if we didn't find any match in the single-event registrations,
 701      * then cycle thru the multi-event registrations next */
 702     PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.multi_events, pmix_event_hdlr_t) {
 703         for (i=0; i < evhdlr->ncodes; i++) {
 704             if (evhdlr->codes[i] == chain->status) {
 705                 if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
 706                     pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
 707                                                chain->affected, chain->naffected)) {
 708                     /* invoke the handler */
 709                     chain->evhdlr = evhdlr;
 710                     goto invk;
 711                 }
 712             }
 713         }
 714     }
 715 
 716     /* if they didn't want it to go to a default handler, then ignore them */
 717     if (!chain->nondefault) {
 718         /* pass it to any default handlers */
 719         PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
 720             if (pmix_notify_check_range(&evhdlr->rng, &chain->source) &&
 721                 pmix_notify_check_affected(evhdlr->affected, evhdlr->naffected,
 722                                            chain->affected, chain->naffected)) {
 723                 /* invoke the handler */
 724                 chain->evhdlr = evhdlr;
 725                 goto invk;
 726             }
 727         }
 728     }
 729 
 730     /* if we registered a "last" handler, and it fits the given range
 731      * and code, then invoke it now */
 732     if (NULL != pmix_globals.events.last &&
 733         pmix_notify_check_range(&pmix_globals.events.last->rng, &chain->source) &&
 734         pmix_notify_check_affected(pmix_globals.events.last->affected, pmix_globals.events.last->naffected,
 735                                    chain->affected, chain->naffected)) {
 736         chain->endchain = true;  // ensure we don't do this again
 737         if (1 == pmix_globals.events.last->ncodes &&
 738             pmix_globals.events.last->codes[0] == chain->status) {
 739             chain->evhdlr = pmix_globals.events.last;
 740             goto invk;
 741         } else if (NULL != pmix_globals.events.last->codes) {
 742             /* need to check if this code is included in the array */
 743             for (i=0; i < pmix_globals.events.last->ncodes; i++) {
 744                 if (pmix_globals.events.last->codes[i] == chain->status) {
 745                     chain->evhdlr = pmix_globals.events.last;
 746                     goto invk;
 747                 }
 748             }
 749         } else {
 750             /* gets run for all codes */
 751             chain->evhdlr = pmix_globals.events.last;
 752             goto invk;
 753         }
 754     }
 755 
 756     /* if we got here, then nothing was found */
 757 
 758   complete:
 759     /* we still have to call their final callback */
 760     if (NULL != chain->final_cbfunc) {
 761         chain->final_cbfunc(rc, chain->final_cbdata);
 762     } else {
 763         PMIX_RELEASE(chain);
 764     }
 765     return;
 766 
 767 
 768   invk:
 769     /* start with the chain holding only the given info */
 770     chain->ninfo = chain->nallocated - 2;
 771 
 772     /* if the handler has a name, then provide it */
 773     if (NULL != chain->evhdlr->name) {
 774         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_HDLR_NAME, chain->evhdlr->name, PMIX_STRING);
 775         chain->ninfo++;
 776     }
 777 
 778     /* if there is an evhdlr cbobject, provide it */
 779     if (NULL != chain->evhdlr->cbobject) {
 780         PMIX_INFO_LOAD(&chain->info[chain->ninfo], PMIX_EVENT_RETURN_OBJECT, chain->evhdlr->cbobject, PMIX_POINTER);
 781         chain->ninfo++;
 782     }
 783 
 784     /* invoke the handler */
 785     pmix_output_verbose(2, pmix_client_globals.event_output,
 786                         "[%s:%d] INVOKING EVHDLR %s", __FILE__, __LINE__,
 787                         (NULL == chain->evhdlr->name) ?
 788                         "NULL" : chain->evhdlr->name);
 789     chain->evhdlr->evhdlr(chain->evhdlr->index,
 790                           chain->status, &chain->source,
 791                           chain->info, chain->ninfo,
 792                           NULL, 0,
 793                           progress_local_event_hdlr, (void*)chain);
 794     return;
 795 }
 796 
 797 static void local_cbfunc(pmix_status_t status, void *cbdata)
 798 {
 799     pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
 800 
 801     if (NULL != cd->cbfunc) {
 802         cd->cbfunc(status, cd->cbdata);
 803     }
 804     PMIX_RELEASE(cd);
 805 }
 806 
 807 static void _notify_client_event(int sd, short args, void *cbdata)
 808 {
 809     pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
 810     pmix_regevents_info_t *reginfoptr;
 811     pmix_peer_events_info_t *pr;
 812     pmix_event_chain_t *chain;
 813     size_t n, nleft;
 814     bool matched, holdcd;
 815     pmix_buffer_t *bfr;
 816     pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
 817     pmix_status_t rc;
 818     pmix_list_t trk;
 819     pmix_namelist_t *nm;
 820     pmix_namespace_t *nptr, *tmp;
 821     pmix_range_trkr_t rngtrk;
 822     pmix_proc_t proc;
 823 
 824     /* need to acquire the object from its originating thread */
 825     PMIX_ACQUIRE_OBJECT(cd);
 826 
 827     pmix_output_verbose(2, pmix_server_globals.event_output,
 828                         "pmix_server: _notify_client_event notifying clients of event %s range %s type %s",
 829                         PMIx_Error_string(cd->status),
 830                         PMIx_Data_range_string(cd->range),
 831                         cd->nondefault ? "NONDEFAULT" : "OPEN");
 832 
 833     /* check for caching instructions */
 834     holdcd = true;
 835     if (0 < cd->ninfo) {
 836         /* check for caching instructions */
 837         for (n=0; n < cd->ninfo; n++) {
 838             if (PMIX_CHECK_KEY(&cd->info[n], PMIX_EVENT_DO_NOT_CACHE)) {
 839                 if (PMIX_INFO_TRUE(&cd->info[n])) {
 840                     holdcd = false;
 841                 }
 842                 break;
 843             }
 844         }
 845     }
 846     if (holdcd) {
 847         /* we cannot know if everyone who wants this notice has had a chance
 848          * to register for it - the notice may be coming too early. So cache
 849          * the message until all local procs have received it, or it ages to
 850          * the point where it gets pushed out by more recent events */
 851         PMIX_RETAIN(cd);
 852         rc = notify_event_cache(cd);
 853         if (PMIX_SUCCESS != rc) {
 854             PMIX_ERROR_LOG(rc);
 855         }
 856     }
 857 
 858     /* we may also have registered for events, so setup to check this
 859      * against our registrations */
 860     chain = PMIX_NEW(pmix_event_chain_t);
 861     chain->status = cd->status;
 862     pmix_strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN);
 863     chain->source.rank = cd->source.rank;
 864     /* we always leave space for a callback object and
 865      * the evhandler name. */
 866     chain->nallocated = cd->ninfo + 2;
 867     PMIX_INFO_CREATE(chain->info, chain->nallocated);
 868     /* prep the chain for processing */
 869     pmix_prep_event_chain(chain, cd->info, cd->ninfo, true);
 870 
 871     /* copy setup to the cd object */
 872     cd->nondefault = chain->nondefault;
 873     if (NULL != chain->targets) {
 874         cd->ntargets = chain->ntargets;
 875         PMIX_PROC_CREATE(cd->targets, cd->ntargets);
 876         memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
 877         /* compute the number of targets that need to be notified */
 878         nleft = 0;
 879         for (n=0; n < cd->ntargets; n++) {
 880             /* if this is a single proc, then increment by one */
 881             if (PMIX_RANK_VALID >= cd->targets[n].rank) {
 882                 ++nleft;
 883             } else {
 884                 /* look up the nspace for this proc */
 885                 nptr = NULL;
 886                 PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
 887                     if (PMIX_CHECK_NSPACE(tmp->nspace, cd->targets[n].nspace)) {
 888                         nptr = tmp;
 889                         break;
 890                     }
 891                 }
 892                 /* if we don't yet know it, then nothing to do */
 893                 if (NULL == nptr) {
 894                     nleft = SIZE_MAX;
 895                     break;
 896                 }
 897                 /* might notify all local members */
 898                 nleft += nptr->nlocalprocs;
 899             }
 900         }
 901         cd->nleft = nleft;
 902     }
 903     if (NULL != chain->affected) {
 904         cd->naffected = chain->naffected;
 905         PMIX_PROC_CREATE(cd->affected, cd->naffected);
 906         if (NULL == cd->affected) {
 907             cd->naffected = 0;
 908             /* notify the caller */
 909             if (NULL != cd->cbfunc) {
 910                 cd->cbfunc(PMIX_ERR_NOMEM, cd->cbdata);
 911             }
 912             PMIX_RELEASE(cd);
 913             PMIX_RELEASE(chain);
 914             return;
 915         }
 916         memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
 917     }
 918 
 919     /* if they provided a PMIX_EVENT_CUSTOM_RANGE info object but
 920      * specified a range other than PMIX_RANGE_CUSTOM, then this
 921      * is an error */
 922     if (PMIX_RANGE_CUSTOM != cd->range && NULL != cd->targets) {
 923         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 924         /* notify the caller */
 925         if (NULL != cd->cbfunc) {
 926             cd->cbfunc(PMIX_ERR_BAD_PARAM, cd->cbdata);
 927         }
 928         PMIX_RELEASE(cd);
 929         PMIX_RELEASE(chain);
 930         return;
 931     }
 932 
 933     /* check to see if this is a group_complete notification
 934      * indicating that a group has asynchronously been formed.
 935      * If it is, then we need to track the group */
 936     if (PMIX_GROUP_CONSTRUCT_COMPLETE == cd->status) {
 937         char *grpid = NULL;
 938         pmix_group_t *grp;
 939         /* must include the group id */
 940         for (n=0; n < cd->ninfo; n++) {
 941             if (PMIX_CHECK_KEY(&cd->info[n], PMIX_GROUP_ID)) {
 942                 grpid = cd->info[n].value.data.string;
 943                 break;
 944             }
 945         }
 946         if (NULL == grpid) {
 947             /* failed to provide the ID */
 948             PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 949             /* notify the caller */
 950             if (NULL != cd->cbfunc) {
 951                 cd->cbfunc(PMIX_ERR_BAD_PARAM, cd->cbdata);
 952             }
 953             PMIX_RELEASE(cd);
 954             PMIX_RELEASE(chain);
 955             return;
 956         }
 957         /* must include members */
 958         if (NULL == cd->targets || 0 == cd->ntargets) {
 959             PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 960             /* notify the caller */
 961             if (NULL != cd->cbfunc) {
 962                 cd->cbfunc(PMIX_ERR_BAD_PARAM, cd->cbdata);
 963             }
 964             PMIX_RELEASE(cd);
 965             PMIX_RELEASE(chain);
 966             return;
 967         }
 968         grp = PMIX_NEW(pmix_group_t);
 969         grp->grpid = strdup(grpid);
 970         grp->nmbrs = cd->ntargets;
 971         PMIX_PROC_CREATE(grp->members, grp->nmbrs);
 972         memcpy(grp->members, cd->targets, cd->ntargets * sizeof(pmix_proc_t));
 973         pmix_list_append(&pmix_server_globals.groups, &grp->super);
 974     }
 975 
 976     holdcd = false;
 977     if (PMIX_RANGE_PROC_LOCAL != cd->range) {
 978         PMIX_CONSTRUCT(&trk, pmix_list_t);
 979         rngtrk.procs = NULL;
 980         rngtrk.nprocs = 0;
 981         /* cycle across our registered events and send the message to
 982          * any client who registered for it */
 983         PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
 984             if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
 985                 cd->status == reginfoptr->code) {
 986                 PMIX_LIST_FOREACH(pr, &reginfoptr->peers, pmix_peer_events_info_t) {
 987                     /* if this client was the source of the event, then
 988                      * don't send it back as they will have processed it
 989                      * when they generated it */
 990                     if (PMIX_CHECK_PROCID(&cd->source, &pr->peer->info->pname)) {
 991                         continue;
 992                     }
 993                     /* if we have already notified this client, then don't do it again */
 994                     matched = false;
 995                     PMIX_LIST_FOREACH(nm, &trk, pmix_namelist_t) {
 996                         if (nm->pname == &pr->peer->info->pname) {
 997                             matched = true;
 998                             break;
 999                         }
1000                     }
1001                     if (matched) {
1002                         continue;
1003                     }
1004                     /* check if the affected procs (if given) match those they
1005                      * wanted to know about */
1006                     if (!pmix_notify_check_affected(cd->affected, cd->naffected,
1007                                                     pr->affected, pr->naffected)) {
1008                         continue;
1009                     }
1010                     /* check the range */
1011                     if (NULL == cd->targets) {
1012                         rngtrk.procs = &cd->source;
1013                         rngtrk.nprocs = 1;
1014                     } else {
1015                         rngtrk.procs = cd->targets;
1016                         rngtrk.nprocs = cd->ntargets;
1017                     }
1018                     rngtrk.range = cd->range;
1019                     PMIX_LOAD_PROCID(&proc, pr->peer->info->pname.nspace, pr->peer->info->pname.rank);
1020                     if (!pmix_notify_check_range(&rngtrk, &proc)) {
1021                         continue;
1022                     }
1023                     pmix_output_verbose(2, pmix_server_globals.event_output,
1024                                         "pmix_server: notifying client %s:%u on status %s",
1025                                         pr->peer->info->pname.nspace, pr->peer->info->pname.rank,
1026                                         PMIx_Error_string(cd->status));
1027 
1028                     /* record that we notified this client */
1029                     nm = PMIX_NEW(pmix_namelist_t);
1030                     nm->pname = &pr->peer->info->pname;
1031                     pmix_list_append(&trk, &nm->super);
1032 
1033                     bfr = PMIX_NEW(pmix_buffer_t);
1034                     if (NULL == bfr) {
1035                         continue;
1036                     }
1037                     /* pack the command */
1038                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cmd, 1, PMIX_COMMAND);
1039                     if (PMIX_SUCCESS != rc) {
1040                         PMIX_ERROR_LOG(rc);
1041                         PMIX_RELEASE(bfr);
1042                         continue;
1043                     }
1044 
1045                     /* pack the status */
1046                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->status, 1, PMIX_STATUS);
1047                     if (PMIX_SUCCESS != rc) {
1048                         PMIX_ERROR_LOG(rc);
1049                         PMIX_RELEASE(bfr);
1050                         continue;
1051                     }
1052 
1053                     /* pack the source */
1054                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->source, 1, PMIX_PROC);
1055                     if (PMIX_SUCCESS != rc) {
1056                         PMIX_ERROR_LOG(rc);
1057                         PMIX_RELEASE(bfr);
1058                         continue;
1059                     }
1060                     /* pack any info */
1061                     PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->ninfo, 1, PMIX_SIZE);
1062                     if (PMIX_SUCCESS != rc) {
1063                         PMIX_ERROR_LOG(rc);
1064                         PMIX_RELEASE(bfr);
1065                         continue;
1066                     }
1067 
1068                     if (0 < cd->ninfo) {
1069                         PMIX_BFROPS_PACK(rc, pr->peer, bfr, cd->info, cd->ninfo, PMIX_INFO);
1070                         if (PMIX_SUCCESS != rc) {
1071                             PMIX_ERROR_LOG(rc);
1072                             PMIX_RELEASE(bfr);
1073                             continue;
1074                         }
1075                     }
1076                     PMIX_SERVER_QUEUE_REPLY(rc, pr->peer, 0, bfr);
1077                     if (PMIX_SUCCESS != rc) {
1078                         PMIX_RELEASE(bfr);
1079                     }
1080                     if (NULL != cd->targets && 0 < cd->nleft) {
1081                         /* track the number of targets we have left to notify */
1082                         --cd->nleft;
1083                         /* if the event was cached and this is the last one,
1084                          * then evict this event from the cache */
1085                         if (0 == cd->nleft) {
1086                             pmix_hotel_checkout(&pmix_globals.notifications, cd->room);
1087                             holdcd = false;
1088                             break;
1089                         }
1090                     }
1091                 }
1092             }
1093         }
1094         PMIX_LIST_DESTRUCT(&trk);
1095         if (PMIX_RANGE_LOCAL != cd->range && PMIX_CHECK_PROCID(&cd->source, &pmix_globals.myid)) {
1096             /* if we are the source, then we need to post this upwards as
1097              * well so the host RM can broadcast it as necessary */
1098             if (NULL != pmix_host_server.notify_event) {
1099                 /* mark that we sent it upstairs so we don't release
1100                  * the caddy until we return from the host RM */
1101                 holdcd = true;
1102                 pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
1103                                               cd->info, cd->ninfo, local_cbfunc, cd);
1104             }
1105         }
1106     }
1107 
1108     /* process it ourselves */
1109     pmix_invoke_local_event_hdlr(chain);
1110 
1111     if (!holdcd) {
1112         /* notify the caller */
1113         if (NULL != cd->cbfunc) {
1114             cd->cbfunc(PMIX_SUCCESS, cd->cbdata);
1115         }
1116         PMIX_RELEASE(cd);
1117     }
1118 }
1119 
1120 
1121 /* as a server, we must do two things:
1122  *
1123  * (a) notify all clients that have registered for this event
1124  *
1125  * (b) callback any of our own functions that have registered
1126  *     for this event
1127  */
1128 pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
1129                                                  const pmix_proc_t *source,
1130                                                  pmix_data_range_t range,
1131                                                  const pmix_info_t info[], size_t ninfo,
1132                                                  pmix_op_cbfunc_t cbfunc, void *cbdata)
1133 {
1134     pmix_notify_caddy_t *cd;
1135     size_t n;
1136 
1137     pmix_output_verbose(2, pmix_server_globals.event_output,
1138                         "pmix_server: notify client of event %s",
1139                         PMIx_Error_string(status));
1140 
1141     if (NULL != info) {
1142         for (n=0; n < ninfo; n++) {
1143             if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_PROXY) &&
1144                 PMIX_CHECK_PROCID(info[n].value.data.proc, &pmix_globals.myid)) {
1145                 return PMIX_OPERATION_SUCCEEDED;
1146             }
1147         }
1148     }
1149 
1150     cd = PMIX_NEW(pmix_notify_caddy_t);
1151     cd->status = status;
1152     if (NULL == source) {
1153         pmix_strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN);
1154         cd->source.rank = PMIX_RANK_UNDEF;
1155     } else {
1156         pmix_strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN);
1157         cd->source.rank = source->rank;
1158     }
1159     cd->range = range;
1160     /* have to copy the info to preserve it for future when cached */
1161     if (0 < ninfo && NULL != info) {
1162         cd->ninfo = ninfo;
1163         PMIX_INFO_CREATE(cd->info, cd->ninfo);
1164         /* need to copy the info */
1165         for (n=0; n < cd->ninfo; n++) {
1166             PMIX_INFO_XFER(&cd->info[n], &info[n]);
1167         }
1168     }
1169 
1170     /* track the eventual callback info */
1171     cd->cbfunc = cbfunc;
1172     cd->cbdata = cbdata;
1173 
1174     pmix_output_verbose(2, pmix_server_globals.event_output,
1175                         "pmix_server_notify_event status =%d, source = %s:%d, ninfo =%lu",
1176                          status, cd->source.nspace, cd->source.rank, ninfo);
1177 
1178     /* we have to push this into our event library to avoid
1179      * potential threading issues */
1180     PMIX_THREADSHIFT(cd, _notify_client_event);
1181     return PMIX_SUCCESS;
1182 }
1183 
1184 bool pmix_notify_check_range(pmix_range_trkr_t *rng,
1185                              const pmix_proc_t *proc)
1186 {
1187     size_t n;
1188 
1189     if (PMIX_RANGE_UNDEF == rng->range ||
1190         PMIX_RANGE_GLOBAL == rng->range ||
1191         PMIX_RANGE_SESSION == rng->range ||
1192         PMIX_RANGE_LOCAL == rng->range) { // assume RM took care of session & local for now
1193         return true;
1194     }
1195     if (PMIX_RANGE_NAMESPACE == rng->range) {
1196         for (n=0; n < rng->nprocs; n++) {
1197             if (PMIX_CHECK_NSPACE(rng->procs[n].nspace, proc->nspace)) {
1198                 return true;
1199             }
1200         }
1201         return false;
1202     }
1203     if (PMIX_RANGE_PROC_LOCAL == rng->range) {
1204         for (n=0; n < rng->nprocs; n++) {
1205             if (PMIX_CHECK_PROCID(&rng->procs[n], proc)) {
1206                 return true;
1207             }
1208         }
1209         return false;
1210     }
1211     if (PMIX_RANGE_CUSTOM == rng->range) {
1212         /* see if this proc was included */
1213         for (n=0; n < rng->nprocs; n++) {
1214             if (0 != strncmp(rng->procs[n].nspace, proc->nspace, PMIX_MAX_NSLEN)) {
1215                 continue;
1216             }
1217             if (PMIX_RANK_WILDCARD == rng->procs[n].rank ||
1218                 rng->procs[n].rank == proc->rank) {
1219                 return true;
1220             }
1221         }
1222         /* if we get here, then this proc isn't in range */
1223         return false;
1224     }
1225 
1226     /* if it is anything else, then reject it */
1227     return false;
1228 }
1229 
1230 bool pmix_notify_check_affected(pmix_proc_t *interested, size_t ninterested,
1231                                 pmix_proc_t *affected, size_t naffected)
1232 {
1233     size_t m, n;
1234 
1235     /* if they didn't restrict their interests, then accept it */
1236     if (NULL == interested) {
1237         return true;
1238     }
1239     /* if we weren't given the affected procs, then accept it */
1240     if (NULL == affected) {
1241         return true;
1242     }
1243     /* check if the two overlap */
1244     for (n=0; n < naffected; n++) {
1245         for (m=0; m < ninterested; m++) {
1246             if (PMIX_CHECK_PROCID(&affected[n], &interested[m])) {
1247                 return true;
1248             }
1249         }
1250     }
1251     /* if we get here, then this proc isn't in range */
1252     return false;
1253 
1254 }
1255 
1256 void pmix_event_timeout_cb(int fd, short flags, void *arg)
1257 {
1258     pmix_event_chain_t *ch = (pmix_event_chain_t*)arg;
1259 
1260     /* need to acquire the object from its originating thread */
1261     PMIX_ACQUIRE_OBJECT(ch);
1262 
1263     ch->timer_active = false;
1264 
1265     /* remove it from the list */
1266     pmix_list_remove_item(&pmix_globals.cached_events, &ch->super);
1267 
1268     /* process this event thru the regular channels */
1269     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
1270         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
1271         pmix_server_notify_client_of_event(ch->status, &ch->source,
1272                                            ch->range, ch->info, ch->ninfo,
1273                                            ch->final_cbfunc, ch->final_cbdata);
1274     } else {
1275         pmix_invoke_local_event_hdlr(ch);
1276     }
1277 }
1278 
1279 pmix_status_t pmix_prep_event_chain(pmix_event_chain_t *chain,
1280                                     const pmix_info_t *info, size_t ninfo,
1281                                     bool xfer)
1282 {
1283     size_t n;
1284 
1285     if (NULL != info && 0 < ninfo) {
1286         chain->ninfo = ninfo;
1287         if (NULL == chain->info) {
1288             PMIX_INFO_CREATE(chain->info, chain->ninfo);
1289         }
1290        /* need to copy the info */
1291         for (n=0; n < ninfo; n++) {
1292             if (xfer) {
1293                 /* chain doesn't already have a copy of the info */
1294                 PMIX_INFO_XFER(&chain->info[n], &info[n]);
1295             }
1296             /* look for specific directives */
1297             if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
1298                 chain->nondefault = PMIX_INFO_TRUE(&info[n]);
1299             } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_CUSTOM_RANGE)) {
1300                 /* provides an array of pmix_proc_t identifying the procs
1301                  * that are to receive this notification, or a single pmix_proc_t  */
1302                 if (PMIX_DATA_ARRAY == info[n].value.type &&
1303                     NULL != info[n].value.data.darray &&
1304                     NULL != info[n].value.data.darray->array) {
1305                     chain->ntargets = info[n].value.data.darray->size;
1306                     PMIX_PROC_CREATE(chain->targets, chain->ntargets);
1307                     memcpy(chain->targets, info[n].value.data.darray->array, chain->ntargets * sizeof(pmix_proc_t));
1308                 } else if (PMIX_PROC == info[n].value.type) {
1309                     chain->ntargets = 1;
1310                     PMIX_PROC_CREATE(chain->targets, chain->ntargets);
1311                     memcpy(chain->targets, info[n].value.data.proc, sizeof(pmix_proc_t));
1312                 } else {
1313                     /* this is an error */
1314                     PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1315                     return PMIX_ERR_BAD_PARAM;
1316                 }
1317             } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
1318                 PMIX_PROC_CREATE(chain->affected, 1);
1319                 if (NULL == chain->affected) {
1320                     return PMIX_ERR_NOMEM;
1321                 }
1322                 chain->naffected = 1;
1323                 memcpy(chain->affected, info[n].value.data.proc, sizeof(pmix_proc_t));
1324             } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROCS)) {
1325                 chain->naffected = info[n].value.data.darray->size;
1326                 PMIX_PROC_CREATE(chain->affected, chain->naffected);
1327                 if (NULL == chain->affected) {
1328                     chain->naffected = 0;
1329                     return PMIX_ERR_NOMEM;
1330                 }
1331                 memcpy(chain->affected, info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
1332             }
1333         }
1334     }
1335     return PMIX_SUCCESS;
1336 }
1337 
1338 /****    CLASS INSTANTIATIONS    ****/
1339 
1340 static void sevcon(pmix_event_hdlr_t *p)
1341 {
1342     p->name = NULL;
1343     p->index = UINT_MAX;
1344     p->precedence = PMIX_EVENT_ORDER_NONE;
1345     p->locator = NULL;
1346     p->rng.range = PMIX_RANGE_UNDEF;
1347     p->rng.procs = NULL;
1348     p->rng.nprocs = 0;
1349     p->affected = NULL;
1350     p->naffected = 0;
1351     p->evhdlr = NULL;
1352     p->cbobject = NULL;
1353     p->codes = NULL;
1354     p->ncodes = 0;
1355 }
1356 static void sevdes(pmix_event_hdlr_t *p)
1357 {
1358     if (NULL != p->name) {
1359         free(p->name);
1360     }
1361     if (NULL != p->locator) {
1362         free(p->locator);
1363     }
1364     if (NULL != p->rng.procs) {
1365         free(p->rng.procs);
1366     }
1367     if (NULL != p->affected) {
1368         PMIX_PROC_FREE(p->affected, p->naffected);
1369     }
1370     if (NULL != p->codes) {
1371         free(p->codes);
1372     }
1373 }
1374 PMIX_CLASS_INSTANCE(pmix_event_hdlr_t,
1375                     pmix_list_item_t,
1376                     sevcon, sevdes);
1377 
1378 static void accon(pmix_active_code_t *p)
1379 {
1380     p->nregs = 0;
1381 }
1382 PMIX_CLASS_INSTANCE(pmix_active_code_t,
1383                     pmix_list_item_t,
1384                     accon, NULL);
1385 
1386 static void evcon(pmix_events_t *p)
1387 {
1388     p->nhdlrs = 0;
1389     p->first = NULL;
1390     p->last = NULL;
1391     PMIX_CONSTRUCT(&p->actives, pmix_list_t);
1392     PMIX_CONSTRUCT(&p->single_events, pmix_list_t);
1393     PMIX_CONSTRUCT(&p->multi_events, pmix_list_t);
1394     PMIX_CONSTRUCT(&p->default_events, pmix_list_t);
1395 }
1396 static void evdes(pmix_events_t *p)
1397 {
1398     if (NULL != p->first) {
1399         PMIX_RELEASE(p->first);
1400     }
1401     if (NULL != p->last) {
1402         PMIX_RELEASE(p->last);
1403     }
1404     PMIX_LIST_DESTRUCT(&p->actives);
1405     PMIX_LIST_DESTRUCT(&p->single_events);
1406     PMIX_LIST_DESTRUCT(&p->multi_events);
1407     PMIX_LIST_DESTRUCT(&p->default_events);
1408 }
1409 PMIX_CLASS_INSTANCE(pmix_events_t,
1410                     pmix_object_t,
1411                     evcon, evdes);
1412 
1413 static void chcon(pmix_event_chain_t *p)
1414 {
1415     p->timer_active = false;
1416     memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1);
1417     p->source.rank = PMIX_RANK_UNDEF;
1418     p->nondefault = false;
1419     p->endchain = false;
1420     p->targets = NULL;
1421     p->ntargets = 0;
1422     p->range = PMIX_RANGE_UNDEF;
1423     p->affected = NULL;
1424     p->naffected = 0;
1425     p->info = NULL;
1426     p->ninfo = 0;
1427     p->nallocated = 0;
1428     p->results = NULL;
1429     p->nresults = 0;
1430     p->evhdlr = NULL;
1431     p->final_cbfunc = NULL;
1432     p->final_cbdata = NULL;
1433 }
1434 static void chdes(pmix_event_chain_t *p)
1435 {
1436     if (p->timer_active) {
1437         pmix_event_del(&p->ev);
1438     }
1439     if (NULL != p->targets) {
1440         PMIX_PROC_FREE(p->targets, p->ntargets);
1441     }
1442     if (NULL != p->affected) {
1443         PMIX_PROC_FREE(p->affected, p->naffected);
1444     }
1445     if (NULL != p->info) {
1446         PMIX_INFO_FREE(p->info, p->nallocated);
1447     }
1448     if (NULL != p->results) {
1449         PMIX_INFO_FREE(p->results, p->nresults);
1450     }
1451 }
1452 PMIX_CLASS_INSTANCE(pmix_event_chain_t,
1453                     pmix_list_item_t,
1454                     chcon, chdes);

/* [<][>][^][v][top][bottom][index][help] */