This source file includes following definitions.
- dcon
- ddes
- tacon
- tades
- ttcon
- ttdes
- tcp_init
- tcp_finalize
- generate_key
- allocate
- setup_local_network
- setup_fork
- child_finalized
- local_app_finalized
- deregister_nspace
- collect_inventory
- process_request
- deliver_inventory
1
2
3
4
5
6
7
8
9
10
11
12
13 #include <src/include/pmix_config.h>
14
15 #include <string.h>
16 #ifdef HAVE_UNISTD_H
17 #include <unistd.h>
18 #endif
19 #ifdef HAVE_SYS_TYPES_H
20 #include <sys/types.h>
21 #endif
22 #ifdef HAVE_SYS_STAT_H
23 #include <sys/stat.h>
24 #endif
25 #ifdef HAVE_FCNTL_H
26 #include <fcntl.h>
27 #endif
28 #include <time.h>
29
30 #include <pmix_common.h>
31
32 #include "src/include/pmix_socket_errno.h"
33 #include "src/include/pmix_globals.h"
34 #include "src/class/pmix_list.h"
35 #include "src/util/alfg.h"
36 #include "src/util/argv.h"
37 #include "src/util/error.h"
38 #include "src/util/output.h"
39 #include "src/util/parse_options.h"
40 #include "src/util/pif.h"
41 #include "src/util/pmix_environ.h"
42 #include "src/mca/preg/preg.h"
43
44 #include "src/mca/pnet/base/base.h"
45 #include "pnet_tcp.h"
46
47 #define PMIX_TCP_SETUP_APP_KEY "pmix.tcp.setup.app.key"
48 #define PMIX_TCP_INVENTORY_KEY "pmix.tcp.inventory"
49
50 static pmix_status_t tcp_init(void);
51 static void tcp_finalize(void);
52 static pmix_status_t allocate(pmix_namespace_t *nptr,
53 pmix_info_t info[], size_t ninfo,
54 pmix_list_t *ilist);
55 static pmix_status_t setup_local_network(pmix_namespace_t *nptr,
56 pmix_info_t info[],
57 size_t ninfo);
58 static pmix_status_t setup_fork(pmix_namespace_t *nptr,
59 const pmix_proc_t *peer, char ***env);
60 static void child_finalized(pmix_proc_t *peer);
61 static void local_app_finalized(pmix_namespace_t *nptr);
62 static void deregister_nspace(pmix_namespace_t *nptr);
63 static pmix_status_t collect_inventory(pmix_info_t directives[], size_t ndirs,
64 pmix_inventory_cbfunc_t cbfunc, void *cbdata);
65 static pmix_status_t deliver_inventory(pmix_info_t info[], size_t ninfo,
66 pmix_info_t directives[], size_t ndirs,
67 pmix_op_cbfunc_t cbfunc, void *cbdata);
68
69 pmix_pnet_module_t pmix_tcp_module = {
70 .name = "tcp",
71 .init = tcp_init,
72 .finalize = tcp_finalize,
73 .allocate = allocate,
74 .setup_local_network = setup_local_network,
75 .setup_fork = setup_fork,
76 .child_finalized = child_finalized,
77 .local_app_finalized = local_app_finalized,
78 .deregister_nspace = deregister_nspace,
79 .collect_inventory = collect_inventory,
80 .deliver_inventory = deliver_inventory
81 };
82
83 typedef struct {
84 pmix_list_item_t super;
85 char *device;
86 char *address;
87 } tcp_device_t;
88
89
90 typedef struct {
91 pmix_list_item_t super;
92 pmix_list_t devices;
93 char *type;
94 char *plane;
95 char **ports;
96 size_t nports;
97 } tcp_available_ports_t;
98
99 typedef struct {
100 pmix_list_item_t super;
101 char *nspace;
102 char **ports;
103 tcp_available_ports_t *src;
104 } tcp_port_tracker_t;
105
106 static pmix_list_t allocations, available;
107 static pmix_status_t process_request(pmix_namespace_t *nptr,
108 char *idkey, int ports_per_node,
109 tcp_port_tracker_t *trk,
110 pmix_list_t *ilist);
111
112 static void dcon(tcp_device_t *p)
113 {
114 p->device = NULL;
115 p->address = NULL;
116 }
117 static void ddes(tcp_device_t *p)
118 {
119 if (NULL != p->device) {
120 free(p->device);
121 }
122 if (NULL != p->address) {
123 free(p->address);
124 }
125 }
126 static PMIX_CLASS_INSTANCE(tcp_device_t,
127 pmix_list_item_t,
128 dcon, ddes);
129
130 static void tacon(tcp_available_ports_t *p)
131 {
132 PMIX_CONSTRUCT(&p->devices, pmix_list_t);
133 p->type = NULL;
134 p->plane = NULL;
135 p->ports = NULL;
136 p->nports = 0;
137 }
138 static void tades(tcp_available_ports_t *p)
139 {
140 PMIX_LIST_DESTRUCT(&p->devices);
141 if (NULL != p->type) {
142 free(p->type);
143 }
144 if (NULL != p->plane) {
145 free(p->plane);
146 }
147 if (NULL != p->ports) {
148 pmix_argv_free(p->ports);
149 }
150 }
151 static PMIX_CLASS_INSTANCE(tcp_available_ports_t,
152 pmix_list_item_t,
153 tacon, tades);
154
155 static void ttcon(tcp_port_tracker_t *p)
156 {
157 p->nspace = NULL;
158 p->ports = NULL;
159 p->src = NULL;
160 }
161 static void ttdes(tcp_port_tracker_t *p)
162 {
163 size_t n, m, mstart;
164
165 if (NULL != p->nspace) {
166 free(p->nspace);
167 }
168 if (NULL != p->src) {
169 if (NULL != p->ports) {
170 mstart = 0;
171 for (n=0; NULL != p->ports[n]; n++) {
172
173 for (m=mstart; m < p->src->nports; m++) {
174 if (NULL == p->src->ports[m]) {
175 p->src->ports[m] = strdup(p->ports[n]);
176 mstart = m + 1;
177 break;
178 }
179 }
180 }
181 pmix_argv_free(p->ports);
182 }
183 PMIX_RELEASE(p->src);
184 } else if (NULL != p->ports) {
185 pmix_argv_free(p->ports);
186 }
187 }
188 static PMIX_CLASS_INSTANCE(tcp_port_tracker_t,
189 pmix_list_item_t,
190 ttcon, ttdes);
191
192 static pmix_status_t tcp_init(void)
193 {
194 tcp_available_ports_t *trk;
195 char *p, **grps;
196 size_t n;
197
198 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
199 "pnet: tcp init");
200
201
202
203 if (!PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
204 return PMIX_SUCCESS;
205 }
206
207 PMIX_CONSTRUCT(&allocations, pmix_list_t);
208 PMIX_CONSTRUCT(&available, pmix_list_t);
209
210
211
212
213
214
215
216 if (NULL == mca_pnet_tcp_component.static_ports) {
217 return PMIX_SUCCESS;
218 }
219
220
221 grps = pmix_argv_split(mca_pnet_tcp_component.static_ports, ';');
222 for (n=0; NULL != grps[n]; n++) {
223 trk = PMIX_NEW(tcp_available_ports_t);
224 if (NULL == trk) {
225 pmix_argv_free(grps);
226 return PMIX_ERR_NOMEM;
227 }
228
229 if (NULL == (p = strrchr(grps[n], ':'))) {
230 pmix_argv_free(grps);
231 return PMIX_ERR_BAD_PARAM;
232 }
233
234 *p = '\0';
235 ++p;
236 pmix_util_parse_range_options(p, &trk->ports);
237 trk->nports = pmix_argv_count(trk->ports);
238
239 if (NULL != (p = strchr(grps[n], ':'))) {
240
241 *p = '\0';
242 ++p;
243 trk->plane = strdup(p);
244 }
245
246 trk->type = strdup(grps[n]);
247 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
248 "TYPE: %s PLANE %s", trk->type,
249 (NULL == trk->plane) ? "NULL" : trk->plane);
250 pmix_list_append(&available, &trk->super);
251 }
252 pmix_argv_free(grps);
253
254 return PMIX_SUCCESS;
255 }
256
257 static void tcp_finalize(void)
258 {
259 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
260 "pnet: tcp finalize");
261 if (PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
262 PMIX_LIST_DESTRUCT(&allocations);
263 PMIX_LIST_DESTRUCT(&available);
264 }
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281 static inline void generate_key(uint64_t* unique_key) {
282 pmix_rng_buff_t rng;
283 pmix_srand(&rng,(unsigned int)time(NULL));
284 unique_key[0] = pmix_rand(&rng);
285 unique_key[1] = pmix_rand(&rng);
286 }
287
288
289
290
291
292
293
294
295
296
297
298
299
300 static pmix_status_t allocate(pmix_namespace_t *nptr,
301 pmix_info_t info[], size_t ninfo,
302 pmix_list_t *ilist)
303 {
304 uint64_t unique_key[2];
305 size_t n, nreqs=0;
306 int ports_per_node=0;
307 pmix_kval_t *kv;
308 pmix_status_t rc;
309 pmix_info_t *requests = NULL;
310 char **reqs, *cptr;
311 bool allocated = false, seckey = false, envars = false;
312 tcp_port_tracker_t *trk;
313 tcp_available_ports_t *avail, *aptr;
314 pmix_list_t mylist;
315 pmix_buffer_t buf;
316 char *type = NULL, *plane = NULL, *idkey = NULL;
317
318 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
319 "pnet:tcp:allocate for nspace %s", nptr->nspace);
320
321
322
323 if (!PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
324 return PMIX_SUCCESS;
325 }
326
327 if (NULL == info) {
328 return PMIX_ERR_TAKE_NEXT_OPTION;
329 }
330
331
332
333 for (n=0; n < ninfo; n++) {
334 if (PMIX_CHECK_KEY(&info[n], PMIX_SETUP_APP_ENVARS) ||
335 PMIX_CHECK_KEY(&info[n], PMIX_SETUP_APP_ALL)) {
336 envars = PMIX_INFO_TRUE(&info[n]);
337 } else if (PMIX_CHECK_KEY(info, PMIX_ALLOC_NETWORK)) {
338
339
340
341 if (PMIX_DATA_ARRAY != info->value.type ||
342 NULL == info->value.data.darray ||
343 PMIX_INFO != info->value.data.darray->type ||
344 NULL == info->value.data.darray->array) {
345
346 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
347 return PMIX_ERR_BAD_PARAM;
348 }
349 requests = (pmix_info_t*)info->value.data.darray->array;
350 nreqs = info->value.data.darray->size;
351 }
352 }
353
354 if (envars) {
355 if (NULL != mca_pnet_tcp_component.include) {
356 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
357 "pnet: tcp harvesting envars %s excluding %s",
358 (NULL == mca_pnet_tcp_component.incparms) ? "NONE" : mca_pnet_tcp_component.incparms,
359 (NULL == mca_pnet_tcp_component.excparms) ? "NONE" : mca_pnet_tcp_component.excparms);
360 rc = pmix_pnet_base_harvest_envars(mca_pnet_tcp_component.include,
361 mca_pnet_tcp_component.exclude,
362 ilist);
363 if (PMIX_SUCCESS != rc) {
364 return rc;
365 }
366 }
367 }
368
369 if (NULL == requests) {
370 return PMIX_ERR_TAKE_NEXT_OPTION;
371 }
372
373 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
374 "pnet:tcp:allocate alloc_network for nspace %s",
375 nptr->nspace);
376
377
378
379 for (n=0; n < nreqs; n++) {
380 if (0 == strncasecmp(requests[n].key, PMIX_ALLOC_NETWORK_TYPE, PMIX_MAX_KEYLEN)) {
381
382 if (PMIX_STRING != requests[n].value.type ||
383 NULL == requests[n].value.data.string) {
384 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
385 return PMIX_ERR_BAD_PARAM;
386 }
387 type = requests[n].value.data.string;
388 } else if (0 == strncasecmp(requests[n].key, PMIX_ALLOC_NETWORK_PLANE, PMIX_MAX_KEYLEN)) {
389
390 if (PMIX_STRING != requests[n].value.type ||
391 NULL == requests[n].value.data.string) {
392 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
393 return PMIX_ERR_BAD_PARAM;
394 }
395 plane = requests[n].value.data.string;
396 } else if (0 == strncasecmp(requests[n].key, PMIX_ALLOC_NETWORK_ENDPTS, PMIX_MAX_KEYLEN)) {
397 PMIX_VALUE_GET_NUMBER(rc, &requests[n].value, ports_per_node, int);
398 if (PMIX_SUCCESS != rc) {
399 return rc;
400 }
401 } else if (0 == strncmp(requests[n].key, PMIX_ALLOC_NETWORK_ID, PMIX_MAX_KEYLEN)) {
402
403 if (PMIX_STRING != requests[n].value.type ||
404 NULL == requests[n].value.data.string) {
405 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
406 return PMIX_ERR_BAD_PARAM;
407 }
408 idkey = requests[n].value.data.string;
409 } else if (0 == strncasecmp(requests[n].key, PMIX_ALLOC_NETWORK_SEC_KEY, PMIX_MAX_KEYLEN)) {
410 seckey = PMIX_INFO_TRUE(&requests[n]);
411 }
412 }
413
414
415 if (NULL == idkey) {
416 return PMIX_ERR_BAD_PARAM;
417 }
418
419 PMIX_CONSTRUCT(&mylist, pmix_list_t);
420
421 kv = PMIX_NEW(pmix_kval_t);
422 if (NULL == kv) {
423 return PMIX_ERR_NOMEM;
424 }
425 kv->key = strdup(PMIX_ALLOC_NETWORK_ID);
426 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
427 if (NULL == kv->value) {
428 PMIX_RELEASE(kv);
429 return PMIX_ERR_NOMEM;
430 }
431 kv->value->type = PMIX_STRING;
432 kv->value->data.string = strdup(idkey);
433 pmix_list_append(&mylist, &kv->super);
434
435
436
437
438
439
440
441 if (NULL != type) {
442
443 if (0 == strcasecmp(type, "tcp")) {
444 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
445 "pnet:tcp:allocate allocating TCP ports for nspace %s",
446 nptr->nspace);
447
448 avail = NULL;
449 PMIX_LIST_FOREACH(aptr, &available, tcp_available_ports_t) {
450 if (0 == strcmp(aptr->type, "tcp")) {
451
452 if (NULL != plane && (NULL == aptr->plane || 0 != strcmp(aptr->plane, plane))) {
453 continue;
454 }
455 avail = aptr;
456 break;
457 }
458 }
459
460 if (NULL == avail) {
461 PMIX_LIST_DESTRUCT(&mylist);
462 return PMIX_ERR_NOT_AVAILABLE;
463 }
464
465 trk = PMIX_NEW(tcp_port_tracker_t);
466 if (NULL == trk) {
467 PMIX_LIST_DESTRUCT(&mylist);
468 return PMIX_ERR_NOMEM;
469 }
470 trk->nspace = strdup(nptr->nspace);
471 PMIX_RETAIN(avail);
472 trk->src = avail;
473 pmix_list_append(&allocations, &trk->super);
474 rc = process_request(nptr, idkey, ports_per_node, trk, &mylist);
475 if (PMIX_SUCCESS != rc) {
476
477 pmix_list_remove_item(&allocations, &trk->super);
478 PMIX_RELEASE(trk);
479 PMIX_LIST_DESTRUCT(&mylist);
480 return rc;
481 }
482 allocated = true;
483
484 } else if (0 == strcasecmp(requests[n].value.data.string, "udp")) {
485 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
486 "pnet:tcp:allocate allocating UDP ports for nspace %s",
487 nptr->nspace);
488
489 avail = NULL;
490 PMIX_LIST_FOREACH(aptr, &available, tcp_available_ports_t) {
491 if (0 == strcmp(aptr->type, "udp")) {
492
493 if (NULL != plane && (NULL == aptr->plane || 0 != strcmp(aptr->plane, plane))) {
494 continue;
495 }
496 avail = aptr;
497 break;
498 }
499 }
500
501 if (NULL == avail) {
502 PMIX_LIST_DESTRUCT(&mylist);
503 return PMIX_ERR_NOT_AVAILABLE;
504 }
505
506 trk = PMIX_NEW(tcp_port_tracker_t);
507 if (NULL == trk) {
508 PMIX_LIST_DESTRUCT(&mylist);
509 return PMIX_ERR_NOMEM;
510 }
511 trk->nspace = strdup(nptr->nspace);
512 PMIX_RETAIN(avail);
513 trk->src = avail;
514 pmix_list_append(&allocations, &trk->super);
515 rc = process_request(nptr, idkey, ports_per_node, trk, &mylist);
516 if (PMIX_SUCCESS != rc) {
517
518 pmix_list_remove_item(&allocations, &trk->super);
519 PMIX_RELEASE(trk);
520 PMIX_LIST_DESTRUCT(&mylist);
521 return rc;
522 }
523 allocated = true;
524 } else {
525
526 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
527 "pnet:tcp:allocate unsupported type %s for nspace %s",
528 type, nptr->nspace);
529 PMIX_LIST_DESTRUCT(&mylist);
530 return PMIX_ERR_TAKE_NEXT_OPTION;
531 }
532
533 } else {
534 if (NULL != plane) {
535
536
537 PMIX_LIST_FOREACH(aptr, &available, tcp_available_ports_t) {
538 if (0 != strcmp(aptr->plane, plane)) {
539 continue;
540 }
541
542 trk = PMIX_NEW(tcp_port_tracker_t);
543 if (NULL == trk) {
544 PMIX_LIST_DESTRUCT(&mylist);
545 return PMIX_ERR_NOMEM;
546 }
547 trk->nspace = strdup(nptr->nspace);
548 PMIX_RETAIN(aptr);
549 trk->src = aptr;
550 pmix_list_append(&allocations, &trk->super);
551 rc = process_request(nptr, idkey, ports_per_node, trk, &mylist);
552 if (PMIX_SUCCESS != rc) {
553
554 pmix_list_remove_item(&allocations, &trk->super);
555 PMIX_RELEASE(trk);
556 PMIX_LIST_DESTRUCT(&mylist);
557 return rc;
558 }
559 allocated = true;
560 break;
561 }
562 } else {
563
564
565
566 if (NULL != mca_pnet_tcp_component.default_request) {
567 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
568 "pnet:tcp:allocate allocating default ports %s for nspace %s",
569 mca_pnet_tcp_component.default_request, nptr->nspace);
570 reqs = pmix_argv_split(mca_pnet_tcp_component.default_request, ';');
571 for (n=0; NULL != reqs[n]; n++) {
572
573
574 type = NULL;
575 plane = NULL;
576 if (NULL == (cptr = strrchr(reqs[n], ':'))) {
577 avail = (tcp_available_ports_t*)pmix_list_get_first(&available);
578 } else {
579 *cptr = '\0';
580 ++cptr;
581 ports_per_node = strtoul(cptr, NULL, 10);
582
583 cptr -= 2;
584 if (NULL != (cptr = strrchr(cptr, ':'))) {
585 *cptr = '\0';
586 ++cptr;
587 plane = cptr;
588 }
589 type = reqs[n];
590 avail = NULL;
591 PMIX_LIST_FOREACH(aptr, &available, tcp_available_ports_t) {
592 if (0 == strcmp(aptr->type, type)) {
593
594 if (NULL != plane && (NULL == aptr->plane || 0 != strcmp(aptr->plane, plane))) {
595 continue;
596 }
597 avail = aptr;
598 break;
599 }
600 }
601
602 if (NULL == avail) {
603 continue;
604 }
605 }
606
607 trk = PMIX_NEW(tcp_port_tracker_t);
608 if (NULL == trk) {
609 pmix_argv_free(reqs);
610 PMIX_LIST_DESTRUCT(&mylist);
611 return PMIX_ERR_NOMEM;
612 }
613 trk->nspace = strdup(nptr->nspace);
614 PMIX_RETAIN(avail);
615 trk->src = avail;
616 pmix_list_append(&allocations, &trk->super);
617 rc = process_request(nptr, idkey, ports_per_node, trk, &mylist);
618 if (PMIX_SUCCESS != rc) {
619
620 pmix_list_remove_item(&allocations, &trk->super);
621 PMIX_RELEASE(trk);
622 PMIX_LIST_DESTRUCT(&mylist);
623 return rc;
624 }
625 allocated = true;
626 }
627 } else {
628 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
629 "pnet:tcp:allocate allocating %d ports/node for nspace %s",
630 ports_per_node, nptr->nspace);
631 if (0 == ports_per_node) {
632
633 PMIX_LIST_DESTRUCT(&mylist);
634 return PMIX_ERR_TAKE_NEXT_OPTION;
635 }
636 avail = (tcp_available_ports_t*)pmix_list_get_first(&available);
637 if (NULL != avail) {
638
639 trk = PMIX_NEW(tcp_port_tracker_t);
640 if (NULL == trk) {
641 PMIX_LIST_DESTRUCT(&mylist);
642 return PMIX_ERR_NOMEM;
643 }
644 trk->nspace = strdup(nptr->nspace);
645 PMIX_RETAIN(avail);
646 trk->src = avail;
647 pmix_list_append(&allocations, &trk->super);
648 rc = process_request(nptr, idkey, ports_per_node, trk, &mylist);
649 if (PMIX_SUCCESS != rc) {
650
651 pmix_list_remove_item(&allocations, &trk->super);
652 PMIX_RELEASE(trk);
653 } else {
654 allocated = true;
655 }
656 }
657 }
658 }
659 if (!allocated) {
660
661 PMIX_LIST_DESTRUCT(&mylist);
662 return PMIX_ERR_TAKE_NEXT_OPTION;
663 }
664 }
665
666 if (seckey) {
667 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
668 "pnet:tcp: generate seckey");
669 generate_key(unique_key);
670 kv = PMIX_NEW(pmix_kval_t);
671 if (NULL == kv) {
672 PMIX_LIST_DESTRUCT(&mylist);
673 return PMIX_ERR_NOMEM;
674 }
675 kv->key = strdup(PMIX_ALLOC_NETWORK_SEC_KEY);
676 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
677 if (NULL == kv->value) {
678 PMIX_RELEASE(kv);
679 PMIX_LIST_DESTRUCT(&mylist);
680 return PMIX_ERR_NOMEM;
681 }
682 kv->value->type = PMIX_BYTE_OBJECT;
683 kv->value->data.bo.bytes = (char*)malloc(2 * sizeof(uint64_t));
684 if (NULL == kv->value->data.bo.bytes) {
685 PMIX_RELEASE(kv);
686 PMIX_LIST_DESTRUCT(&mylist);
687 return PMIX_ERR_NOMEM;
688 }
689 memcpy(kv->value->data.bo.bytes, unique_key, 2 * sizeof(uint64_t));
690 kv->value->data.bo.size = 2 * sizeof(uint64_t);
691 pmix_list_append(&mylist, &kv->super);
692 }
693
694
695 n = pmix_list_get_size(&mylist);
696 if (0 < n) {
697 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
698
699 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, &n, 1, PMIX_SIZE);
700
701 while (NULL != (kv = (pmix_kval_t*)pmix_list_remove_first(&mylist))) {
702 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv, 1, PMIX_KVAL);
703 PMIX_RELEASE(kv);
704 if (PMIX_SUCCESS != rc) {
705 PMIX_DESTRUCT(&buf);
706 PMIX_LIST_DESTRUCT(&mylist);
707 return rc;
708 }
709 }
710 PMIX_LIST_DESTRUCT(&mylist);
711 kv = PMIX_NEW(pmix_kval_t);
712 kv->key = strdup(PMIX_TCP_SETUP_APP_KEY);
713 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
714 if (NULL == kv->value) {
715 PMIX_RELEASE(kv);
716 PMIX_DESTRUCT(&buf);
717 return PMIX_ERR_NOMEM;
718 }
719 kv->value->type = PMIX_BYTE_OBJECT;
720 PMIX_UNLOAD_BUFFER(&buf, kv->value->data.bo.bytes, kv->value->data.bo.size);
721 PMIX_DESTRUCT(&buf);
722 pmix_list_append(ilist, &kv->super);
723 }
724
725
726
727 return PMIX_SUCCESS;
728 }
729
730
731
732
733 static pmix_status_t setup_local_network(pmix_namespace_t *nptr,
734 pmix_info_t info[],
735 size_t ninfo)
736 {
737 size_t n, m, nkvals;
738 pmix_buffer_t bkt;
739 int32_t cnt;
740 pmix_kval_t *kv;
741 pmix_status_t rc;
742 pmix_info_t *jinfo, stinfo;
743 char *idkey = NULL;
744
745 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
746 "pnet:tcp:setup_local_network");
747
748 if (NULL != info) {
749 for (n=0; n < ninfo; n++) {
750
751 if (0 == strncmp(info[n].key, PMIX_TCP_SETUP_APP_KEY, PMIX_MAX_KEYLEN)) {
752
753 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &bkt,
754 info[n].value.data.bo.bytes,
755 info[n].value.data.bo.size);
756
757 cnt = 1;
758 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
759 &bkt, &nkvals, &cnt, PMIX_SIZE);
760
761 PMIX_INFO_CONSTRUCT(&stinfo);
762 pmix_strncpy(stinfo.key, idkey, PMIX_MAX_KEYLEN);
763 stinfo.value.type = PMIX_DATA_ARRAY;
764 PMIX_DATA_ARRAY_CREATE(stinfo.value.data.darray, nkvals, PMIX_INFO);
765 jinfo = (pmix_info_t*)stinfo.value.data.darray->array;
766
767
768 kv = PMIX_NEW(pmix_kval_t);
769 cnt = 1;
770 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
771 &bkt, kv, &cnt, PMIX_KVAL);
772 m = 0;
773 while (PMIX_SUCCESS == rc) {
774 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
775 "recvd KEY %s %s", kv->key,
776 (PMIX_STRING == kv->value->type) ? kv->value->data.string : "NON-STRING");
777
778 pmix_strncpy(jinfo[m].key, kv->key, PMIX_MAX_KEYLEN);
779 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
780 &jinfo[m].value, kv->value);
781
782 if (NULL == idkey &&
783 0 == strncmp(kv->key, PMIX_ALLOC_NETWORK_ID, PMIX_MAX_KEYLEN)) {
784 idkey = strdup(kv->value->data.string);
785 }
786 ++m;
787 PMIX_RELEASE(kv);
788 kv = PMIX_NEW(pmix_kval_t);
789 cnt = 1;
790 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
791 &bkt, kv, &cnt, PMIX_KVAL);
792 }
793
794 info[n].value.data.bo.bytes = bkt.base_ptr;
795 info[n].value.data.bo.size = bkt.bytes_used;
796 bkt.base_ptr = NULL;
797 bkt.bytes_used = 0;
798
799
800 if (NULL == idkey) {
801 PMIX_INFO_FREE(jinfo, nkvals);
802 return PMIX_ERR_BAD_PARAM;
803 }
804
805
806 PMIX_GDS_CACHE_JOB_INFO(rc, pmix_globals.mypeer, nptr,
807 &stinfo, 1);
808 PMIX_INFO_DESTRUCT(&stinfo);
809 }
810 }
811 }
812 if (NULL != idkey) {
813 free(idkey);
814 }
815 return PMIX_SUCCESS;
816 }
817
818 static pmix_status_t setup_fork(pmix_namespace_t *nptr,
819 const pmix_proc_t *peer, char ***env)
820 {
821 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
822 "pnet:tcp:setup_fork");
823 return PMIX_SUCCESS;
824 }
825
826
827
828
829 static void child_finalized(pmix_proc_t *peer)
830 {
831 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
832 "pnet:tcp child finalized");
833 }
834
835
836
837
838
839 static void local_app_finalized(pmix_namespace_t *nptr)
840 {
841 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
842 "pnet:tcp app finalized");
843 }
844
845
846
847
848
849 static void deregister_nspace(pmix_namespace_t *nptr)
850 {
851 tcp_port_tracker_t *trk;
852
853 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
854 "pnet:tcp deregister nspace %s", nptr->nspace);
855
856
857
858 if (!PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
859 return;
860 }
861
862
863 PMIX_LIST_FOREACH(trk, &allocations, tcp_port_tracker_t) {
864 if (0 == strcmp(nptr->nspace, trk->nspace)) {
865 pmix_list_remove_item(&allocations, &trk->super);
866 PMIX_RELEASE(trk);
867 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
868 "pnet:tcp released tracker for nspace %s", nptr->nspace);
869 return;
870 }
871 }
872 }
873
874 static pmix_status_t collect_inventory(pmix_info_t directives[], size_t ndirs,
875 pmix_inventory_cbfunc_t cbfunc, void *cbdata)
876 {
877 pmix_inventory_rollup_t *cd = (pmix_inventory_rollup_t*)cbdata;
878 char *prefix, myhost[PMIX_MAXHOSTNAMELEN];
879 char myconnhost[PMIX_MAXHOSTNAMELEN];
880 char name[32], uri[2048];
881 struct sockaddr_storage my_ss;
882 char *foo;
883 pmix_buffer_t bucket, pbkt;
884 int i;
885 pmix_status_t rc;
886 bool found = false;
887 pmix_byte_object_t pbo;
888 pmix_kval_t *kv;
889
890 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
891 "pnet:tcp:collect_inventory");
892
893
894 PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
895
896 gethostname(myhost, sizeof(myhost));
897 foo = &myhost[0];
898 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket, &foo, 1, PMIX_STRING);
899 if (PMIX_SUCCESS != rc) {
900 PMIX_ERROR_LOG(rc);
901 PMIX_DESTRUCT(&bucket);
902 return rc;
903 }
904
905
906 for (i = pmix_ifbegin(); i >= 0; i = pmix_ifnext(i)) {
907 if (PMIX_SUCCESS != pmix_ifindextoaddr(i, (struct sockaddr*)&my_ss, sizeof(my_ss))) {
908 pmix_output (0, "ptl_tcp: problems getting address for index %i (kernel index %i)\n",
909 i, pmix_ifindextokindex(i));
910 continue;
911 }
912
913 if (AF_INET != my_ss.ss_family &&
914 AF_INET6 != my_ss.ss_family) {
915 continue;
916 }
917
918 pmix_ifindextoname(i, name, sizeof(name));
919
920
921 if (0 == strncmp(name, "vir", 3)) {
922 continue;
923 }
924
925 if (pmix_ifisloopback(i)) {
926 continue;
927 }
928 if (AF_INET == my_ss.ss_family) {
929 prefix = "tcp4://";
930 inet_ntop(AF_INET, &((struct sockaddr_in*) &my_ss)->sin_addr,
931 myconnhost, PMIX_MAXHOSTNAMELEN);
932 } else if (AF_INET6 == my_ss.ss_family) {
933 prefix = "tcp6://";
934 inet_ntop(AF_INET6, &((struct sockaddr_in6*) &my_ss)->sin6_addr,
935 myconnhost, PMIX_MAXHOSTNAMELEN);
936 } else {
937 continue;
938 }
939 (void)snprintf(uri, 2048, "%s%s", prefix, myconnhost);
940 pmix_output_verbose(2, pmix_pnet_base_framework. framework_output,
941 "TCP INVENTORY ADDING: %s %s", name, uri);
942 found = true;
943
944 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
945 foo = &name[0];
946 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, &foo, 1, PMIX_STRING);
947 if (PMIX_SUCCESS != rc) {
948 PMIX_ERROR_LOG(rc);
949 PMIX_DESTRUCT(&pbkt);
950 PMIX_DESTRUCT(&bucket);
951 return rc;
952 }
953
954 foo = &uri[0];
955 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, &foo, 1, PMIX_STRING);
956 if (PMIX_SUCCESS != rc) {
957 PMIX_ERROR_LOG(rc);
958 PMIX_DESTRUCT(&pbkt);
959 PMIX_DESTRUCT(&bucket);
960 return rc;
961 }
962
963 PMIX_UNLOAD_BUFFER(&pbkt, pbo.bytes, pbo.size);
964
965 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket, &pbo, 1, PMIX_BYTE_OBJECT);
966 if (PMIX_SUCCESS != rc) {
967 PMIX_ERROR_LOG(rc);
968 PMIX_BYTE_OBJECT_DESTRUCT(&pbo);
969 PMIX_DESTRUCT(&bucket);
970 return rc;
971 }
972 }
973
974 if (!found) {
975 PMIX_DESTRUCT(&bucket);
976 return PMIX_ERR_TAKE_NEXT_OPTION;
977 }
978
979 PMIX_UNLOAD_BUFFER(&bucket, pbo.bytes, pbo.size);
980 kv = PMIX_NEW(pmix_kval_t);
981 kv->key = strdup(PMIX_TCP_INVENTORY_KEY);
982 PMIX_VALUE_CREATE(kv->value, 1);
983 pmix_value_load(kv->value, &pbo, PMIX_BYTE_OBJECT);
984 PMIX_BYTE_OBJECT_DESTRUCT(&pbo);
985 pmix_list_append(&cd->payload, &kv->super);
986
987 return PMIX_SUCCESS;
988 }
989
990 static pmix_status_t process_request(pmix_namespace_t *nptr,
991 char *idkey, int ports_per_node,
992 tcp_port_tracker_t *trk,
993 pmix_list_t *ilist)
994 {
995 char **plist;
996 pmix_kval_t *kv;
997 size_t m;
998 int p, ppn;
999 tcp_available_ports_t *avail = trk->src;
1000
1001 kv = PMIX_NEW(pmix_kval_t);
1002 if (NULL == kv) {
1003 return PMIX_ERR_NOMEM;
1004 }
1005 kv->key = strdup(idkey);
1006 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1007 if (NULL == kv->value) {
1008 PMIX_RELEASE(kv);
1009 return PMIX_ERR_NOMEM;
1010 }
1011 kv->value->type = PMIX_STRING;
1012 kv->value->data.string = NULL;
1013 if (0 == ports_per_node) {
1014
1015
1016 return PMIX_ERR_NOT_SUPPORTED;
1017 } else {
1018 ppn = ports_per_node;
1019 }
1020
1021
1022 p = 0;
1023 plist = NULL;
1024 for (m=0; p < ppn && m < avail->nports; m++) {
1025 if (NULL != avail->ports[m]) {
1026 pmix_argv_append_nosize(&trk->ports, avail->ports[m]);
1027 pmix_argv_append_nosize(&plist, avail->ports[m]);
1028 free(avail->ports[m]);
1029 avail->ports[m] = NULL;
1030 ++p;
1031 }
1032 }
1033
1034 if (p < ppn) {
1035 PMIX_RELEASE(kv);
1036
1037
1038 return PMIX_ERR_OUT_OF_RESOURCE;
1039 }
1040
1041 kv->value->data.string = pmix_argv_join(plist, ',');
1042 pmix_argv_free(plist);
1043 pmix_list_append(ilist, &kv->super);
1044
1045
1046 kv = PMIX_NEW(pmix_kval_t);
1047 if (NULL == kv) {
1048 return PMIX_ERR_NOMEM;
1049 }
1050 kv->key = strdup(idkey);
1051 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1052 if (NULL == kv->value) {
1053 PMIX_RELEASE(kv);
1054 return PMIX_ERR_NOMEM;
1055 }
1056 kv->value->type = PMIX_STRING;
1057 kv->value->data.string = strdup(trk->src->type);
1058 pmix_list_append(ilist, &kv->super);
1059 if (NULL != trk->src->plane) {
1060 kv = PMIX_NEW(pmix_kval_t);
1061 if (NULL == kv) {
1062 return PMIX_ERR_NOMEM;
1063 }
1064 kv->key = strdup(idkey);
1065 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1066 if (NULL == kv->value) {
1067 PMIX_RELEASE(kv);
1068 return PMIX_ERR_NOMEM;
1069 }
1070 kv->value->type = PMIX_STRING;
1071 kv->value->data.string = strdup(trk->src->plane);
1072 pmix_list_append(ilist, &kv->super);
1073 }
1074 return PMIX_SUCCESS;
1075 }
1076
1077 static pmix_status_t deliver_inventory(pmix_info_t info[], size_t ninfo,
1078 pmix_info_t directives[], size_t ndirs,
1079 pmix_op_cbfunc_t cbfunc, void *cbdata)
1080 {
1081 pmix_buffer_t bkt, pbkt;
1082 size_t n;
1083 int32_t cnt;
1084 char *hostname, *device, *address;
1085 pmix_byte_object_t pbo;
1086 pmix_pnet_node_t *nd, *ndptr;
1087 pmix_pnet_resource_t *lt, *lst;
1088 tcp_available_ports_t *prts;
1089 tcp_device_t *res;
1090 pmix_status_t rc;
1091
1092 pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
1093 "pnet:tcp deliver inventory");
1094
1095 for (n=0; n < ninfo; n++) {
1096 if (0 == strncmp(info[n].key, PMIX_TCP_INVENTORY_KEY, PMIX_MAX_KEYLEN)) {
1097
1098 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &bkt,
1099 info[n].value.data.bo.bytes,
1100 info[n].value.data.bo.size);
1101
1102 cnt = 1;
1103 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
1104 &bkt, &hostname, &cnt, PMIX_STRING);
1105 if (PMIX_SUCCESS != rc) {
1106 PMIX_ERROR_LOG(rc);
1107
1108
1109 return rc;
1110 }
1111
1112 nd = NULL;
1113 PMIX_LIST_FOREACH(ndptr, &pmix_pnet_globals.nodes, pmix_pnet_node_t) {
1114 if (0 == strcmp(hostname, ndptr->name)) {
1115 nd = ndptr;
1116 break;
1117 }
1118 }
1119 if (NULL == nd) {
1120 nd = PMIX_NEW(pmix_pnet_node_t);
1121 nd->name = strdup(hostname);
1122 pmix_list_append(&pmix_pnet_globals.nodes, &nd->super);
1123 }
1124
1125 lst = NULL;
1126 PMIX_LIST_FOREACH(lt, &nd->resources, pmix_pnet_resource_t) {
1127 if (0 == strcmp(lt->name, "tcp")) {
1128 lst = lt;
1129 break;
1130 }
1131 }
1132 if (NULL == lst) {
1133 lst = PMIX_NEW(pmix_pnet_resource_t);
1134 lst->name = strdup("tcp");
1135 pmix_list_append(&nd->resources, &lst->super);
1136 }
1137
1138 prts = PMIX_NEW(tcp_available_ports_t);
1139 pmix_list_append(&lst->resources, &prts->super);
1140
1141 cnt = 1;
1142 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
1143 &bkt, &pbo, &cnt, PMIX_BYTE_OBJECT);
1144 while (PMIX_SUCCESS == rc) {
1145
1146 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &pbkt, pbo.bytes, pbo.size);
1147
1148 cnt = 1;
1149 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
1150 &pbkt, &device, &cnt, PMIX_STRING);
1151 if (PMIX_SUCCESS != rc) {
1152 PMIX_ERROR_LOG(rc);
1153 PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
1154
1155
1156 return rc;
1157 }
1158
1159 cnt = 1;
1160 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
1161 &pbkt, &address, &cnt, PMIX_STRING);
1162 if (PMIX_SUCCESS != rc) {
1163 PMIX_ERROR_LOG(rc);
1164 PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
1165
1166
1167 return rc;
1168 }
1169
1170 res = PMIX_NEW(tcp_device_t);
1171 res->device = device;
1172 res->address = address;
1173 pmix_list_append(&prts->devices, &res->super);
1174 PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
1175 cnt = 1;
1176 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer,
1177 &bkt, &pbo, &cnt, PMIX_BYTE_OBJECT);
1178 }
1179 PMIX_DATA_BUFFER_DESTRUCT(&bkt);
1180 if (5 < pmix_output_get_verbosity(pmix_pnet_base_framework.framework_output)) {
1181
1182 pmix_output(0, "TCP resources for node: %s", nd->name);
1183 PMIX_LIST_FOREACH(lt, &nd->resources, pmix_pnet_resource_t) {
1184 if (0 == strcmp(lt->name, "tcp")) {
1185 PMIX_LIST_FOREACH(prts, <->resources, tcp_available_ports_t) {
1186 device = NULL;
1187 if (NULL != prts->ports) {
1188 device = pmix_argv_join(prts->ports, ',');
1189 }
1190 pmix_output(0, "\tPorts: %s", (NULL == device) ? "UNSPECIFIED" : device);
1191 if (NULL != device) {
1192 free(device);
1193 }
1194 PMIX_LIST_FOREACH(res, &prts->devices, tcp_device_t) {
1195 pmix_output(0, "\tDevice: %s", res->device);
1196 pmix_output(0, "\tAddress: %s", res->address);
1197 }
1198 }
1199 }
1200 }
1201 }
1202 }
1203 }
1204
1205 return PMIX_SUCCESS;
1206 }