This source file includes following definitions.
- test_init
- test_finalize
- allocate
- setup_local_network
- setup_fork
- child_finalized
- local_app_finalized
- deregister_nspace
- collect_inventory
- deliver_inventory
1
2
3
4
5
6
7
8
9
10
11
12 #include <src/include/pmix_config.h>
13
14 #include <string.h>
15 #ifdef HAVE_UNISTD_H
16 #include <unistd.h>
17 #endif
18 #ifdef HAVE_SYS_TYPES_H
19 #include <sys/types.h>
20 #endif
21 #ifdef HAVE_SYS_STAT_H
22 #include <sys/stat.h>
23 #endif
24 #ifdef HAVE_FCNTL_H
25 #include <fcntl.h>
26 #endif
27 #include <time.h>
28
29 #include <pmix_common.h>
30
31 #include "src/mca/base/pmix_mca_base_var.h"
32 #include "src/include/pmix_socket_errno.h"
33 #include "src/include/pmix_globals.h"
34 #include "src/class/pmix_list.h"
35 #include "src/util/alfg.h"
36 #include "src/util/argv.h"
37 #include "src/util/error.h"
38 #include "src/util/name_fns.h"
39 #include "src/util/output.h"
40 #include "src/util/pmix_environ.h"
41 #include "src/mca/preg/preg.h"
42
43 #include "src/mca/pnet/pnet.h"
44 #include "src/mca/pnet/base/base.h"
45 #include "pnet_test.h"
46
47 static pmix_status_t test_init(void);
48 static void test_finalize(void);
49 static pmix_status_t allocate(pmix_namespace_t *nptr,
50 pmix_info_t info[], size_t ninfo,
51 pmix_list_t *ilist);
52 static pmix_status_t setup_local_network(pmix_namespace_t *nptr,
53 pmix_info_t info[],
54 size_t ninfo);
55 static pmix_status_t setup_fork(pmix_namespace_t *nptr,
56 const pmix_proc_t *proc,
57 char ***env);
58 static void child_finalized(pmix_proc_t *peer);
59 static void local_app_finalized(pmix_namespace_t *nptr);
60 static void deregister_nspace(pmix_namespace_t *nptr);
61 static pmix_status_t collect_inventory(pmix_info_t directives[], size_t ndirs,
62 pmix_inventory_cbfunc_t cbfunc, void *cbdata);
63 static pmix_status_t deliver_inventory(pmix_info_t info[], size_t ninfo,
64 pmix_info_t directives[], size_t ndirs,
65 pmix_op_cbfunc_t cbfunc, void *cbdata);
66
67 pmix_pnet_module_t pmix_test_module = {
68 .name = "test",
69 .init = test_init,
70 .finalize = test_finalize,
71 .allocate = allocate,
72 .setup_local_network = setup_local_network,
73 .setup_fork = setup_fork,
74 .child_finalized = child_finalized,
75 .local_app_finalized = local_app_finalized,
76 .deregister_nspace = deregister_nspace,
77 .collect_inventory = collect_inventory,
78 .deliver_inventory = deliver_inventory
79 };
80
81 static pmix_status_t test_init(void)
82 {
83 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
84 "pnet: test init");
85 return PMIX_SUCCESS;
86 }
87
88 static void test_finalize(void)
89 {
90 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
91 "pnet: test finalize");
92 }
93
94
95
96
97 static pmix_status_t allocate(pmix_namespace_t *nptr,
98 pmix_info_t info[], size_t ninfo,
99 pmix_list_t *ilist)
100 {
101 pmix_kval_t *kv;
102 bool seckey = false, envars = false;
103 pmix_list_t mylist;
104 size_t n, nreqs=0;
105 pmix_info_t *requests = NULL;
106 char *idkey = NULL;
107 uint64_t unique_key = 12345;
108 pmix_buffer_t buf;
109 pmix_status_t rc;
110 pmix_pnet_job_t *jptr, *job;
111 pmix_pnet_node_t *nd;
112 pmix_pnet_local_procs_t *lptr, *lp;
113
114 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
115 "pnet:test:allocate for nspace %s key %s",
116 nptr->nspace, info->key);
117
118
119
120 if (!PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
121 return PMIX_SUCCESS;
122 }
123
124 if (NULL == info) {
125 return PMIX_ERR_TAKE_NEXT_OPTION;
126 }
127
128
129 for (n=0; n < ninfo; n++) {
130 if (PMIX_CHECK_KEY(&info[n], PMIX_SETUP_APP_ENVARS) ||
131 PMIX_CHECK_KEY(&info[n], PMIX_SETUP_APP_ALL)) {
132 envars = PMIX_INFO_TRUE(&info[n]);
133 } else if (PMIX_CHECK_KEY(&info[n], PMIX_ALLOC_NETWORK_ID)) {
134
135
136
137 if (PMIX_DATA_ARRAY != info->value.type ||
138 NULL == info->value.data.darray ||
139 PMIX_INFO != info->value.data.darray->type ||
140 NULL == info->value.data.darray->array) {
141
142 goto process;
143 }
144 requests = (pmix_info_t*)info->value.data.darray->array;
145 nreqs = info->value.data.darray->size;
146 }
147 }
148
149 if (envars) {
150 kv = PMIX_NEW(pmix_kval_t);
151 if (NULL == kv) {
152 return PMIX_ERR_NOMEM;
153 }
154 kv->key = strdup(PMIX_SET_ENVAR);
155 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
156 if (NULL == kv->value) {
157 PMIX_RELEASE(kv);
158 return PMIX_ERR_NOMEM;
159 }
160 kv->value->type = PMIX_ENVAR;
161 PMIX_ENVAR_LOAD(&kv->value->data.envar, "PMIX_TEST_ENVAR", "1", ':');
162 pmix_list_append(ilist, &kv->super);
163 }
164
165 if (NULL == requests) {
166 return PMIX_ERR_TAKE_NEXT_OPTION;
167 }
168
169 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
170 "pnet:test:allocate alloc_network for nspace %s",
171 nptr->nspace);
172
173
174 for (n=0; n < nreqs; n++) {
175 if (0 == strncmp(requests[n].key, PMIX_ALLOC_NETWORK_ID, PMIX_MAX_KEYLEN)) {
176
177 if (PMIX_STRING != requests[n].value.type ||
178 NULL == requests[n].value.data.string) {
179 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
180 return PMIX_ERR_BAD_PARAM;
181 }
182 idkey = requests[n].value.data.string;
183 } else if (0 == strncasecmp(requests[n].key, PMIX_ALLOC_NETWORK_SEC_KEY, PMIX_MAX_KEYLEN)) {
184 seckey = PMIX_INFO_TRUE(&requests[n]);
185 }
186 }
187
188 process:
189
190 if (NULL == idkey) {
191 idkey = "TESTKEY";
192 }
193 PMIX_CONSTRUCT(&mylist, pmix_list_t);
194
195
196 kv = PMIX_NEW(pmix_kval_t);
197 if (NULL == kv) {
198 return PMIX_ERR_NOMEM;
199 }
200 kv->key = strdup(PMIX_ALLOC_NETWORK_ID);
201 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
202 if (NULL == kv->value) {
203 PMIX_RELEASE(kv);
204 return PMIX_ERR_NOMEM;
205 }
206 kv->value->type = PMIX_STRING;
207 kv->value->data.string = strdup(idkey);
208 pmix_list_append(&mylist, &kv->super);
209
210 if (seckey) {
211 kv = PMIX_NEW(pmix_kval_t);
212 if (NULL == kv) {
213 return PMIX_ERR_NOMEM;
214 }
215 kv->key = strdup(PMIX_ALLOC_NETWORK_SEC_KEY);
216 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
217 if (NULL == kv->value) {
218 PMIX_RELEASE(kv);
219 return PMIX_ERR_NOMEM;
220 }
221 kv->value->type = PMIX_BYTE_OBJECT;
222 kv->value->data.bo.bytes = (char*)malloc(sizeof(uint64_t));
223 if (NULL == kv->value->data.bo.bytes) {
224 PMIX_RELEASE(kv);
225 return PMIX_ERR_NOMEM;
226 }
227 memcpy(kv->value->data.bo.bytes, &unique_key, sizeof(uint64_t));
228 kv->value->data.bo.size = sizeof(uint64_t);
229 pmix_list_append(&mylist, &kv->super);
230 }
231
232
233 job = NULL;
234 PMIX_LIST_FOREACH(jptr, &pmix_pnet_globals.jobs, pmix_pnet_job_t) {
235 if (0 == strcmp(jptr->nspace, nptr->nspace)) {
236 job = jptr;
237 break;
238 }
239 }
240 if (NULL != job) {
241 pmix_output(0, "ALLOCATE RESOURCES FOR JOB %s", job->nspace);
242 for (n=0; (int)n < job->nodes.size; n++) {
243 if (NULL == (nd = (pmix_pnet_node_t*)pmix_pointer_array_get_item(&job->nodes, n))) {
244 continue;
245 }
246 lp = NULL;
247 PMIX_LIST_FOREACH(lptr, &nd->local_jobs, pmix_pnet_local_procs_t) {
248 if (0 == strcmp(job->nspace, lptr->nspace)) {
249 lp = lptr;
250 break;
251 }
252 }
253 if (NULL == lp) {
254 pmix_output(0, "\t NODE %s 0 RANKS", nd->name);
255 } else {
256 pmix_output(0, "\tNODE %s %d RANKS", nd->name, (int)lp->np);
257 }
258 }
259 }
260
261 n = pmix_list_get_size(&mylist);
262 if (0 < n) {
263 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
264
265 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, &n, 1, PMIX_SIZE);
266
267 while (NULL != (kv = (pmix_kval_t*)pmix_list_remove_first(&mylist))) {
268 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv, 1, PMIX_KVAL);
269 PMIX_RELEASE(kv);
270 if (PMIX_SUCCESS != rc) {
271 PMIX_DESTRUCT(&buf);
272 PMIX_LIST_DESTRUCT(&mylist);
273 return rc;
274 }
275 }
276 PMIX_LIST_DESTRUCT(&mylist);
277 kv = PMIX_NEW(pmix_kval_t);
278 kv->key = strdup("pmix-pnet-test-blob");
279 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
280 if (NULL == kv->value) {
281 PMIX_RELEASE(kv);
282 PMIX_DESTRUCT(&buf);
283 return PMIX_ERR_NOMEM;
284 }
285 kv->value->type = PMIX_BYTE_OBJECT;
286 PMIX_UNLOAD_BUFFER(&buf, kv->value->data.bo.bytes, kv->value->data.bo.size);
287 PMIX_DESTRUCT(&buf);
288 pmix_list_append(ilist, &kv->super);
289 }
290
291 return PMIX_SUCCESS;
292 }
293
294 static pmix_status_t setup_local_network(pmix_namespace_t *nptr,
295 pmix_info_t info[],
296 size_t ninfo)
297 {
298 size_t n, m, nkvals;
299 char *nodestring, **nodes;
300 pmix_proc_t *procs;
301 size_t nprocs;
302 pmix_buffer_t bkt;
303 int32_t cnt;
304 pmix_kval_t *kv;
305 pmix_status_t rc;
306 pmix_info_t *jinfo, stinfo;
307 char *idkey = NULL;
308
309 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
310 "pnet:test:setup_local_network");
311
312
313 pmix_output(0, "pnet:test setup_local_network NSPACE %s", (NULL == nptr) ? "NULL" : nptr->nspace);
314 if (NULL == nptr) {
315 return PMIX_SUCCESS;
316 }
317 pmix_preg.resolve_nodes(nptr->nspace, &nodestring);
318 if (NULL == nodestring) {
319 return PMIX_SUCCESS;
320 }
321 pmix_preg.parse_nodes(nodestring, &nodes);
322 pmix_output(0, "pnet:test setup_local_network NODES %s", (NULL == nodes) ? "NULL" : "NON-NULL");
323 if (NULL == nodes) {
324 free(nodestring);
325 return PMIX_SUCCESS;
326 }
327 for (n=0; NULL != nodes[n]; n++) {
328 pmix_output(0, "pnet:test setup_local_network NODE: %s", nodes[n]);
329 }
330
331 for (n=0; NULL != nodes[n]; n++) {
332
333 pmix_preg.resolve_peers(nodes[n], nptr->nspace, &procs, &nprocs);
334 if (NULL == procs) {
335 continue;
336 }
337 for (m=0; m < nprocs; m++) {
338 pmix_output(0, "pnet:test setup_local_network NODE %s: peer %s:%d", nodes[n], procs[m].nspace, procs[m].rank);
339 }
340
341 free(procs);
342 }
343
344 if (NULL != info) {
345 for (n=0; n < ninfo; n++) {
346
347 if (0 == strncmp(info[n].key, "pmix-pnet-test-blob", PMIX_MAX_KEYLEN)) {
348
349 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &bkt,
350 info[n].value.data.bo.bytes,
351 info[n].value.data.bo.size);
352
353 cnt = 1;
354 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
355 &bkt, &nkvals, &cnt, PMIX_SIZE);
356
357 PMIX_INFO_CONSTRUCT(&stinfo);
358 pmix_strncpy(stinfo.key, idkey, PMIX_MAX_KEYLEN);
359 stinfo.value.type = PMIX_DATA_ARRAY;
360 PMIX_DATA_ARRAY_CREATE(stinfo.value.data.darray, nkvals, PMIX_INFO);
361 jinfo = (pmix_info_t*)stinfo.value.data.darray->array;
362
363
364 kv = PMIX_NEW(pmix_kval_t);
365 cnt = 1;
366 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
367 &bkt, kv, &cnt, PMIX_KVAL);
368 m = 0;
369 while (PMIX_SUCCESS == rc) {
370 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
371 "recvd KEY %s %s", kv->key,
372 (PMIX_STRING == kv->value->type) ? kv->value->data.string : "NON-STRING");
373
374 pmix_strncpy(jinfo[m].key, kv->key, PMIX_MAX_KEYLEN);
375 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
376 &jinfo[m].value, kv->value);
377
378 if (NULL == idkey &&
379 0 == strncmp(kv->key, PMIX_ALLOC_NETWORK_ID, PMIX_MAX_KEYLEN)) {
380 idkey = strdup(kv->value->data.string);
381 }
382 ++m;
383 PMIX_RELEASE(kv);
384 kv = PMIX_NEW(pmix_kval_t);
385 cnt = 1;
386 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
387 &bkt, kv, &cnt, PMIX_KVAL);
388 }
389
390 info[n].value.data.bo.bytes = bkt.base_ptr;
391 info[n].value.data.bo.size = bkt.bytes_used;
392 bkt.base_ptr = NULL;
393 bkt.bytes_used = 0;
394
395
396 if (NULL == idkey) {
397 PMIX_INFO_FREE(jinfo, nkvals);
398 return PMIX_ERR_BAD_PARAM;
399 }
400
401 PMIX_GDS_CACHE_JOB_INFO(rc, pmix_globals.mypeer, nptr,
402 &stinfo, 1);
403 PMIX_INFO_DESTRUCT(&stinfo);
404 }
405 }
406 }
407 if (NULL != idkey) {
408 free(idkey);
409 }
410 return PMIX_SUCCESS;
411 }
412
413 static pmix_status_t setup_fork(pmix_namespace_t *nptr,
414 const pmix_proc_t *proc,
415 char ***env)
416 {
417 pmix_cb_t cb;
418 pmix_status_t rc;
419 pmix_kval_t *kv;
420 uint16_t localrank;
421
422 PMIX_CONSTRUCT(&cb, pmix_cb_t);
423
424 cb.key = strdup(PMIX_LOCAL_RANK);
425
426 cb.copy = false;
427
428 cb.scope = PMIX_SCOPE_UNDEF;
429
430 cb.proc = (pmix_proc_t*)proc;
431
432 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
433 if (PMIX_SUCCESS != rc) {
434 if (PMIX_ERR_INVALID_NAMESPACE != rc) {
435 PMIX_ERROR_LOG(rc);
436 }
437 PMIX_DESTRUCT(&cb);
438 return rc;
439 }
440
441 if (1 != pmix_list_get_size(&cb.kvs)) {
442 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
443 PMIX_DESTRUCT(&cb);
444 return PMIX_ERR_BAD_PARAM;
445 }
446 kv = (pmix_kval_t*)pmix_list_get_first(&cb.kvs);
447 if (PMIX_UINT16 != kv->value->type) {
448 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
449 PMIX_DESTRUCT(&cb);
450 return PMIX_ERR_BAD_PARAM;
451 }
452 localrank = kv->value->data.uint16;
453
454 pmix_output(0, "pnet:test LOCAL RANK FOR PROC %s: %d", PMIX_NAME_PRINT(proc), (int)localrank);
455
456 PMIX_DESTRUCT(&cb);
457 return PMIX_SUCCESS;
458 }
459
460 static void child_finalized(pmix_proc_t *peer)
461 {
462 pmix_output(0, "pnet:test CHILD %s:%d FINALIZED",
463 peer->nspace, peer->rank);
464 }
465
466 static void local_app_finalized(pmix_namespace_t *nptr)
467 {
468 pmix_output(0, "pnet:test NSPACE %s LOCALLY FINALIZED", nptr->nspace);
469 }
470
471 static void deregister_nspace(pmix_namespace_t *nptr)
472 {
473 pmix_output(0, "pnet:test DEREGISTER NSPACE %s", nptr->nspace);
474 }
475
476 static pmix_status_t collect_inventory(pmix_info_t directives[], size_t ndirs,
477 pmix_inventory_cbfunc_t cbfunc, void *cbdata)
478 {
479 pmix_output(0, "pnet:test COLLECT INVENTORY");
480 return PMIX_ERR_NOT_SUPPORTED;
481 }
482
483 static pmix_status_t deliver_inventory(pmix_info_t info[], size_t ninfo,
484 pmix_info_t directives[], size_t ndirs,
485 pmix_op_cbfunc_t cbfunc, void *cbdata)
486 {
487 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
488 "pnet:test deliver inventory");
489
490 return PMIX_ERR_NOT_SUPPORTED;
491 }