root/orte/mca/iof/tool/iof_tool.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. init
  2. tool_push
  3. send_cb
  4. tool_pull
  5. tool_close
  6. finalize
  7. tool_output
  8. tool_ft_event

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2005 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2011-2013 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "orte_config.h"
  24 #include "orte/constants.h"
  25 
  26 #include <errno.h>
  27 #ifdef HAVE_UNISTD_H
  28 #include <unistd.h>
  29 #endif  /* HAVE_UNISTD_H */
  30 #include <string.h>
  31 
  32 #include "orte/mca/rml/rml.h"
  33 #include "orte/mca/rml/rml_types.h"
  34 #include "orte/mca/errmgr/errmgr.h"
  35 #include "orte/util/name_fns.h"
  36 #include "orte/runtime/orte_globals.h"
  37 #include "orte/runtime/orte_wait.h"
  38 
  39 #include "orte/mca/iof/iof.h"
  40 #include "orte/mca/iof/base/base.h"
  41 
  42 #include "iof_tool.h"
  43 
  44 static int init(void);
  45 
  46 static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
  47 
  48 static int tool_pull(const orte_process_name_t* src_name,
  49                       orte_iof_tag_t src_tag,
  50                       int fd);
  51 
  52 static int tool_close(const orte_process_name_t* peer,
  53                        orte_iof_tag_t source_tag);
  54 
  55 static int tool_output(const orte_process_name_t* peer,
  56                        orte_iof_tag_t source_tag,
  57                        const char *msg);
  58 
  59 static int finalize(void);
  60 
  61 static int tool_ft_event(int state);
  62 
  63 orte_iof_base_module_t orte_iof_tool_module = {
  64     .init = init,
  65     .push = tool_push,
  66     .pull = tool_pull,
  67     .close = tool_close,
  68     .output = tool_output,
  69     .finalize = finalize,
  70     .ft_event = tool_ft_event
  71 };
  72 
  73 
  74 static int init(void)
  75 {
  76     /* post a non-blocking RML receive to get messages
  77      from the HNP IOF component */
  78     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  79                             ORTE_RML_TAG_IOF_PROXY,
  80                             ORTE_RML_PERSISTENT,
  81                             orte_iof_tool_recv,
  82                             NULL);
  83 
  84     mca_iof_tool_component.closed = false;
  85 
  86     return ORTE_SUCCESS;
  87 }
  88 
  89 /**
  90  * Push data from the specified file descriptor
  91  * to the indicated SINK set of peers.
  92  */
  93 
  94 static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
  95 {
  96     /* at this time, we do not allow tools to push data into the
  97      * stdin of a job. This is due to potential confusion over which
  98      * stdin is being read/used, and the impossibility of resolving
  99      * potential interleaving of the data
 100      */
 101 
 102     return ORTE_ERR_NOT_SUPPORTED;
 103 }
 104 
 105 
 106 /*
 107  * Callback when non-blocking RML send completes.
 108  */
 109 static void send_cb(int status, orte_process_name_t *peer,
 110                     opal_buffer_t *buf, orte_rml_tag_t tag,
 111                     void *cbdata)
 112 {
 113     /* nothing to do here - just release buffer and return */
 114     OBJ_RELEASE(buf);
 115 }
 116 
 117 /**
 118  * Pull data from the specified set of SOURCE peers and
 119  * dump to the indicated file descriptor.
 120  */
 121 
 122 static int tool_pull(const orte_process_name_t* src_name,
 123                      orte_iof_tag_t src_tag,
 124                      int fd)
 125 {
 126     /* if we are a tool, then we need to request the HNP to please
 127      * forward the data from the specified process to us. Note that
 128      * the HNP will return an error if the specified stream of any
 129      * intended recipient is not open. By default, stdout/err/diag
 130      * are all left open. However, the user can also direct us to
 131      * close any or all of those streams, so the success of this call
 132      * will depend upon how the user executed the application
 133      */
 134 
 135     opal_buffer_t *buf;
 136     orte_iof_tag_t tag;
 137     orte_process_name_t hnp;
 138     int rc;
 139 
 140     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 141                          "%s pulling output for proc %s",
 142                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 143                          ORTE_NAME_PRINT(src_name)));
 144 
 145     buf = OBJ_NEW(opal_buffer_t);
 146 
 147     /* setup the tag to pull from HNP */
 148     tag = src_tag | ORTE_IOF_PULL;
 149 
 150     /* pack the tag - we do this first so that flow control messages can
 151      * consist solely of the tag
 152      */
 153     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
 154         ORTE_ERROR_LOG(rc);
 155         OBJ_RELEASE(buf);
 156         return rc;
 157     }
 158     /* pack the name of the source */
 159     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) {
 160         ORTE_ERROR_LOG(rc);
 161         OBJ_RELEASE(buf);
 162         return rc;
 163     }
 164     /* pack our name as the sink */
 165     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
 166         ORTE_ERROR_LOG(rc);
 167         OBJ_RELEASE(buf);
 168         return rc;
 169     }
 170 
 171     /* send the buffer to the correct HNP */
 172     ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid);
 173     orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP,
 174                             send_cb, NULL);
 175 
 176     return ORTE_SUCCESS;
 177 }
 178 
 179 
 180 static int tool_close(const orte_process_name_t* src_name,
 181                       orte_iof_tag_t src_tag)
 182 {
 183     /* if we are a tool, then we need to request the HNP to stop
 184      * forwarding data from this process/stream
 185      */
 186 
 187     opal_buffer_t *buf;
 188     orte_iof_tag_t tag;
 189     orte_process_name_t hnp;
 190     int rc;
 191 
 192     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 193                          "%s closing output for proc %s",
 194                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 195                          ORTE_NAME_PRINT(src_name)));
 196 
 197     buf = OBJ_NEW(opal_buffer_t);
 198 
 199     /* setup the tag to stop the copy */
 200     tag = src_tag | ORTE_IOF_CLOSE;
 201 
 202     /* pack the tag - we do this first so that flow control messages can
 203      * consist solely of the tag
 204      */
 205     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
 206         ORTE_ERROR_LOG(rc);
 207         OBJ_RELEASE(buf);
 208         return rc;
 209     }
 210     /* pack the name of the source */
 211     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) {
 212         ORTE_ERROR_LOG(rc);
 213         OBJ_RELEASE(buf);
 214         return rc;
 215     }
 216 
 217     /* flag that the close is incomplete */
 218     mca_iof_tool_component.closed = false;
 219 
 220     /* send the buffer to the correct HNP */
 221     ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid);
 222     orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP,
 223                             send_cb, NULL);
 224 
 225     return ORTE_SUCCESS;
 226 }
 227 
 228 static int finalize(void)
 229 {
 230     opal_list_item_t* item;
 231     orte_iof_write_output_t *output;
 232     orte_iof_write_event_t *wev;
 233     int num_written;
 234     bool dump;
 235 
 236     /* check if anything is still trying to be written out */
 237     wev = orte_iof_base.iof_write_stdout->wev;
 238     if (!opal_list_is_empty(&wev->outputs)) {
 239         dump = false;
 240         /* make one last attempt to write this out */
 241         while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 242             output = (orte_iof_write_output_t*)item;
 243             if (!dump) {
 244                 num_written = write(wev->fd, output->data, output->numbytes);
 245                 if (num_written < output->numbytes) {
 246                     /* don't retry - just cleanout the list and dump it */
 247                     dump = true;
 248                 }
 249             }
 250             OBJ_RELEASE(output);
 251         }
 252     }
 253     OBJ_RELEASE(orte_iof_base.iof_write_stdout);
 254     if (!orte_xml_output) {
 255         /* we only opened stderr channel if we are NOT doing xml output */
 256         wev = orte_iof_base.iof_write_stderr->wev;
 257         if (!opal_list_is_empty(&wev->outputs)) {
 258             dump = false;
 259             /* make one last attempt to write this out */
 260             while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 261                 output = (orte_iof_write_output_t*)item;
 262                 if (!dump) {
 263                     num_written = write(wev->fd, output->data, output->numbytes);
 264                     if (num_written < output->numbytes) {
 265                         /* don't retry - just cleanout the list and dump it */
 266                         dump = true;
 267                     }
 268                 }
 269                 OBJ_RELEASE(output);
 270             }
 271         }
 272         OBJ_RELEASE(orte_iof_base.iof_write_stderr);
 273     }
 274 
 275     /* Cancel the RML receive */
 276     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
 277 
 278     return ORTE_SUCCESS;
 279 }
 280 
 281 static int tool_output(const orte_process_name_t* peer,
 282                        orte_iof_tag_t source_tag,
 283                        const char *msg)
 284 {
 285     /* output this to our local output */
 286     if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
 287         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
 288     } else {
 289         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
 290     }
 291 
 292     return ORTE_SUCCESS;
 293 }
 294 
 295 /*
 296  * FT event
 297  */
 298 
 299 static int tool_ft_event(int state)
 300 {
 301     return ORTE_ERR_NOT_IMPLEMENTED;
 302 }

/* [<][>][^][v][top][bottom][index][help] */