root/orte/mca/iof/orted/iof_orted_read.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_iof_orted_read_handler

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2011-2013 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2016-2019 Intel, Inc.  All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "orte_config.h"
  24 #include "orte/constants.h"
  25 
  26 #include <errno.h>
  27 #ifdef HAVE_UNISTD_H
  28 #include <unistd.h>
  29 #endif  /* HAVE_UNISTD_H */
  30 #include <string.h>
  31 
  32 #include "opal/dss/dss.h"
  33 
  34 #include "orte/mca/rml/rml.h"
  35 #include "orte/mca/errmgr/errmgr.h"
  36 #include "orte/mca/odls/odls_types.h"
  37 #include "orte/util/name_fns.h"
  38 #include "orte/util/threads.h"
  39 #include "orte/mca/state/state.h"
  40 #include "orte/runtime/orte_globals.h"
  41 
  42 #include "orte/mca/iof/iof.h"
  43 #include "orte/mca/iof/base/base.h"
  44 
  45 #include "iof_orted.h"
  46 
  47 void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
  48 {
  49     orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
  50     unsigned char data[ORTE_IOF_BASE_MSG_MAX];
  51     opal_buffer_t *buf=NULL;
  52     int rc;
  53     int32_t numbytes;
  54     orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
  55 
  56     ORTE_ACQUIRE_OBJECT(rev);
  57 
  58     /* As we may use timer events, fd can be bogus (-1)
  59      * use the right one here
  60      */
  61     fd = rev->fd;
  62 
  63     /* read up to the fragment size */
  64 #if !defined(__WINDOWS__)
  65     numbytes = read(fd, data, sizeof(data));
  66 #else
  67     {
  68         DWORD readed;
  69         HANDLE handle = (HANDLE)_get_osfhandle(fd);
  70         ReadFile(handle, data, sizeof(data), &readed, NULL);
  71         numbytes = (int)readed;
  72     }
  73 #endif  /* !defined(__WINDOWS__) */
  74 
  75     if (NULL == proct) {
  76         /* nothing we can do */
  77         ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
  78         return;
  79     }
  80 
  81     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
  82                          "%s iof:orted:read handler read %d bytes from %s, fd %d",
  83                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  84                          numbytes, ORTE_NAME_PRINT(&proct->name), fd));
  85 
  86     if (numbytes <= 0) {
  87         if (0 > numbytes) {
  88             /* either we have a connection error or it was a non-blocking read */
  89             if (EAGAIN == errno || EINTR == errno) {
  90                 /* non-blocking, retry */
  91                 ORTE_IOF_READ_ACTIVATE(rev);
  92                 return;
  93             }
  94 
  95             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
  96                                  "%s iof:orted:read handler %s Error on connection:%d",
  97                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  98                                  ORTE_NAME_PRINT(&proct->name), fd));
  99         }
 100         /* numbytes must have been zero, so go down and close the fd etc */
 101         goto CLEAN_RETURN;
 102     }
 103 
 104     /* see if the user wanted the output directed to files */
 105     if (NULL != rev->sink) {
 106         /* output to the corresponding file */
 107         orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
 108     }
 109     if (!proct->copy) {
 110         /* re-add the event */
 111         ORTE_IOF_READ_ACTIVATE(rev);
 112         return;
 113     }
 114 
 115     /* prep the buffer */
 116     buf = OBJ_NEW(opal_buffer_t);
 117 
 118     /* pack the stream first - we do this so that flow control messages can
 119      * consist solely of the tag
 120      */
 121     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->tag, 1, ORTE_IOF_TAG))) {
 122         ORTE_ERROR_LOG(rc);
 123         goto CLEAN_RETURN;
 124     }
 125 
 126     /* pack name of process that gave us this data */
 127     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &proct->name, 1, ORTE_NAME))) {
 128         ORTE_ERROR_LOG(rc);
 129         goto CLEAN_RETURN;
 130     }
 131 
 132     /* pack the data - only pack the #bytes we read! */
 133     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &data, numbytes, OPAL_BYTE))) {
 134         ORTE_ERROR_LOG(rc);
 135         goto CLEAN_RETURN;
 136     }
 137 
 138     /* start non-blocking RML call to forward received data */
 139     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 140                          "%s iof:orted:read handler sending %d bytes to HNP",
 141                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes));
 142 
 143     orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
 144                             orte_rml_send_callback, NULL);
 145 
 146     /* re-add the event */
 147     ORTE_IOF_READ_ACTIVATE(rev);
 148 
 149     return;
 150 
 151  CLEAN_RETURN:
 152     /* must be an error, or zero bytes were read indicating that the
 153      * proc terminated this IOF channel - either way, release the
 154      * corresponding event. This deletes the read event and closes
 155      * the file descriptor */
 156     if (rev->tag & ORTE_IOF_STDOUT) {
 157         if( NULL != proct->revstdout ) {
 158             orte_iof_base_static_dump_output(proct->revstdout);
 159             OBJ_RELEASE(proct->revstdout);
 160         }
 161     } else if (rev->tag & ORTE_IOF_STDERR) {
 162         if( NULL != proct->revstderr ) {
 163             orte_iof_base_static_dump_output(proct->revstderr);
 164             OBJ_RELEASE(proct->revstderr);
 165         }
 166 #if OPAL_PMIX_V1
 167     } else if (rev->tag & ORTE_IOF_STDDIAG) {
 168         if( NULL != proct->revstddiag ) {
 169             orte_iof_base_static_dump_output(proct->revstddiag);
 170             OBJ_RELEASE(proct->revstddiag);
 171         }
 172 #endif
 173     }
 174     /* check to see if they are all done */
 175     if (NULL == proct->revstdout &&
 176 #if OPAL_PMIX_V1
 177         NULL == proct->revstddiag &&
 178 #endif
 179         NULL == proct->revstderr) {
 180         /* this proc's iof is complete */
 181         ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
 182     }
 183     if (NULL != buf) {
 184         OBJ_RELEASE(buf);
 185     }
 186     return;
 187 }

/* [<][>][^][v][top][bottom][index][help] */