This source file includes following definitions.
- pmix_server_register_params
- eviction_cbfunc
- pmix_server_init
- pmix_server_start
- pmix_server_finalize
- send_error
- _mdxresp
- modex_resp
- pmix_server_dmdx_recv
- dccon
- dcdes
- relcbfunc
- pmix_server_dmdx_resp
- opcon
- rqcon
- rqdes
- mdcon
- mddes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include "orte_config.h"
30 #include "orte/types.h"
31 #include "opal/types.h"
32
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #ifdef HAVE_SYS_TYPES_H
37 #include <sys/types.h>
38 #endif
39 #include <fcntl.h>
40 #ifdef HAVE_NETINET_IN_H
41 #include <netinet/in.h>
42 #endif
43 #ifdef HAVE_ARPA_INET_H
44 #include <arpa/inet.h>
45 #endif
46 #ifdef HAVE_NETDB_H
47 #include <netdb.h>
48 #endif
49 #include <ctype.h>
50
51 #include "opal_stdint.h"
52 #include "opal/class/opal_hotel.h"
53 #include "opal/class/opal_list.h"
54 #include "opal/mca/base/mca_base_var.h"
55 #include "opal/mca/pmix/pmix.h"
56 #include "opal/util/opal_environ.h"
57 #include "opal/util/show_help.h"
58 #include "opal/util/error.h"
59 #include "opal/util/output.h"
60 #include "opal/util/os_path.h"
61 #include "opal/util/argv.h"
62 #include "opal/util/printf.h"
63
64 #include "orte/mca/errmgr/errmgr.h"
65 #include "orte/mca/grpcomm/grpcomm.h"
66 #include "orte/mca/rml/rml.h"
67 #include "orte/mca/rml/base/rml_contact.h"
68 #include "orte/util/name_fns.h"
69 #include "orte/util/proc_info.h"
70 #include "orte/util/session_dir.h"
71 #include "orte/util/show_help.h"
72 #include "orte/util/threads.h"
73 #include "orte/runtime/orte_globals.h"
74 #include "orte/runtime/orte_data_server.h"
75
76 #include "pmix_server.h"
77 #include "pmix_server_internal.h"
78
79
80
81
82 static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
83 opal_buffer_t *buffer,
84 orte_rml_tag_t tg, void *cbdata);
85 static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
86 opal_buffer_t *buffer,
87 orte_rml_tag_t tg, void *cbdata);
88
89 #define ORTE_PMIX_SERVER_MIN_ROOMS 4096
90
91 pmix_server_globals_t orte_pmix_server_globals = {0};
92
93 static opal_pmix_server_module_t pmix_server = {
94 .client_connected = pmix_server_client_connected_fn,
95 .client_finalized = pmix_server_client_finalized_fn,
96 .abort = pmix_server_abort_fn,
97 .fence_nb = pmix_server_fencenb_fn,
98 .direct_modex = pmix_server_dmodex_req_fn,
99 .publish = pmix_server_publish_fn,
100 .lookup = pmix_server_lookup_fn,
101 .unpublish = pmix_server_unpublish_fn,
102 .spawn = pmix_server_spawn_fn,
103 .connect = pmix_server_connect_fn,
104 .disconnect = pmix_server_disconnect_fn,
105 .register_events = pmix_server_register_events_fn,
106 .deregister_events = pmix_server_deregister_events_fn,
107 .notify_event = pmix_server_notify_event,
108 .query = pmix_server_query_fn,
109 .tool_connected = pmix_tool_connected_fn,
110 .log = pmix_server_log_fn,
111 .allocate = pmix_server_alloc_fn,
112 .job_control = pmix_server_job_ctrl_fn
113 };
114
115 void pmix_server_register_params(void)
116 {
117
118 orte_pmix_server_globals.verbosity = -1;
119 (void) mca_base_var_register ("orte", "pmix", NULL, "server_verbose",
120 "Debug verbosity for PMIx server",
121 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
122 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
123 &orte_pmix_server_globals.verbosity);
124 if (0 <= orte_pmix_server_globals.verbosity) {
125 orte_pmix_server_globals.output = opal_output_open(NULL);
126 opal_output_set_verbosity(orte_pmix_server_globals.output,
127 orte_pmix_server_globals.verbosity);
128 }
129
130 orte_pmix_server_globals.num_rooms = -1;
131 (void) mca_base_var_register ("orte", "pmix", NULL, "server_max_reqs",
132 "Maximum number of backlogged PMIx server direct modex requests",
133 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
134 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
135 &orte_pmix_server_globals.num_rooms);
136
137 orte_pmix_server_globals.timeout = 2;
138 (void) mca_base_var_register ("orte", "pmix", NULL, "server_max_wait",
139 "Maximum time (in seconds) the PMIx server should wait to service direct modex requests",
140 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
141 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
142 &orte_pmix_server_globals.timeout);
143
144
145 orte_pmix_server_globals.wait_for_server = false;
146 (void) mca_base_var_register ("orte", "pmix", NULL, "wait_for_server",
147 "Whether or not to wait for the session-level server to start",
148 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
149 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
150 &orte_pmix_server_globals.wait_for_server);
151
152
153 orte_pmix_server_globals.legacy = false;
154 (void) mca_base_var_register ("orte", "pmix", NULL, "server_usock_connections",
155 "Whether or not to support legacy usock connections",
156 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
157 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
158 &orte_pmix_server_globals.legacy);
159
160
161 orte_pmix_server_globals.session_server = false;
162 (void) mca_base_var_register ("orte", "pmix", NULL, "session_server",
163 "Whether or not to drop a session-level tool rendezvous point",
164 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
165 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
166 &orte_pmix_server_globals.session_server);
167
168
169 orte_pmix_server_globals.system_server = false;
170 (void) mca_base_var_register ("orte", "pmix", NULL, "system_server",
171 "Whether or not to drop a system-level tool rendezvous point",
172 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
173 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
174 &orte_pmix_server_globals.system_server);
175 }
176
177 static void eviction_cbfunc(struct opal_hotel_t *hotel,
178 int room_num, void *occupant)
179 {
180 pmix_server_req_t *req = (pmix_server_req_t*)occupant;
181 bool timeout = false;
182 int rc=OPAL_ERR_TIMEOUT;
183
184
185 req->timeout -= orte_pmix_server_globals.timeout;
186 if (req->timeout > 0) {
187 req->timeout -= orte_pmix_server_globals.timeout;
188 if (0 >= req->timeout) {
189 timeout = true;
190 }
191 }
192 if (!timeout) {
193
194 if (OPAL_SUCCESS == (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
195 return;
196 }
197 ORTE_ERROR_LOG(rc);
198
199 } else {
200 orte_show_help("help-orted.txt", "timedout", true, req->operation);
201 }
202
203 if (NULL != req->opcbfunc) {
204 req->opcbfunc(OPAL_ERR_TIMEOUT, req->cbdata);
205 } else if (NULL != req->mdxcbfunc) {
206 req->mdxcbfunc(OPAL_ERR_TIMEOUT, NULL, 0, req->cbdata, NULL, NULL);
207 } else if (NULL != req->spcbfunc) {
208 req->spcbfunc(OPAL_ERR_TIMEOUT, ORTE_JOBID_INVALID, req->cbdata);
209 } else if (NULL != req->lkcbfunc) {
210 req->lkcbfunc(OPAL_ERR_TIMEOUT, NULL, req->cbdata);
211 }
212 OBJ_RELEASE(req);
213 }
214
215
216
217
218 int pmix_server_init(void)
219 {
220 int rc;
221 opal_list_t info;
222 opal_value_t *kv;
223
224 if (orte_pmix_server_globals.initialized) {
225 return ORTE_SUCCESS;
226 }
227 orte_pmix_server_globals.initialized = true;
228
229
230 OBJ_CONSTRUCT(&orte_pmix_server_globals.reqs, opal_hotel_t);
231
232
233
234
235
236 if (-1 == orte_pmix_server_globals.num_rooms) {
237 orte_pmix_server_globals.num_rooms = orte_process_info.num_procs * 2;
238 if (orte_pmix_server_globals.num_rooms < ORTE_PMIX_SERVER_MIN_ROOMS) {
239 orte_pmix_server_globals.num_rooms = ORTE_PMIX_SERVER_MIN_ROOMS;
240 }
241 }
242 if (OPAL_SUCCESS != (rc = opal_hotel_init(&orte_pmix_server_globals.reqs,
243 orte_pmix_server_globals.num_rooms,
244 orte_event_base, orte_pmix_server_globals.timeout*1000000,
245 ORTE_ERROR_PRI, eviction_cbfunc))) {
246 ORTE_ERROR_LOG(rc);
247 return rc;
248 }
249 OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t);
250 orte_pmix_server_globals.server = *ORTE_NAME_INVALID;
251
252 OBJ_CONSTRUCT(&info, opal_list_t);
253
254 kv = OBJ_NEW(opal_value_t);
255 kv->key = strdup(OPAL_PMIX_SERVER_TMPDIR);
256 kv->type = OPAL_STRING;
257 kv->data.string = opal_os_path(false, orte_process_info.jobfam_session_dir, NULL);
258 opal_list_append(&info, &kv->super);
259 if (!orte_pmix_server_globals.legacy) {
260
261 kv = OBJ_NEW(opal_value_t);
262 kv->key = strdup(OPAL_PMIX_SINGLE_LISTENER);
263 kv->type = OPAL_BOOL;
264 kv->data.flag = true;
265 opal_list_append(&info, &kv->super);
266 }
267
268 kv = OBJ_NEW(opal_value_t);
269 kv->key = strdup(OPAL_PMIX_SERVER_ENABLE_MONITORING);
270 kv->type = OPAL_BOOL;
271 kv->data.flag = true;
272 opal_list_append(&info, &kv->super);
273
274
275 if (orte_pmix_server_globals.session_server) {
276 kv = OBJ_NEW(opal_value_t);
277 kv->key = strdup(OPAL_PMIX_SERVER_TOOL_SUPPORT);
278 kv->type = OPAL_BOOL;
279 kv->data.flag = true;
280 opal_list_append(&info, &kv->super);
281 }
282
283
284
285
286
287 if (orte_pmix_server_globals.system_server &&
288 (ORTE_PROC_IS_HNP || ORTE_PROC_IS_MASTER)) {
289 kv = OBJ_NEW(opal_value_t);
290 kv->key = strdup(OPAL_PMIX_SERVER_SYSTEM_SUPPORT);
291 kv->type = OPAL_BOOL;
292 kv->data.flag = true;
293 opal_list_append(&info, &kv->super);
294 }
295
296
297 if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_MASTER) {
298 kv = OBJ_NEW(opal_value_t);
299 kv->key = strdup(OPAL_PMIX_SERVER_GATEWAY);
300 kv->type = OPAL_BOOL;
301 kv->data.flag = true;
302 opal_list_append(&info, &kv->super);
303 }
304
305
306 if (ORTE_SUCCESS != (rc = opal_pmix.server_init(&pmix_server, &info))) {
307
308 return rc;
309 }
310 OPAL_LIST_DESTRUCT(&info);
311
312 return rc;
313 }
314
315 void pmix_server_start(void)
316 {
317
318 orte_data_server_init();
319
320
321 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,
322 ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL);
323
324
325 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP,
326 ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL);
327
328
329 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP,
330 ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL);
331
332
333 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
334 ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL);
335
336
337 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION,
338 ORTE_RML_PERSISTENT, pmix_server_notify, NULL);
339 }
340
341 void pmix_server_finalize(void)
342 {
343 if (!orte_pmix_server_globals.initialized) {
344 return;
345 }
346
347 opal_output_verbose(2, orte_pmix_server_globals.output,
348 "%s Finalizing PMIX server",
349 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
350
351
352 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX);
353 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP);
354 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP);
355 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT);
356 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION);
357
358
359 orte_data_server_finalize();
360
361
362 opal_pmix.server_finalize();
363
364
365 OBJ_DESTRUCT(&orte_pmix_server_globals.reqs);
366 OPAL_LIST_DESTRUCT(&orte_pmix_server_globals.notifications);
367 }
368
369 static void send_error(int status, opal_process_name_t *idreq,
370 orte_process_name_t *remote, int remote_room)
371 {
372 opal_buffer_t *reply;
373 int rc;
374
375 reply = OBJ_NEW(opal_buffer_t);
376
377 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &status, 1, OPAL_INT))) {
378 ORTE_ERROR_LOG(rc);
379 goto error;
380 }
381
382 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, idreq, 1, OPAL_NAME))) {
383 ORTE_ERROR_LOG(rc);
384 goto error;
385 }
386
387
388 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &remote_room, 1, OPAL_INT))) {
389 ORTE_ERROR_LOG(rc);
390 goto error;
391 }
392
393
394 orte_rml.send_buffer_nb(remote, reply,
395 ORTE_RML_TAG_DIRECT_MODEX_RESP,
396 orte_rml_send_callback, NULL);
397 return;
398 error:
399 OBJ_RELEASE(reply);
400 return;
401 }
402
403 static void _mdxresp(int sd, short args, void *cbdata)
404 {
405 pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
406 int rc;
407 opal_buffer_t *reply;
408
409 ORTE_ACQUIRE_OBJECT(req);
410
411
412 opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
413
414 reply = OBJ_NEW(opal_buffer_t);
415
416 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &req->status, 1, OPAL_INT))) {
417 ORTE_ERROR_LOG(rc);
418 OBJ_RELEASE(reply);
419 goto done;
420 }
421
422 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &req->target, 1, OPAL_NAME))) {
423 ORTE_ERROR_LOG(rc);
424 OBJ_RELEASE(reply);
425 goto done;
426 }
427
428 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &req->remote_room_num, 1, OPAL_INT))) {
429 ORTE_ERROR_LOG(rc);
430 OBJ_RELEASE(reply);
431 goto done;
432 }
433
434 opal_dss.copy_payload(reply, &req->msg);
435
436
437 orte_rml.send_buffer_nb(&req->proxy, reply,
438 ORTE_RML_TAG_DIRECT_MODEX_RESP,
439 orte_rml_send_callback, NULL);
440
441 done:
442
443 if (NULL != req->rlcbfunc) {
444 req->rlcbfunc(req->cbdata);
445 }
446 OBJ_RELEASE(req);
447 return;
448 }
449
450
451
452 static void modex_resp(int status,
453 const char *data, size_t sz, void *cbdata,
454 opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata)
455 {
456 pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
457 opal_buffer_t xfer;
458
459 ORTE_ACQUIRE_OBJECT(req);
460
461 req->status = status;
462
463
464 OBJ_CONSTRUCT(&xfer, opal_buffer_t);
465 opal_dss.load(&xfer, (void*)data, sz);
466 opal_dss.copy_payload(&req->msg, &xfer);
467 xfer.base_ptr = NULL;
468 OBJ_DESTRUCT(&xfer);
469
470 req->rlcbfunc = relcbfunc;
471 req->cbdata = relcbdata;
472 opal_event_set(orte_event_base, &(req->ev),
473 -1, OPAL_EV_WRITE, _mdxresp, req);
474 opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
475 ORTE_POST_OBJECT(req);
476 opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
477 }
478 static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
479 opal_buffer_t *buffer,
480 orte_rml_tag_t tg, void *cbdata)
481 {
482 int rc, room_num;
483 int32_t cnt;
484 opal_process_name_t idreq;
485 orte_process_name_t name;
486 orte_job_t *jdata;
487 orte_proc_t *proc;
488 pmix_server_req_t *req;
489
490
491
492 cnt = 1;
493 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &idreq, &cnt, OPAL_NAME))) {
494 ORTE_ERROR_LOG(rc);
495 return;
496 }
497 opal_output_verbose(2, orte_pmix_server_globals.output,
498 "%s dmdx:recv request from proc %s for proc %s",
499 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
500 ORTE_NAME_PRINT(sender),
501 ORTE_NAME_PRINT(&idreq));
502
503 cnt = 1;
504 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
505 ORTE_ERROR_LOG(rc);
506 return;
507 }
508
509 memcpy((char*)&name, (char*)&idreq, sizeof(orte_process_name_t));
510 if (NULL == (jdata = orte_get_job_data_object(name.jobid))) {
511
512
513
514
515 req = OBJ_NEW(pmix_server_req_t);
516 opal_asprintf(&req->operation, "DMDX: %s:%d", __FILE__, __LINE__);
517 req->proxy = *sender;
518 req->target = idreq;
519 req->remote_room_num = room_num;
520
521
522 ORTE_ADJUST_TIMEOUT(req);
523 if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
524 orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
525 OBJ_RELEASE(req);
526 send_error(rc, &idreq, sender, room_num);
527 }
528 return;
529 }
530 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, name.vpid))) {
531
532 send_error(ORTE_ERR_NOT_FOUND, &idreq, sender, room_num);
533 return;
534 }
535 if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
536
537 send_error(ORTE_ERR_NOT_FOUND, &idreq, sender, room_num);
538 return;
539 }
540
541
542 req = OBJ_NEW(pmix_server_req_t);
543 opal_asprintf(&req->operation, "DMDX: %s:%d", __FILE__, __LINE__);
544 req->proxy = *sender;
545 req->target = idreq;
546 req->remote_room_num = room_num;
547
548
549 ORTE_ADJUST_TIMEOUT(req);
550 if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
551 orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
552 OBJ_RELEASE(req);
553 send_error(rc, &idreq, sender, room_num);
554 return;
555 }
556
557
558 if (OPAL_SUCCESS != (rc = opal_pmix.server_dmodex_request(&idreq, modex_resp, req))) {
559 ORTE_ERROR_LOG(rc);
560 opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
561 OBJ_RELEASE(req);
562 send_error(rc, &idreq, sender, room_num);
563 return;
564 }
565 return;
566 }
567
568 typedef struct {
569 opal_object_t super;
570 char *data;
571 int32_t ndata;
572 } datacaddy_t;
573 static void dccon(datacaddy_t *p)
574 {
575 p->data = NULL;
576 }
577 static void dcdes(datacaddy_t *p)
578 {
579 if (NULL != p->data) {
580 free(p->data);
581 }
582 }
583 static OBJ_CLASS_INSTANCE(datacaddy_t,
584 opal_object_t,
585 dccon, dcdes);
586
587 static void relcbfunc(void *relcbdata)
588 {
589 datacaddy_t *d = (datacaddy_t*)relcbdata;
590
591 OBJ_RELEASE(d);
592 }
593
594 static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
595 opal_buffer_t *buffer,
596 orte_rml_tag_t tg, void *cbdata)
597 {
598 int rc, ret, room_num, rnum;
599 int32_t cnt;
600 opal_process_name_t target;
601 pmix_server_req_t *req;
602 datacaddy_t *d;
603
604 opal_output_verbose(2, orte_pmix_server_globals.output,
605 "%s dmdx:recv response from proc %s",
606 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
607 ORTE_NAME_PRINT(sender));
608
609
610 cnt = 1;
611 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
612 ORTE_ERROR_LOG(rc);
613 return;
614 }
615
616
617 cnt = 1;
618 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &target, &cnt, OPAL_NAME))) {
619 ORTE_ERROR_LOG(rc);
620 return;
621 }
622
623
624 cnt = 1;
625 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
626 ORTE_ERROR_LOG(rc);
627 return;
628 }
629
630
631 d = OBJ_NEW(datacaddy_t);
632 if (OPAL_SUCCESS != (rc = opal_dss.unload(buffer, (void**)&d->data, &d->ndata))) {
633 ORTE_ERROR_LOG(rc);
634 return;
635 }
636
637
638 opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
639
640 if (NULL != req) {
641 if (NULL != req->mdxcbfunc) {
642 OBJ_RETAIN(d);
643 req->mdxcbfunc(ret, d->data, d->ndata, req->cbdata, relcbfunc, d);
644 }
645 OBJ_RELEASE(req);
646 }
647
648
649 for (rnum=0; rnum < orte_pmix_server_globals.reqs.num_rooms; rnum++) {
650 opal_hotel_knock(&orte_pmix_server_globals.reqs, rnum, (void**)&req);
651 if (NULL == req) {
652 continue;
653 }
654 if (req->target.jobid == target.jobid &&
655 req->target.vpid == target.vpid) {
656 if (NULL != req->mdxcbfunc) {
657 OBJ_RETAIN(d);
658 req->mdxcbfunc(ret, d->data, d->ndata, req->cbdata, relcbfunc, d);
659 }
660 opal_hotel_checkout(&orte_pmix_server_globals.reqs, rnum);
661 OBJ_RELEASE(req);
662 }
663 }
664 OBJ_RELEASE(d);
665 }
666
667 static void opcon(orte_pmix_server_op_caddy_t *p)
668 {
669 p->procs = NULL;
670 p->eprocs = NULL;
671 p->info = NULL;
672 p->cbfunc = NULL;
673 p->infocbfunc = NULL;
674 p->toolcbfunc = NULL;
675 p->cbdata = NULL;
676 }
677 OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t,
678 opal_object_t,
679 opcon, NULL);
680
681 static void rqcon(pmix_server_req_t *p)
682 {
683 p->operation = NULL;
684 p->range = OPAL_PMIX_RANGE_SESSION;
685 p->proxy = *ORTE_NAME_INVALID;
686 p->target = *ORTE_NAME_INVALID;
687 p->timeout = orte_pmix_server_globals.timeout;
688 p->jdata = NULL;
689 OBJ_CONSTRUCT(&p->msg, opal_buffer_t);
690 p->opcbfunc = NULL;
691 p->mdxcbfunc = NULL;
692 p->spcbfunc = NULL;
693 p->lkcbfunc = NULL;
694 p->rlcbfunc = NULL;
695 p->cbdata = NULL;
696 }
697 static void rqdes(pmix_server_req_t *p)
698 {
699 if (NULL != p->operation) {
700 free(p->operation);
701 }
702 if (NULL != p->jdata) {
703 OBJ_RELEASE(p->jdata);
704 }
705 OBJ_DESTRUCT(&p->msg);
706 }
707 OBJ_CLASS_INSTANCE(pmix_server_req_t,
708 opal_object_t,
709 rqcon, rqdes);
710
711 static void mdcon(orte_pmix_mdx_caddy_t *p)
712 {
713 p->sig = NULL;
714 p->cbfunc = NULL;
715 p->cbdata = NULL;
716 }
717 static void mddes(orte_pmix_mdx_caddy_t *p)
718 {
719 if (NULL != p->sig) {
720 OBJ_RELEASE(p->sig);
721 }
722 }
723 OBJ_CLASS_INSTANCE(orte_pmix_mdx_caddy_t,
724 opal_object_t,
725 mdcon, mddes);