This source file includes following definitions.
- relcbfunc
- query_cbfunc
- _local_relcb
- _local_cbfunc
- PMIx_Query_info_nb
- acb
- PMIx_Allocation_request
- PMIx_Allocation_request_nb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include <src/include/pmix_config.h>
16
17 #include <src/include/pmix_stdint.h>
18 #include <src/include/pmix_socket_errno.h>
19
20 #include <pmix.h>
21 #include <pmix_common.h>
22 #include <pmix_server.h>
23 #include <pmix_rename.h>
24
25 #include "src/threads/threads.h"
26 #include "src/util/argv.h"
27 #include "src/util/error.h"
28 #include "src/util/name_fns.h"
29 #include "src/util/output.h"
30 #include "src/mca/bfrops/bfrops.h"
31 #include "src/mca/ptl/ptl.h"
32 #include "src/common/pmix_attributes.h"
33
34 #include "src/client/pmix_client_ops.h"
35 #include "src/server/pmix_server_ops.h"
36 #include "src/include/pmix_globals.h"
37
38 static void relcbfunc(void *cbdata)
39 {
40 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
41
42 pmix_output_verbose(2, pmix_globals.debug_output,
43 "pmix:query release callback");
44
45 if (NULL != cd->info) {
46 PMIX_INFO_FREE(cd->info, cd->ninfo);
47 }
48 PMIX_RELEASE(cd);
49 }
50 static void query_cbfunc(struct pmix_peer_t *peer,
51 pmix_ptl_hdr_t *hdr,
52 pmix_buffer_t *buf, void *cbdata)
53 {
54 pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
55 pmix_status_t rc;
56 pmix_shift_caddy_t *results;
57 int cnt;
58 size_t n;
59 pmix_kval_t *kv;
60
61 pmix_output_verbose(2, pmix_globals.debug_output,
62 "pmix:query cback from server");
63
64 results = PMIX_NEW(pmix_shift_caddy_t);
65
66
67 cnt = 1;
68 PMIX_BFROPS_UNPACK(rc, peer, buf, &results->status, &cnt, PMIX_STATUS);
69 if (PMIX_SUCCESS != rc) {
70 PMIX_ERROR_LOG(rc);
71 results->status = rc;
72 goto complete;
73 }
74 if (PMIX_SUCCESS != results->status) {
75 goto complete;
76 }
77
78
79 cnt = 1;
80 PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE);
81 if (PMIX_SUCCESS != rc) {
82 PMIX_ERROR_LOG(rc);
83 results->status = rc;
84 goto complete;
85 }
86 if (0 < results->ninfo) {
87 PMIX_INFO_CREATE(results->info, results->ninfo);
88 cnt = results->ninfo;
89 PMIX_BFROPS_UNPACK(rc, peer, buf, results->info, &cnt, PMIX_INFO);
90 if (PMIX_SUCCESS != rc) {
91 PMIX_ERROR_LOG(rc);
92 results->status = rc;
93 goto complete;
94 }
95
96 for (n=0; n < results->ninfo; n++) {
97 kv = PMIX_NEW(pmix_kval_t);
98 kv->key = strdup(results->info[n].key);
99 PMIX_VALUE_CREATE(kv->value, 1);
100 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
101 kv->value, &results->info[n].value);
102
103 PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer,
104 &pmix_globals.myid, PMIX_INTERNAL,
105 kv);
106 PMIX_RELEASE(kv);
107 }
108 }
109
110 complete:
111 pmix_output_verbose(2, pmix_globals.debug_output,
112 "pmix:query cback from server releasing with status %s", PMIx_Error_string(results->status));
113
114 if (NULL != cd->cbfunc) {
115 cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results);
116 }
117 PMIX_RELEASE(cd);
118 }
119
120 static void _local_relcb(void *cbdata)
121 {
122 pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
123 PMIX_RELEASE(cd);
124 }
125
126 static void _local_cbfunc(int sd, short args, void *cbdata)
127 {
128 pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
129 if (NULL != cd->cbfunc) {
130 cd->cbfunc(cd->status, cd->info, cd->ninfo, cd->cbdata, _local_relcb, cd);
131 return;
132 }
133 PMIX_RELEASE(cd);
134 }
135
136 PMIX_EXPORT pmix_status_t PMIx_Query_info_nb(pmix_query_t queries[], size_t nqueries,
137 pmix_info_cbfunc_t cbfunc, void *cbdata)
138
139 {
140 pmix_query_caddy_t *cd;
141 pmix_cmd_t cmd = PMIX_QUERY_CMD;
142 pmix_buffer_t *msg;
143 pmix_status_t rc;
144 pmix_cb_t cb;
145 size_t n, p;
146 pmix_list_t results;
147 pmix_kval_t *kv, *kvnxt;
148 pmix_proc_t proc;
149
150 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
151
152 pmix_output_verbose(2, pmix_globals.debug_output,
153 "pmix:query non-blocking");
154
155 if (pmix_globals.init_cntr <= 0) {
156 PMIX_RELEASE_THREAD(&pmix_global_lock);
157 return PMIX_ERR_INIT;
158 }
159
160 if (0 == nqueries || NULL == queries) {
161 PMIX_RELEASE_THREAD(&pmix_global_lock);
162 return PMIX_ERR_BAD_PARAM;
163 }
164
165
166
167 for (n=0; n < nqueries; n++) {
168 if (NULL != queries[n].qualifiers && 0 == queries[n].nqual) {
169
170 p = 0;
171 while (!(PMIX_INFO_IS_END(&queries[n].qualifiers[p])) && p < SIZE_MAX) {
172 ++p;
173 }
174 if (SIZE_MAX == p) {
175
176 PMIX_RELEASE_THREAD(&pmix_global_lock);
177 return PMIX_ERR_BAD_PARAM;
178 }
179 queries[n].nqual = p;
180 }
181 }
182
183
184 PMIX_CONSTRUCT(&results, pmix_list_t);
185
186
187
188
189
190
191
192
193 memset(proc.nspace, 0, PMIX_MAX_NSLEN+1);
194 proc.rank = PMIX_RANK_INVALID;
195 for (n=0; n < nqueries; n++) {
196
197 if (0 == strcmp(queries[n].keys[0], PMIX_QUERY_ATTRIBUTE_SUPPORT)) {
198 cd = PMIX_NEW(pmix_query_caddy_t);
199 cd->queries = queries;
200 cd->nqueries = nqueries;
201 cd->cbfunc = cbfunc;
202 cd->cbdata = cbdata;
203 PMIX_THREADSHIFT(cd, pmix_attrs_query_support);
204
205
206
207 PMIX_RELEASE_THREAD(&pmix_global_lock);
208 return PMIX_SUCCESS;
209 }
210 for (p=0; p < queries[n].nqual; p++) {
211 if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_QUERY_REFRESH_CACHE)) {
212 if (PMIX_INFO_TRUE(&queries[n].qualifiers[p])) {
213 PMIX_LIST_DESTRUCT(&results);
214 goto query;
215 }
216 } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_PROCID)) {
217 PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.proc->nspace);
218 proc.rank = queries[n].qualifiers[p].value.data.proc->rank;
219 } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_NSPACE)) {
220 PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.string);
221 } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_RANK)) {
222 proc.rank = queries[n].qualifiers[p].value.data.rank;
223 } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_HOSTNAME)) {
224 if (0 != strcmp(queries[n].qualifiers[p].value.data.string, pmix_globals.hostname)) {
225
226 PMIX_LIST_DESTRUCT(&results);
227 goto query;
228 }
229 }
230 }
231
232
233 PMIX_CONSTRUCT(&cb, pmix_cb_t);
234 cb.copy = false;
235
236 if (PMIX_RANK_INVALID == proc.rank &&
237 0 == strlen(proc.nspace)) {
238
239 cb.proc = &pmix_globals.myid;
240 } else {
241 if (0 == strlen(proc.nspace)) {
242
243 PMIX_LOAD_NSPACE(cb.proc->nspace, pmix_globals.myid.nspace);
244 }
245 if (PMIX_RANK_INVALID == proc.rank) {
246
247 proc.rank = PMIX_RANK_WILDCARD;
248 }
249 cb.proc = &proc;
250 }
251 for (p=0; NULL != queries[n].keys[p]; p++) {
252 cb.key = queries[n].keys[p];
253 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
254 if (PMIX_SUCCESS != rc) {
255
256 PMIX_LIST_DESTRUCT(&results);
257 PMIX_DESTRUCT(&cb);
258 goto query;
259 }
260
261 PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &cb.kvs, pmix_kval_t) {
262 pmix_list_remove_item(&cb.kvs, &kv->super);
263 pmix_list_append(&results, &kv->super);
264 }
265 PMIX_DESTRUCT(&cb);
266 }
267 }
268
269
270
271 cd = PMIX_NEW(pmix_query_caddy_t);
272 cd->cbfunc = cbfunc;
273 cd->cbdata = cbdata;
274 cd->status = PMIX_SUCCESS;
275 cd->ninfo = pmix_list_get_size(&results);
276 PMIX_INFO_CREATE(cd->info, cd->ninfo);
277 n = 0;
278 PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &results, pmix_kval_t) {
279 PMIX_LOAD_KEY(cd->info[n].key, kv->key);
280 rc = pmix_value_xfer(&cd->info[n].value, kv->value);
281 if (PMIX_SUCCESS != rc) {
282 cd->status = rc;
283 PMIX_INFO_FREE(cd->info, cd->ninfo);
284 break;
285 }
286 ++n;
287 }
288
289 PMIX_LIST_DESTRUCT(&results);
290
291
292
293 PMIX_THREADSHIFT(cd, _local_cbfunc);
294
295
296
297 PMIX_RELEASE_THREAD(&pmix_global_lock);
298 return PMIX_SUCCESS;
299
300
301 query:
302
303
304 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
305 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
306 PMIX_RELEASE_THREAD(&pmix_global_lock);
307 if (NULL == pmix_host_server.query) {
308
309 return PMIX_ERR_NOT_SUPPORTED;
310 }
311 pmix_output_verbose(2, pmix_globals.debug_output,
312 "pmix:query handed to RM");
313 rc = pmix_host_server.query(&pmix_globals.myid,
314 queries, nqueries,
315 cbfunc, cbdata);
316 return rc;
317 }
318
319
320 if (!pmix_globals.connected) {
321 PMIX_RELEASE_THREAD(&pmix_global_lock);
322 return PMIX_ERR_UNREACH;
323 }
324 PMIX_RELEASE_THREAD(&pmix_global_lock);
325
326
327 cd = PMIX_NEW(pmix_query_caddy_t);
328 cd->cbfunc = cbfunc;
329 cd->cbdata = cbdata;
330 msg = PMIX_NEW(pmix_buffer_t);
331 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
332 msg, &cmd, 1, PMIX_COMMAND);
333 if (PMIX_SUCCESS != rc) {
334 PMIX_ERROR_LOG(rc);
335 PMIX_RELEASE(msg);
336 PMIX_RELEASE(cd);
337 return rc;
338 }
339 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
340 msg, &nqueries, 1, PMIX_SIZE);
341 if (PMIX_SUCCESS != rc) {
342 PMIX_ERROR_LOG(rc);
343 PMIX_RELEASE(msg);
344 PMIX_RELEASE(cd);
345 return rc;
346 }
347 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
348 msg, queries, nqueries, PMIX_QUERY);
349 if (PMIX_SUCCESS != rc) {
350 PMIX_ERROR_LOG(rc);
351 PMIX_RELEASE(msg);
352 PMIX_RELEASE(cd);
353 return rc;
354 }
355
356 pmix_output_verbose(2, pmix_globals.debug_output,
357 "pmix:query sending to server");
358 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
359 msg, query_cbfunc, (void*)cd);
360 if (PMIX_SUCCESS != rc) {
361 PMIX_RELEASE(cd);
362 }
363 return rc;
364 }
365
366 static void acb(pmix_status_t status,
367 pmix_info_t *info, size_t ninfo,
368 void *cbdata,
369 pmix_release_cbfunc_t release_fn,
370 void *release_cbdata)
371 {
372 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
373 cb->status = status;
374 if (NULL != release_fn) {
375 release_fn(release_cbdata);
376 }
377 PMIX_WAKEUP_THREAD(&cb->lock);
378 }
379
380 PMIX_EXPORT pmix_status_t PMIx_Allocation_request(pmix_alloc_directive_t directive,
381 pmix_info_t *info, size_t ninfo)
382 {
383 pmix_cb_t cb;
384 pmix_status_t rc;
385
386 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
387
388 if (pmix_globals.init_cntr <= 0) {
389 PMIX_RELEASE_THREAD(&pmix_global_lock);
390 return PMIX_ERR_INIT;
391 }
392 PMIX_RELEASE_THREAD(&pmix_global_lock);
393
394 pmix_output_verbose(2, pmix_globals.debug_output,
395 "%s pmix:allocate", PMIX_NAME_PRINT(&pmix_globals.myid));
396
397
398
399
400 PMIX_CONSTRUCT(&cb, pmix_cb_t);
401 if (PMIX_SUCCESS != (rc = PMIx_Allocation_request_nb(directive, info, ninfo,
402 acb, &cb))) {
403 PMIX_DESTRUCT(&cb);
404 return rc;
405 }
406
407
408 PMIX_WAIT_THREAD(&cb.lock);
409 rc = cb.status;
410 PMIX_DESTRUCT(&cb);
411
412 pmix_output_verbose(2, pmix_globals.debug_output,
413 "pmix:allocate completed");
414
415 return rc;
416 }
417
418 PMIX_EXPORT pmix_status_t PMIx_Allocation_request_nb(pmix_alloc_directive_t directive,
419 pmix_info_t *info, size_t ninfo,
420 pmix_info_cbfunc_t cbfunc, void *cbdata)
421 {
422 pmix_buffer_t *msg;
423 pmix_cmd_t cmd = PMIX_ALLOC_CMD;
424 pmix_status_t rc;
425 pmix_query_caddy_t *cb;
426
427 pmix_output_verbose(2, pmix_globals.debug_output,
428 "pmix: allocate called");
429
430 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
431
432 if (pmix_globals.init_cntr <= 0) {
433 PMIX_RELEASE_THREAD(&pmix_global_lock);
434 return PMIX_ERR_INIT;
435 }
436
437
438
439 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
440 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
441 PMIX_RELEASE_THREAD(&pmix_global_lock);
442 if (NULL == pmix_host_server.allocate) {
443
444 return PMIX_ERR_NOT_SUPPORTED;
445 }
446 pmix_output_verbose(2, pmix_globals.debug_output,
447 "pmix:allocate handed to RM");
448 rc = pmix_host_server.allocate(&pmix_globals.myid,
449 directive,
450 info, ninfo,
451 cbfunc, cbdata);
452 return rc;
453 }
454
455
456
457
458 if (!pmix_globals.connected) {
459 PMIX_RELEASE_THREAD(&pmix_global_lock);
460 return PMIX_ERR_UNREACH;
461 }
462 PMIX_RELEASE_THREAD(&pmix_global_lock);
463
464 msg = PMIX_NEW(pmix_buffer_t);
465
466 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
467 msg, &cmd, 1, PMIX_COMMAND);
468 if (PMIX_SUCCESS != rc) {
469 PMIX_ERROR_LOG(rc);
470 PMIX_RELEASE(msg);
471 return rc;
472 }
473
474
475 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
476 msg, &directive, 1, PMIX_ALLOC_DIRECTIVE);
477 if (PMIX_SUCCESS != rc) {
478 PMIX_ERROR_LOG(rc);
479 PMIX_RELEASE(msg);
480 return rc;
481 }
482
483
484 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
485 msg, &ninfo, 1, PMIX_SIZE);
486 if (PMIX_SUCCESS != rc) {
487 PMIX_ERROR_LOG(rc);
488 PMIX_RELEASE(msg);
489 return rc;
490 }
491 if (0 < ninfo) {
492 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
493 msg, info, ninfo, PMIX_INFO);
494 if (PMIX_SUCCESS != rc) {
495 PMIX_ERROR_LOG(rc);
496 PMIX_RELEASE(msg);
497 return rc;
498 }
499 }
500
501
502
503
504 cb = PMIX_NEW(pmix_query_caddy_t);
505 cb->cbfunc = cbfunc;
506 cb->cbdata = cbdata;
507
508
509 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
510 msg, query_cbfunc, (void*)cb);
511 if (PMIX_SUCCESS != rc) {
512 PMIX_RELEASE(msg);
513 PMIX_RELEASE(cb);
514 }
515
516 return rc;
517 }