This source file includes following definitions.
- rscon
- rsdes
- regevents_cbfunc
- reg_cbfunc
- _send_to_server
- _add_hdlr
- check_cached_events
- reg_event_hdlr
- PMIx_Register_event_handler
- dereg_event_hdlr
- PMIx_Deregister_event_handler
1
2
3
4
5
6
7
8
9
10
11
12 #include <src/include/pmix_config.h>
13
14 #include <pmix.h>
15 #include <pmix_common.h>
16 #include <pmix_server.h>
17 #include <pmix_rename.h>
18
19 #include "src/threads/threads.h"
20 #include "src/util/error.h"
21 #include "src/util/output.h"
22
23 #include "src/client/pmix_client_ops.h"
24 #include "src/server/pmix_server_ops.h"
25 #include "src/include/pmix_globals.h"
26 #include "src/mca/bfrops/bfrops.h"
27 #include "src/event/pmix_event.h"
28
29 typedef struct {
30 pmix_object_t super;
31 volatile bool active;
32 pmix_event_t ev;
33 size_t index;
34 bool firstoverall;
35 bool enviro;
36 pmix_list_t *list;
37 pmix_event_hdlr_t *hdlr;
38 void *cd;
39 pmix_status_t *codes;
40 size_t ncodes;
41 pmix_info_t *info;
42 size_t ninfo;
43 pmix_proc_t *affected;
44 size_t naffected;
45 pmix_notification_fn_t evhdlr;
46 pmix_hdlr_reg_cbfunc_t evregcbfn;
47 void *cbdata;
48 } pmix_rshift_caddy_t;
49 static void rscon(pmix_rshift_caddy_t *p)
50 {
51 p->firstoverall = false;
52 p->enviro = false;
53 p->list = NULL;
54 p->hdlr = NULL;
55 p->cd = NULL;
56 p->codes = NULL;
57 p->ncodes = 0;
58 p->info = NULL;
59 p->ninfo = 0;
60 p->affected = NULL;
61 p->naffected = 0;
62 p->evhdlr = NULL;
63 p->evregcbfn = NULL;
64 p->cbdata = NULL;
65 }
66 static void rsdes(pmix_rshift_caddy_t *p)
67 {
68 if (0 < p->ncodes) {
69 free(p->codes);
70 }
71 if (NULL != p->cd) {
72 PMIX_RELEASE(p->cd);
73 }
74 }
75 PMIX_CLASS_INSTANCE(pmix_rshift_caddy_t,
76 pmix_object_t,
77 rscon, rsdes);
78
79 static void check_cached_events(pmix_rshift_caddy_t *cd);
80
81
82
83 static void regevents_cbfunc(struct pmix_peer_t *peer, pmix_ptl_hdr_t *hdr,
84 pmix_buffer_t *buf, void *cbdata)
85 {
86 pmix_rshift_caddy_t *rb = (pmix_rshift_caddy_t*)cbdata;
87 pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)rb->cd;
88 pmix_status_t rc, ret;
89 int cnt;
90 size_t index = rb->index;
91
92 pmix_output_verbose(2, pmix_client_globals.event_output,
93 "pmix: regevents callback recvd");
94
95
96 cnt = 1;
97 PMIX_BFROPS_UNPACK(rc, peer, buf, &ret, &cnt, PMIX_STATUS);
98 if ((PMIX_SUCCESS != rc) ||
99 (PMIX_SUCCESS != ret)) {
100 if (PMIX_SUCCESS != rc) {
101 PMIX_ERROR_LOG(rc);
102 } else {
103 PMIX_ERROR_LOG(ret);
104 }
105
106
107
108 if (NULL == rb->list) {
109 if (NULL != rb->hdlr) {
110 PMIX_RELEASE(rb->hdlr);
111 }
112 if (rb->firstoverall) {
113 pmix_globals.events.first = NULL;
114 } else {
115 pmix_globals.events.last = NULL;
116 }
117 } else if (NULL != rb->hdlr) {
118 pmix_list_remove_item(rb->list, &rb->hdlr->super);
119 PMIX_RELEASE(rb->hdlr);
120 }
121 ret = PMIX_ERR_SERVER_FAILED_REQUEST;
122 index = UINT_MAX;
123 }
124
125
126 if (NULL != cd && NULL != cd->evregcbfn) {
127 cd->evregcbfn(ret, index, cd->cbdata);
128 }
129 if (NULL != cd) {
130
131 check_cached_events(cd);
132 }
133
134
135
136 if (NULL!= rb->info) {
137 PMIX_INFO_FREE(rb->info, rb->ninfo);
138 }
139 if (NULL != rb->codes) {
140 free(rb->codes);
141 }
142 PMIX_RELEASE(rb);
143 }
144
145 static void reg_cbfunc(pmix_status_t status, void *cbdata)
146 {
147 pmix_rshift_caddy_t *rb = (pmix_rshift_caddy_t*)cbdata;
148 pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)rb->cd;
149 pmix_status_t rc = status;
150 size_t index = rb->index;
151
152 if (PMIX_SUCCESS != status) {
153
154 if (NULL == rb->list) {
155 if (NULL != rb->hdlr) {
156 PMIX_RELEASE(rb->hdlr);
157 }
158 if (rb->firstoverall) {
159 pmix_globals.events.first = NULL;
160 } else {
161 pmix_globals.events.last = NULL;
162 }
163 } else if (NULL != rb->hdlr) {
164 pmix_list_remove_item(rb->list, &rb->hdlr->super);
165 PMIX_RELEASE(rb->hdlr);
166 }
167 rc = PMIX_ERR_SERVER_FAILED_REQUEST;
168 index = UINT_MAX;
169 }
170
171 if (NULL != cd && NULL != cd->evregcbfn) {
172
173 cd->evregcbfn(rc, index, cd->cbdata);
174 }
175
176
177
178 if (NULL!= rb->info) {
179 PMIX_INFO_FREE(rb->info, rb->ninfo);
180 }
181 if (NULL != rb->codes) {
182 free(rb->codes);
183 }
184 PMIX_RELEASE(rb);
185 }
186
187 static pmix_status_t _send_to_server(pmix_rshift_caddy_t *rcd)
188 {
189 pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)rcd->cd;
190 pmix_status_t rc;
191 pmix_buffer_t *msg;
192 pmix_cmd_t cmd=PMIX_REGEVENTS_CMD;
193
194 msg = PMIX_NEW(pmix_buffer_t);
195
196 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &cmd, 1, PMIX_COMMAND);
197 if (PMIX_SUCCESS != rc) {
198 PMIX_ERROR_LOG(rc);
199 return rc;
200 }
201
202 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &cd->ncodes, 1, PMIX_SIZE);
203 if (PMIX_SUCCESS != rc) {
204 PMIX_ERROR_LOG(rc);
205 return rc;
206 }
207
208 if (0 < cd->ncodes) {
209 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, cd->codes, cd->ncodes, PMIX_STATUS);
210 if (PMIX_SUCCESS != rc) {
211 PMIX_ERROR_LOG(rc);
212 return rc;
213 }
214 }
215
216
217 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &rcd->ninfo, 1, PMIX_SIZE);
218 if (PMIX_SUCCESS != rc) {
219 PMIX_ERROR_LOG(rc);
220 return rc;
221 }
222
223 if (0 < rcd->ninfo) {
224 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, rcd->info, rcd->ninfo, PMIX_INFO);
225 if (PMIX_SUCCESS != rc) {
226 PMIX_ERROR_LOG(rc);
227 return rc;
228 }
229 }
230 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver, msg, regevents_cbfunc, rcd);
231 if (PMIX_SUCCESS != rc) {
232 PMIX_ERROR_LOG(rc);
233 PMIX_RELEASE(msg);
234 }
235
236 return rc;
237 }
238
239 static pmix_status_t _add_hdlr(pmix_rshift_caddy_t *cd, pmix_list_t *xfer)
240 {
241 pmix_rshift_caddy_t *cd2;
242 pmix_info_caddy_t *ixfer;
243 size_t n;
244 bool registered, need_register = false;
245 pmix_active_code_t *active;
246 pmix_status_t rc;
247
248 pmix_output_verbose(2, pmix_client_globals.event_output,
249 "pmix: _add_hdlr");
250
251
252 if (NULL == cd->codes) {
253 registered = false;
254 PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
255 if (PMIX_MAX_ERR_CONSTANT == active->code) {
256
257 registered = true;
258 ++active->nregs;
259 break;
260 }
261 }
262 if (!registered) {
263 active = PMIX_NEW(pmix_active_code_t);
264 active->code = PMIX_MAX_ERR_CONSTANT;
265 active->nregs = 1;
266 pmix_list_append(&pmix_globals.events.actives, &active->super);
267
268 need_register = true;
269 }
270 } else {
271 for (n=0; n < cd->ncodes; n++) {
272 registered = false;
273 PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
274 if (active->code == cd->codes[n]) {
275 registered = true;
276 ++active->nregs;
277 break;
278 }
279 }
280 if (!registered) {
281 active = PMIX_NEW(pmix_active_code_t);
282 active->code = cd->codes[n];
283 active->nregs = 1;
284 pmix_list_append(&pmix_globals.events.actives, &active->super);
285
286 need_register = true;
287 }
288 }
289 }
290
291
292 cd2 = PMIX_NEW(pmix_rshift_caddy_t);
293 cd2->index = cd->index;
294 cd2->firstoverall = cd->firstoverall;
295 cd2->list = cd->list;
296 cd2->hdlr = cd->hdlr;
297 PMIX_RETAIN(cd);
298 cd2->cd = cd;
299 cd2->ninfo = pmix_list_get_size(xfer);
300 if (0 < cd2->ninfo) {
301 PMIX_INFO_CREATE(cd2->info, cd2->ninfo);
302 n=0;
303 PMIX_LIST_FOREACH(ixfer, xfer, pmix_info_caddy_t) {
304 PMIX_INFO_XFER(&cd2->info[n], ixfer->info);
305 ++n;
306 }
307 }
308
309
310
311
312
313 if ((!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) || PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) &&
314 pmix_globals.connected &&
315 !PMIX_PROC_IS_V1(pmix_client_globals.myserver) &&
316 (need_register || 0 < pmix_list_get_size(xfer))) {
317 pmix_output_verbose(2, pmix_client_globals.event_output,
318 "pmix: _add_hdlr sending to server");
319
320
321 if (PMIX_SUCCESS != (rc = _send_to_server(cd2))) {
322 pmix_output_verbose(2, pmix_client_globals.event_output,
323 "pmix: add_hdlr - pack send_to_server failed status=%d", rc);
324 if (NULL != cd2->info) {
325 PMIX_INFO_FREE(cd2->info, cd2->ninfo);
326 }
327 PMIX_RELEASE(cd2);
328 return rc;
329 }
330 return PMIX_ERR_WOULD_BLOCK;
331 }
332
333
334
335 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
336 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer) && cd->enviro &&
337 NULL != pmix_host_server.register_events) {
338 pmix_output_verbose(2, pmix_client_globals.event_output,
339 "pmix: _add_hdlr registering with server");
340 rc = pmix_host_server.register_events(cd->codes, cd->ncodes,
341 cd2->info, cd2->ninfo,
342 reg_cbfunc, cd2);
343 if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) {
344 if (NULL != cd2->info) {
345 PMIX_INFO_FREE(cd2->info, cd2->ninfo);
346 }
347 PMIX_RELEASE(cd2);
348 return rc;
349 }
350 return PMIX_SUCCESS;
351 } else {
352 if (NULL != cd2->info) {
353 PMIX_INFO_FREE(cd2->info, cd2->ninfo);
354 }
355 PMIX_RELEASE(cd2);
356 }
357
358 return PMIX_SUCCESS;
359 }
360
361 static void check_cached_events(pmix_rshift_caddy_t *cd)
362 {
363 size_t n;
364 pmix_notify_caddy_t *ncd;
365 bool found, matched;
366 pmix_event_chain_t *chain;
367 int j;
368
369 for (j=0; j < pmix_globals.max_events; j++) {
370 pmix_hotel_knock(&pmix_globals.notifications, j, (void**)&ncd);
371 if (NULL == ncd) {
372 continue;
373 }
374 found = false;
375 if (NULL == cd->codes) {
376 if (!ncd->nondefault) {
377
378 found = true;
379 }
380 } else {
381 for (n=0; n < cd->ncodes; n++) {
382 if (cd->codes[n] == ncd->status) {
383 found = true;
384 break;
385 }
386 }
387 }
388 if (!found) {
389 continue;
390 }
391
392 if (NULL != ncd->targets) {
393 matched = false;
394 for (n=0; n < ncd->ntargets; n++) {
395 if (PMIX_CHECK_PROCID(&pmix_globals.myid, &ncd->targets[n])) {
396 matched = true;
397 break;
398 }
399 }
400 if (!matched) {
401
402 continue;
403 }
404 }
405
406 if (!pmix_notify_check_affected(cd->affected, cd->naffected,
407 ncd->affected, ncd->naffected)) {
408 continue;
409 }
410
411 chain = PMIX_NEW(pmix_event_chain_t);
412 chain->status = ncd->status;
413 pmix_strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
414 chain->source.rank = pmix_globals.myid.rank;
415
416 chain->nallocated = ncd->ninfo + 2;
417 PMIX_INFO_CREATE(chain->info, chain->nallocated);
418 if (0 < cd->ninfo) {
419 chain->ninfo = ncd->ninfo;
420
421 for (n=0; n < ncd->ninfo; n++) {
422 PMIX_INFO_XFER(&chain->info[n], &ncd->info[n]);
423 if (0 == strncmp(ncd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
424 chain->nondefault = true;
425 } else if (0 == strncmp(ncd->info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) {
426 PMIX_PROC_CREATE(chain->affected, 1);
427 if (NULL == chain->affected) {
428 PMIX_RELEASE(chain);
429 return;
430 }
431 chain->naffected = 1;
432 memcpy(chain->affected, ncd->info[n].value.data.proc, sizeof(pmix_proc_t));
433 } else if (0 == strncmp(ncd->info[n].key, PMIX_EVENT_AFFECTED_PROCS, PMIX_MAX_KEYLEN)) {
434 chain->naffected = ncd->info[n].value.data.darray->size;
435 PMIX_PROC_CREATE(chain->affected, chain->naffected);
436 if (NULL == chain->affected) {
437 chain->naffected = 0;
438 PMIX_RELEASE(chain);
439 return;
440 }
441 memcpy(chain->affected, ncd->info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
442 }
443 }
444 }
445
446
447 pmix_hotel_checkout(&pmix_globals.notifications, ncd->room);
448
449 PMIX_RELEASE(ncd);
450
451
452
453 chain->endchain = true;
454
455 pmix_invoke_local_event_hdlr(chain);
456 }
457 }
458
459 static void reg_event_hdlr(int sd, short args, void *cbdata)
460 {
461 pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)cbdata;
462 size_t index = 0, n;
463 pmix_status_t rc;
464 pmix_event_hdlr_t *evhdlr, *ev;
465 uint8_t location = PMIX_EVENT_ORDER_NONE;
466 char *name = NULL, *locator = NULL;
467 bool firstoverall=false, lastoverall=false;
468 bool found;
469 pmix_list_t xfer;
470 pmix_info_caddy_t *ixfer;
471 void *cbobject = NULL;
472 pmix_data_range_t range = PMIX_RANGE_UNDEF;
473 pmix_proc_t *parray = NULL;
474 size_t nprocs = 0;
475
476
477 PMIX_ACQUIRE_OBJECT(cd);
478
479 pmix_output_verbose(2, pmix_client_globals.event_output,
480 "pmix: register event_hdlr with %d infos", (int)cd->ninfo);
481
482 PMIX_CONSTRUCT(&xfer, pmix_list_t);
483
484
485 if (NULL != cd->info) {
486 for (n=0; n < cd->ninfo; n++) {
487 if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_FIRST, PMIX_MAX_KEYLEN)) {
488
489 firstoverall = PMIX_INFO_TRUE(&cd->info[n]);
490 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_LAST, PMIX_MAX_KEYLEN)) {
491
492 lastoverall = PMIX_INFO_TRUE(&cd->info[n]);
493 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_PREPEND, PMIX_MAX_KEYLEN)) {
494
495 if (PMIX_INFO_TRUE(&cd->info[n])) {
496 location = PMIX_EVENT_ORDER_PREPEND;
497 }
498 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_APPEND, PMIX_MAX_KEYLEN)) {
499
500 if (PMIX_INFO_TRUE(&cd->info[n])) {
501 location = PMIX_EVENT_ORDER_APPEND;
502 }
503 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_NAME, PMIX_MAX_KEYLEN)) {
504 name = cd->info[n].value.data.string;
505 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_RETURN_OBJECT, PMIX_MAX_KEYLEN)) {
506 cbobject = cd->info[n].value.data.ptr;
507 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_FIRST_IN_CATEGORY, PMIX_MAX_KEYLEN)) {
508 if (PMIX_INFO_TRUE(&cd->info[n])) {
509 location = PMIX_EVENT_ORDER_FIRST;
510 }
511 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_LAST_IN_CATEGORY, PMIX_MAX_KEYLEN)) {
512 if (PMIX_INFO_TRUE(&cd->info[n])) {
513 location = PMIX_EVENT_ORDER_LAST;
514 }
515 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_BEFORE, PMIX_MAX_KEYLEN)) {
516 location = PMIX_EVENT_ORDER_BEFORE;
517 locator = cd->info[n].value.data.string;
518 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_AFTER, PMIX_MAX_KEYLEN)) {
519 location = PMIX_EVENT_ORDER_AFTER;
520 locator = cd->info[n].value.data.string;
521 } else if (0 == strncmp(cd->info[n].key, PMIX_RANGE, PMIX_MAX_KEYLEN)) {
522 range = cd->info[n].value.data.range;
523 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) {
524 parray = (pmix_proc_t*)cd->info[n].value.data.darray->array;
525 nprocs = cd->info[n].value.data.darray->size;
526 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) {
527 cd->affected = cd->info[n].value.data.proc;
528 cd->naffected = 1;
529 ixfer = PMIX_NEW(pmix_info_caddy_t);
530 ixfer->info = &cd->info[n];
531 ixfer->ninfo = 1;
532 pmix_list_append(&xfer, &ixfer->super);
533 } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROCS, PMIX_MAX_KEYLEN)) {
534 cd->affected = (pmix_proc_t*)cd->info[n].value.data.darray->array;
535 cd->naffected = cd->info[n].value.data.darray->size;
536 ixfer = PMIX_NEW(pmix_info_caddy_t);
537 ixfer->info = &cd->info[n];
538 ixfer->ninfo = 1;
539 pmix_list_append(&xfer, &ixfer->super);
540 } else {
541 ixfer = PMIX_NEW(pmix_info_caddy_t);
542 ixfer->info = &cd->info[n];
543 ixfer->ninfo = 1;
544 pmix_list_append(&xfer, &ixfer->super);
545 }
546 }
547 }
548
549
550 for (n=0; n < cd->ncodes; n++) {
551 if (PMIX_SYSTEM_EVENT(cd->codes[n])) {
552 cd->enviro = true;
553 break;
554 }
555 }
556
557
558
559
560 if (firstoverall || lastoverall) {
561 if ((firstoverall && NULL != pmix_globals.events.first) ||
562 (lastoverall && NULL != pmix_globals.events.last)) {
563
564 index = UINT_MAX;
565 rc = PMIX_ERR_EVENT_REGISTRATION;
566 goto ack;
567 }
568 evhdlr = PMIX_NEW(pmix_event_hdlr_t);
569 if (NULL == evhdlr) {
570 index = UINT_MAX;
571 rc = PMIX_ERR_EVENT_REGISTRATION;
572 goto ack;
573 }
574 if (NULL != name) {
575 evhdlr->name = strdup(name);
576 }
577 index = pmix_globals.events.nhdlrs;
578 evhdlr->index = index;
579 ++pmix_globals.events.nhdlrs;
580 evhdlr->rng.range = range;
581 if (NULL != parray && 0 < nprocs) {
582 evhdlr->rng.nprocs = nprocs;
583 PMIX_PROC_CREATE(evhdlr->rng.procs, nprocs);
584 if (NULL == evhdlr->rng.procs) {
585 index = UINT_MAX;
586 rc = PMIX_ERR_EVENT_REGISTRATION;
587 PMIX_RELEASE(evhdlr);
588 goto ack;
589 }
590 memcpy(evhdlr->rng.procs, parray, nprocs * sizeof(pmix_proc_t));
591 }
592 if (NULL != cd->affected && 0 < cd->naffected) {
593 evhdlr->naffected = cd->naffected;
594 PMIX_PROC_CREATE(evhdlr->affected, cd->naffected);
595 if (NULL == evhdlr->affected) {
596 index = UINT_MAX;
597 rc = PMIX_ERR_EVENT_REGISTRATION;
598 PMIX_RELEASE(evhdlr);
599 goto ack;
600 }
601 memcpy(evhdlr->affected, cd->affected, cd->naffected * sizeof(pmix_proc_t));
602 }
603 evhdlr->evhdlr = cd->evhdlr;
604 evhdlr->cbobject = cbobject;
605 if (NULL != cd->codes) {
606 evhdlr->codes = (pmix_status_t*)malloc(cd->ncodes * sizeof(pmix_status_t));
607 if (NULL == evhdlr->codes) {
608 PMIX_RELEASE(evhdlr);
609 index = UINT_MAX;
610 rc = PMIX_ERR_EVENT_REGISTRATION;
611 goto ack;
612 }
613 memcpy(evhdlr->codes, cd->codes, cd->ncodes * sizeof(pmix_status_t));
614 evhdlr->ncodes = cd->ncodes;
615 }
616 if (firstoverall) {
617 pmix_globals.events.first = evhdlr;
618 } else {
619 pmix_globals.events.last = evhdlr;
620 }
621 cd->index = index;
622 cd->list = NULL;
623 cd->hdlr = evhdlr;
624 cd->firstoverall = firstoverall;
625 rc = _add_hdlr(cd, &xfer);
626 PMIX_LIST_DESTRUCT(&xfer);
627 if (PMIX_SUCCESS != rc &&
628 PMIX_ERR_WOULD_BLOCK != rc) {
629
630 --pmix_globals.events.nhdlrs;
631 rc = PMIX_ERR_EVENT_REGISTRATION;
632 index = UINT_MAX;
633 if (firstoverall) {
634 pmix_globals.events.first = NULL;
635 } else {
636 pmix_globals.events.last = NULL;
637 }
638 PMIX_RELEASE(evhdlr);
639 goto ack;
640 }
641 if (PMIX_ERR_WOULD_BLOCK == rc) {
642
643 PMIX_RELEASE(cd);
644 return;
645 }
646 goto ack;
647 }
648
649
650
651 evhdlr = PMIX_NEW(pmix_event_hdlr_t);
652 if (NULL == evhdlr) {
653 index = UINT_MAX;
654 rc = PMIX_ERR_EVENT_REGISTRATION;
655 goto ack;
656 }
657 if (NULL != name) {
658 evhdlr->name = strdup(name);
659 }
660 index = pmix_globals.events.nhdlrs;
661 evhdlr->index = index;
662 ++pmix_globals.events.nhdlrs;
663 evhdlr->precedence = location;
664 evhdlr->locator = locator;
665 evhdlr->rng.range = range;
666 if (NULL != parray && 0 < nprocs) {
667 evhdlr->rng.nprocs = nprocs;
668 PMIX_PROC_CREATE(evhdlr->rng.procs, nprocs);
669 if (NULL == evhdlr->rng.procs) {
670 index = UINT_MAX;
671 rc = PMIX_ERR_EVENT_REGISTRATION;
672 PMIX_RELEASE(evhdlr);
673 goto ack;
674 }
675 memcpy(evhdlr->rng.procs, parray, nprocs * sizeof(pmix_proc_t));
676 }
677 if (NULL != cd->affected && 0 < cd->naffected) {
678 evhdlr->naffected = cd->naffected;
679 PMIX_PROC_CREATE(evhdlr->affected, cd->naffected);
680 if (NULL == evhdlr->affected) {
681 index = UINT_MAX;
682 rc = PMIX_ERR_EVENT_REGISTRATION;
683 PMIX_RELEASE(evhdlr);
684 goto ack;
685 }
686 memcpy(evhdlr->affected, cd->affected, cd->naffected * sizeof(pmix_proc_t));
687 }
688 evhdlr->evhdlr = cd->evhdlr;
689 evhdlr->cbobject = cbobject;
690 if (NULL == cd->codes) {
691
692 cd->list = &pmix_globals.events.default_events;
693 } else {
694 evhdlr->codes = (pmix_status_t*)malloc(cd->ncodes * sizeof(pmix_status_t));
695 if (NULL == evhdlr->codes) {
696 PMIX_RELEASE(evhdlr);
697 index = UINT_MAX;
698 rc = PMIX_ERR_EVENT_REGISTRATION;
699 goto ack;
700 }
701 memcpy(evhdlr->codes, cd->codes, cd->ncodes * sizeof(pmix_status_t));
702 evhdlr->ncodes = cd->ncodes;
703 if (1 == cd->ncodes) {
704 cd->list = &pmix_globals.events.single_events;
705 } else {
706 cd->list = &pmix_globals.events.multi_events;
707 }
708 }
709
710 cd->index = index;
711 cd->hdlr = evhdlr;
712 cd->firstoverall = false;
713
714
715 if (PMIX_RANGE_PROC_LOCAL == range) {
716 rc = PMIX_SUCCESS;
717 } else {
718 rc = _add_hdlr(cd, &xfer);
719 }
720 PMIX_LIST_DESTRUCT(&xfer);
721 if (PMIX_SUCCESS != rc &&
722 PMIX_ERR_WOULD_BLOCK != rc) {
723
724 --pmix_globals.events.nhdlrs;
725 rc = PMIX_ERR_EVENT_REGISTRATION;
726 index = UINT_MAX;
727 PMIX_RELEASE(evhdlr);
728 goto ack;
729 }
730
731
732
733
734 if (0 == pmix_list_get_size(cd->list) ||
735 PMIX_EVENT_ORDER_NONE == location) {
736 pmix_list_prepend(cd->list, &evhdlr->super);
737 } else if (PMIX_EVENT_ORDER_FIRST == location) {
738
739 ev = (pmix_event_hdlr_t*)pmix_list_get_first(cd->list);
740 if (PMIX_EVENT_ORDER_FIRST == ev->precedence) {
741
742 --pmix_globals.events.nhdlrs;
743 rc = PMIX_ERR_EVENT_REGISTRATION;
744 index = UINT_MAX;
745 PMIX_RELEASE(evhdlr);
746 goto ack;
747 }
748
749 pmix_list_prepend(cd->list, &evhdlr->super);
750 } else if (PMIX_EVENT_ORDER_LAST == location) {
751
752 ev = (pmix_event_hdlr_t*)pmix_list_get_last(cd->list);
753 if (PMIX_EVENT_ORDER_LAST == ev->precedence) {
754
755 --pmix_globals.events.nhdlrs;
756 rc = PMIX_ERR_EVENT_REGISTRATION;
757 index = UINT_MAX;
758 PMIX_RELEASE(evhdlr);
759 goto ack;
760 }
761
762 pmix_list_append(cd->list, &evhdlr->super);
763 } else if (PMIX_EVENT_ORDER_PREPEND == location) {
764
765
766
767 ev = (pmix_event_hdlr_t*)pmix_list_get_first(cd->list);
768 if (PMIX_EVENT_ORDER_FIRST == ev->precedence) {
769 ev = (pmix_event_hdlr_t*)pmix_list_get_next(&ev->super);
770 if (NULL != ev) {
771 pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
772 } else {
773
774 pmix_list_append(cd->list, &evhdlr->super);
775 }
776 } else {
777 pmix_list_prepend(cd->list, &evhdlr->super);
778 }
779 } else if (PMIX_EVENT_ORDER_APPEND == location) {
780
781
782
783 ev = (pmix_event_hdlr_t*)pmix_list_get_last(cd->list);
784 if (PMIX_EVENT_ORDER_LAST == ev->precedence) {
785 pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
786 } else {
787 pmix_list_append(cd->list, &evhdlr->super);
788 }
789 } else {
790
791 found = false;
792 PMIX_LIST_FOREACH(ev, cd->list, pmix_event_hdlr_t) {
793 if (NULL == ev->name) {
794 continue;
795 }
796 if (0 == strcmp(ev->name, name)) {
797 if (PMIX_EVENT_ORDER_BEFORE == location) {
798
799 pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
800 } else {
801
802 ev = (pmix_event_hdlr_t*)pmix_list_get_next(&ev->super);
803 if (NULL != ev) {
804 pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
805 } else {
806
807 pmix_list_append(cd->list, &evhdlr->super);
808 }
809 }
810 found = true;
811 break;
812 }
813 }
814
815
816
817
818
819 if (!found) {
820
821 --pmix_globals.events.nhdlrs;
822 rc = PMIX_ERR_EVENT_REGISTRATION;
823 index = UINT_MAX;
824 PMIX_RELEASE(evhdlr);
825 goto ack;
826 }
827 }
828 if (PMIX_ERR_WOULD_BLOCK == rc) {
829
830 PMIX_RELEASE(cd);
831 return;
832 }
833
834 ack:
835
836
837 if (NULL != cd->evregcbfn) {
838 cd->evregcbfn(rc, index, cd->cbdata);
839 }
840
841
842 check_cached_events(cd);
843 if (NULL != cd->codes) {
844 free(cd->codes);
845 cd->codes = NULL;
846 }
847
848
849 PMIX_RELEASE(cd);
850 }
851
852 PMIX_EXPORT void PMIx_Register_event_handler(pmix_status_t codes[], size_t ncodes,
853 pmix_info_t info[], size_t ninfo,
854 pmix_notification_fn_t event_hdlr,
855 pmix_hdlr_reg_cbfunc_t cbfunc,
856 void *cbdata)
857 {
858 pmix_rshift_caddy_t *cd;
859 size_t n;
860
861 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
862
863 if (pmix_globals.init_cntr <= 0) {
864 PMIX_RELEASE_THREAD(&pmix_global_lock);
865 if (NULL != cbfunc) {
866 cbfunc(PMIX_ERR_INIT, 0, cbdata);
867 }
868 return;
869 }
870 PMIX_RELEASE_THREAD(&pmix_global_lock);
871
872
873
874 cd = PMIX_NEW(pmix_rshift_caddy_t);
875
876
877
878 if (0 < ncodes) {
879 cd->codes = (pmix_status_t*)malloc(ncodes * sizeof(pmix_status_t));
880 if (NULL == cd->codes) {
881
882 PMIX_RELEASE(cd);
883 if (NULL != cbfunc) {
884 cbfunc(PMIX_ERR_NOMEM, SIZE_MAX, cbdata);
885 }
886 return;
887 }
888 for (n=0; n < ncodes; n++) {
889 cd->codes[n] = codes[n];
890 }
891 }
892 cd->ncodes = ncodes;
893 cd->info = info;
894 cd->ninfo = ninfo;
895 cd->evhdlr = event_hdlr;
896 cd->evregcbfn = cbfunc;
897 cd->cbdata = cbdata;
898
899 pmix_output_verbose(2, pmix_client_globals.event_output,
900 "pmix_register_event_hdlr shifting to progress thread");
901
902 PMIX_THREADSHIFT(cd, reg_event_hdlr);
903 }
904
905 static void dereg_event_hdlr(int sd, short args, void *cbdata)
906 {
907 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
908 pmix_buffer_t *msg = NULL;
909 pmix_event_hdlr_t *evhdlr, *ev;
910 pmix_cmd_t cmd = PMIX_DEREGEVENTS_CMD;
911 pmix_status_t rc = PMIX_SUCCESS;
912 pmix_status_t wildcard = PMIX_MAX_ERR_CONSTANT;
913 size_t n;
914 pmix_active_code_t *active;
915
916
917 PMIX_ACQUIRE_OBJECT(cd);
918
919
920
921 if ((!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) || PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) &&
922 pmix_globals.connected) {
923 msg = PMIX_NEW(pmix_buffer_t);
924 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
925 msg, &cmd, 1, PMIX_COMMAND);
926 if (PMIX_SUCCESS != rc) {
927 PMIX_RELEASE(msg);
928 goto cleanup;
929 }
930 }
931
932
933 if ((NULL != pmix_globals.events.first && pmix_globals.events.first->index == cd->ref) ||
934 (NULL != pmix_globals.events.last && pmix_globals.events.last->index == cd->ref)) {
935
936 if (NULL != pmix_globals.events.first && pmix_globals.events.first->index == cd->ref) {
937 ev = pmix_globals.events.first;
938 } else {
939 ev = pmix_globals.events.last;
940 }
941 if (NULL != msg) {
942
943
944 if (NULL == ev->codes) {
945 if (0 == pmix_list_get_size(&pmix_globals.events.default_events)) {
946
947 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
948 msg, &wildcard, 1, PMIX_STATUS);
949 if (PMIX_SUCCESS != rc) {
950 PMIX_RELEASE(msg);
951 goto cleanup;
952 }
953 }
954 } else {
955 for (n=0; n < ev->ncodes; n++) {
956
957 PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
958 if (active->code == ev->codes[n]) {
959 --active->nregs;
960 if (0 == active->nregs) {
961 pmix_list_remove_item(&pmix_globals.events.actives, &active->super);
962
963 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
964 msg, &active->code, 1, PMIX_STATUS);
965 if (PMIX_SUCCESS != rc) {
966 PMIX_RELEASE(active);
967 PMIX_RELEASE(msg);
968 goto cleanup;
969 }
970 PMIX_RELEASE(active);
971 }
972 break;
973 }
974 }
975 }
976 }
977 }
978 if (ev == pmix_globals.events.first) {
979 pmix_globals.events.first = NULL;
980 } else {
981 pmix_globals.events.last = NULL;
982 }
983 PMIX_RELEASE(ev);
984 goto cleanup;
985 }
986
987
988 PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
989 if (evhdlr->index == cd->ref) {
990
991 pmix_list_remove_item(&pmix_globals.events.default_events, &evhdlr->super);
992 if (NULL != msg) {
993
994
995 if (0 == pmix_list_get_size(&pmix_globals.events.default_events)) {
996 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
997 msg, &wildcard, 1, PMIX_STATUS);
998 if (PMIX_SUCCESS != rc) {
999 PMIX_RELEASE(msg);
1000 goto cleanup;
1001 }
1002 }
1003 }
1004 PMIX_RELEASE(evhdlr);
1005 goto report;
1006 }
1007 }
1008 PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.single_events, pmix_event_hdlr_t) {
1009 if (evhdlr->index == cd->ref) {
1010
1011 pmix_list_remove_item(&pmix_globals.events.single_events, &evhdlr->super);
1012 if (NULL != msg) {
1013
1014 PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
1015 if (active->code == evhdlr->codes[0]) {
1016 --active->nregs;
1017 if (0 == active->nregs) {
1018 pmix_list_remove_item(&pmix_globals.events.actives, &active->super);
1019 if (NULL != msg) {
1020
1021 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1022 msg, &active->code, 1, PMIX_STATUS);
1023 if (PMIX_SUCCESS != rc) {
1024 PMIX_RELEASE(active);
1025 PMIX_RELEASE(msg);
1026 goto cleanup;
1027 }
1028 }
1029 PMIX_RELEASE(active);
1030 }
1031 break;
1032 }
1033 }
1034 }
1035 PMIX_RELEASE(evhdlr);
1036 goto report;
1037 }
1038 }
1039 PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.multi_events, pmix_event_hdlr_t) {
1040 if (evhdlr->index == cd->ref) {
1041
1042 pmix_list_remove_item(&pmix_globals.events.multi_events, &evhdlr->super);
1043 for (n=0; n < evhdlr->ncodes; n++) {
1044
1045 PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
1046 if (active->code == evhdlr->codes[n]) {
1047 --active->nregs;
1048 if (0 == active->nregs) {
1049 pmix_list_remove_item(&pmix_globals.events.actives, &active->super);
1050 if (NULL != msg) {
1051
1052 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1053 msg, &active->code, 1, PMIX_STATUS);
1054 if (PMIX_SUCCESS != rc) {
1055 PMIX_RELEASE(active);
1056 PMIX_RELEASE(msg);
1057 goto cleanup;
1058 }
1059 }
1060 PMIX_RELEASE(active);
1061 }
1062 break;
1063 }
1064 }
1065 }
1066 PMIX_RELEASE(evhdlr);
1067 goto report;
1068 }
1069 }
1070
1071 if (NULL != msg) {
1072 PMIX_RELEASE(msg);
1073 }
1074 goto cleanup;
1075
1076 report:
1077 if (NULL != msg) {
1078
1079 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver, msg, NULL, NULL);
1080 if (PMIX_SUCCESS != rc) {
1081 PMIX_ERROR_LOG(rc);
1082 }
1083 }
1084
1085 cleanup:
1086
1087 if (NULL != cd->cbfunc.opcbfn) {
1088 cd->cbfunc.opcbfn(rc, cd->cbdata);
1089 }
1090 PMIX_RELEASE(cd);
1091 }
1092
1093 PMIX_EXPORT void PMIx_Deregister_event_handler(size_t event_hdlr_ref,
1094 pmix_op_cbfunc_t cbfunc,
1095 void *cbdata)
1096 {
1097 pmix_shift_caddy_t *cd;
1098
1099 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1100 if (pmix_globals.init_cntr <= 0) {
1101 PMIX_RELEASE_THREAD(&pmix_global_lock);
1102 if (NULL != cbfunc) {
1103 cbfunc(PMIX_ERR_INIT, cbdata);
1104 }
1105 return;
1106 }
1107 PMIX_RELEASE_THREAD(&pmix_global_lock);
1108
1109
1110 cd = PMIX_NEW(pmix_shift_caddy_t);
1111 cd->cbfunc.opcbfn = cbfunc;
1112 cd->cbdata = cbdata;
1113 cd->ref = event_hdlr_ref;
1114
1115 pmix_output_verbose(2, pmix_client_globals.event_output,
1116 "pmix_deregister_event_hdlr shifting to progress thread");
1117 PMIX_THREADSHIFT(cd, dereg_event_hdlr);
1118 }