root/orte/mca/iof/hnp/iof_hnp_read.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. restart_stdin
  2. orte_iof_hnp_stdin_check
  3. orte_iof_hnp_stdin_cb
  4. orte_iof_hnp_read_local_handler

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2018 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007-2012 Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2011-2013 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
  16  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
  17  * Copyright (c) 2018      Research Organization for Information Science
  18  *                         and Technology (RIST).  All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  */
  25 
  26 #include "orte_config.h"
  27 #include "orte/constants.h"
  28 
  29 #include <errno.h>
  30 #ifdef HAVE_UNISTD_H
  31 #include <unistd.h>
  32 #endif  /* HAVE_UNISTD_H */
  33 #include <string.h>
  34 
  35 #include "opal/dss/dss.h"
  36 #include "opal/mca/pmix/pmix.h"
  37 
  38 #include "orte/mca/rml/rml.h"
  39 #include "orte/mca/errmgr/errmgr.h"
  40 #include "orte/mca/odls/odls_types.h"
  41 #include "orte/util/name_fns.h"
  42 #include "orte/util/threads.h"
  43 #include "orte/mca/state/state.h"
  44 #include "orte/runtime/orte_globals.h"
  45 #include "orte/runtime/orte_wait.h"
  46 
  47 #include "orte/mca/iof/iof.h"
  48 #include "orte/mca/iof/base/base.h"
  49 
  50 #include "iof_hnp.h"
  51 
  52 static void restart_stdin(int fd, short event, void *cbdata)
  53 {
  54     orte_timer_t *tm = (orte_timer_t*)cbdata;
  55 
  56     ORTE_ACQUIRE_OBJECT(tm);
  57 
  58     if (NULL != mca_iof_hnp_component.stdinev &&
  59         !orte_job_term_ordered &&
  60         !mca_iof_hnp_component.stdinev->active) {
  61         ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
  62     }
  63 
  64     /* if this was a timer callback, then release the timer */
  65     if (NULL != tm) {
  66         OBJ_RELEASE(tm);
  67     }
  68 }
  69 
  70 /* return true if we should read stdin from fd, false otherwise */
  71 bool orte_iof_hnp_stdin_check(int fd)
  72 {
  73 #if defined(HAVE_TCGETPGRP)
  74     if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) {
  75         return false;
  76     }
  77 #endif
  78     return true;
  79 }
  80 
  81 void orte_iof_hnp_stdin_cb(int fd, short event, void *cbdata)
  82 {
  83     bool should_process;
  84 
  85     ORTE_ACQUIRE_OBJECT(mca_iof_hnp_component.stdinev);
  86 
  87     should_process = orte_iof_hnp_stdin_check(0);
  88 
  89     if (should_process) {
  90         ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
  91     } else {
  92 
  93         opal_event_del(mca_iof_hnp_component.stdinev->ev);
  94         mca_iof_hnp_component.stdinev->active = false;
  95     }
  96 }
  97 
  98 /* this is the read handler for my own child procs. In this case,
  99  * the data is going nowhere - I just output it myself
 100  */
 101 void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
 102 {
 103     orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
 104     unsigned char data[ORTE_IOF_BASE_MSG_MAX];
 105     int32_t numbytes;
 106     orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
 107     int rc;
 108     orte_ns_cmp_bitmask_t mask=ORTE_NS_CMP_ALL;
 109     bool exclusive;
 110     orte_iof_sink_t *sink;
 111 
 112     ORTE_ACQUIRE_OBJECT(rev);
 113 
 114     /* As we may use timer events, fd can be bogus (-1)
 115      * use the right one here
 116      */
 117     fd = rev->fd;
 118 
 119     /* read up to the fragment size */
 120     memset(data, 0, ORTE_IOF_BASE_MSG_MAX);
 121     numbytes = read(fd, data, sizeof(data));
 122 
 123     if (NULL == proct) {
 124         /* this is an error - nothing we can do */
 125         ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
 126         return;
 127     }
 128 
 129     if (numbytes < 0) {
 130         /* either we have a connection error or it was a non-blocking read */
 131 
 132         /* non-blocking, retry */
 133         if (EAGAIN == errno || EINTR == errno) {
 134             ORTE_IOF_READ_ACTIVATE(rev);
 135             return;
 136         }
 137 
 138         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 139                              "%s iof:hnp:read handler %s Error on connection:%d",
 140                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 141                              ORTE_NAME_PRINT(&proct->name), fd));
 142         /* Un-recoverable error. Allow the code to flow as usual in order to
 143          * to send the zero bytes message up the stream, and then close the
 144          * file descriptor and delete the event.
 145          */
 146         numbytes = 0;
 147     }
 148 
 149     /* is this read from our stdin? */
 150     if (ORTE_IOF_STDIN & rev->tag) {
 151         /* The event has fired, so it's no longer active until we
 152            re-add it */
 153         rev->active = false;
 154         if (NULL == proct->stdinev) {
 155             /* nothing further to do */
 156             return;
 157         }
 158 
 159         /* if job termination has been ordered, just ignore the
 160          * data and delete the read event
 161          */
 162         if (orte_job_term_ordered) {
 163             OBJ_RELEASE(rev);
 164             return;
 165         }
 166         /* if the daemon is me, then this is a local sink */
 167         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdinev->daemon)) {
 168             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 169                                  "%s read %d bytes from stdin - writing to %s",
 170                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
 171                                  ORTE_NAME_PRINT(&proct->name)));
 172             /* send the bytes down the pipe - we even send 0 byte events
 173              * down the pipe so it forces out any preceding data before
 174              * closing the output stream
 175              */
 176             if (NULL != proct->stdinev->wev) {
 177                 if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdinev->wev)) {
 178                     /* getting too backed up - stop the read event for now if it is still active */
 179 
 180                     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 181                                          "buffer backed up - holding"));
 182                     return;
 183                 }
 184             }
 185         } else {
 186             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 187                                  "%s sending %d bytes from stdinev to daemon %s",
 188                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
 189                                  ORTE_NAME_PRINT(&proct->stdinev->daemon)));
 190 
 191             /* send the data to the daemon so it can
 192              * write it to the proc's fd - in this case,
 193              * we pass sink->name to indicate who is to
 194              * receive the data. If the connection closed,
 195              * numbytes will be zero so zero bytes will be
 196              * sent - this will tell the daemon to close
 197              * the fd for stdin to that proc
 198              */
 199             if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, &proct->stdinev->name, ORTE_IOF_STDIN, data, numbytes))) {
 200                 /* if the addressee is unknown, remove the sink from the list */
 201                 if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) {
 202                     OBJ_RELEASE(rev->sink);
 203                 }
 204             }
 205         }
 206 
 207         /* if num_bytes was zero, or we read the last piece of the file, then we need to terminate the event */
 208         if (0 == numbytes) {
 209             if (0 != opal_list_get_size(&proct->stdinev->wev->outputs)) {
 210                 /* some stuff has yet to be written, so delay the release of proct->stdinev */
 211                 proct->stdinev->closed = true;
 212             } else {
 213                 /* this will also close our stdin file descriptor */
 214                 OBJ_RELEASE(proct->stdinev);
 215             }
 216         } else {
 217             /* if we are looking at a tty, then we just go ahead and restart the
 218              * read event assuming we are not backgrounded
 219              */
 220             if (orte_iof_hnp_stdin_check(fd)) {
 221                 restart_stdin(fd, 0, NULL);
 222             } else {
 223                 /* delay for awhile and then restart */
 224                 ORTE_TIMER_EVENT(0, 10000, restart_stdin, ORTE_INFO_PRI);
 225             }
 226         }
 227         /* nothing more to do */
 228         return;
 229     }
 230 
 231     /* this must be output from one of my local procs - see
 232      * if anyone else has requested a copy of this info. If
 233      * we were directed to put it into a file, then
 234      */
 235     exclusive = false;
 236     if (NULL != proct->subscribers) {
 237         OPAL_LIST_FOREACH(sink, proct->subscribers, orte_iof_sink_t) {
 238             /* if the target isn't set, then this sink is for another purpose - ignore it */
 239             if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
 240                 continue;
 241             }
 242             if ((sink->tag & rev->tag) &&
 243                 sink->name.jobid == proct->name.jobid &&
 244                 (ORTE_VPID_WILDCARD == sink->name.vpid || sink->name.vpid == proct->name.vpid)) {
 245                 /* need to send the data to the remote endpoint - if
 246                  * the connection closed, numbytes will be zero, so
 247                  * the remote endpoint will know to close its local fd.
 248                  * In this case, we pass rev->name to indicate who the
 249                  * data came from.
 250                  */
 251                 if (NULL != opal_pmix.server_iof_push) {
 252                     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 253                                          "%s sending data of size %d via PMIx to tool %s",
 254                                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)numbytes,
 255                                          ORTE_NAME_PRINT(&sink->daemon)));
 256                     /* don't pass down zero byte blobs */
 257                     if (0 < numbytes) {
 258                         rc = opal_pmix.server_iof_push(&proct->name, rev->tag, data, numbytes);
 259                         if (ORTE_SUCCESS != rc) {
 260                             ORTE_ERROR_LOG(rc);
 261                         }
 262                     }
 263                 } else {
 264                     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 265                                          "%s sending data to tool %s",
 266                                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 267                                          ORTE_NAME_PRINT(&sink->daemon));
 268                     orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &proct->name, rev->tag, data, numbytes));
 269                 }
 270                 if (sink->exclusive) {
 271                     exclusive = true;
 272                 }
 273             }
 274         }
 275     }
 276 
 277     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 278                          "%s read %d bytes from %s of %s",
 279                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
 280                          (ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
 281                          ORTE_NAME_PRINT(&proct->name)));
 282 
 283     if (0 == numbytes) {
 284         /* if we read 0 bytes from the stdout/err/diag, there is
 285          * nothing to output - release the appropriate event.
 286          * This will delete the read event and close the file descriptor */
 287         /* make sure we don't do recursive delete on the proct */
 288         OBJ_RETAIN(proct);
 289         if (rev->tag & ORTE_IOF_STDOUT) {
 290             orte_iof_base_static_dump_output(proct->revstdout);
 291             OBJ_RELEASE(proct->revstdout);
 292         } else if (rev->tag & ORTE_IOF_STDERR) {
 293             orte_iof_base_static_dump_output(proct->revstderr);
 294             OBJ_RELEASE(proct->revstderr);
 295 #if OPAL_PMIX_V1
 296         } else if (rev->tag & ORTE_IOF_STDDIAG) {
 297             orte_iof_base_static_dump_output(proct->revstddiag);
 298             OBJ_RELEASE(proct->revstddiag);
 299 #endif
 300         }
 301         /* check to see if they are all done */
 302         if (NULL == proct->revstdout &&
 303 #if OPAL_PMIX_V1
 304             NULL == proct->revstddiag &&
 305 #endif
 306             NULL == proct->revstderr) {
 307             /* this proc's iof is complete */
 308             ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
 309         }
 310         OBJ_RELEASE(proct);
 311         return;
 312     }
 313 
 314     if (proct->copy) {
 315         if (NULL != proct->subscribers) {
 316             if (!exclusive) {
 317                 /* output this to our local output */
 318                 if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
 319                     orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
 320                 } else {
 321                     orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
 322                 }
 323             }
 324         } else {
 325             /* output this to our local output */
 326             if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
 327                 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
 328             } else {
 329                 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
 330             }
 331         }
 332     }
 333     /* see if the user wanted the output directed to files */
 334     if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) {
 335         /* output to the corresponding file */
 336         orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
 337     }
 338 
 339     /* re-add the event */
 340     ORTE_IOF_READ_ACTIVATE(rev);
 341     return;
 342 }

/* [<][>][^][v][top][bottom][index][help] */