This source file includes following definitions.
- orte_rmaps_resilient_map
- resilient_assign
- orte_getline
- construct_ftgrps
- get_ftgrp_target
- get_new_node
- flag_nodes
- map_to_ftgrps
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 #include "orte_config.h"
18 #include "orte/constants.h"
19 #include "orte/types.h"
20
21 #include <errno.h>
22 #ifdef HAVE_UNISTD_H
23 #include <unistd.h>
24 #endif
25 #include <string.h>
26 #include <stdio.h>
27
28 #include "opal/util/argv.h"
29 #include "opal/class/opal_pointer_array.h"
30
31 #include "orte/util/error_strings.h"
32 #include "orte/util/show_help.h"
33 #include "orte/mca/errmgr/errmgr.h"
34
35 #include "orte/mca/rmaps/base/rmaps_private.h"
36 #include "orte/mca/rmaps/base/base.h"
37 #include "rmaps_resilient.h"
38
39 static int orte_rmaps_resilient_map(orte_job_t *jdata);
40 static int resilient_assign(orte_job_t *jdata);
41
42 orte_rmaps_base_module_t orte_rmaps_resilient_module = {
43 .map_job = orte_rmaps_resilient_map,
44 .assign_locations = resilient_assign
45 };
46
47
48
49
50
51 static char *orte_getline(FILE *fp);
52 static bool have_ftgrps=false, made_ftgrps=false;
53
54 static int construct_ftgrps(void);
55 static int get_ftgrp_target(orte_proc_t *proc,
56 orte_rmaps_res_ftgrp_t **target,
57 orte_node_t **nd);
58 static int get_new_node(orte_proc_t *proc,
59 orte_app_context_t *app,
60 orte_job_map_t *map,
61 orte_node_t **ndret);
62 static int map_to_ftgrps(orte_job_t *jdata);
63
64
65
66
67 static int orte_rmaps_resilient_map(orte_job_t *jdata)
68 {
69 orte_app_context_t *app;
70 int i, j;
71 int rc = ORTE_SUCCESS;
72 orte_node_t *nd=NULL, *oldnode, *node, *nptr;
73 orte_rmaps_res_ftgrp_t *target = NULL;
74 orte_proc_t *proc;
75 orte_vpid_t totprocs;
76 opal_list_t node_list;
77 orte_std_cntr_t num_slots;
78 opal_list_item_t *item;
79 mca_base_component_t *c = &mca_rmaps_resilient_component.super.base_version;
80 bool found;
81
82 if (!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_RESTART)) {
83 if (NULL != jdata->map->req_mapper &&
84 0 != strcasecmp(jdata->map->req_mapper, c->mca_component_name)) {
85
86 opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
87 "mca:rmaps:resilient: job %s not using resilient mapper",
88 ORTE_JOBID_PRINT(jdata->jobid));
89 return ORTE_ERR_TAKE_NEXT_OPTION;
90 }
91 if (NULL == mca_rmaps_resilient_component.fault_group_file) {
92 opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
93 "mca:rmaps:resilient: cannot perform initial map of job %s - no fault groups",
94 ORTE_JOBID_PRINT(jdata->jobid));
95 return ORTE_ERR_TAKE_NEXT_OPTION;
96 }
97 } else if (!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_PROCS_MIGRATING)) {
98 opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
99 "mca:rmaps:resilient: cannot map job %s - not in restart or migrating",
100 ORTE_JOBID_PRINT(jdata->jobid));
101 return ORTE_ERR_TAKE_NEXT_OPTION;
102 }
103
104 opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
105 "mca:rmaps:resilient: mapping job %s",
106 ORTE_JOBID_PRINT(jdata->jobid));
107
108
109 if (NULL != jdata->map->last_mapper) {
110 free(jdata->map->last_mapper);
111 }
112 jdata->map->last_mapper = strdup(c->mca_component_name);
113
114
115 if (!made_ftgrps) {
116 construct_ftgrps();
117 }
118
119 if (ORTE_JOB_STATE_INIT == jdata->state) {
120
121
122
123 return map_to_ftgrps(jdata);
124 }
125
126
127
128
129
130 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
131 "%s rmaps:resilient: remapping job %s",
132 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
133 ORTE_JOBID_PRINT(jdata->jobid)));
134
135
136 for (i=0; i < jdata->procs->size; i++) {
137
138 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
139 continue;
140 }
141 OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
142 "%s PROC %s STATE %s",
143 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
144 ORTE_NAME_PRINT(&proc->name),
145 orte_proc_state_to_str(proc->state)));
146
147 if (proc->state != ORTE_PROC_STATE_RESTART) {
148 continue;
149 }
150
151 oldnode = proc->node;
152
153 app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proc->app_idx);
154 if( NULL == app ) {
155 ORTE_ERROR_LOG(ORTE_ERR_FAILED_TO_MAP);
156 rc = ORTE_ERR_FAILED_TO_MAP;
157 goto error;
158 }
159
160 if (NULL == oldnode) {
161 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
162 "%s rmaps:resilient: proc %s is to be started",
163 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
164 ORTE_NAME_PRINT(&proc->name)));
165 } else {
166 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
167 "%s rmaps:resilient: proc %s from node %s[%s] is to be restarted",
168 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
169 ORTE_NAME_PRINT(&proc->name),
170 (NULL == oldnode->name) ? "NULL" : oldnode->name,
171 (NULL == oldnode->daemon) ? "--" : ORTE_VPID_PRINT(oldnode->daemon->name.vpid)));
172 }
173
174 if (NULL == oldnode) {
175
176
177
178
179 OBJ_CONSTRUCT(&node_list, opal_list_t);
180 if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list,
181 &num_slots,
182 app,
183 jdata->map->mapping,
184 false, false))) {
185 ORTE_ERROR_LOG(rc);
186 while (NULL != (item = opal_list_remove_first(&node_list))) {
187 OBJ_RELEASE(item);
188 }
189 OBJ_DESTRUCT(&node_list);
190 goto error;
191 }
192 if (opal_list_is_empty(&node_list)) {
193
194 OBJ_DESTRUCT(&node_list);
195 proc->state = ORTE_PROC_STATE_MIGRATING;
196 rc = ORTE_ERR_OUT_OF_RESOURCE;
197 goto error;
198 }
199 totprocs = 1000000;
200 nd = NULL;
201 while (NULL != (item = opal_list_remove_first(&node_list))) {
202 node = (orte_node_t*)item;
203 if (node->num_procs < totprocs) {
204 nd = node;
205 totprocs = node->num_procs;
206 }
207 OBJ_RELEASE(item);
208 }
209 OBJ_DESTRUCT(&node_list);
210
211
212
213 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
214 "%s rmaps:resilient: Placing new process on node %s[%s] (no ftgrp)",
215 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
216 nd->name,
217 (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
218 } else {
219
220 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
221 "%s rmaps:resilient: proc %s from node %s is to be restarted",
222 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
223 ORTE_NAME_PRINT(&proc->name),
224 (NULL == proc->node) ? "NULL" : proc->node->name));
225
226
227 if (have_ftgrps) {
228 if (ORTE_SUCCESS != (rc = get_ftgrp_target(proc, &target, &nd))) {
229 ORTE_ERROR_LOG(rc);
230 goto error;
231 }
232 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
233 "%s rmaps:resilient: placing proc %s into fault group %d node %s",
234 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
235 ORTE_NAME_PRINT(&proc->name), target->ftgrp, nd->name));
236 } else {
237 if (ORTE_SUCCESS != (rc = get_new_node(proc, app, jdata->map, &nd))) {
238 ORTE_ERROR_LOG(rc);
239 return rc;
240 }
241 }
242 }
243
244
245
246 found = false;
247 for (j=0; j < jdata->map->nodes->size; j++) {
248 if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, j))) {
249 continue;
250 }
251 if (nptr == nd) {
252 found = true;
253 break;
254 }
255 }
256 if (!found) {
257 OBJ_RETAIN(nd);
258 opal_pointer_array_add(jdata->map->nodes, nd);
259 ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_MAPPED);
260 }
261 OBJ_RETAIN(nd);
262 proc->node = nd;
263 nd->num_procs++;
264 opal_pointer_array_add(nd->procs, (void*)proc);
265
266 OBJ_RETAIN(proc);
267
268
269 proc->state = ORTE_PROC_STATE_INIT;
270
271
272
273
274 orte_rmaps_base_update_local_ranks(jdata, oldnode, nd, proc);
275 }
276
277 error:
278 return rc;
279 }
280
281 static int resilient_assign(orte_job_t *jdata)
282 {
283 mca_base_component_t *c = &mca_rmaps_resilient_component.super.base_version;
284
285 if (NULL == jdata->map->last_mapper ||
286 0 != strcasecmp(jdata->map->last_mapper, c->mca_component_name)) {
287
288 opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
289 "mca:rmaps:resilient: job %s not using resilient assign: %s",
290 ORTE_JOBID_PRINT(jdata->jobid),
291 (NULL == jdata->map->last_mapper) ? "NULL" : jdata->map->last_mapper);
292 return ORTE_ERR_TAKE_NEXT_OPTION;
293 }
294
295 return ORTE_ERR_NOT_IMPLEMENTED;
296 }
297
298 static char *orte_getline(FILE *fp)
299 {
300 char *ret, *buff;
301 char input[1024];
302
303 ret = fgets(input, 1024, fp);
304 if (NULL != ret) {
305 input[strlen(input)-1] = '\0';
306 buff = strdup(input);
307 return buff;
308 }
309
310 return NULL;
311 }
312
313
314 static int construct_ftgrps(void)
315 {
316 orte_rmaps_res_ftgrp_t *ftgrp;
317 orte_node_t *node;
318 FILE *fp;
319 char *ftinput;
320 int grp;
321 char **nodes;
322 bool found;
323 int i, k;
324
325
326 made_ftgrps = true;
327
328 if (NULL == mca_rmaps_resilient_component.fault_group_file) {
329
330 return ORTE_SUCCESS;
331 }
332
333
334 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
335 "%s rmaps:resilient: constructing fault groups",
336 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
337 fp = fopen(mca_rmaps_resilient_component.fault_group_file, "r");
338 if (NULL == fp) {
339 orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:file-not-found",
340 true, mca_rmaps_resilient_component.fault_group_file);
341 return ORTE_ERR_FAILED_TO_MAP;
342 }
343
344
345 grp = 0;
346 while (NULL != (ftinput = orte_getline(fp))) {
347 ftgrp = OBJ_NEW(orte_rmaps_res_ftgrp_t);
348 ftgrp->ftgrp = grp++;
349 nodes = opal_argv_split(ftinput, ',');
350
351 for (k=0; k < opal_argv_count(nodes); k++) {
352 found = false;
353 for (i=0; i < orte_node_pool->size && !found; i++) {
354 if (NULL == (node = opal_pointer_array_get_item(orte_node_pool, i))) {
355 continue;
356 }
357 if (0 == strcmp(node->name, nodes[k])) {
358 OBJ_RETAIN(node);
359 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
360 "%s rmaps:resilient: adding node %s to fault group %d",
361 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
362 node->name, ftgrp->ftgrp));
363 opal_pointer_array_add(&ftgrp->nodes, node);
364 found = true;
365 break;
366 }
367 }
368 }
369 opal_list_append(&mca_rmaps_resilient_component.fault_grps, &ftgrp->super);
370 opal_argv_free(nodes);
371 free(ftinput);
372 }
373 fclose(fp);
374
375
376 have_ftgrps = true;
377 return ORTE_SUCCESS;
378 }
379
380 static int get_ftgrp_target(orte_proc_t *proc,
381 orte_rmaps_res_ftgrp_t **tgt,
382 orte_node_t **ndret)
383 {
384 opal_list_item_t *item;
385 int k, totnodes;
386 orte_node_t *node, *nd;
387 orte_rmaps_res_ftgrp_t *target, *ftgrp;
388 float avgload, minload;
389 orte_vpid_t totprocs, lowprocs;
390
391
392 *tgt = NULL;
393 *ndret = NULL;
394
395
396
397
398 minload = 1000000.0;
399 target = NULL;
400 for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
401 item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
402 item = opal_list_get_next(item)) {
403 ftgrp = (orte_rmaps_res_ftgrp_t*)item;
404
405 ftgrp->included = true;
406 ftgrp->used = false;
407 for (k=0; k < ftgrp->nodes.size; k++) {
408 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
409 continue;
410 }
411 if (NULL != proc->node && 0 == strcmp(node->name, proc->node->name)) {
412
413 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
414 "%s rmaps:resilient: node %s is in fault group %d, which will be excluded",
415 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
416 proc->node->name, ftgrp->ftgrp));
417 ftgrp->included = false;
418 break;
419 }
420 }
421
422 if (!ftgrp->included) {
423 continue;
424 }
425
426 totprocs = 0;
427 totnodes = 0;
428 for (k=0; k < ftgrp->nodes.size; k++) {
429 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
430 continue;
431 }
432 totnodes++;
433 totprocs += node->num_procs;
434 }
435 avgload = (float)totprocs / (float)totnodes;
436
437 if (avgload < minload) {
438 minload = avgload;
439 target = ftgrp;
440 OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
441 "%s rmaps:resilient: found new min load ftgrp %d",
442 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
443 ftgrp->ftgrp));
444 }
445 }
446
447 if (NULL == target) {
448
449 return ORTE_ERR_NOT_FOUND;
450 }
451
452
453
454
455 lowprocs = 1000000;
456 nd = NULL;
457 for (k=0; k < target->nodes.size; k++) {
458 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
459 continue;
460 }
461 if (node->num_procs < lowprocs) {
462 lowprocs = node->num_procs;
463 nd = node;
464 }
465 }
466
467
468 *tgt = target;
469 *ndret = nd;
470
471 return ORTE_SUCCESS;
472 }
473
474 static int get_new_node(orte_proc_t *proc,
475 orte_app_context_t *app,
476 orte_job_map_t *map,
477 orte_node_t **ndret)
478 {
479 orte_node_t *nd, *oldnode, *node;
480 orte_proc_t *pptr;
481 int rc, j;
482 opal_list_t node_list, candidates;
483 opal_list_item_t *item, *next;
484 orte_std_cntr_t num_slots;
485 bool found;
486
487
488 *ndret = NULL;
489 nd = NULL;
490 oldnode = NULL;
491 orte_get_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, (void**)&oldnode, OPAL_PTR);
492
493
494
495
496 OBJ_CONSTRUCT(&node_list, opal_list_t);
497 if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list,
498 &num_slots,
499 app,
500 map->mapping,
501 false, false))) {
502 ORTE_ERROR_LOG(rc);
503 goto release;
504 }
505 if (opal_list_is_empty(&node_list)) {
506 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
507 rc = ORTE_ERR_OUT_OF_RESOURCE;
508 goto release;
509 }
510
511 if (1 == opal_list_get_size(&node_list)) {
512
513
514
515
516 nd = (orte_node_t*)opal_list_get_first(&node_list);
517 orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, oldnode, OPAL_PTR);
518 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
519 "%s rmaps:resilient: Placing process %s on node %s[%s] (only one avail node)",
520 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
521 ORTE_NAME_PRINT(&proc->name),
522 nd->name,
523 (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
524 goto release;
525 }
526
527
528
529
530
531
532
533 OBJ_CONSTRUCT(&candidates, opal_list_t);
534 while (NULL != (item = opal_list_remove_first(&node_list))) {
535 node = (orte_node_t*)item;
536
537 if (node == oldnode) {
538 OBJ_RELEASE(item);
539 continue;
540 }
541 if (0 == node->num_procs) {
542 OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
543 "%s PREPENDING EMPTY NODE %s[%s] TO CANDIDATES",
544 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
545 (NULL == node->name) ? "NULL" : node->name,
546 (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
547 opal_list_prepend(&candidates, item);
548 } else {
549 OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
550 "%s APPENDING NON-EMPTY NODE %s[%s] TO CANDIDATES",
551 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
552 (NULL == node->name) ? "NULL" : node->name,
553 (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
554 opal_list_append(&candidates, item);
555 }
556 }
557
558
559
560
561
562
563
564
565
566 nd = NULL;
567 item = opal_list_get_first(&candidates);
568 while (item != opal_list_get_end(&candidates)) {
569 node = (orte_node_t*)item;
570 next = opal_list_get_next(item);
571
572
573
574 if (NULL != oldnode && node == oldnode) {
575 OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
576 "%s REMOVING PRIOR NODE %s[%s] FROM CANDIDATES",
577 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
578 (NULL == node->name) ? "NULL" : node->name,
579 (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
580 opal_list_remove_item(&candidates, item);
581 OBJ_RELEASE(item);
582 item = next;
583 continue;
584 }
585
586 if (0 == node->num_procs) {
587 nd = node;
588 orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, oldnode, OPAL_PTR);
589 break;
590 }
591
592
593
594 found = false;
595 for (j=0; j < node->procs->size; j++) {
596 if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(node->procs, j))) {
597 continue;
598 }
599 if (pptr->name.jobid == proc->name.jobid) {
600 OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
601 "%s FOUND PEER %s ON NODE %s[%s]",
602 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
603 ORTE_NAME_PRINT(&pptr->name),
604 (NULL == node->name) ? "NULL" : node->name,
605 (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
606 found = true;
607 break;
608 }
609 }
610 if (found) {
611 item = next;
612 continue;
613 }
614
615 nd = node;
616 orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, oldnode, OPAL_PTR);
617 break;
618 }
619 if (NULL == nd) {
620
621 if (NULL != oldnode) {
622 nd = oldnode;
623 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
624 "%s rmaps:resilient: Placing process %s on prior node %s[%s] (no ftgrp)",
625 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
626 ORTE_NAME_PRINT(&proc->name),
627 (NULL == nd->name) ? "NULL" : nd->name,
628 (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
629 } else {
630 nd = proc->node;
631 orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, nd, OPAL_PTR);
632 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
633 "%s rmaps:resilient: Placing process %s back on same node %s[%s] (no ftgrp)",
634 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
635 ORTE_NAME_PRINT(&proc->name),
636 (NULL == nd->name) ? "NULL" : nd->name,
637 (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
638 }
639
640 }
641
642 while (NULL != (item = opal_list_remove_first(&candidates))) {
643 OBJ_RELEASE(item);
644 }
645 OBJ_DESTRUCT(&candidates);
646
647 release:
648 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
649 "%s rmaps:resilient: Placing process on node %s[%s] (no ftgrp)",
650 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
651 (NULL == nd->name) ? "NULL" : nd->name,
652 (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
653
654 while (NULL != (item = opal_list_remove_first(&node_list))) {
655 OBJ_RELEASE(item);
656 }
657 OBJ_DESTRUCT(&node_list);
658
659 *ndret = nd;
660 return rc;
661 }
662
663 static void flag_nodes(opal_list_t *node_list)
664 {
665 opal_list_item_t *item, *nitem;
666 orte_node_t *node, *nd;
667 orte_rmaps_res_ftgrp_t *ftgrp;
668 int k;
669
670 for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
671 item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
672 item = opal_list_get_next(item)) {
673 ftgrp = (orte_rmaps_res_ftgrp_t*)item;
674
675 ftgrp->used = false;
676 ftgrp->included = false;
677
678
679
680 for (nitem = opal_list_get_first(node_list);
681 !ftgrp->included && nitem != opal_list_get_end(node_list);
682 nitem = opal_list_get_next(nitem)) {
683 node = (orte_node_t*)nitem;
684 for (k=0; k < ftgrp->nodes.size; k++) {
685 if (NULL == (nd = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
686 continue;
687 }
688 if (0 == strcmp(nd->name, node->name)) {
689 ftgrp->included = true;
690 break;
691 }
692 }
693 }
694 }
695 }
696
697 static int map_to_ftgrps(orte_job_t *jdata)
698 {
699 orte_job_map_t *map;
700 orte_app_context_t *app;
701 int i, j, k, totnodes;
702 opal_list_t node_list;
703 opal_list_item_t *item, *next, *curitem;
704 orte_std_cntr_t num_slots;
705 int rc = ORTE_SUCCESS;
706 float avgload, minload;
707 orte_node_t *node, *nd=NULL;
708 orte_rmaps_res_ftgrp_t *ftgrp, *target = NULL;
709 orte_vpid_t totprocs, num_assigned;
710 bool initial_map=true;
711
712 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
713 "%s rmaps:resilient: creating initial map for job %s",
714 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
715 ORTE_JOBID_PRINT(jdata->jobid)));
716
717
718 jdata->num_procs = 0;
719 map = jdata->map;
720
721 for (i=0; i < jdata->apps->size; i++) {
722
723 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
724 continue;
725 }
726
727
728
729 if (0 == app->num_procs) {
730 orte_show_help("help-orte-rmaps-resilient.txt",
731 "orte-rmaps-resilient:num-procs",
732 true);
733 return ORTE_ERR_SILENT;
734 }
735 num_assigned = 0;
736
737
738
739
740 OBJ_CONSTRUCT(&node_list, opal_list_t);
741 if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app,
742 map->mapping, initial_map, false))) {
743 ORTE_ERROR_LOG(rc);
744 return rc;
745 }
746
747 initial_map = false;
748
749
750 item = opal_list_get_first(&node_list);
751 while (item != opal_list_get_end(&node_list)) {
752 next = opal_list_get_next(item);
753 node = (orte_node_t*)item;
754 if (ORTE_NODE_STATE_UP != node->state ||
755 NULL == node->daemon ||
756 ORTE_PROC_STATE_RUNNING != node->daemon->state) {
757 opal_list_remove_item(&node_list, item);
758 OBJ_RELEASE(item);
759 }
760 item = next;
761 }
762 curitem = opal_list_get_first(&node_list);
763
764
765 flag_nodes(&node_list);
766
767
768
769 for (j=0; j < app->num_procs; j++) {
770
771
772
773 target = NULL;
774 minload = 1000000000.0;
775 for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
776 item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
777 item = opal_list_get_next(item)) {
778 ftgrp = (orte_rmaps_res_ftgrp_t*)item;
779 OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
780 "%s rmaps:resilient: fault group %d used: %s included %s",
781 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
782 ftgrp->ftgrp,
783 ftgrp->used ? "YES" : "NO",
784 ftgrp->included ? "YES" : "NO" ));
785
786
787
788 if (ftgrp->used || !ftgrp->included) {
789 continue;
790 }
791
792 totprocs = 0;
793 totnodes = 0;
794 for (k=0; k < ftgrp->nodes.size; k++) {
795 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
796 continue;
797 }
798 totnodes++;
799 totprocs += node->num_procs;
800 }
801 avgload = (float)totprocs / (float)totnodes;
802 if (avgload < minload) {
803 minload = avgload;
804 target = ftgrp;
805 OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
806 "%s rmaps:resilient: found new min load ftgrp %d",
807 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
808 ftgrp->ftgrp));
809 }
810 }
811
812
813
814
815
816 if (NULL == target) {
817 OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
818 "%s rmaps:resilient: more procs than fault groups - mapping excess rr",
819 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
820 nd = (orte_node_t*)curitem;
821 curitem = opal_list_get_next(curitem);
822 if (curitem == opal_list_get_end(&node_list)) {
823 curitem = opal_list_get_first(&node_list);
824 }
825 } else {
826
827 totprocs = 1000000;
828 for (k=0; k < target->nodes.size; k++) {
829 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
830 continue;
831 }
832 if (node->num_procs < totprocs) {
833 totprocs = node->num_procs;
834 nd = node;
835 }
836 }
837 }
838 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
839 "%s rmaps:resilient: placing proc into fault group %d node %s",
840 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
841 (NULL == target) ? -1 : target->ftgrp, nd->name));
842
843 if (!ORTE_FLAG_TEST(nd, ORTE_NODE_FLAG_MAPPED)) {
844 OBJ_RETAIN(nd);
845 opal_pointer_array_add(map->nodes, nd);
846 ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_MAPPED);
847 }
848 if (NULL == orte_rmaps_base_setup_proc(jdata, nd, app->idx)) {
849 ORTE_ERROR_LOG(ORTE_ERROR);
850 return ORTE_ERROR;
851 }
852 if ((nd->slots < (int)nd->num_procs) ||
853 (0 < nd->slots_max && nd->slots_max < (int)nd->num_procs)) {
854 if (ORTE_MAPPING_NO_OVERSUBSCRIBE & ORTE_GET_MAPPING_DIRECTIVE(jdata->map->mapping)) {
855 orte_show_help("help-orte-rmaps-base.txt", "orte-rmaps-base:alloc-error",
856 true, nd->num_procs, app->app);
857 ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
858 return ORTE_ERR_SILENT;
859 }
860
861
862
863 ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_OVERSUBSCRIBED);
864 ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_OVERSUBSCRIBED);
865 }
866
867
868 num_assigned++;
869
870
871 if (NULL != target) {
872 target->used = true;
873 }
874 }
875
876
877 jdata->num_procs += app->num_procs;
878
879
880
881
882 while (NULL != (item = opal_list_remove_first(&node_list))) {
883 OBJ_RELEASE(item);
884 }
885 OBJ_DESTRUCT(&node_list);
886 }
887
888 return ORTE_SUCCESS;
889 }