This source file includes following definitions.
- ompi_dpm_init
- ompi_dpm_connect_accept
- construct_peers
- ompi_dpm_disconnect
- ompi_dpm_spawn
- ompi_dpm_open_port
- ompi_dpm_close_port
- ompi_dpm_dyn_init
- ompi_dpm_finalize
- ompi_dpm_dyn_finalize
- disconnect_init
- disconnect_waitall
- ompi_dpm_group_is_dyn
- ompi_dpm_mark_dyncomm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include "ompi_config.h"
30 #include "ompi/constants.h"
31
32 #include <string.h>
33 #include <stdio.h>
34 #include <ctype.h>
35 #include <time.h>
36 #if HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39
40 #include "opal/util/alfg.h"
41 #include "opal/util/argv.h"
42 #include "opal/util/opal_getcwd.h"
43 #include "opal/util/proc.h"
44 #include "opal/util/show_help.h"
45 #include "opal/util/printf.h"
46 #include "opal/dss/dss.h"
47 #include "opal/mca/hwloc/base/base.h"
48 #include "opal/mca/pmix/pmix.h"
49
50 #include "ompi/communicator/communicator.h"
51 #include "ompi/group/group.h"
52 #include "ompi/proc/proc.h"
53 #include "ompi/mca/pml/pml.h"
54 #include "ompi/mca/rte/rte.h"
55 #include "ompi/info/info.h"
56
57 #include "ompi/dpm/dpm.h"
58
59 static opal_rng_buff_t rnd;
60
61 typedef struct {
62 ompi_communicator_t *comm;
63 int size;
64 struct ompi_request_t **reqs;
65 int buf;
66 } ompi_dpm_disconnect_obj;
67 static int disconnect_waitall (int count, ompi_dpm_disconnect_obj **objs);
68 static ompi_dpm_disconnect_obj *disconnect_init(ompi_communicator_t *comm);
69
70 typedef struct {
71 opal_list_item_t super;
72 ompi_proc_t *p;
73 } ompi_dpm_proct_caddy_t;
74 static OBJ_CLASS_INSTANCE(ompi_dpm_proct_caddy_t,
75 opal_list_item_t,
76 NULL, NULL);
77
78
79
80
81 int ompi_dpm_init(void)
82 {
83 time_t now;
84
85
86 now = time(NULL);
87 if (!opal_srand(&rnd, now)) {
88 return OMPI_ERROR;
89 }
90 return OMPI_SUCCESS;
91 }
92
93 int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
94 const char *port_string, bool send_first,
95 ompi_communicator_t **newcomm)
96 {
97 int k, size, rsize, rank, rc, rportlen=0;
98 char **members = NULL, *nstring, *rport=NULL;
99 bool dense, isnew;
100 opal_process_name_t pname;
101 opal_list_t ilist, mlist, rlist;
102 opal_value_t info;
103 opal_pmix_pdata_t pdat;
104 opal_namelist_t *nm;
105 opal_jobid_t jobid;
106
107 ompi_communicator_t *newcomp=MPI_COMM_NULL;
108 ompi_proc_t *proc;
109 ompi_group_t *group=comm->c_local_group;
110 ompi_proc_t **proc_list=NULL, **new_proc_list = NULL;
111 int32_t i;
112 ompi_group_t *new_group_pointer;
113 ompi_dpm_proct_caddy_t *cd;
114
115 if (NULL == opal_pmix.publish || NULL == opal_pmix.connect ||
116 NULL == opal_pmix.unpublish ||
117 (NULL == opal_pmix.lookup && NULL == opal_pmix.lookup_nb)) {
118
119 opal_show_help("help-mpi-runtime.txt", "noconxcpt", true);
120 return OMPI_ERR_NOT_SUPPORTED;
121 }
122 if (!ompi_rte_connect_accept_support(port_string)) {
123
124 return OMPI_ERR_NOT_SUPPORTED;
125 }
126
127
128 *newcomm = MPI_COMM_NULL;
129
130 size = ompi_comm_size ( comm );
131 rank = ompi_comm_rank ( comm );
132
133
134
135
136
137
138
139
140
141 if (MPI_COMM_WORLD == comm) {
142 pname.jobid = OMPI_PROC_MY_NAME->jobid;
143 pname.vpid = OPAL_VPID_WILDCARD;
144 rc = opal_convert_process_name_to_string(&nstring, &pname);
145 if (OPAL_SUCCESS != rc) {
146 return OMPI_ERROR;
147 }
148 opal_argv_append_nosize(&members, nstring);
149 free(nstring);
150
151
152
153 if (NULL == (nstring = (char*)opal_pmix.get_nspace(OMPI_PROC_MY_NAME->jobid))) {
154 opal_argv_free(members);
155 return OMPI_ERR_NOT_SUPPORTED;
156 }
157 opal_argv_append_nosize(&members, nstring);
158 (void)opal_asprintf(&nstring, "%d", size);
159 opal_argv_append_nosize(&members, nstring);
160 free(nstring);
161 } else {
162 if (OMPI_GROUP_IS_DENSE(group)) {
163 proc_list = group->grp_proc_pointers;
164 dense = true;
165 } else {
166 proc_list = (ompi_proc_t**)calloc(group->grp_proc_count,
167 sizeof(ompi_proc_t *));
168 for (i=0 ; i<group->grp_proc_count ; i++) {
169 if (NULL == (proc_list[i] = ompi_group_peer_lookup(group,i))) {
170 OMPI_ERROR_LOG(OMPI_ERR_NOT_FOUND);
171 rc = OMPI_ERR_NOT_FOUND;
172 free(proc_list);
173 goto exit;
174 }
175 }
176 dense = false;
177 }
178 for (i=0; i < size; i++) {
179 opal_process_name_t proc_name;
180 if (ompi_proc_is_sentinel (proc_list[i])) {
181 proc_name = ompi_proc_sentinel_to_name ((uintptr_t) proc_list[i]);
182 } else {
183 proc_name = proc_list[i]->super.proc_name;
184 }
185 rc = opal_convert_process_name_to_string(&nstring, &proc_name);
186 if (OPAL_SUCCESS != rc) {
187 if (!dense) {
188 free(proc_list);
189 proc_list = NULL;
190 }
191 return OMPI_ERROR;
192 }
193 opal_argv_append_nosize(&members, nstring);
194 free(nstring);
195 if (NULL == (nstring = (char*)opal_pmix.get_nspace(proc_name.jobid))) {
196 opal_argv_free(members);
197 free (proc_list);
198 return OMPI_ERR_NOT_SUPPORTED;
199 }
200 opal_argv_append_nosize(&members, nstring);
201 }
202 if (!dense) {
203 free(proc_list);
204 proc_list = NULL;
205 }
206 }
207
208 if (rank == root) {
209
210 OBJ_CONSTRUCT(&info, opal_value_t);
211 OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t);
212 if (send_first) {
213 (void)opal_asprintf(&info.key, "%s:connect", port_string);
214 (void)opal_asprintf(&pdat.value.key, "%s:accept", port_string);
215 } else {
216 (void)opal_asprintf(&info.key, "%s:accept", port_string);
217 (void)opal_asprintf(&pdat.value.key, "%s:connect", port_string);
218 }
219 info.type = OPAL_STRING;
220 info.data.string = opal_argv_join(members, ':');
221 pdat.value.type = OPAL_STRING;
222
223 OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600);
224 OBJ_DESTRUCT(&info);
225 if (OPAL_SUCCESS != rc) {
226 OBJ_DESTRUCT(&pdat);
227 return rc;
228 }
229
230
231 rport = strdup(pdat.value.data.string);
232 rportlen = strlen(rport) + 1;
233 OBJ_DESTRUCT(&pdat);
234 }
235
236
237
238
239
240
241 rc = comm->c_coll->coll_bcast(&rportlen, 1, MPI_INT, root, comm,
242 comm->c_coll->coll_bcast_module);
243 if (OMPI_SUCCESS != rc) {
244 free(rport);
245 goto exit;
246 }
247
248 if (rank != root) {
249
250 rport = (char*)malloc(rportlen);
251 if (NULL == rport) {
252 rc = OMPI_ERR_OUT_OF_RESOURCE;
253 goto exit;
254 }
255 }
256
257 rc = comm->c_coll->coll_bcast(rport, rportlen, MPI_BYTE, root, comm,
258 comm->c_coll->coll_bcast_module);
259 if (OMPI_SUCCESS != rc) {
260 free(rport);
261 goto exit;
262 }
263
264
265
266 OBJ_CONSTRUCT(&mlist, opal_list_t);
267 for (i=0; NULL != members[i]; i++) {
268 nm = OBJ_NEW(opal_namelist_t);
269 if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
270 OMPI_ERROR_LOG(rc);
271 opal_argv_free(members);
272 free(rport);
273 OPAL_LIST_DESTRUCT(&mlist);
274 goto exit;
275 }
276
277 ++i;
278 if (NULL == members[i]) {
279
280 OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
281 OPAL_LIST_DESTRUCT(&mlist);
282 opal_argv_free(members);
283 free(rport);
284 rc = OMPI_ERR_BAD_PARAM;
285 goto exit;
286 }
287
288
289 if (OPAL_VPID_WILDCARD == nm->name.vpid) {
290 jobid = nm->name.jobid;
291 OBJ_RELEASE(nm);
292 for (k=0; k < size; k++) {
293 nm = OBJ_NEW(opal_namelist_t);
294 nm->name.jobid = jobid;
295 nm->name.vpid = k;
296 opal_list_append(&mlist, &nm->super);
297 }
298
299 if (NULL == members[i+1]) {
300
301 OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
302 OPAL_LIST_DESTRUCT(&mlist);
303 opal_argv_free(members);
304 free(rport);
305 rc = OMPI_ERR_BAD_PARAM;
306 goto exit;
307 }
308 ++i;
309 } else {
310 opal_list_append(&mlist, &nm->super);
311 }
312 }
313 opal_argv_free(members);
314 members = NULL;
315
316
317
318
319 members = opal_argv_split(rport, ':');
320 free(rport);
321
322
323
324 OBJ_CONSTRUCT(&ilist, opal_list_t);
325 OBJ_CONSTRUCT(&rlist, opal_list_t);
326
327 for (i=0; NULL != members[i]; i++) {
328 nm = OBJ_NEW(opal_namelist_t);
329 if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
330 OMPI_ERROR_LOG(rc);
331 opal_argv_free(members);
332 OPAL_LIST_DESTRUCT(&ilist);
333 OPAL_LIST_DESTRUCT(&rlist);
334 goto exit;
335 }
336
337 ++i;
338 if (NULL == members[i]) {
339 OMPI_ERROR_LOG(OMPI_ERR_NOT_SUPPORTED);
340 opal_argv_free(members);
341 OPAL_LIST_DESTRUCT(&ilist);
342 OPAL_LIST_DESTRUCT(&rlist);
343 goto exit;
344 }
345 opal_pmix.register_jobid(nm->name.jobid, members[i]);
346 if (OPAL_VPID_WILDCARD == nm->name.vpid) {
347 jobid = nm->name.jobid;
348 OBJ_RELEASE(nm);
349
350
351
352 if (NULL == members[i+1]) {
353
354 OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
355 opal_argv_free(members);
356 OPAL_LIST_DESTRUCT(&ilist);
357 OPAL_LIST_DESTRUCT(&rlist);
358 rc = OMPI_ERR_BAD_PARAM;
359 goto exit;
360 }
361 rsize = strtoul(members[i+1], NULL, 10);
362 ++i;
363 for (k=0; k < rsize; k++) {
364 nm = OBJ_NEW(opal_namelist_t);
365 nm->name.jobid = jobid;
366 nm->name.vpid = k;
367 opal_list_append(&mlist, &nm->super);
368
369 proc = ompi_proc_find_and_add(&nm->name, &isnew);
370 if (isnew) {
371 cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
372 cd->p = proc;
373 opal_list_append(&ilist, &cd->super);
374 }
375
376 cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
377 cd->p = proc;
378 opal_list_append(&rlist, &cd->super);
379 }
380 } else {
381 opal_list_append(&mlist, &nm->super);
382
383 proc = ompi_proc_find_and_add(&nm->name, &isnew);
384 if (isnew) {
385 cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
386 cd->p = proc;
387 opal_list_append(&ilist, &cd->super);
388 }
389
390 cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
391 cd->p = proc;
392 opal_list_append(&rlist, &cd->super);
393 }
394 }
395 opal_argv_free(members);
396
397
398
399
400 rc = opal_pmix.connect(&mlist);
401 OPAL_LIST_DESTRUCT(&mlist);
402 if (OPAL_SUCCESS != rc) {
403 OMPI_ERROR_LOG(rc);
404 OPAL_LIST_DESTRUCT(&ilist);
405 OPAL_LIST_DESTRUCT(&rlist);
406 goto exit;
407 }
408 if (0 < opal_list_get_size(&ilist)) {
409
410 new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist),
411 sizeof(ompi_proc_t *));
412 i = 0;
413 OPAL_LIST_FOREACH(cd, &ilist, ompi_dpm_proct_caddy_t) {
414 opal_value_t *kv;
415 proc = cd->p;
416 new_proc_list[i] = proc ;
417
418
419
420 ompi_proc_complete_init_single(proc);
421
422 kv = OBJ_NEW(opal_value_t);
423 kv->key = strdup(OPAL_PMIX_LOCALITY);
424 kv->type = OPAL_UINT16;
425 kv->data.uint16 = proc->super.proc_flags;
426 opal_pmix.store_local(&proc->super.proc_name, kv);
427 OBJ_RELEASE(kv);
428 ++i;
429 }
430
431 rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
432 free(new_proc_list);
433 new_proc_list = NULL;
434 if (OMPI_SUCCESS != rc) {
435 OMPI_ERROR_LOG(rc);
436 OPAL_LIST_DESTRUCT(&ilist);
437 goto exit;
438 }
439 }
440 OPAL_LIST_DESTRUCT(&ilist);
441
442
443 rsize = opal_list_get_size(&rlist);
444 new_group_pointer=ompi_group_allocate(rsize);
445 if (NULL == new_group_pointer) {
446 rc = OMPI_ERR_OUT_OF_RESOURCE;
447 OPAL_LIST_DESTRUCT(&rlist);
448 goto exit;
449 }
450
451 i=0;
452 OPAL_LIST_FOREACH(cd, &rlist, ompi_dpm_proct_caddy_t) {
453 new_group_pointer->grp_proc_pointers[i++] = cd->p;
454
455 OBJ_RETAIN(cd->p);
456 }
457 OPAL_LIST_DESTRUCT(&rlist);
458
459
460 rc = ompi_comm_set ( &newcomp,
461 comm,
462 group->grp_proc_count,
463 NULL,
464 rsize,
465 NULL ,
466 NULL,
467 comm->error_handler,
468 NULL,
469 group,
470 new_group_pointer
471 );
472 if (OMPI_SUCCESS != rc) {
473 goto exit;
474 }
475
476 OBJ_RELEASE(new_group_pointer);
477 new_group_pointer = MPI_GROUP_NULL;
478
479
480 rc = ompi_comm_nextcid ( newcomp,
481 comm,
482 NULL,
483 &root,
484 (void*)port_string,
485 send_first,
486 OMPI_COMM_CID_INTRA_PMIX);
487 if (OMPI_SUCCESS != rc) {
488 goto exit;
489 }
490
491
492 rc = ompi_comm_activate ( &newcomp,
493 comm,
494 NULL,
495 &root,
496 (void*)port_string,
497 send_first,
498 OMPI_COMM_CID_INTRA_PMIX);
499 if (OMPI_SUCCESS != rc) {
500 goto exit;
501 }
502
503
504
505
506
507
508 exit:
509 if (OMPI_SUCCESS != rc) {
510 if (MPI_COMM_NULL != newcomp && NULL != newcomp) {
511 OBJ_RELEASE(newcomp);
512 newcomp = MPI_COMM_NULL;
513 }
514 }
515
516 *newcomm = newcomp;
517 return rc;
518 }
519
520 static int construct_peers(ompi_group_t *group, opal_list_t *peers)
521 {
522 int i;
523 opal_namelist_t *nm, *n2;
524 ompi_proc_t *proct;
525 opal_process_name_t proc_name;
526
527 for (i=0; i < group->grp_proc_count; i++) {
528 if (OMPI_GROUP_IS_DENSE(group)) {
529 proct = group->grp_proc_pointers[i];
530 } else {
531 proct = ompi_group_peer_lookup(group, i);
532 }
533 if (NULL == proct) {
534 OMPI_ERROR_LOG(OMPI_ERR_NOT_FOUND);
535 return OMPI_ERR_NOT_FOUND;
536 }
537 if (ompi_proc_is_sentinel (proct)) {
538 proc_name = ompi_proc_sentinel_to_name ((uintptr_t)proct);
539 } else {
540 proc_name = proct->super.proc_name;
541 }
542
543
544 nm = OBJ_NEW(opal_namelist_t);
545 nm->name = proc_name;
546
547
548 OPAL_LIST_FOREACH(n2, peers, opal_namelist_t) {
549 if (opal_compare_proc(nm->name, n2->name) < 0) {
550 opal_list_insert_pos(peers, &n2->super, &nm->super);
551 nm = NULL;
552 break;
553 }
554 }
555 if (NULL != nm) {
556
557 opal_list_append(peers, &nm->super);
558 }
559 }
560 return OMPI_SUCCESS;
561 }
562
563 int ompi_dpm_disconnect(ompi_communicator_t *comm)
564 {
565 int ret;
566 ompi_group_t *group;
567 opal_list_t coll;
568
569
570
571
572
573
574
575 OBJ_CONSTRUCT(&coll, opal_list_t);
576
577
578 group = comm->c_local_group;
579 if (OMPI_SUCCESS != (ret = construct_peers(group, &coll))) {
580 OMPI_ERROR_LOG(ret);
581 OPAL_LIST_DESTRUCT(&coll);
582 return ret;
583 }
584
585 group = comm->c_remote_group;
586 if (OMPI_SUCCESS != (ret = construct_peers(group, &coll))) {
587 OMPI_ERROR_LOG(ret);
588 OPAL_LIST_DESTRUCT(&coll);
589 return ret;
590 }
591
592
593
594 if (OMPI_SUCCESS != (ret = opal_pmix.fence(&coll, false))) {
595 OMPI_ERROR_LOG(ret);
596 OPAL_LIST_DESTRUCT(&coll);
597 return ret;
598 }
599 OPAL_LIST_DESTRUCT(&coll);
600
601 return ret;
602 }
603
604 int ompi_dpm_spawn(int count, const char *array_of_commands[],
605 char **array_of_argv[],
606 const int array_of_maxprocs[],
607 const MPI_Info array_of_info[],
608 const char *port_name)
609 {
610 int rc, i, j;
611 int have_wdir=0;
612 int flag=0;
613 char cwd[OPAL_PATH_MAX];
614 char host[OPAL_MAX_INFO_VAL];
615 char prefix[OPAL_MAX_INFO_VAL];
616 char stdin_target[OPAL_MAX_INFO_VAL];
617 char params[OPAL_MAX_INFO_VAL];
618 char mapper[OPAL_MAX_INFO_VAL];
619 char slot_list[OPAL_MAX_INFO_VAL];
620 uint32_t ui32;
621 bool personality = false;
622 opal_jobid_t jobid;
623
624 opal_list_t apps;
625 opal_list_t job_info;
626 opal_pmix_app_t *app;
627 opal_value_t *info;
628 bool local_spawn, non_mpi;
629 char **envars;
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668 OBJ_CONSTRUCT(&job_info, opal_list_t);
669 OBJ_CONSTRUCT(&apps, opal_list_t);
670
671
672 for (i = 0; i < count; ++i) {
673 app = OBJ_NEW(opal_pmix_app_t);
674 if (NULL == app) {
675 OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
676 OPAL_LIST_DESTRUCT(&apps);
677 opal_progress_event_users_decrement();
678 return OMPI_ERR_OUT_OF_RESOURCE;
679 }
680
681 opal_list_append(&apps, &app->super);
682
683
684 app->cmd = strdup(array_of_commands[i]);
685 opal_argv_append_nosize(&app->argv, app->cmd);
686
687
688 app->maxprocs = array_of_maxprocs[i];
689
690
691 if (MPI_ARGVS_NULL != array_of_argv &&
692 MPI_ARGV_NULL != array_of_argv[i]) {
693 for (j=0; NULL != array_of_argv[i][j]; j++) {
694 opal_argv_append_nosize(&app->argv, array_of_argv[i][j]);
695 }
696 }
697
698
699
700
701 opal_setenv("OMPI_PARENT_PORT", port_name, true, &app->env);
702 for (j = 0; NULL != environ[j]; ++j) {
703 if (0 == strncmp(OPAL_MCA_PREFIX, environ[j], strlen(OPAL_MCA_PREFIX))) {
704 opal_argv_append_nosize(&app->env, environ[j]);
705 }
706 }
707
708
709 have_wdir = 0;
710 if ( array_of_info != NULL && array_of_info[i] != MPI_INFO_NULL ) {
711
712
713 ompi_info_get (array_of_info[i], "personality", sizeof(host) - 1, host, &flag);
714 if ( flag ) {
715 personality = true;
716 info = OBJ_NEW(opal_value_t);
717 info->key = strdup(OPAL_PMIX_PERSONALITY);
718 opal_value_load(info, host, OPAL_STRING);
719 opal_list_append(&job_info, &info->super);
720 }
721
722
723 ompi_info_get (array_of_info[i], "host", sizeof(host) - 1, host, &flag);
724 if ( flag ) {
725 info = OBJ_NEW(opal_value_t);
726 info->key = strdup(OPAL_PMIX_HOST);
727 opal_value_load(info, host, OPAL_STRING);
728 opal_list_append(&app->info, &info->super);
729 }
730
731
732 ompi_info_get (array_of_info[i], "hostfile", sizeof(host) - 1, host, &flag);
733 if ( flag ) {
734 info = OBJ_NEW(opal_value_t);
735 info->key = strdup(OPAL_PMIX_HOSTFILE);
736 opal_value_load(info, host, OPAL_STRING);
737 opal_list_append(&app->info, &info->super);
738 }
739
740
741 ompi_info_get (array_of_info[i], "add-hostfile", sizeof(host) - 1, host, &flag);
742 if ( flag ) {
743 info = OBJ_NEW(opal_value_t);
744 info->key = strdup(OPAL_PMIX_ADD_HOSTFILE);
745 opal_value_load(info, host, OPAL_STRING);
746 opal_list_append(&app->info, &info->super);
747 }
748
749
750 ompi_info_get (array_of_info[i], "add-host", sizeof(host) - 1, host, &flag);
751 if ( flag ) {
752 info = OBJ_NEW(opal_value_t);
753 info->key = strdup(OPAL_PMIX_ADD_HOST);
754 opal_value_load(info, host, OPAL_STRING);
755 opal_list_append(&app->info, &info->super);
756 }
757
758
759 ompi_info_get (array_of_info[i], "env", sizeof(host)-1, host, &flag);
760 if ( flag ) {
761 envars = opal_argv_split(host, '\n');
762 for (j=0; NULL != envars[j]; j++) {
763 opal_argv_append_nosize(&app->env, envars[j]);
764 }
765 opal_argv_free(envars);
766 }
767
768
769
770
771
772
773
774
775 ompi_info_get (array_of_info[i], "ompi_prefix", sizeof(prefix) - 1, prefix, &flag);
776 if ( flag ) {
777 info = OBJ_NEW(opal_value_t);
778 info->key = strdup(OPAL_PMIX_PREFIX);
779 opal_value_load(info, prefix, OPAL_STRING);
780 opal_list_append(&job_info, &info->super);
781 }
782
783
784 ompi_info_get (array_of_info[i], "wdir", sizeof(cwd) - 1, cwd, &flag);
785 if ( flag ) {
786 info = OBJ_NEW(opal_value_t);
787 info->key = strdup(OPAL_PMIX_WDIR);
788 opal_value_load(info, cwd, OPAL_STRING);
789 opal_list_append(&app->info, &info->super);
790 have_wdir = 1;
791 }
792
793
794 ompi_info_get(array_of_info[i], "mapper", sizeof(mapper) - 1, mapper, &flag);
795 if ( flag ) {
796 info = OBJ_NEW(opal_value_t);
797 info->key = strdup(OPAL_PMIX_MAPPER);
798 opal_value_load(info, mapper, OPAL_STRING);
799 opal_list_append(&job_info, &info->super);
800 }
801
802
803 ompi_info_get_bool(array_of_info[i], "display_map", &local_spawn, &flag);
804 if ( flag ) {
805 info = OBJ_NEW(opal_value_t);
806 info->key = strdup(OPAL_PMIX_DISPLAY_MAP);
807 opal_value_load(info, &local_spawn, OPAL_BOOL);
808 opal_list_append(&job_info, &info->super);
809 }
810
811
812 ompi_info_get (array_of_info[i], "npernode", sizeof(slot_list) - 1, slot_list, &flag);
813 if ( flag ) {
814 info = OBJ_NEW(opal_value_t);
815 info->key = strdup(OPAL_PMIX_PPR);
816 info->type = OPAL_STRING;
817 (void)opal_asprintf(&(info->data.string), "%s:n", slot_list);
818 opal_list_append(&job_info, &info->super);
819 }
820 ompi_info_get (array_of_info[i], "pernode", sizeof(slot_list) - 1, slot_list, &flag);
821 if ( flag ) {
822 info = OBJ_NEW(opal_value_t);
823 info->key = strdup(OPAL_PMIX_PPR);
824 opal_value_load(info, "1:n", OPAL_STRING);
825 opal_list_append(&job_info, &info->super);
826 }
827 ompi_info_get (array_of_info[i], "ppr", sizeof(slot_list) - 1, slot_list, &flag);
828 if ( flag ) {
829 info = OBJ_NEW(opal_value_t);
830 info->key = strdup(OPAL_PMIX_PPR);
831 opal_value_load(info, slot_list, OPAL_STRING);
832 opal_list_append(&job_info, &info->super);
833 }
834
835
836 ompi_info_get(array_of_info[i], "map_by", sizeof(slot_list) - 1, slot_list, &flag);
837 if ( flag ) {
838 info = OBJ_NEW(opal_value_t);
839 info->key = strdup(OPAL_PMIX_MAPBY);
840 opal_value_load(info, slot_list, OPAL_STRING);
841 opal_list_append(&job_info, &info->super);
842 }
843
844
845 ompi_info_get(array_of_info[i], "rank_by", sizeof(slot_list) - 1, slot_list, &flag);
846 if ( flag ) {
847 info = OBJ_NEW(opal_value_t);
848 info->key = strdup(OPAL_PMIX_RANKBY);
849 opal_value_load(info, slot_list, OPAL_STRING);
850 opal_list_append(&job_info, &info->super);
851 }
852
853
854 ompi_info_get(array_of_info[i], "bind_to", sizeof(slot_list) - 1, slot_list, &flag);
855 if ( flag ) {
856 info = OBJ_NEW(opal_value_t);
857 info->key = strdup(OPAL_PMIX_BINDTO);
858 opal_value_load(info, slot_list, OPAL_STRING);
859 opal_list_append(&job_info, &info->super);
860 }
861
862
863 ompi_info_get_bool(array_of_info[i], "ompi_preload_binary", &local_spawn, &flag);
864 if ( flag ) {
865 info = OBJ_NEW(opal_value_t);
866 info->key = strdup(OPAL_PMIX_PRELOAD_BIN);
867 opal_value_load(info, &local_spawn, OPAL_BOOL);
868 opal_list_append(&job_info, &info->super);
869 }
870
871
872 ompi_info_get (array_of_info[i], "ompi_preload_files", sizeof(cwd) - 1, cwd, &flag);
873 if ( flag ) {
874 info = OBJ_NEW(opal_value_t);
875 info->key = strdup(OPAL_PMIX_PRELOAD_FILES);
876 opal_value_load(info, cwd, OPAL_STRING);
877 opal_list_append(&job_info, &info->super);
878 }
879
880
881
882
883 ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi, &flag);
884 if (flag && non_mpi) {
885 info = OBJ_NEW(opal_value_t);
886 info->key = strdup(OPAL_PMIX_NON_PMI);
887 opal_value_load(info, &non_mpi, OPAL_BOOL);
888 opal_list_append(&job_info, &info->super);
889 }
890
891
892 ompi_info_get (array_of_info[i], "ompi_param", sizeof(params) - 1, params, &flag);
893 if ( flag ) {
894 opal_argv_append_unique_nosize(&app->env, params, true);
895 }
896
897
898
899
900 ompi_info_get (array_of_info[i], "ompi_stdin_target", sizeof(stdin_target) - 1, stdin_target, &flag);
901 if ( flag ) {
902 if (0 == strcmp(stdin_target, "all")) {
903 ui32 = OPAL_VPID_WILDCARD;
904 } else if (0 == strcmp(stdin_target, "none")) {
905 ui32 = OPAL_VPID_INVALID;
906 } else {
907 ui32 = strtoul(stdin_target, NULL, 10);
908 }
909 info = OBJ_NEW(opal_value_t);
910 info->key = strdup(OPAL_PMIX_STDIN_TGT);
911 opal_value_load(info, &ui32, OPAL_UINT32);
912 opal_list_append(&job_info, &info->super);
913 }
914 }
915
916
917
918
919 if ( !have_wdir ) {
920 if (OMPI_SUCCESS != (rc = opal_getcwd(cwd, OPAL_PATH_MAX))) {
921 OMPI_ERROR_LOG(rc);
922 OPAL_LIST_DESTRUCT(&apps);
923 opal_progress_event_users_decrement();
924 return rc;
925 }
926 info = OBJ_NEW(opal_value_t);
927 info->key = strdup(OPAL_PMIX_WDIR);
928 opal_value_load(info, cwd, OPAL_STRING);
929 opal_list_append(&app->info, &info->super);
930 }
931
932
933
934
935 }
936
937
938 if (!personality) {
939 info = OBJ_NEW(opal_value_t);
940 info->key = strdup(OPAL_PMIX_PERSONALITY);
941 opal_value_load(info, "ompi", OPAL_STRING);
942 opal_list_append(&job_info, &info->super);
943 }
944
945
946 rc = opal_pmix.spawn(&job_info, &apps, &jobid);
947 OPAL_LIST_DESTRUCT(&job_info);
948 OPAL_LIST_DESTRUCT(&apps);
949
950 if (OPAL_SUCCESS != rc) {
951 opal_progress_event_users_decrement();
952 return MPI_ERR_SPAWN;
953 }
954
955 return OMPI_SUCCESS;
956 }
957
958
959 int ompi_dpm_open_port(char *port_name)
960 {
961 uint32_t r;
962 char *tmp;
963
964 r = opal_rand(&rnd);
965 opal_convert_process_name_to_string(&tmp, OMPI_PROC_MY_NAME);
966 snprintf(port_name, MPI_MAX_PORT_NAME-1, "%s:%u", tmp, r);
967 port_name[MPI_MAX_PORT_NAME - 1] = '\0';
968 free(tmp);
969 return OMPI_SUCCESS;
970 }
971
972 int ompi_dpm_close_port(const char *port_name)
973 {
974
975 return OMPI_SUCCESS;
976 }
977
978 int ompi_dpm_dyn_init(void)
979 {
980 int root=0, rc;
981 bool send_first = true;
982 ompi_communicator_t *newcomm=NULL;
983 char *port_name=NULL, *tmp, *ptr;
984
985
986 tmp = getenv("OMPI_PARENT_PORT");
987 if (NULL == tmp) {
988
989 return OMPI_SUCCESS;
990 }
991
992
993
994
995
996 if ('"' == tmp[0]) {
997
998 tmp[strlen(tmp)-1] = '\0';
999 ptr = &tmp[1];
1000 } else {
1001 ptr = &tmp[0];
1002 }
1003 port_name = strdup(ptr);
1004
1005 rc = ompi_dpm_connect_accept(MPI_COMM_WORLD, root, port_name, send_first, &newcomm);
1006 free(port_name);
1007 if (OMPI_SUCCESS != rc) {
1008 return rc;
1009 }
1010
1011
1012
1013
1014
1015 OBJ_RELEASE(ompi_mpi_comm_parent->c_local_group);
1016 OBJ_RELEASE(ompi_mpi_comm_parent->error_handler);
1017 OBJ_RELEASE(ompi_mpi_comm_parent);
1018
1019
1020 ompi_mpi_comm_parent = newcomm;
1021
1022
1023 snprintf(newcomm->c_name, MPI_MAX_OBJECT_NAME, "MPI_COMM_PARENT");
1024 newcomm->c_flags |= OMPI_COMM_NAMEISSET;
1025
1026 return OMPI_SUCCESS;
1027 }
1028
1029
1030
1031
1032
1033 int ompi_dpm_finalize(void)
1034 {
1035 return OMPI_SUCCESS;
1036 }
1037
1038
1039
1040
1041
1042
1043
1044 int ompi_dpm_dyn_finalize(void)
1045 {
1046 int i,j=0, max=0;
1047 ompi_dpm_disconnect_obj **objs=NULL;
1048 ompi_communicator_t *comm=NULL;
1049
1050 if (1 <ompi_comm_num_dyncomm) {
1051 objs = (ompi_dpm_disconnect_obj**)malloc(ompi_comm_num_dyncomm*
1052 sizeof(ompi_dpm_disconnect_obj*));
1053 if (NULL == objs) {
1054 return OMPI_ERR_OUT_OF_RESOURCE;
1055 }
1056
1057 max = opal_pointer_array_get_size(&ompi_mpi_communicators);
1058 for (i=3; i<max; i++) {
1059 comm = (ompi_communicator_t*)opal_pointer_array_get_item(&ompi_mpi_communicators,i);
1060 if (NULL != comm && OMPI_COMM_IS_DYNAMIC(comm)) {
1061 objs[j++] = disconnect_init(comm);
1062 }
1063 }
1064
1065 if (j != ompi_comm_num_dyncomm+1) {
1066 free(objs);
1067 return OMPI_ERROR;
1068 }
1069
1070 disconnect_waitall(ompi_comm_num_dyncomm, objs);
1071 free(objs);
1072 }
1073
1074 return OMPI_SUCCESS;
1075 }
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091 static ompi_dpm_disconnect_obj *disconnect_init(ompi_communicator_t *comm)
1092 {
1093 ompi_dpm_disconnect_obj *obj=NULL;
1094 int ret;
1095 int i;
1096
1097 obj = (ompi_dpm_disconnect_obj*)calloc(1,sizeof(ompi_dpm_disconnect_obj));
1098 if (NULL == obj) {
1099 opal_output(0, "Could not allocate disconnect object");
1100 return NULL;
1101 }
1102
1103 if (OMPI_COMM_IS_INTER(comm)) {
1104 obj->size = ompi_comm_remote_size(comm);
1105 } else {
1106 obj->size = ompi_comm_size(comm);
1107 }
1108
1109 obj->comm = comm;
1110 obj->reqs = (ompi_request_t**)malloc(2*obj->size*sizeof(ompi_request_t *));
1111 if (NULL == obj->reqs) {
1112 opal_output(0, "Could not allocate request array for disconnect object");
1113 free(obj);
1114 return NULL;
1115 }
1116
1117
1118
1119 for (i=0; i < obj->size; i++) {
1120 ret = MCA_PML_CALL(irecv(&(obj->buf), 0, MPI_INT, i,
1121 OMPI_COMM_BARRIER_TAG, comm,
1122 &(obj->reqs[2*i])));
1123
1124 if (OMPI_SUCCESS != ret) {
1125 opal_output(0, "dpm_disconnect_init: error %d in irecv to process %d", ret, i);
1126 free(obj->reqs);
1127 free(obj);
1128 return NULL;
1129 }
1130 ret = MCA_PML_CALL(isend(&(obj->buf), 0, MPI_INT, i,
1131 OMPI_COMM_BARRIER_TAG,
1132 MCA_PML_BASE_SEND_SYNCHRONOUS,
1133 comm, &(obj->reqs[2*i+1])));
1134
1135 if (OMPI_SUCCESS != ret) {
1136 opal_output(0, "dpm_disconnect_init: error %d in isend to process %d", ret, i);
1137 free(obj->reqs);
1138 free(obj);
1139 return NULL;
1140 }
1141 }
1142
1143
1144 return obj;
1145 }
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155 static int disconnect_waitall (int count, ompi_dpm_disconnect_obj **objs)
1156 {
1157
1158 ompi_request_t **reqs=NULL;
1159 char *treq=NULL;
1160 int totalcount = 0;
1161 int i;
1162 int ret;
1163
1164 for (i=0; i<count; i++) {
1165 if (NULL == objs[i]) {
1166 opal_output(0, "Error in comm_disconnect_waitall");
1167 return OMPI_ERROR;
1168 }
1169
1170 totalcount += objs[i]->size;
1171 }
1172
1173 reqs = (ompi_request_t**)malloc(2*totalcount*sizeof(ompi_request_t *));
1174 if (NULL == reqs) {
1175 opal_output(0, "ompi_comm_disconnect_waitall: error allocating memory");
1176 return OMPI_ERROR;
1177 }
1178
1179
1180 treq = (char *)reqs;
1181 for (i=0; i<count; i++) {
1182 memcpy(treq, objs[i]->reqs, 2*objs[i]->size * sizeof(ompi_request_t *));
1183 treq += 2*objs[i]->size * sizeof(ompi_request_t *);
1184 }
1185
1186
1187 ret = ompi_request_wait_all(2*totalcount, reqs, MPI_STATUSES_IGNORE);
1188
1189
1190 for (i=0; i< count; i++ ) {
1191 if (NULL != objs[i]->reqs ) {
1192 free(objs[i]->reqs );
1193 }
1194 free(objs[i]);
1195 }
1196
1197 free(reqs);
1198
1199 return ret;
1200 }
1201
1202
1203
1204
1205 static bool ompi_dpm_group_is_dyn (ompi_group_t *group, ompi_jobid_t thisjobid)
1206 {
1207 int size = group ? ompi_group_size (group) : 0;
1208
1209 for (int i = 0 ; i < size ; ++i) {
1210 opal_process_name_t name = ompi_group_get_proc_name (group, i);
1211
1212 if (thisjobid != ((ompi_process_name_t *) &name)->jobid) {
1213
1214 return true;
1215 }
1216 }
1217
1218 return false;
1219 }
1220
1221
1222
1223
1224
1225
1226 void ompi_dpm_mark_dyncomm(ompi_communicator_t *comm)
1227 {
1228 bool found;
1229 ompi_jobid_t thisjobid;
1230
1231
1232 if (comm == MPI_COMM_NULL) {
1233 return;
1234 }
1235
1236 thisjobid = ompi_group_get_proc_name (comm->c_local_group, 0).jobid;
1237
1238
1239
1240
1241 found = ompi_dpm_group_is_dyn (comm->c_local_group, thisjobid);
1242 if (!found) {
1243
1244
1245
1246 found = ompi_dpm_group_is_dyn (comm->c_remote_group, thisjobid);
1247 }
1248
1249
1250 if (found) {
1251 ompi_comm_num_dyncomm++;
1252 OMPI_COMM_SET_DYNAMIC(comm);
1253 }
1254 }