root/orte/mca/iof/base/iof_base_output.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_iof_base_write_output
  2. orte_iof_base_static_dump_output
  3. orte_iof_base_write_handler

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2006 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2017      Intel, Inc. All rights reserved.
  14  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  *
  21  * These symbols are in a file by themselves to provide nice linker
  22  * semantics.  Since linkers generally pull in symbols by object
  23  * files, keeping these symbols as the only symbols in this file
  24  * prevents utility programs such as "ompi_info" from having to import
  25  * entire components just to query their version and parameters.
  26  */
  27 
  28 #include "orte_config.h"
  29 #include "orte/constants.h"
  30 
  31 #include <string.h>
  32 #include <stdlib.h>
  33 #ifdef HAVE_UNISTD_H
  34 #include <unistd.h>
  35 #endif
  36 #include <time.h>
  37 #include <errno.h>
  38 
  39 #include "opal/util/output.h"
  40 
  41 #include "orte/util/name_fns.h"
  42 #include "orte/util/threads.h"
  43 #include "orte/runtime/orte_globals.h"
  44 #include "orte/mca/errmgr/errmgr.h"
  45 #include "orte/mca/state/state.h"
  46 
  47 #include "orte/mca/iof/base/base.h"
  48 
  49 int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
  50                                const unsigned char *data, int numbytes,
  51                                orte_iof_write_event_t *channel)
  52 {
  53     char starttag[ORTE_IOF_BASE_TAG_MAX], endtag[ORTE_IOF_BASE_TAG_MAX], *suffix;
  54     orte_iof_write_output_t *output;
  55     int i, j, k, starttaglen, endtaglen, num_buffered;
  56     bool endtagged;
  57     char qprint[10];
  58 
  59     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
  60                          "%s write:output setting up to write %d bytes to %s for %s on fd %d",
  61                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
  62                          (ORTE_IOF_STDIN & stream) ? "stdin" : ((ORTE_IOF_STDOUT & stream) ? "stdout" : ((ORTE_IOF_STDERR & stream) ? "stderr" : "stddiag")),
  63                          ORTE_NAME_PRINT(name),
  64                          (NULL == channel) ? -1 : channel->fd));
  65 
  66     /* setup output object */
  67     output = OBJ_NEW(orte_iof_write_output_t);
  68 
  69     /* write output data to the corresponding tag */
  70     if (ORTE_IOF_STDIN & stream) {
  71         /* copy over the data to be written */
  72         if (0 < numbytes) {
  73             /* don't copy 0 bytes - we just need to pass
  74              * the zero bytes so the fd can be closed
  75              * after it writes everything out
  76              */
  77             memcpy(output->data, data, numbytes);
  78         }
  79         output->numbytes = numbytes;
  80         goto process;
  81     } else if (ORTE_IOF_STDOUT & stream) {
  82         /* write the bytes to stdout */
  83         suffix = "stdout";
  84     } else if (ORTE_IOF_STDERR & stream) {
  85         /* write the bytes to stderr */
  86         suffix = "stderr";
  87     } else if (ORTE_IOF_STDDIAG & stream) {
  88         /* write the bytes to stderr */
  89         suffix = "stddiag";
  90     } else {
  91         /* error - this should never happen */
  92         ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
  93         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
  94                              "%s stream %0x", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), stream));
  95         return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
  96     }
  97 
  98     /* if this is to be xml tagged, create a tag with the correct syntax - we do not allow
  99      * timestamping of xml output
 100      */
 101     if (orte_xml_output) {
 102         snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "<%s rank=\"%s\">", suffix, ORTE_VPID_PRINT(name->vpid));
 103         snprintf(endtag, ORTE_IOF_BASE_TAG_MAX, "</%s>", suffix);
 104         goto construct;
 105     }
 106 
 107     /* if we are to timestamp output, start the tag with that */
 108     if (orte_timestamp_output) {
 109         time_t mytime;
 110         char *cptr;
 111         /* get the timestamp */
 112         time(&mytime);
 113         cptr = ctime(&mytime);
 114         cptr[strlen(cptr)-1] = '\0';  /* remove trailing newline */
 115 
 116         if (orte_tag_output) {
 117             /* if we want it tagged as well, use both */
 118             snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s[%s,%s]<%s>:",
 119                      cptr, ORTE_LOCAL_JOBID_PRINT(name->jobid),
 120                      ORTE_VPID_PRINT(name->vpid), suffix);
 121         } else {
 122             /* only use timestamp */
 123             snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s<%s>:", cptr, suffix);
 124         }
 125         /* no endtag for this option */
 126         memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
 127         goto construct;
 128     }
 129 
 130     if (orte_tag_output) {
 131         snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "[%s,%s]<%s>:",
 132                  ORTE_LOCAL_JOBID_PRINT(name->jobid),
 133                  ORTE_VPID_PRINT(name->vpid), suffix);
 134         /* no endtag for this option */
 135         memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
 136         goto construct;
 137     }
 138 
 139     /* if we get here, then the data is not to be tagged - just copy it
 140      * and move on to processing
 141      */
 142     if (0 < numbytes) {
 143         /* don't copy 0 bytes - we just need to pass
 144          * the zero bytes so the fd can be closed
 145          * after it writes everything out
 146          */
 147         memcpy(output->data, data, numbytes);
 148     }
 149     output->numbytes = numbytes;
 150     goto process;
 151 
 152   construct:
 153     starttaglen = strlen(starttag);
 154     endtaglen = strlen(endtag);
 155     endtagged = false;
 156     /* start with the tag */
 157     for (j=0, k=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
 158         output->data[k++] = starttag[j];
 159     }
 160     /* cycle through the data looking for <cr>
 161      * and replace those with the tag
 162      */
 163     for (i=0; i < numbytes && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; i++) {
 164         if (orte_xml_output) {
 165             if ('&' == data[i]) {
 166                 if (k+5 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
 167                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 168                     goto process;
 169                 }
 170                 snprintf(qprint, 10, "&amp;");
 171                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
 172                     output->data[k++] = qprint[j];
 173                 }
 174             } else if ('<' == data[i]) {
 175                 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
 176                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 177                     goto process;
 178                 }
 179                 snprintf(qprint, 10, "&lt;");
 180                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
 181                     output->data[k++] = qprint[j];
 182                 }
 183             } else if ('>' == data[i]) {
 184                 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
 185                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 186                     goto process;
 187                 }
 188                 snprintf(qprint, 10, "&gt;");
 189                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
 190                     output->data[k++] = qprint[j];
 191                 }
 192             } else if (data[i] < 32 || data[i] > 127) {
 193                 /* this is a non-printable character, so escape it too */
 194                 if (k+7 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
 195                     ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 196                     goto process;
 197                 }
 198                 snprintf(qprint, 10, "&#%03d;", (int)data[i]);
 199                 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
 200                     output->data[k++] = qprint[j];
 201                 }
 202                 /* if this was a \n, then we also need to break the line with the end tag */
 203                 if ('\n' == data[i] && (k+endtaglen+1) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
 204                     /* we need to break the line with the end tag */
 205                     for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
 206                         output->data[k++] = endtag[j];
 207                     }
 208                     /* move the <cr> over */
 209                     output->data[k++] = '\n';
 210                     /* if this isn't the end of the data buffer, add a new start tag */
 211                     if (i < numbytes-1 && (k+starttaglen) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
 212                         for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
 213                             output->data[k++] = starttag[j];
 214                             endtagged = false;
 215                         }
 216                     } else {
 217                         endtagged = true;
 218                     }
 219                 }
 220             } else {
 221                 output->data[k++] = data[i];
 222             }
 223         } else {
 224             if ('\n' == data[i]) {
 225                 /* we need to break the line with the end tag */
 226                 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
 227                     output->data[k++] = endtag[j];
 228                 }
 229                 /* move the <cr> over */
 230                 output->data[k++] = '\n';
 231                 /* if this isn't the end of the data buffer, add a new start tag */
 232                 if (i < numbytes-1) {
 233                     for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
 234                         output->data[k++] = starttag[j];
 235                         endtagged = false;
 236                     }
 237                 } else {
 238                     endtagged = true;
 239                 }
 240             } else {
 241                 output->data[k++] = data[i];
 242             }
 243         }
 244     }
 245     if (!endtagged && k < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
 246         /* need to add an endtag */
 247         for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
 248             output->data[k++] = endtag[j];
 249         }
 250         output->data[k] = '\n';
 251     }
 252     output->numbytes = k;
 253 
 254   process:
 255     /* add this data to the write list for this fd */
 256     opal_list_append(&channel->outputs, &output->super);
 257 
 258     /* record how big the buffer is */
 259     num_buffered = opal_list_get_size(&channel->outputs);
 260 
 261     /* is the write event issued? */
 262     if (!channel->pending) {
 263         /* issue it */
 264         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 265                              "%s write:output adding write event",
 266                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 267         ORTE_IOF_SINK_ACTIVATE(channel);
 268     }
 269 
 270     return num_buffered;
 271 }
 272 
 273 void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
 274 {
 275     bool dump;
 276     int num_written;
 277     orte_iof_write_event_t *wev;
 278     orte_iof_write_output_t *output;
 279 
 280     if (NULL != rev->sink) {
 281         wev = rev->sink->wev;
 282         if (NULL != wev && !opal_list_is_empty(&wev->outputs)) {
 283             dump = false;
 284             /* make one last attempt to write this out */
 285             while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
 286                 if (!dump) {
 287                     num_written = write(wev->fd, output->data, output->numbytes);
 288                     if (num_written < output->numbytes) {
 289                         /* don't retry - just cleanout the list and dump it */
 290                         dump = true;
 291                     }
 292                 }
 293                 OBJ_RELEASE(output);
 294             }
 295         }
 296     }
 297 }
 298 
 299 void orte_iof_base_write_handler(int _fd, short event, void *cbdata)
 300 {
 301     orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
 302     orte_iof_write_event_t *wev = sink->wev;
 303     opal_list_item_t *item;
 304     orte_iof_write_output_t *output;
 305     int num_written, total_written = 0;
 306 
 307     ORTE_ACQUIRE_OBJECT(sink);
 308 
 309     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 310                          "%s write:handler writing data to %d",
 311                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 312                          wev->fd));
 313 
 314     while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 315         output = (orte_iof_write_output_t*)item;
 316         if (0 == output->numbytes) {
 317             /* indicates we are to close this stream */
 318             OBJ_RELEASE(sink);
 319             return;
 320         }
 321         num_written = write(wev->fd, output->data, output->numbytes);
 322         if (num_written < 0) {
 323             if (EAGAIN == errno || EINTR == errno) {
 324                 /* push this item back on the front of the list */
 325                 opal_list_prepend(&wev->outputs, item);
 326                 /* if the list is getting too large, abort */
 327                 if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
 328                     opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
 329                     ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
 330                     goto ABORT;
 331                 }
 332                 /* leave the write event running so it will call us again
 333                  * when the fd is ready.
 334                  */
 335                 goto NEXT_CALL;
 336             }
 337             /* otherwise, something bad happened so all we can do is abort
 338              * this attempt
 339              */
 340             OBJ_RELEASE(output);
 341             goto ABORT;
 342         } else if (num_written < output->numbytes) {
 343             /* incomplete write - adjust data to avoid duplicate output */
 344             memmove(output->data, &output->data[num_written], output->numbytes - num_written);
 345             /* adjust the number of bytes remaining to be written */
 346             output->numbytes -= num_written;
 347             /* push this item back on the front of the list */
 348             opal_list_prepend(&wev->outputs, item);
 349             /* if the list is getting too large, abort */
 350             if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
 351                 opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
 352                 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
 353                 goto ABORT;
 354             }
 355             /* leave the write event running so it will call us again
 356              * when the fd is ready
 357              */
 358             goto NEXT_CALL;
 359         }
 360         OBJ_RELEASE(output);
 361 
 362         total_written += num_written;
 363         if(wev->always_writable && (ORTE_IOF_SINK_BLOCKSIZE <= total_written)){
 364             /* If this is a regular file it will never tell us it will block
 365              * Write no more than ORTE_IOF_REGULARF_BLOCK at a time allowing
 366              * other fds to progress
 367              */
 368             goto NEXT_CALL;
 369         }
 370     }
 371   ABORT:
 372     wev->pending = false;
 373     ORTE_POST_OBJECT(wev);
 374     return;
 375 NEXT_CALL:
 376     ORTE_IOF_SINK_ACTIVATE(wev);
 377 }

/* [<][>][^][v][top][bottom][index][help] */