root/orte/mca/iof/hnp/iof_hnp.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. init
  2. hnp_push
  3. hnp_pull
  4. hnp_close
  5. hnp_complete
  6. finalize
  7. hnp_ft_event
  8. stdin_write_handler
  9. hnp_output

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Sun Microsystems, Inc.  All rights reserved.
  13  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  14  * Copyright (c) 2011-2013 Los Alamos National Security, LLC.  All rights
  15  *                         reserved.
  16  * Copyright (c) 2014-2018 Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
  19  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  */
  26 #include "orte_config.h"
  27 #include "opal/util/output.h"
  28 #include "orte/constants.h"
  29 
  30 #include <errno.h>
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif  /* HAVE_UNISTD_H */
  34 #include <string.h>
  35 
  36 #ifdef HAVE_FCNTL_H
  37 #include <fcntl.h>
  38 #else
  39 #ifdef HAVE_SYS_FCNTL_H
  40 #include <sys/fcntl.h>
  41 #endif
  42 #endif
  43 
  44 #include "opal/mca/event/event.h"
  45 
  46 #include "orte/runtime/orte_globals.h"
  47 #include "orte/mca/errmgr/errmgr.h"
  48 #include "orte/mca/ess/ess.h"
  49 #include "orte/mca/rml/rml.h"
  50 #include "orte/util/name_fns.h"
  51 #include "orte/util/threads.h"
  52 #include "orte/mca/odls/odls_types.h"
  53 
  54 #include "orte/mca/iof/base/base.h"
  55 #include "orte/mca/iof/base/iof_base_setup.h"
  56 #include "iof_hnp.h"
  57 
  58 /* LOCAL FUNCTIONS */
  59 static void stdin_write_handler(int fd, short event, void *cbdata);
  60 
  61 /* API FUNCTIONS */
  62 static int init(void);
  63 
  64 static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
  65 
  66 static int hnp_pull(const orte_process_name_t* src_name,
  67                 orte_iof_tag_t src_tag,
  68                 int fd);
  69 
  70 static int hnp_close(const orte_process_name_t* peer,
  71                  orte_iof_tag_t source_tag);
  72 
  73 static int hnp_output(const orte_process_name_t* peer,
  74                       orte_iof_tag_t source_tag,
  75                       const char *msg);
  76 
  77 static void hnp_complete(const orte_job_t *jdata);
  78 
  79 static int finalize(void);
  80 
  81 static int hnp_ft_event(int state);
  82 
  83 /* The API's in this module are solely used to support LOCAL
  84  * procs - i.e., procs that are co-located to the HNP. Remote
  85  * procs interact with the HNP's IOF via the HNP's receive function,
  86  * which operates independently and is in the iof_hnp_receive.c file
  87  */
  88 
  89 orte_iof_base_module_t orte_iof_hnp_module = {
  90     .init = init,
  91     .push = hnp_push,
  92     .pull = hnp_pull,
  93     .close = hnp_close,
  94     .output = hnp_output,
  95     .complete = hnp_complete,
  96     .finalize = finalize,
  97     .ft_event = hnp_ft_event
  98 };
  99 
 100 /* Initialize the module */
 101 static int init(void)
 102 {
 103     /* post non-blocking recv to catch forwarded IO from
 104      * the orteds
 105      */
 106     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 107                             ORTE_RML_TAG_IOF_HNP,
 108                             ORTE_RML_PERSISTENT,
 109                             orte_iof_hnp_recv,
 110                             NULL);
 111 
 112     OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
 113     mca_iof_hnp_component.stdinev = NULL;
 114 
 115     return ORTE_SUCCESS;
 116 }
 117 
 118 /* Setup to read local data. If the tag is other than STDIN,
 119  * then this is output being pushed from one of my child processes
 120  * and I'll write the data out myself. If the tag is STDIN,
 121  * then I need to setup to read from my stdin, and send anything
 122  * I get to the specified dst_name. The dst_name in this case tells
 123  * us which procs are to get stdin - only two options are supported:
 124  *
 125  * (a) a specific name, usually vpid=0; or
 126  *
 127  * (b) all procs, specified by vpid=ORTE_VPID_WILDCARD
 128  *
 129  * The orte_plm_base_launch_apps function calls iof.push after
 130  * the procs are launched and tells us how to distribute stdin. This
 131  * ensures that the procs are started -before- we begin reading stdin
 132  * and attempting to send it to remote procs
 133  */
 134 static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
 135 {
 136     orte_job_t *jdata;
 137     orte_proc_t *proc;
 138     orte_iof_proc_t *proct, *pptr;
 139     int flags, rc;
 140     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 141 
 142     /* don't do this if the dst vpid is invalid or the fd is negative! */
 143     if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
 144         return ORTE_SUCCESS;
 145     }
 146 
 147     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 148                          "%s iof:hnp pushing fd %d for process %s",
 149                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 150                          fd, ORTE_NAME_PRINT(dst_name)));
 151 
 152     /* do we already have this process in our list? */
 153     OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 154         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
 155             /* found it */
 156             goto SETUP;
 157         }
 158     }
 159     /* if we get here, then we don't yet have this proc in our list */
 160     proct = OBJ_NEW(orte_iof_proc_t);
 161     proct->name.jobid = dst_name->jobid;
 162     proct->name.vpid = dst_name->vpid;
 163     opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
 164 
 165   SETUP:
 166     if (!(src_tag & ORTE_IOF_STDIN)) {
 167         /* set the file descriptor to non-blocking - do this before we setup
 168          * and activate the read event in case it fires right away
 169          */
 170         if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 171             opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 172                         __FILE__, __LINE__, errno);
 173         } else {
 174             flags |= O_NONBLOCK;
 175             fcntl(fd, F_SETFL, flags);
 176         }
 177         /* get the local jobdata for this proc */
 178         if (NULL == (jdata = orte_get_job_data_object(proct->name.jobid))) {
 179             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 180             return ORTE_ERR_NOT_FOUND;
 181         }
 182         /* define a read event and activate it */
 183         if (src_tag & ORTE_IOF_STDOUT) {
 184             ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
 185                                 orte_iof_hnp_read_local_handler, false);
 186         } else if (src_tag & ORTE_IOF_STDERR) {
 187             ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
 188                                 orte_iof_hnp_read_local_handler, false);
 189 #if OPAL_PMIX_V1
 190         } else if (src_tag & ORTE_IOF_STDDIAG) {
 191             ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
 192                                 orte_iof_hnp_read_local_handler, false);
 193 #endif
 194         }
 195         /* setup any requested output files */
 196         if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct))) {
 197             ORTE_ERROR_LOG(rc);
 198             return rc;
 199         }
 200 
 201         /* if -all- of the readevents for this proc have been defined, then
 202          * activate them. Otherwise, we can think that the proc is complete
 203          * because one of the readevents fires -prior- to all of them having
 204          * been defined!
 205          */
 206         if (NULL != proct->revstdout &&
 207 #if OPAL_PMIX_V1
 208            NULL != proct->revstddiag &&
 209 #endif
 210             (orte_iof_base.redirect_app_stderr_to_stdout || NULL != proct->revstderr)) {
 211             if (proct->copy) {
 212                 /* see if there are any wildcard subscribers out there that
 213                  * apply to us */
 214                 OPAL_LIST_FOREACH(pptr, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 215                     if (dst_name->jobid == pptr->name.jobid &&
 216                         ORTE_VPID_WILDCARD == pptr->name.vpid &&
 217                         NULL != pptr->subscribers) {
 218                         OBJ_RETAIN(pptr->subscribers);
 219                         proct->subscribers = pptr->subscribers;
 220                         break;
 221                     }
 222                 }
 223             }
 224             ORTE_IOF_READ_ACTIVATE(proct->revstdout);
 225             if (!orte_iof_base.redirect_app_stderr_to_stdout) {
 226                 ORTE_IOF_READ_ACTIVATE(proct->revstderr);
 227             }
 228 #if OPAL_PMIX_V1
 229             if (NULL != proct->revstddiag) {
 230                 ORTE_IOF_READ_ACTIVATE(proct->revstddiag);
 231             }
 232 #endif
 233        }
 234         return ORTE_SUCCESS;
 235     }
 236 
 237     /* if we are pushing stdin, this is happening only during launch - setup
 238      * a target for this destination if it is going somewhere other than me
 239      */
 240     if (ORTE_VPID_WILDCARD == dst_name->vpid) {
 241         /* if wildcard, define a sink with that info so it gets sent out */
 242         ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
 243                              stdin_write_handler);
 244         proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 245         proct->stdinev->daemon.vpid = ORTE_VPID_WILDCARD;
 246      } else {
 247         /* no - lookup the proc's daemon and set that into sink */
 248         if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
 249             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 250             return ORTE_ERR_BAD_PARAM;
 251         }
 252         if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, dst_name->vpid))) {
 253             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 254             return ORTE_ERR_NOT_FOUND;
 255         }
 256         /* if it is me, then don't set this up - we'll get it on the pull */
 257         if (ORTE_PROC_MY_NAME->vpid != proc->node->daemon->name.vpid) {
 258             ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
 259                                  stdin_write_handler);
 260             proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 261             proct->stdinev->daemon.vpid = proc->node->daemon->name.vpid;
 262         }
 263     }
 264 
 265     /* now setup the read - but check to only do this once */
 266     if (NULL == mca_iof_hnp_component.stdinev) {
 267         /* Since we are the HNP, we don't want to set nonblocking on our
 268          * stdio stream.  If we do so, we set the file descriptor to
 269          * non-blocking for everyone that has that file descriptor, which
 270          * includes everyone else in our shell pipeline chain.  (See
 271          * http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).
 272          * This causes things like "mpirun -np 1 big_app | cat" to lose
 273          * output, because cat's stdout is then ALSO non-blocking and cat
 274          * isn't built to deal with that case (same with almost all other
 275          * unix text utils).
 276          */
 277         if (0 != fd) {
 278             if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 279                 opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 280                             __FILE__, __LINE__, errno);
 281             } else {
 282                 flags |= O_NONBLOCK;
 283                 fcntl(fd, F_SETFL, flags);
 284             }
 285         }
 286         if (isatty(fd)) {
 287             /* We should avoid trying to read from stdin if we
 288              * have a terminal, but are backgrounded.  Catch the
 289              * signals that are commonly used when we switch
 290              * between being backgrounded and not.  If the
 291              * filedescriptor is not a tty, don't worry about it
 292              * and always stay connected.
 293              */
 294             opal_event_signal_set(orte_event_base, &mca_iof_hnp_component.stdinsig,
 295                                   SIGCONT, orte_iof_hnp_stdin_cb,
 296                                   NULL);
 297 
 298             /* setup a read event to read stdin, but don't activate it yet. The
 299              * dst_name indicates who should receive the stdin. If that recipient
 300              * doesn't do a corresponding pull, however, then the stdin will
 301              * be dropped upon receipt at the local daemon
 302              */
 303             ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
 304                                 proct, fd, ORTE_IOF_STDIN,
 305                                 orte_iof_hnp_read_local_handler, false);
 306 
 307             /* check to see if we want the stdin read event to be
 308              * active - we will always at least define the event,
 309              * but may delay its activation
 310              */
 311             if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
 312                 ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
 313             }
 314         } else {
 315             /* if we are not looking at a tty, just setup a read event
 316              * and activate it
 317              */
 318             ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
 319                                 proct, fd, ORTE_IOF_STDIN,
 320                                 orte_iof_hnp_read_local_handler, true);
 321         }
 322     }
 323     return ORTE_SUCCESS;
 324 }
 325 
 326 
 327 /*
 328  * Since we are the HNP, the only "pull" call comes from a local
 329  * process so we can record the file descriptor for its stdin.
 330  */
 331 
 332 static int hnp_pull(const orte_process_name_t* dst_name,
 333                     orte_iof_tag_t src_tag,
 334                     int fd)
 335 {
 336     orte_iof_proc_t *proct;
 337     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 338     int flags;
 339 
 340     /* this is a local call - only stdin is supported */
 341     if (ORTE_IOF_STDIN != src_tag) {
 342         return ORTE_ERR_NOT_SUPPORTED;
 343     }
 344 
 345     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 346                          "%s iof:hnp pulling fd %d for process %s",
 347                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 348                          fd, ORTE_NAME_PRINT(dst_name)));
 349 
 350     /* set the file descriptor to non-blocking - do this before we setup
 351      * the sink in case it fires right away
 352      */
 353     if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 354         opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 355                     __FILE__, __LINE__, errno);
 356     } else {
 357         flags |= O_NONBLOCK;
 358         fcntl(fd, F_SETFL, flags);
 359     }
 360 
 361     /* do we already have this process in our list? */
 362     OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 363         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
 364             /* found it */
 365             goto SETUP;
 366         }
 367     }
 368     /* if we get here, then we don't yet have this proc in our list */
 369     proct = OBJ_NEW(orte_iof_proc_t);
 370     proct->name.jobid = dst_name->jobid;
 371     proct->name.vpid = dst_name->vpid;
 372     opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
 373 
 374   SETUP:
 375     ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
 376                          stdin_write_handler);
 377     proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 378     proct->stdinev->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
 379 
 380     return ORTE_SUCCESS;
 381 }
 382 
 383 /*
 384  * One of our local procs wants us to close the specifed
 385  * stream(s), thus terminating any potential io to/from it.
 386  */
 387 static int hnp_close(const orte_process_name_t* peer,
 388                      orte_iof_tag_t source_tag)
 389 {
 390     orte_iof_proc_t* proct;
 391     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 392 
 393     OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 394         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
 395             if (ORTE_IOF_STDIN & source_tag) {
 396                 if (NULL != proct->stdinev) {
 397                     OBJ_RELEASE(proct->stdinev);
 398                 }
 399                 proct->stdinev = NULL;
 400             }
 401             if ((ORTE_IOF_STDOUT & source_tag) ||
 402                 (ORTE_IOF_STDMERGE & source_tag)) {
 403                 if (NULL != proct->revstdout) {
 404                     orte_iof_base_static_dump_output(proct->revstdout);
 405                     OBJ_RELEASE(proct->revstdout);
 406                 }
 407                 proct->revstdout = NULL;
 408             }
 409             if (ORTE_IOF_STDERR & source_tag) {
 410                 if (NULL != proct->revstderr) {
 411                     orte_iof_base_static_dump_output(proct->revstderr);
 412                     OBJ_RELEASE(proct->revstderr);
 413                 }
 414                 proct->revstderr = NULL;
 415             }
 416 #if OPAL_PMIX_V1
 417             if (ORTE_IOF_STDDIAG & source_tag) {
 418                 if (NULL != proct->revstddiag) {
 419                     orte_iof_base_static_dump_output(proct->revstddiag);
 420                     OBJ_RELEASE(proct->revstddiag);
 421                 }
 422                 proct->revstddiag = NULL;
 423             }
 424 #endif
 425             /* if we closed them all, then remove this proc */
 426             if (NULL == proct->stdinev &&
 427                 NULL == proct->revstdout &&
 428 #if OPAL_PMIX_V1
 429                 NULL == proct->revstddiag &&
 430 #endif
 431                 NULL == proct->revstderr) {
 432                 opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
 433                 OBJ_RELEASE(proct);
 434             }
 435             break;
 436         }
 437     }
 438     return ORTE_SUCCESS;
 439 }
 440 
 441 static void hnp_complete(const orte_job_t *jdata)
 442 {
 443     orte_iof_proc_t *proct, *next;
 444 
 445     /* cleanout any lingering sinks */
 446     OPAL_LIST_FOREACH_SAFE(proct, next, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 447         if (jdata->jobid == proct->name.jobid) {
 448             opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
 449             OBJ_RELEASE(proct);
 450         }
 451     }
 452 }
 453 
 454 static int finalize(void)
 455 {
 456     orte_iof_write_event_t *wev;
 457     orte_iof_proc_t *proct;
 458     bool dump;
 459     orte_iof_write_output_t *output;
 460     int num_written;
 461 
 462     /* check if anything is still trying to be written out */
 463     wev = orte_iof_base.iof_write_stdout->wev;
 464     if (!opal_list_is_empty(&wev->outputs)) {
 465         dump = false;
 466         /* make one last attempt to write this out */
 467         while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
 468             if (!dump) {
 469                 num_written = write(wev->fd, output->data, output->numbytes);
 470                 if (num_written < output->numbytes) {
 471                     /* don't retry - just cleanout the list and dump it */
 472                     dump = true;
 473                 }
 474             }
 475             OBJ_RELEASE(output);
 476         }
 477     }
 478     if (!orte_xml_output) {
 479         /* we only opened stderr channel if we are NOT doing xml output */
 480         wev = orte_iof_base.iof_write_stderr->wev;
 481         if (!opal_list_is_empty(&wev->outputs)) {
 482             dump = false;
 483             /* make one last attempt to write this out */
 484             while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
 485                 if (!dump) {
 486                     num_written = write(wev->fd, output->data, output->numbytes);
 487                     if (num_written < output->numbytes) {
 488                         /* don't retry - just cleanout the list and dump it */
 489                         dump = true;
 490                     }
 491                 }
 492                 OBJ_RELEASE(output);
 493             }
 494         }
 495     }
 496 
 497     /* cycle thru the procs and ensure all their output was delivered
 498      * if they were writing to files */
 499     while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_hnp_component.procs))) {
 500         if (NULL != proct->revstdout) {
 501             orte_iof_base_static_dump_output(proct->revstdout);
 502         }
 503         if (NULL != proct->revstderr) {
 504             orte_iof_base_static_dump_output(proct->revstderr);
 505         }
 506 #if OPAL_PMIX_V1
 507         if (NULL != proct->revstddiag) {
 508             orte_iof_base_static_dump_output(proct->revstddiag);
 509         }
 510 #endif
 511         OBJ_RELEASE(proct);
 512     }
 513     OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
 514 
 515     return ORTE_SUCCESS;
 516 }
 517 
 518 int hnp_ft_event(int state) {
 519     /*
 520      * Replica doesn't need to do anything for a checkpoint
 521      */
 522     return ORTE_SUCCESS;
 523 }
 524 
 525 
 526 /* this function is called by the event library and thus
 527  * can access information global to the state machine
 528  */
 529 static void stdin_write_handler(int fd, short event, void *cbdata)
 530 {
 531     orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
 532     orte_iof_write_event_t *wev = sink->wev;
 533     opal_list_item_t *item;
 534     orte_iof_write_output_t *output;
 535     int num_written, total_written = 0;
 536 
 537     ORTE_ACQUIRE_OBJECT(sink);
 538 
 539     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 540                          "%s hnp:stdin:write:handler writing data to %d",
 541                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 542                          wev->fd));
 543 
 544     wev->pending = false;
 545 
 546     while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 547         output = (orte_iof_write_output_t*)item;
 548         /* if an abnormal termination has occurred, just dump
 549          * this data as we are aborting
 550          */
 551         if (orte_abnormal_term_ordered) {
 552             OBJ_RELEASE(output);
 553             continue;
 554         }
 555         if (0 == output->numbytes) {
 556             /* this indicates we are to close the fd - there is
 557              * nothing to write
 558              */
 559             OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
 560                                  "%s iof:hnp closing fd %d on write event due to zero bytes output",
 561                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
 562             goto finish;
 563         }
 564         num_written = write(wev->fd, output->data, output->numbytes);
 565         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 566                              "%s hnp:stdin:write:handler wrote %d bytes",
 567                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 568                              num_written));
 569         if (num_written < 0) {
 570             if (EAGAIN == errno || EINTR == errno) {
 571                 /* push this item back on the front of the list */
 572                 opal_list_prepend(&wev->outputs, item);
 573                 /* leave the write event running so it will call us again
 574                  * when the fd is ready.
 575                  */
 576                 goto re_enter;
 577             }
 578             /* otherwise, something bad happened so all we can do is declare an
 579              * error and abort
 580              */
 581             OBJ_RELEASE(output);
 582             OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
 583                                  "%s iof:hnp closing fd %d on write event due to negative bytes written",
 584                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
 585             goto finish;
 586         } else if (num_written < output->numbytes) {
 587             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 588                                  "%s hnp:stdin:write:handler incomplete write %d - adjusting data",
 589                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
 590             /* incomplete write - adjust data to avoid duplicate output */
 591             memmove(output->data, &output->data[num_written], output->numbytes - num_written);
 592             /* push this item back on the front of the list */
 593             opal_list_prepend(&wev->outputs, item);
 594             /* leave the write event running so it will call us again
 595              * when the fd is ready.
 596              */
 597             goto re_enter;
 598         }
 599         OBJ_RELEASE(output);
 600 
 601         total_written += num_written;
 602         if ((ORTE_IOF_SINK_BLOCKSIZE <= total_written) && wev->always_writable) {
 603             goto re_enter;
 604         }
 605     }
 606     goto check;
 607   re_enter:
 608     ORTE_IOF_SINK_ACTIVATE(wev);
 609   check:
 610     if (NULL != mca_iof_hnp_component.stdinev &&
 611         !orte_abnormal_term_ordered &&
 612         !mca_iof_hnp_component.stdinev->active) {
 613         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 614                             "read event is off - checking if okay to restart"));
 615         /* if we have turned off the read event, check to
 616          * see if the output list has shrunk enough to
 617          * turn it back on
 618          *
 619          * RHC: Note that when multiple procs want stdin, we
 620          * can get into a fight between a proc turnin stdin
 621          * back "on" and other procs turning it "off". There
 622          * is no clear way to resolve this as different procs
 623          * may take input at different rates.
 624          */
 625         if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) {
 626             /* restart the read */
 627             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 628                                  "restarting read event"));
 629             ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
 630         }
 631     }
 632     if (sink->closed && 0 == opal_list_get_size(&wev->outputs)) {
 633         /* the sink has already been closed and everything was written, time to release it */
 634         OBJ_RELEASE(sink);
 635     }
 636     return;
 637   finish:
 638     OBJ_RELEASE(wev);
 639     sink->wev = NULL;
 640     return;
 641 }
 642 
 643 static int hnp_output(const orte_process_name_t* peer,
 644                       orte_iof_tag_t source_tag,
 645                       const char *msg)
 646 {
 647     /* output this to our local output */
 648     if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
 649         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
 650     } else {
 651         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
 652     }
 653 
 654     return ORTE_SUCCESS;
 655 }

/* [<][>][^][v][top][bottom][index][help] */