root/orte/mca/rmaps/resilient/rmaps_resilient.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_rmaps_resilient_map
  2. resilient_assign
  3. orte_getline
  4. construct_ftgrps
  5. get_ftgrp_target
  6. get_new_node
  7. flag_nodes
  8. map_to_ftgrps

   1 /*
   2  * Copyright (c) 2009-2011 Cisco Systems, Inc.  All rights reserved.
   3  * Copyright (c) 2009-2010 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2011-2012 Los Alamos National Security, LLC.
   7  *                         All rights reserved.
   8  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
   9  *
  10  * $COPYRIGHT$
  11  *
  12  * Additional copyrights may follow
  13  *
  14  * $HEADER$
  15  */
  16 
  17 #include "orte_config.h"
  18 #include "orte/constants.h"
  19 #include "orte/types.h"
  20 
  21 #include <errno.h>
  22 #ifdef HAVE_UNISTD_H
  23 #include <unistd.h>
  24 #endif  /* HAVE_UNISTD_H */
  25 #include <string.h>
  26 #include <stdio.h>
  27 
  28 #include "opal/util/argv.h"
  29 #include "opal/class/opal_pointer_array.h"
  30 
  31 #include "orte/util/error_strings.h"
  32 #include "orte/util/show_help.h"
  33 #include "orte/mca/errmgr/errmgr.h"
  34 
  35 #include "orte/mca/rmaps/base/rmaps_private.h"
  36 #include "orte/mca/rmaps/base/base.h"
  37 #include "rmaps_resilient.h"
  38 
  39 static int orte_rmaps_resilient_map(orte_job_t *jdata);
  40 static int resilient_assign(orte_job_t *jdata);
  41 
  42 orte_rmaps_base_module_t orte_rmaps_resilient_module = {
  43     .map_job = orte_rmaps_resilient_map,
  44     .assign_locations = resilient_assign
  45 };
  46 
  47 
  48 /*
  49  * Local variable
  50  */
  51 static char *orte_getline(FILE *fp);
  52 static bool have_ftgrps=false, made_ftgrps=false;
  53 
  54 static int construct_ftgrps(void);
  55 static int get_ftgrp_target(orte_proc_t *proc,
  56                             orte_rmaps_res_ftgrp_t **target,
  57                             orte_node_t **nd);
  58 static int get_new_node(orte_proc_t *proc,
  59                         orte_app_context_t *app,
  60                         orte_job_map_t *map,
  61                         orte_node_t **ndret);
  62 static int map_to_ftgrps(orte_job_t *jdata);
  63 
  64 /*
  65  * Loadbalance the cluster
  66  */
  67 static int orte_rmaps_resilient_map(orte_job_t *jdata)
  68 {
  69     orte_app_context_t *app;
  70     int i, j;
  71     int rc = ORTE_SUCCESS;
  72     orte_node_t *nd=NULL, *oldnode, *node, *nptr;
  73     orte_rmaps_res_ftgrp_t *target = NULL;
  74     orte_proc_t *proc;
  75     orte_vpid_t totprocs;
  76     opal_list_t node_list;
  77     orte_std_cntr_t num_slots;
  78     opal_list_item_t *item;
  79     mca_base_component_t *c = &mca_rmaps_resilient_component.super.base_version;
  80     bool found;
  81 
  82     if (!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_RESTART)) {
  83         if (NULL != jdata->map->req_mapper &&
  84             0 != strcasecmp(jdata->map->req_mapper, c->mca_component_name)) {
  85             /* a mapper has been specified, and it isn't me */
  86             opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
  87                                 "mca:rmaps:resilient: job %s not using resilient mapper",
  88                                 ORTE_JOBID_PRINT(jdata->jobid));
  89             return ORTE_ERR_TAKE_NEXT_OPTION;
  90         }
  91         if (NULL == mca_rmaps_resilient_component.fault_group_file) {
  92             opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
  93                                 "mca:rmaps:resilient: cannot perform initial map of job %s - no fault groups",
  94                                 ORTE_JOBID_PRINT(jdata->jobid));
  95             return ORTE_ERR_TAKE_NEXT_OPTION;
  96         }
  97     } else if (!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_PROCS_MIGRATING)) {
  98         opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
  99                             "mca:rmaps:resilient: cannot map job %s - not in restart or migrating",
 100                             ORTE_JOBID_PRINT(jdata->jobid));
 101         return ORTE_ERR_TAKE_NEXT_OPTION;
 102     }
 103 
 104     opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
 105                         "mca:rmaps:resilient: mapping job %s",
 106                         ORTE_JOBID_PRINT(jdata->jobid));
 107 
 108     /* flag that I did the mapping */
 109     if (NULL != jdata->map->last_mapper) {
 110         free(jdata->map->last_mapper);
 111     }
 112     jdata->map->last_mapper = strdup(c->mca_component_name);
 113 
 114     /* have we already constructed the fault group list? */
 115     if (!made_ftgrps) {
 116         construct_ftgrps();
 117     }
 118 
 119     if (ORTE_JOB_STATE_INIT == jdata->state) {
 120         /* this is an initial map - let the fault group mapper
 121          * handle it
 122          */
 123         return map_to_ftgrps(jdata);
 124     }
 125 
 126     /*
 127      * NOTE: if a proc is being ADDED to an existing job, then its
 128      * node field will be NULL.
 129      */
 130     OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 131                          "%s rmaps:resilient: remapping job %s",
 132                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 133                          ORTE_JOBID_PRINT(jdata->jobid)));
 134 
 135     /* cycle through all the procs in this job to find the one(s) that failed */
 136     for (i=0; i < jdata->procs->size; i++) {
 137         /* get the proc object */
 138         if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
 139             continue;
 140         }
 141         OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
 142                              "%s PROC %s STATE %s",
 143                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 144                              ORTE_NAME_PRINT(&proc->name),
 145                              orte_proc_state_to_str(proc->state)));
 146         /* is this proc to be restarted? */
 147         if (proc->state != ORTE_PROC_STATE_RESTART) {
 148             continue;
 149         }
 150         /* save the current node */
 151         oldnode = proc->node;
 152         /* point to the app */
 153         app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proc->app_idx);
 154         if( NULL == app ) {
 155             ORTE_ERROR_LOG(ORTE_ERR_FAILED_TO_MAP);
 156             rc = ORTE_ERR_FAILED_TO_MAP;
 157             goto error;
 158         }
 159 
 160         if (NULL == oldnode) {
 161             OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 162                                  "%s rmaps:resilient: proc %s is to be started",
 163                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 164                                  ORTE_NAME_PRINT(&proc->name)));
 165         } else {
 166             OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 167                                  "%s rmaps:resilient: proc %s from node %s[%s] is to be restarted",
 168                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 169                                  ORTE_NAME_PRINT(&proc->name),
 170                                  (NULL == oldnode->name) ? "NULL" : oldnode->name,
 171                                  (NULL == oldnode->daemon) ? "--" : ORTE_VPID_PRINT(oldnode->daemon->name.vpid)));
 172         }
 173 
 174         if (NULL == oldnode) {
 175             /* this proc was not previously running - likely it is being added
 176              * to the job. So place it on the node with the fewest procs to
 177              * balance the load
 178              */
 179             OBJ_CONSTRUCT(&node_list, opal_list_t);
 180             if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list,
 181                                                                        &num_slots,
 182                                                                        app,
 183                                                                        jdata->map->mapping,
 184                                                                        false, false))) {
 185                 ORTE_ERROR_LOG(rc);
 186                 while (NULL != (item = opal_list_remove_first(&node_list))) {
 187                     OBJ_RELEASE(item);
 188                 }
 189                 OBJ_DESTRUCT(&node_list);
 190                 goto error;
 191             }
 192             if (opal_list_is_empty(&node_list)) {
 193                 /* put the proc on "hold" until resources are available */
 194                 OBJ_DESTRUCT(&node_list);
 195                 proc->state = ORTE_PROC_STATE_MIGRATING;
 196                 rc = ORTE_ERR_OUT_OF_RESOURCE;
 197                 goto error;
 198             }
 199             totprocs = 1000000;
 200             nd = NULL;
 201             while (NULL != (item = opal_list_remove_first(&node_list))) {
 202                 node = (orte_node_t*)item;
 203                 if (node->num_procs < totprocs) {
 204                     nd = node;
 205                     totprocs = node->num_procs;
 206                 }
 207                 OBJ_RELEASE(item); /* maintain accounting */
 208             }
 209             OBJ_DESTRUCT(&node_list);
 210             /* we already checked to ensure there was at least one node,
 211              * so we couldn't have come out of the loop with nd=NULL
 212              */
 213             OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 214                                  "%s rmaps:resilient: Placing new process on node %s[%s] (no ftgrp)",
 215                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 216                                  nd->name,
 217                                  (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
 218         } else {
 219 
 220             OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 221                                  "%s rmaps:resilient: proc %s from node %s is to be restarted",
 222                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 223                                  ORTE_NAME_PRINT(&proc->name),
 224                                  (NULL == proc->node) ? "NULL" : proc->node->name));
 225 
 226             /* if we have fault groups, use them */
 227             if (have_ftgrps) {
 228                 if (ORTE_SUCCESS != (rc = get_ftgrp_target(proc, &target, &nd))) {
 229                     ORTE_ERROR_LOG(rc);
 230                     goto error;
 231                 }
 232                 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 233                                      "%s rmaps:resilient: placing proc %s into fault group %d node %s",
 234                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 235                                      ORTE_NAME_PRINT(&proc->name), target->ftgrp, nd->name));
 236             } else {
 237                 if (ORTE_SUCCESS != (rc = get_new_node(proc, app, jdata->map, &nd))) {
 238                     ORTE_ERROR_LOG(rc);
 239                     return rc;
 240                 }
 241             }
 242         }
 243         /* add node to map if necessary - nothing we can do here
 244          * but search for it
 245          */
 246         found = false;
 247         for (j=0; j < jdata->map->nodes->size; j++) {
 248             if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, j))) {
 249                 continue;
 250             }
 251             if (nptr == nd) {
 252                 found = true;
 253                 break;
 254             }
 255         }
 256         if (!found) {
 257             OBJ_RETAIN(nd);
 258             opal_pointer_array_add(jdata->map->nodes, nd);
 259             ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_MAPPED);
 260         }
 261         OBJ_RETAIN(nd);  /* maintain accounting on object */
 262         proc->node = nd;
 263         nd->num_procs++;
 264         opal_pointer_array_add(nd->procs, (void*)proc);
 265         /* retain the proc struct so that we correctly track its release */
 266         OBJ_RETAIN(proc);
 267 
 268         /* flag the proc state as non-launched so we'll know to launch it */
 269         proc->state = ORTE_PROC_STATE_INIT;
 270 
 271         /* update the node and local ranks so static ports can
 272          * be properly selected if active
 273          */
 274         orte_rmaps_base_update_local_ranks(jdata, oldnode, nd, proc);
 275     }
 276 
 277  error:
 278     return rc;
 279 }
 280 
 281 static int resilient_assign(orte_job_t *jdata)
 282 {
 283     mca_base_component_t *c = &mca_rmaps_resilient_component.super.base_version;
 284 
 285     if (NULL == jdata->map->last_mapper ||
 286         0 != strcasecmp(jdata->map->last_mapper, c->mca_component_name)) {
 287         /* a mapper has been specified, and it isn't me */
 288         opal_output_verbose(5, orte_rmaps_base_framework.framework_output,
 289                             "mca:rmaps:resilient: job %s not using resilient assign: %s",
 290                             ORTE_JOBID_PRINT(jdata->jobid),
 291                             (NULL == jdata->map->last_mapper) ? "NULL" : jdata->map->last_mapper);
 292         return ORTE_ERR_TAKE_NEXT_OPTION;
 293     }
 294 
 295     return ORTE_ERR_NOT_IMPLEMENTED;
 296 }
 297 
 298 static char *orte_getline(FILE *fp)
 299 {
 300     char *ret, *buff;
 301     char input[1024];
 302 
 303     ret = fgets(input, 1024, fp);
 304     if (NULL != ret) {
 305         input[strlen(input)-1] = '\0';  /* remove newline */
 306         buff = strdup(input);
 307         return buff;
 308     }
 309 
 310     return NULL;
 311 }
 312 
 313 
 314 static int construct_ftgrps(void)
 315 {
 316     orte_rmaps_res_ftgrp_t *ftgrp;
 317     orte_node_t *node;
 318     FILE *fp;
 319     char *ftinput;
 320     int grp;
 321     char **nodes;
 322     bool found;
 323     int i, k;
 324 
 325     /* flag that we did this */
 326     made_ftgrps = true;
 327 
 328     if (NULL == mca_rmaps_resilient_component.fault_group_file) {
 329         /* nothing to build */
 330         return ORTE_SUCCESS;
 331     }
 332 
 333     /* construct it */
 334     OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 335                          "%s rmaps:resilient: constructing fault groups",
 336                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 337     fp = fopen(mca_rmaps_resilient_component.fault_group_file, "r");
 338     if (NULL == fp) { /* not found */
 339         orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:file-not-found",
 340                        true, mca_rmaps_resilient_component.fault_group_file);
 341         return ORTE_ERR_FAILED_TO_MAP;
 342     }
 343 
 344     /* build list of fault groups */
 345     grp = 0;
 346     while (NULL != (ftinput = orte_getline(fp))) {
 347         ftgrp = OBJ_NEW(orte_rmaps_res_ftgrp_t);
 348         ftgrp->ftgrp = grp++;
 349         nodes = opal_argv_split(ftinput, ',');
 350         /* find the referenced nodes */
 351         for (k=0; k < opal_argv_count(nodes); k++) {
 352             found = false;
 353             for (i=0; i < orte_node_pool->size && !found; i++) {
 354                 if (NULL == (node = opal_pointer_array_get_item(orte_node_pool, i))) {
 355                     continue;
 356                 }
 357                 if (0 == strcmp(node->name, nodes[k])) {
 358                     OBJ_RETAIN(node);
 359                     OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 360                                          "%s rmaps:resilient: adding node %s to fault group %d",
 361                                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 362                                          node->name, ftgrp->ftgrp));
 363                     opal_pointer_array_add(&ftgrp->nodes, node);
 364                     found = true;
 365                     break;
 366                 }
 367             }
 368         }
 369         opal_list_append(&mca_rmaps_resilient_component.fault_grps, &ftgrp->super);
 370         opal_argv_free(nodes);
 371         free(ftinput);
 372     }
 373     fclose(fp);
 374 
 375     /* flag that we have fault grps */
 376     have_ftgrps = true;
 377     return ORTE_SUCCESS;
 378 }
 379 
 380 static int get_ftgrp_target(orte_proc_t *proc,
 381                             orte_rmaps_res_ftgrp_t **tgt,
 382                             orte_node_t **ndret)
 383 {
 384     opal_list_item_t *item;
 385     int k, totnodes;
 386     orte_node_t *node, *nd;
 387     orte_rmaps_res_ftgrp_t *target, *ftgrp;
 388     float avgload, minload;
 389     orte_vpid_t totprocs, lowprocs;
 390 
 391     /* set defaults */
 392     *tgt = NULL;
 393     *ndret = NULL;
 394 
 395     /* flag all the fault groups that
 396      * include this node so we don't reuse them
 397      */
 398     minload = 1000000.0;
 399     target = NULL;
 400     for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
 401          item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
 402          item = opal_list_get_next(item)) {
 403         ftgrp = (orte_rmaps_res_ftgrp_t*)item;
 404         /* see if the node is in this fault group */
 405         ftgrp->included = true;
 406         ftgrp->used = false;
 407         for (k=0; k < ftgrp->nodes.size; k++) {
 408             if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
 409                 continue;
 410             }
 411             if (NULL != proc->node && 0 == strcmp(node->name, proc->node->name)) {
 412                 /* yes - mark it to not be included */
 413                 OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 414                                      "%s rmaps:resilient: node %s is in fault group %d, which will be excluded",
 415                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 416                                      proc->node->name, ftgrp->ftgrp));
 417                 ftgrp->included = false;
 418                 break;
 419             }
 420         }
 421         /* if this ftgrp is not included, then skip it */
 422         if (!ftgrp->included) {
 423             continue;
 424         }
 425         /* compute the load average on this fault group */
 426         totprocs = 0;
 427         totnodes = 0;
 428         for (k=0; k < ftgrp->nodes.size; k++) {
 429             if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
 430                 continue;
 431             }
 432             totnodes++;
 433             totprocs += node->num_procs;
 434         }
 435         avgload = (float)totprocs / (float)totnodes;
 436         /* now find the lightest loaded of the included fault groups */
 437         if (avgload < minload) {
 438             minload = avgload;
 439             target = ftgrp;
 440             OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
 441                                  "%s rmaps:resilient: found new min load ftgrp %d",
 442                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 443                                  ftgrp->ftgrp));
 444         }
 445     }
 446 
 447     if (NULL == target) {
 448         /* nothing found */
 449         return ORTE_ERR_NOT_FOUND;
 450     }
 451 
 452     /* if we did find a target, re-map the proc to the lightest loaded
 453      * node in that group
 454      */
 455     lowprocs = 1000000;
 456     nd = NULL;
 457     for (k=0; k < target->nodes.size; k++) {
 458         if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
 459             continue;
 460         }
 461         if (node->num_procs < lowprocs) {
 462             lowprocs = node->num_procs;
 463             nd = node;
 464         }
 465     }
 466 
 467     /* return the results */
 468     *tgt = target;
 469     *ndret = nd;
 470 
 471     return ORTE_SUCCESS;
 472 }
 473 
 474 static int get_new_node(orte_proc_t *proc,
 475                         orte_app_context_t *app,
 476                         orte_job_map_t *map,
 477                         orte_node_t **ndret)
 478 {
 479     orte_node_t *nd, *oldnode, *node;
 480     orte_proc_t *pptr;
 481     int rc, j;
 482     opal_list_t node_list, candidates;
 483     opal_list_item_t *item, *next;
 484     orte_std_cntr_t num_slots;
 485     bool found;
 486 
 487     /* set defaults */
 488     *ndret = NULL;
 489     nd = NULL;
 490     oldnode = NULL;
 491     orte_get_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, (void**)&oldnode, OPAL_PTR);
 492 
 493     /*
 494      * Get a list of all nodes
 495      */
 496     OBJ_CONSTRUCT(&node_list, opal_list_t);
 497     if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list,
 498                                                                &num_slots,
 499                                                                app,
 500                                                                map->mapping,
 501                                                                false, false))) {
 502         ORTE_ERROR_LOG(rc);
 503         goto release;
 504     }
 505     if (opal_list_is_empty(&node_list)) {
 506         ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 507         rc = ORTE_ERR_OUT_OF_RESOURCE;
 508         goto release;
 509     }
 510 
 511     if (1 == opal_list_get_size(&node_list)) {
 512         /* if we have only one node, all we can do is put the proc on that
 513          * node, even if it is the same one - better than not restarting at
 514          * all
 515          */
 516         nd = (orte_node_t*)opal_list_get_first(&node_list);
 517         orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, oldnode, OPAL_PTR);
 518         OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 519                              "%s rmaps:resilient: Placing process %s on node %s[%s] (only one avail node)",
 520                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 521                              ORTE_NAME_PRINT(&proc->name),
 522                              nd->name,
 523                              (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
 524         goto release;
 525     }
 526 
 527     /*
 528      * Cycle thru the list, transferring
 529      * all available nodes to the candidate list
 530      * so we can get them in the right order
 531      *
 532      */
 533     OBJ_CONSTRUCT(&candidates, opal_list_t);
 534     while (NULL != (item = opal_list_remove_first(&node_list))) {
 535         node = (orte_node_t*)item;
 536         /* don't put it back on current node */
 537         if (node == oldnode) {
 538             OBJ_RELEASE(item);
 539             continue;
 540         }
 541         if (0 == node->num_procs) {
 542             OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
 543                                  "%s PREPENDING EMPTY NODE %s[%s] TO CANDIDATES",
 544                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 545                                  (NULL == node->name) ? "NULL" : node->name,
 546                                  (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
 547             opal_list_prepend(&candidates, item);
 548         } else {
 549             OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
 550                                  "%s APPENDING NON-EMPTY NODE %s[%s] TO CANDIDATES",
 551                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 552                                  (NULL == node->name) ? "NULL" : node->name,
 553                                  (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
 554             opal_list_append(&candidates, item);
 555         }
 556     }
 557     /* search the candidates
 558      * try to use a semi-intelligent selection logic here that:
 559      *
 560      * (a) avoids putting the proc on a node where a peer is already
 561      *     located as this degrades our fault tolerance
 562      *
 563      * (b) avoids "ricochet effect" where a process would ping-pong
 564      *     between two nodes as it fails
 565      */
 566     nd = NULL;
 567     item = opal_list_get_first(&candidates);
 568     while (item != opal_list_get_end(&candidates)) {
 569         node = (orte_node_t*)item;
 570         next = opal_list_get_next(item);
 571         /* don't return to our prior location to avoid
 572          * "ricochet" effect
 573          */
 574         if (NULL != oldnode && node == oldnode) {
 575             OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
 576                                  "%s REMOVING PRIOR NODE %s[%s] FROM CANDIDATES",
 577                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 578                                  (NULL == node->name) ? "NULL" : node->name,
 579                                  (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
 580             opal_list_remove_item(&candidates, item);
 581             OBJ_RELEASE(item);  /* maintain acctg */
 582             item = next;
 583             continue;
 584         }
 585         /* if this node is empty, then it is the winner */
 586         if (0 == node->num_procs) {
 587             nd = node;
 588             orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, oldnode, OPAL_PTR);
 589             break;
 590         }
 591         /* if this node has someone from my job, then skip it
 592          * to avoid (a)
 593          */
 594         found = false;
 595         for (j=0; j < node->procs->size; j++) {
 596             if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(node->procs, j))) {
 597                 continue;
 598             }
 599             if (pptr->name.jobid == proc->name.jobid) {
 600                 OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output,
 601                                      "%s FOUND PEER %s ON NODE %s[%s]",
 602                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 603                                      ORTE_NAME_PRINT(&pptr->name),
 604                                      (NULL == node->name) ? "NULL" : node->name,
 605                                      (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid)));
 606                 found = true;
 607                 break;
 608             }
 609         }
 610         if (found) {
 611             item = next;
 612             continue;
 613         }
 614         /* get here if all tests pass - take this node */
 615         nd = node;
 616         orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, oldnode, OPAL_PTR);
 617         break;
 618     }
 619     if (NULL == nd) {
 620         /* didn't find anything */
 621         if (NULL != oldnode) {
 622             nd = oldnode;
 623             OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 624                                  "%s rmaps:resilient: Placing process %s on prior node %s[%s] (no ftgrp)",
 625                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 626                                  ORTE_NAME_PRINT(&proc->name),
 627                                  (NULL == nd->name) ? "NULL" : nd->name,
 628                                  (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
 629         } else {
 630             nd = proc->node;
 631             orte_set_attribute(&proc->attributes, ORTE_PROC_PRIOR_NODE, ORTE_ATTR_LOCAL, nd, OPAL_PTR);
 632             OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 633                                  "%s rmaps:resilient: Placing process %s back on same node %s[%s] (no ftgrp)",
 634                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 635                                  ORTE_NAME_PRINT(&proc->name),
 636                                  (NULL == nd->name) ? "NULL" : nd->name,
 637                                  (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
 638         }
 639 
 640     }
 641     /* cleanup candidate list */
 642     while (NULL != (item = opal_list_remove_first(&candidates))) {
 643         OBJ_RELEASE(item);
 644     }
 645     OBJ_DESTRUCT(&candidates);
 646 
 647  release:
 648     OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 649                          "%s rmaps:resilient: Placing process on node %s[%s] (no ftgrp)",
 650                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 651                          (NULL == nd->name) ? "NULL" : nd->name,
 652                          (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid)));
 653 
 654     while (NULL != (item = opal_list_remove_first(&node_list))) {
 655         OBJ_RELEASE(item);
 656     }
 657     OBJ_DESTRUCT(&node_list);
 658 
 659     *ndret = nd;
 660     return rc;
 661 }
 662 
 663 static void flag_nodes(opal_list_t *node_list)
 664 {
 665     opal_list_item_t *item, *nitem;
 666     orte_node_t *node, *nd;
 667     orte_rmaps_res_ftgrp_t *ftgrp;
 668     int k;
 669 
 670     for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
 671          item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
 672          item = opal_list_get_next(item)) {
 673         ftgrp = (orte_rmaps_res_ftgrp_t*)item;
 674         /* reset the flags */
 675         ftgrp->used = false;
 676         ftgrp->included = false;
 677         /* if at least one node in our list is included in this
 678          * ftgrp, then flag it as included
 679          */
 680         for (nitem = opal_list_get_first(node_list);
 681              !ftgrp->included && nitem != opal_list_get_end(node_list);
 682              nitem = opal_list_get_next(nitem)) {
 683             node = (orte_node_t*)nitem;
 684             for (k=0; k < ftgrp->nodes.size; k++) {
 685                 if (NULL == (nd = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
 686                     continue;
 687                 }
 688                 if (0 == strcmp(nd->name, node->name)) {
 689                     ftgrp->included = true;
 690                     break;
 691                 }
 692             }
 693         }
 694     }
 695 }
 696 
 697 static int map_to_ftgrps(orte_job_t *jdata)
 698 {
 699     orte_job_map_t *map;
 700     orte_app_context_t *app;
 701     int i, j, k, totnodes;
 702     opal_list_t node_list;
 703     opal_list_item_t *item, *next, *curitem;
 704     orte_std_cntr_t num_slots;
 705     int rc = ORTE_SUCCESS;
 706     float avgload, minload;
 707     orte_node_t *node, *nd=NULL;
 708     orte_rmaps_res_ftgrp_t *ftgrp, *target = NULL;
 709     orte_vpid_t totprocs, num_assigned;
 710     bool initial_map=true;
 711 
 712     OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 713                          "%s rmaps:resilient: creating initial map for job %s",
 714                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 715                          ORTE_JOBID_PRINT(jdata->jobid)));
 716 
 717     /* start at the beginning... */
 718     jdata->num_procs = 0;
 719     map = jdata->map;
 720 
 721     for (i=0; i < jdata->apps->size; i++) {
 722         /* get the app_context */
 723         if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
 724             continue;
 725         }
 726         /* you cannot use this mapper unless you specify the number of procs to
 727          * launch for each app
 728          */
 729         if (0 == app->num_procs) {
 730             orte_show_help("help-orte-rmaps-resilient.txt",
 731                            "orte-rmaps-resilient:num-procs",
 732                            true);
 733             return ORTE_ERR_SILENT;
 734         }
 735         num_assigned = 0;
 736         /* for each app_context, we have to get the list of nodes that it can
 737          * use since that can now be modified with a hostfile and/or -host
 738          * option
 739          */
 740         OBJ_CONSTRUCT(&node_list, opal_list_t);
 741         if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app,
 742                                                                    map->mapping, initial_map, false))) {
 743             ORTE_ERROR_LOG(rc);
 744             return rc;
 745         }
 746         /* flag that all subsequent requests should not reset the node->mapped flag */
 747         initial_map = false;
 748 
 749         /* remove all nodes that are not "up" or do not have a running daemon on them */
 750         item = opal_list_get_first(&node_list);
 751         while (item != opal_list_get_end(&node_list)) {
 752             next = opal_list_get_next(item);
 753             node = (orte_node_t*)item;
 754             if (ORTE_NODE_STATE_UP != node->state ||
 755                 NULL == node->daemon ||
 756                 ORTE_PROC_STATE_RUNNING != node->daemon->state) {
 757                 opal_list_remove_item(&node_list, item);
 758                 OBJ_RELEASE(item);
 759             }
 760             item = next;
 761         }
 762         curitem = opal_list_get_first(&node_list);
 763 
 764         /* flag the fault groups included by these nodes */
 765         flag_nodes(&node_list);
 766         /* map each copy to a different fault group - if more copies are
 767          * specified than fault groups, then overlap in a round-robin fashion
 768          */
 769         for (j=0; j < app->num_procs; j++) {
 770             /* find unused included fault group with lowest average load - if none
 771              * found, then break
 772              */
 773             target = NULL;
 774             minload = 1000000000.0;
 775             for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
 776                  item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
 777                  item = opal_list_get_next(item)) {
 778                 ftgrp = (orte_rmaps_res_ftgrp_t*)item;
 779                 OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
 780                                      "%s rmaps:resilient: fault group %d used: %s included %s",
 781                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 782                                      ftgrp->ftgrp,
 783                                      ftgrp->used ? "YES" : "NO",
 784                                      ftgrp->included ? "YES" : "NO" ));
 785                 /* if this ftgrp has already been used or is not included, then
 786                  * skip it
 787                  */
 788                 if (ftgrp->used || !ftgrp->included) {
 789                     continue;
 790                 }
 791                 /* compute the load average on this fault group */
 792                 totprocs = 0;
 793                 totnodes = 0;
 794                 for (k=0; k < ftgrp->nodes.size; k++) {
 795                     if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
 796                         continue;
 797                     }
 798                     totnodes++;
 799                     totprocs += node->num_procs;
 800                 }
 801                 avgload = (float)totprocs / (float)totnodes;
 802                 if (avgload < minload) {
 803                     minload = avgload;
 804                     target = ftgrp;
 805                     OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
 806                                          "%s rmaps:resilient: found new min load ftgrp %d",
 807                                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 808                                          ftgrp->ftgrp));
 809                 }
 810             }
 811             /* if we have more procs than fault groups, then we simply
 812              * map the remaining procs on available nodes in a round-robin
 813              * fashion - it doesn't matter where they go as they will not
 814              * be contributing to fault tolerance by definition
 815              */
 816             if (NULL == target) {
 817                 OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output,
 818                                      "%s rmaps:resilient: more procs than fault groups - mapping excess rr",
 819                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 820                 nd = (orte_node_t*)curitem;
 821                 curitem = opal_list_get_next(curitem);
 822                 if (curitem == opal_list_get_end(&node_list)) {
 823                     curitem = opal_list_get_first(&node_list);
 824                 }
 825             } else {
 826                 /* pick node with lowest load from within that group */
 827                 totprocs = 1000000;
 828                 for (k=0; k < target->nodes.size; k++) {
 829                     if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
 830                         continue;
 831                     }
 832                     if (node->num_procs < totprocs) {
 833                         totprocs = node->num_procs;
 834                         nd = node;
 835                     }
 836                 }
 837             }
 838             OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output,
 839                                  "%s rmaps:resilient: placing proc into fault group %d node %s",
 840                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 841                                  (NULL == target) ? -1 : target->ftgrp, nd->name));
 842             /* if the node isn't in the map, add it */
 843             if (!ORTE_FLAG_TEST(nd, ORTE_NODE_FLAG_MAPPED)) {
 844                 OBJ_RETAIN(nd);
 845                 opal_pointer_array_add(map->nodes, nd);
 846                 ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_MAPPED);
 847             }
 848             if (NULL == orte_rmaps_base_setup_proc(jdata, nd, app->idx)) {
 849                 ORTE_ERROR_LOG(ORTE_ERROR);
 850                 return ORTE_ERROR;
 851             }
 852             if ((nd->slots < (int)nd->num_procs) ||
 853                 (0 < nd->slots_max && nd->slots_max < (int)nd->num_procs)) {
 854                 if (ORTE_MAPPING_NO_OVERSUBSCRIBE & ORTE_GET_MAPPING_DIRECTIVE(jdata->map->mapping)) {
 855                     orte_show_help("help-orte-rmaps-base.txt", "orte-rmaps-base:alloc-error",
 856                                    true, nd->num_procs, app->app);
 857                     ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
 858                     return ORTE_ERR_SILENT;
 859                 }
 860                 /* flag the node as oversubscribed so that sched-yield gets
 861                  * properly set
 862                  */
 863                 ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_OVERSUBSCRIBED);
 864                 ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_OVERSUBSCRIBED);
 865             }
 866 
 867             /* track number of procs mapped */
 868             num_assigned++;
 869 
 870             /* flag this fault group as used */
 871             if (NULL != target) {
 872                 target->used = true;
 873             }
 874         }
 875 
 876         /* track number of procs */
 877         jdata->num_procs += app->num_procs;
 878 
 879         /* cleanup the node list - it can differ from one app_context
 880          * to another, so we have to get it every time
 881          */
 882         while (NULL != (item = opal_list_remove_first(&node_list))) {
 883             OBJ_RELEASE(item);
 884         }
 885         OBJ_DESTRUCT(&node_list);
 886     }
 887 
 888     return ORTE_SUCCESS;
 889 }

/* [<][>][^][v][top][bottom][index][help] */