This source file includes following definitions.
- pmix_pnet_base_allocate
- pmix_pnet_base_setup_local_network
- pmix_pnet_base_setup_fork
- pmix_pnet_base_child_finalized
- pmix_pnet_base_local_app_finalized
- pmix_pnet_base_deregister_nspace
- cicbfunc
- pmix_pnet_base_collect_inventory
- dlcbfunc
- pmix_pnet_base_deliver_inventory
- pmix_pnet_base_harvest_envars
- process_maps
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include <src/include/pmix_config.h>
17
18 #include <pmix_common.h>
19 #include "src/include/pmix_globals.h"
20
21 #include "src/class/pmix_list.h"
22 #include "src/mca/preg/preg.h"
23 #include "src/util/argv.h"
24 #include "src/util/error.h"
25 #include "src/util/pmix_environ.h"
26 #include "src/server/pmix_server_ops.h"
27
28 #include "src/mca/pnet/base/base.h"
29
30
31 static pmix_status_t process_maps(char *nspace, char *nregex, char *pregex);
32
33
34
35
36
37 pmix_status_t pmix_pnet_base_allocate(char *nspace,
38 pmix_info_t info[], size_t ninfo,
39 pmix_list_t *ilist)
40 {
41 pmix_pnet_base_active_module_t *active;
42 pmix_status_t rc = PMIX_SUCCESS;
43 pmix_namespace_t *nptr, *ns;
44 size_t n;
45 char *nregex, *pregex;
46 char *params[2] = {"PMIX_MCA_", NULL};
47
48 if (!pmix_pnet_globals.initialized) {
49 return PMIX_ERR_INIT;
50 }
51
52 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
53 "pnet:allocate called");
54
55
56 if (NULL == nspace || NULL == ilist) {
57 return PMIX_ERR_BAD_PARAM;
58 }
59 if (PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
60 nptr = NULL;
61
62
63 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
64 if (0 == strcmp(ns->nspace, nspace)) {
65 nptr = ns;
66 break;
67 }
68 }
69 if (NULL == nptr) {
70
71 nptr = PMIX_NEW(pmix_namespace_t);
72 if (NULL == nptr) {
73 return PMIX_ERR_NOMEM;
74 }
75 nptr->nspace = strdup(nspace);
76 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
77 }
78
79 if (NULL != info) {
80
81 nregex = NULL;
82 pregex = NULL;
83 for (n=0; n < ninfo; n++) {
84 if (0 == strncmp(info[n].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN)) {
85 nregex = info[n].value.data.string;
86 } else if (0 == strncmp(info[n].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN)) {
87 pregex = info[n].value.data.string;
88 }
89 }
90 if (NULL != nregex && NULL != pregex) {
91
92
93
94
95
96
97
98 rc = process_maps(nspace, nregex, pregex);
99 if (PMIX_SUCCESS != rc) {
100 return rc;
101 }
102 }
103
104 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
105 if (NULL != active->module->allocate) {
106 if (PMIX_SUCCESS == (rc = active->module->allocate(nptr, info, ninfo, ilist))) {
107 break;
108 }
109 if (PMIX_ERR_TAKE_NEXT_OPTION != rc) {
110
111 return rc;
112 }
113 }
114 }
115 }
116 }
117
118
119 rc = pmix_pnet_base_harvest_envars(params, NULL, ilist);
120
121 return rc;
122 }
123
124
125 pmix_status_t pmix_pnet_base_setup_local_network(char *nspace,
126 pmix_info_t info[],
127 size_t ninfo)
128 {
129 pmix_pnet_base_active_module_t *active;
130 pmix_status_t rc;
131 pmix_namespace_t *nptr, *ns;
132
133 if (!pmix_pnet_globals.initialized) {
134 return PMIX_ERR_INIT;
135 }
136
137 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
138 "pnet: setup_local_network called");
139
140
141 if (NULL == nspace) {
142 return PMIX_ERR_BAD_PARAM;
143 }
144
145
146 nptr = NULL;
147 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
148 if (0 == strcmp(ns->nspace, nspace)) {
149 nptr = ns;
150 break;
151 }
152 }
153 if (NULL == nptr) {
154
155 nptr = PMIX_NEW(pmix_namespace_t);
156 if (NULL == nptr) {
157 return PMIX_ERR_NOMEM;
158 }
159 nptr->nspace = strdup(nspace);
160 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
161 }
162
163 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
164 if (NULL != active->module->setup_local_network) {
165 if (PMIX_SUCCESS != (rc = active->module->setup_local_network(nptr, info, ninfo))) {
166 return rc;
167 }
168 }
169 }
170
171 return PMIX_SUCCESS;
172 }
173
174
175 pmix_status_t pmix_pnet_base_setup_fork(const pmix_proc_t *proc, char ***env)
176 {
177 pmix_pnet_base_active_module_t *active;
178 pmix_status_t rc;
179 pmix_namespace_t *nptr, *ns;
180
181 if (!pmix_pnet_globals.initialized) {
182 return PMIX_ERR_INIT;
183 }
184
185
186 if (NULL == proc || NULL == env) {
187 return PMIX_ERR_BAD_PARAM;
188 }
189
190
191 nptr = NULL;
192 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
193 if (0 == strcmp(ns->nspace, proc->nspace)) {
194 nptr = ns;
195 break;
196 }
197 }
198 if (NULL == nptr) {
199
200 nptr = PMIX_NEW(pmix_namespace_t);
201 if (NULL == nptr) {
202 return PMIX_ERR_NOMEM;
203 }
204 nptr->nspace = strdup(proc->nspace);
205 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
206 }
207
208 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
209 if (NULL != active->module->setup_fork) {
210 if (PMIX_SUCCESS != (rc = active->module->setup_fork(nptr, proc, env))) {
211 return rc;
212 }
213 }
214 }
215
216 return PMIX_SUCCESS;
217 }
218
219 void pmix_pnet_base_child_finalized(pmix_proc_t *peer)
220 {
221 pmix_pnet_base_active_module_t *active;
222
223 if (!pmix_pnet_globals.initialized) {
224 return;
225 }
226
227
228 if (NULL == peer) {
229 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
230 return;
231 }
232
233 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
234 if (NULL != active->module->child_finalized) {
235 active->module->child_finalized(peer);
236 }
237 }
238
239 return;
240 }
241
242 void pmix_pnet_base_local_app_finalized(pmix_namespace_t *nptr)
243 {
244 pmix_pnet_base_active_module_t *active;
245
246 if (!pmix_pnet_globals.initialized) {
247 return;
248 }
249
250
251 if (NULL == nptr) {
252 return;
253 }
254
255 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
256 if (NULL != active->module->local_app_finalized) {
257 active->module->local_app_finalized(nptr);
258 }
259 }
260
261 return;
262 }
263
264 void pmix_pnet_base_deregister_nspace(char *nspace)
265 {
266 pmix_pnet_base_active_module_t *active;
267 pmix_namespace_t *nptr, *ns;
268 pmix_pnet_job_t *job;
269 pmix_pnet_node_t *node;
270
271 if (!pmix_pnet_globals.initialized) {
272 return;
273 }
274
275
276 if (NULL == nspace) {
277 return;
278 }
279
280
281 nptr = NULL;
282 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
283 if (0 == strcmp(ns->nspace, nspace)) {
284 nptr = ns;
285 break;
286 }
287 }
288 if (NULL == nptr) {
289
290 return;
291 }
292
293 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
294 if (NULL != active->module->deregister_nspace) {
295 active->module->deregister_nspace(nptr);
296 }
297 }
298
299 PMIX_LIST_FOREACH(job, &pmix_pnet_globals.jobs, pmix_pnet_job_t) {
300 if (0 == strcmp(nspace, job->nspace)) {
301 pmix_list_remove_item(&pmix_pnet_globals.jobs, &job->super);
302 PMIX_RELEASE(job);
303 break;
304 }
305 }
306
307 PMIX_LIST_FOREACH(node, &pmix_pnet_globals.nodes, pmix_pnet_node_t) {
308 pmix_pnet_local_procs_t *lp;
309 PMIX_LIST_FOREACH(lp, &node->local_jobs, pmix_pnet_local_procs_t) {
310 if (0 == strcmp(nspace, lp->nspace)) {
311 pmix_list_remove_item(&node->local_jobs, &lp->super);
312 PMIX_RELEASE(lp);
313 break;
314 }
315 }
316 }
317 }
318
319 static void cicbfunc(pmix_status_t status,
320 pmix_list_t *inventory,
321 void *cbdata)
322 {
323 pmix_inventory_rollup_t *rollup = (pmix_inventory_rollup_t*)cbdata;
324 pmix_kval_t *kv;
325
326 PMIX_ACQUIRE_THREAD(&rollup->lock);
327
328 if (PMIX_SUCCESS != status && PMIX_SUCCESS == rollup->status) {
329 rollup->status = status;
330 }
331
332 if (NULL != inventory) {
333 while (NULL != (kv = (pmix_kval_t*)pmix_list_remove_first(inventory))) {
334 pmix_list_append(&rollup->payload, &kv->super);
335 }
336 }
337
338 rollup->replies++;
339
340 if (rollup->replies < rollup->requests) {
341
342 PMIX_RELEASE_THREAD(&rollup->lock);
343 return;
344 }
345
346
347 PMIX_RELEASE_THREAD(&rollup->lock);
348 if (NULL != rollup->cbfunc) {
349 rollup->cbfunc(rollup->status, &rollup->payload, rollup->cbdata);
350 }
351 PMIX_RELEASE(rollup);
352 return;
353 }
354
355 void pmix_pnet_base_collect_inventory(pmix_info_t directives[], size_t ndirs,
356 pmix_inventory_cbfunc_t cbfunc, void *cbdata)
357 {
358 pmix_pnet_base_active_module_t *active;
359 pmix_inventory_rollup_t *myrollup;
360 pmix_status_t rc;
361
362
363
364
365
366
367
368
369 if (!pmix_pnet_globals.initialized) {
370
371 if (NULL != cbfunc) {
372 cbfunc(PMIX_ERR_INIT, NULL, cbdata);
373 }
374 return;
375 }
376
377 myrollup = PMIX_NEW(pmix_inventory_rollup_t);
378 if (NULL == myrollup) {
379
380 if (NULL != cbfunc) {
381 cbfunc(PMIX_ERR_NOMEM, NULL, cbdata);
382 }
383 return;
384 }
385 myrollup->cbfunc = cbfunc;
386 myrollup->cbdata = cbdata;
387
388
389
390
391 PMIX_ACQUIRE_THREAD(&myrollup->lock);
392
393 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
394 if (NULL != active->module->collect_inventory) {
395 pmix_output_verbose(5, pmix_pnet_base_framework.framework_output,
396 "COLLECTING %s", active->module->name);
397 rc = active->module->collect_inventory(directives, ndirs, cicbfunc, (void*)myrollup);
398
399
400
401 if (PMIX_OPERATION_IN_PROGRESS == rc) {
402 myrollup->requests++;
403 } else if (PMIX_SUCCESS != rc &&
404 PMIX_ERR_TAKE_NEXT_OPTION != rc &&
405 PMIX_ERR_NOT_SUPPORTED != rc) {
406
407
408
409 if (PMIX_SUCCESS == myrollup->status) {
410 myrollup->status = rc;
411 }
412 }
413 }
414 }
415 if (0 == myrollup->requests) {
416
417 PMIX_RELEASE_THREAD(&myrollup->lock);
418 if (NULL != cbfunc) {
419 cbfunc(myrollup->status, &myrollup->payload, cbdata);
420 }
421 PMIX_RELEASE(myrollup);
422 return;
423 }
424
425 PMIX_RELEASE_THREAD(&myrollup->lock);
426 return;
427 }
428
429 static void dlcbfunc(pmix_status_t status,
430 void *cbdata)
431 {
432 pmix_inventory_rollup_t *rollup = (pmix_inventory_rollup_t*)cbdata;
433
434 PMIX_ACQUIRE_THREAD(&rollup->lock);
435
436 if (PMIX_SUCCESS != status && PMIX_SUCCESS == rollup->status) {
437 rollup->status = status;
438 }
439
440 rollup->replies++;
441
442 if (rollup->replies < rollup->requests) {
443
444 PMIX_RELEASE_THREAD(&rollup->lock);
445 return;
446 }
447
448
449 PMIX_RELEASE_THREAD(&rollup->lock);
450 if (NULL != rollup->opcbfunc) {
451 rollup->opcbfunc(rollup->status, rollup->cbdata);
452 }
453 PMIX_RELEASE(rollup);
454 return;
455 }
456
457 void pmix_pnet_base_deliver_inventory(pmix_info_t info[], size_t ninfo,
458 pmix_info_t directives[], size_t ndirs,
459 pmix_op_cbfunc_t cbfunc, void *cbdata)
460 {
461 pmix_pnet_base_active_module_t *active;
462 pmix_inventory_rollup_t *myrollup;
463 pmix_status_t rc;
464
465
466
467
468
469
470
471
472 if (!pmix_pnet_globals.initialized) {
473
474 if (NULL != cbfunc) {
475 cbfunc(PMIX_ERR_INIT, cbdata);
476 }
477 return;
478 }
479
480 myrollup = PMIX_NEW(pmix_inventory_rollup_t);
481 if (NULL == myrollup) {
482
483 if (NULL != cbfunc) {
484 cbfunc(PMIX_ERR_NOMEM, cbdata);
485 }
486 return;
487 }
488 myrollup->opcbfunc = cbfunc;
489 myrollup->cbdata = cbdata;
490
491
492
493
494 PMIX_ACQUIRE_THREAD(&myrollup->lock);
495
496 PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
497 if (NULL != active->module->deliver_inventory) {
498 pmix_output_verbose(5, pmix_pnet_base_framework.framework_output,
499 "DELIVERING TO %s", active->module->name);
500 rc = active->module->deliver_inventory(info, ninfo, directives, ndirs, dlcbfunc, (void*)myrollup);
501
502
503 if (PMIX_OPERATION_IN_PROGRESS == rc) {
504 myrollup->requests++;
505 } else if (PMIX_SUCCESS != rc &&
506 PMIX_ERR_TAKE_NEXT_OPTION != rc &&
507 PMIX_ERR_NOT_SUPPORTED != rc) {
508
509
510
511 if (PMIX_SUCCESS == myrollup->status) {
512 myrollup->status = rc;
513 }
514 }
515 }
516 }
517 if (0 == myrollup->requests) {
518
519 PMIX_RELEASE_THREAD(&myrollup->lock);
520 if (NULL != cbfunc) {
521 cbfunc(myrollup->status, cbdata);
522 }
523 PMIX_RELEASE(myrollup);
524 return;
525 }
526
527 PMIX_RELEASE_THREAD(&myrollup->lock);
528 return;
529 }
530
531 pmix_status_t pmix_pnet_base_harvest_envars(char **incvars, char **excvars,
532 pmix_list_t *ilist)
533 {
534 int i, j;
535 size_t len;
536 pmix_kval_t *kv, *next;
537 char *cs_env, *string_key;
538
539
540 for (j=0; NULL != incvars[j]; j++) {
541 len = strlen(incvars[j]);
542 if ('*' == incvars[j][len-1]) {
543 --len;
544 }
545 for (i = 0; NULL != environ[i]; ++i) {
546 if (0 == strncmp(environ[i], incvars[j], len)) {
547 cs_env = strdup(environ[i]);
548 kv = PMIX_NEW(pmix_kval_t);
549 if (NULL == kv) {
550 free(cs_env);
551 return PMIX_ERR_OUT_OF_RESOURCE;
552 }
553 kv->key = strdup(PMIX_SET_ENVAR);
554 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
555 if (NULL == kv->value) {
556 PMIX_RELEASE(kv);
557 free(cs_env);
558 return PMIX_ERR_OUT_OF_RESOURCE;
559 }
560 kv->value->type = PMIX_ENVAR;
561 string_key = strchr(cs_env, '=');
562 if (NULL == string_key) {
563 free(cs_env);
564 PMIX_RELEASE(kv);
565 return PMIX_ERR_BAD_PARAM;
566 }
567 *string_key = '\0';
568 ++string_key;
569 pmix_output_verbose(5, pmix_pnet_base_framework.framework_output,
570 "pnet: adding envar %s", cs_env);
571 PMIX_ENVAR_LOAD(&kv->value->data.envar, cs_env, string_key, ':');
572 pmix_list_append(ilist, &kv->super);
573 free(cs_env);
574 }
575 }
576 }
577
578
579 if (NULL != excvars) {
580 for (j=0; NULL != excvars[j]; j++) {
581 len = strlen(excvars[j]);
582 if ('*' == excvars[j][len-1]) {
583 --len;
584 }
585 PMIX_LIST_FOREACH_SAFE(kv, next, ilist, pmix_kval_t) {
586 if (0 == strncmp(kv->value->data.envar.envar, excvars[j], len)) {
587 pmix_output_verbose(5, pmix_pnet_base_framework.framework_output,
588 "pnet: excluding envar %s", kv->value->data.envar.envar);
589 pmix_list_remove_item(ilist, &kv->super);
590 PMIX_RELEASE(kv);
591 }
592 }
593 }
594 }
595 return PMIX_SUCCESS;
596 }
597
598 static pmix_status_t process_maps(char *nspace, char *nregex, char *pregex)
599 {
600 char **nodes, **procs, **ranks;
601 pmix_status_t rc;
602 size_t m, n;
603 pmix_pnet_job_t *jptr, *job;
604 pmix_pnet_node_t *nd, *ndptr;
605 pmix_pnet_local_procs_t *lp;
606 bool needcheck;
607
608 PMIX_ACQUIRE_THREAD(&pmix_pnet_globals.lock);
609
610
611 if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(nregex, &nodes))) {
612 PMIX_ERROR_LOG(rc);
613 PMIX_RELEASE_THREAD(&pmix_pnet_globals.lock);
614 return rc;
615 }
616
617
618 if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(pregex, &procs))) {
619 PMIX_ERROR_LOG(rc);
620 pmix_argv_free(nodes);
621 PMIX_RELEASE_THREAD(&pmix_pnet_globals.lock);
622 return rc;
623 }
624
625
626 job = NULL;
627 if (0 < pmix_list_get_size(&pmix_pnet_globals.jobs)) {
628 PMIX_LIST_FOREACH(jptr, &pmix_pnet_globals.jobs, pmix_pnet_job_t) {
629 if (0 == strcmp(nspace, jptr->nspace)) {
630 job = jptr;
631 break;
632 }
633 }
634 }
635 if (NULL == job) {
636 job = PMIX_NEW(pmix_pnet_job_t);
637 job->nspace = strdup(nspace);
638 pmix_list_append(&pmix_pnet_globals.jobs, &job->super);
639 }
640
641 if (0 < pmix_list_get_size(&pmix_pnet_globals.nodes)) {
642 needcheck = true;
643 } else {
644 needcheck = false;
645 }
646 for (n=0; NULL != nodes[n]; n++) {
647 if (needcheck) {
648
649 nd = NULL;
650 PMIX_LIST_FOREACH(ndptr, &pmix_pnet_globals.nodes, pmix_pnet_node_t) {
651 if (0 == strcmp(nodes[n], ndptr->name)) {
652 nd = ndptr;
653 break;
654 }
655 }
656 if (NULL == nd) {
657 nd = PMIX_NEW(pmix_pnet_node_t);
658 nd->name = strdup(nodes[n]);
659 pmix_list_append(&pmix_pnet_globals.nodes, &nd->super);
660
661 PMIX_RETAIN(nd);
662 nd->index = pmix_pointer_array_add(&job->nodes, nd);
663 }
664 } else {
665 nd = PMIX_NEW(pmix_pnet_node_t);
666 nd->name = strdup(nodes[n]);
667 pmix_list_append(&pmix_pnet_globals.nodes, &nd->super);
668
669 PMIX_RETAIN(nd);
670 nd->index = pmix_pointer_array_add(&job->nodes, nd);
671 }
672
673 PMIX_LIST_FOREACH(lp, &nd->local_jobs, pmix_pnet_local_procs_t) {
674 if (0 == strcmp(nspace, lp->nspace)) {
675
676
677 pmix_list_remove_item(&nd->local_jobs, &lp->super);
678 PMIX_RELEASE(lp);
679 break;
680 }
681 }
682
683 lp = PMIX_NEW(pmix_pnet_local_procs_t);
684 lp->nspace = strdup(nspace);
685
686
687 ranks = pmix_argv_split(procs[n], ',');
688 lp->np = pmix_argv_count(ranks);
689 lp->ranks = (pmix_rank_t*)malloc(lp->np * sizeof(pmix_rank_t));
690 for (m=0; m < lp->np; m++) {
691 lp->ranks[m] = strtoul(ranks[m], NULL, 10);
692 }
693 pmix_list_append(&nd->local_jobs, &lp->super);
694 pmix_argv_free(ranks);
695 }
696
697 pmix_argv_free(nodes);
698 pmix_argv_free(procs);
699
700 PMIX_RELEASE_THREAD(&pmix_pnet_globals.lock);
701 return PMIX_SUCCESS;
702 }