root/orte/mca/iof/orted/iof_orted.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. init
  2. orted_push
  3. orted_pull
  4. orted_close
  5. orted_complete
  6. finalize
  7. orted_ft_event
  8. stdin_write_handler
  9. orted_output

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2011-2013 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2016-2019 Intel, Inc.  All rights reserved.
  16  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
  17  * Copyright (c) 2017      Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  */
  25 
  26 #include "orte_config.h"
  27 #include "opal/util/output.h"
  28 #include "orte/constants.h"
  29 
  30 #include <errno.h>
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif  /* HAVE_UNISTD_H */
  34 #include <string.h>
  35 
  36 #ifdef HAVE_FCNTL_H
  37 #include <fcntl.h>
  38 #else
  39 #ifdef HAVE_SYS_FCNTL_H
  40 #include <sys/fcntl.h>
  41 #endif
  42 #endif
  43 
  44 #include "opal/util/os_dirpath.h"
  45 
  46 #include "orte/mca/errmgr/errmgr.h"
  47 #include "orte/util/name_fns.h"
  48 #include "orte/util/threads.h"
  49 #include "orte/runtime/orte_globals.h"
  50 #include "orte/mca/odls/odls_types.h"
  51 #include "orte/mca/rml/rml.h"
  52 
  53 #include "orte/mca/iof/iof.h"
  54 #include "orte/mca/iof/base/base.h"
  55 #include "orte/mca/iof/base/iof_base_setup.h"
  56 
  57 #include "iof_orted.h"
  58 
  59 
  60 /* LOCAL FUNCTIONS */
  61 static void stdin_write_handler(int fd, short event, void *cbdata);
  62 
  63 
  64 /* API FUNCTIONS */
  65 static int init(void);
  66 
  67 static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
  68 
  69 static int orted_pull(const orte_process_name_t* src_name,
  70                       orte_iof_tag_t src_tag,
  71                       int fd);
  72 
  73 static int orted_close(const orte_process_name_t* peer,
  74                        orte_iof_tag_t source_tag);
  75 
  76 static int orted_output(const orte_process_name_t* peer,
  77                         orte_iof_tag_t source_tag,
  78                         const char *msg);
  79 
  80 static void orted_complete(const orte_job_t *jdata);
  81 
  82 static int finalize(void);
  83 
  84 static int orted_ft_event(int state);
  85 
  86 /* The API's in this module are solely used to support LOCAL
  87  * procs - i.e., procs that are co-located to the daemon. Output
  88  * from local procs is automatically sent to the HNP for output
  89  * and possible forwarding to other requestors. The HNP automatically
  90  * determines and wires up the stdin configuration, so we don't
  91  * have to do anything here.
  92  */
  93 
  94 orte_iof_base_module_t orte_iof_orted_module = {
  95     .init = init,
  96     .push = orted_push,
  97     .pull = orted_pull,
  98     .close = orted_close,
  99     .output = orted_output,
 100     .complete = orted_complete,
 101     .finalize = finalize,
 102     .ft_event = orted_ft_event
 103 };
 104 
 105 static int init(void)
 106 {
 107     /* post a non-blocking RML receive to get messages
 108      from the HNP IOF component */
 109     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 110                             ORTE_RML_TAG_IOF_PROXY,
 111                             ORTE_RML_PERSISTENT,
 112                             orte_iof_orted_recv,
 113                             NULL);
 114 
 115     /* setup the local global variables */
 116     OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
 117     mca_iof_orted_component.xoff = false;
 118 
 119     return ORTE_SUCCESS;
 120 }
 121 
 122 /**
 123  * Push data from the specified file descriptor
 124  * to the HNP
 125  */
 126 
 127 static int orted_push(const orte_process_name_t* dst_name,
 128                       orte_iof_tag_t src_tag, int fd)
 129 {
 130     int flags;
 131     orte_iof_proc_t *proct;
 132     int rc;
 133     orte_job_t *jobdat=NULL;
 134     orte_ns_cmp_bitmask_t mask;
 135 
 136    OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 137                          "%s iof:orted pushing fd %d for process %s",
 138                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 139                          fd, ORTE_NAME_PRINT(dst_name)));
 140 
 141     /* set the file descriptor to non-blocking - do this before we setup
 142      * and activate the read event in case it fires right away
 143      */
 144     if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 145         opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 146                     __FILE__, __LINE__, errno);
 147     } else {
 148         flags |= O_NONBLOCK;
 149         fcntl(fd, F_SETFL, flags);
 150     }
 151 
 152     /* do we already have this process in our list? */
 153     OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
 154         mask = ORTE_NS_CMP_ALL;
 155 
 156         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
 157             /* found it */
 158             goto SETUP;
 159         }
 160     }
 161     /* if we get here, then we don't yet have this proc in our list */
 162     proct = OBJ_NEW(orte_iof_proc_t);
 163     proct->name.jobid = dst_name->jobid;
 164     proct->name.vpid = dst_name->vpid;
 165     opal_list_append(&mca_iof_orted_component.procs, &proct->super);
 166 
 167   SETUP:
 168     /* get the local jobdata for this proc */
 169     if (NULL == (jobdat = orte_get_job_data_object(proct->name.jobid))) {
 170         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 171         return ORTE_ERR_NOT_FOUND;
 172     }
 173     /* define a read event and activate it */
 174     if (src_tag & ORTE_IOF_STDOUT) {
 175         ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
 176                             orte_iof_orted_read_handler, false);
 177     } else if (src_tag & ORTE_IOF_STDERR) {
 178         ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
 179                             orte_iof_orted_read_handler, false);
 180 #if OPAL_PMIX_V1
 181     } else if (src_tag & ORTE_IOF_STDDIAG) {
 182         ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
 183                             orte_iof_orted_read_handler, false);
 184 #endif
 185     }
 186     /* setup any requested output files */
 187     if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jobdat, proct))) {
 188         ORTE_ERROR_LOG(rc);
 189         return rc;
 190     }
 191 
 192     /* if -all- of the readevents for this proc have been defined, then
 193      * activate them. Otherwise, we can think that the proc is complete
 194      * because one of the readevents fires -prior- to all of them having
 195      * been defined!
 196      */
 197     if (NULL != proct->revstdout &&
 198 #if OPAL_PMIX_V1
 199         NULL != proct->revstddiag &&
 200 #endif
 201         (orte_iof_base.redirect_app_stderr_to_stdout || NULL != proct->revstderr)) {
 202         ORTE_IOF_READ_ACTIVATE(proct->revstdout);
 203         if (!orte_iof_base.redirect_app_stderr_to_stdout) {
 204             ORTE_IOF_READ_ACTIVATE(proct->revstderr);
 205         }
 206 #if OPAL_PMIX_V1
 207         if (NULL != proct->revstddiag) {
 208             ORTE_IOF_READ_ACTIVATE(proct->revstddiag);
 209         }
 210 #endif
 211     }
 212     return ORTE_SUCCESS;
 213 }
 214 
 215 
 216 /**
 217  * Pull for a daemon tells
 218  * us that any info we receive from the HNP that is targeted
 219  * for stdin of the specified process should be fed down the
 220  * indicated file descriptor. Thus, all we need to do here
 221  * is define a local endpoint so we know where to feed anything
 222  * that comes to us
 223  */
 224 
 225 static int orted_pull(const orte_process_name_t* dst_name,
 226                       orte_iof_tag_t src_tag,
 227                       int fd)
 228 {
 229     orte_iof_proc_t *proct;
 230     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 231     int flags;
 232 
 233     /* this is a local call - only stdin is supported */
 234     if (ORTE_IOF_STDIN != src_tag) {
 235         return ORTE_ERR_NOT_SUPPORTED;
 236     }
 237 
 238     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 239                          "%s iof:orted pulling fd %d for process %s",
 240                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 241                          fd, ORTE_NAME_PRINT(dst_name)));
 242 
 243     /* set the file descriptor to non-blocking - do this before we setup
 244      * the sink in case it fires right away
 245      */
 246     if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 247         opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 248                     __FILE__, __LINE__, errno);
 249     } else {
 250         flags |= O_NONBLOCK;
 251         fcntl(fd, F_SETFL, flags);
 252     }
 253 
 254     /* do we already have this process in our list? */
 255     OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
 256         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
 257             /* found it */
 258             goto SETUP;
 259         }
 260     }
 261     /* if we get here, then we don't yet have this proc in our list */
 262     proct = OBJ_NEW(orte_iof_proc_t);
 263     proct->name.jobid = dst_name->jobid;
 264     proct->name.vpid = dst_name->vpid;
 265     opal_list_append(&mca_iof_orted_component.procs, &proct->super);
 266 
 267   SETUP:
 268     ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
 269                          stdin_write_handler);
 270 
 271     return ORTE_SUCCESS;
 272 }
 273 
 274 
 275 /*
 276  * One of our local procs wants us to close the specifed
 277  * stream(s), thus terminating any potential io to/from it.
 278  * For the orted, this just means closing the local fd
 279  */
 280 static int orted_close(const orte_process_name_t* peer,
 281                        orte_iof_tag_t source_tag)
 282 {
 283     orte_iof_proc_t* proct;
 284     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 285 
 286     OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
 287         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
 288             if (ORTE_IOF_STDIN & source_tag) {
 289                 if (NULL != proct->stdinev) {
 290                     OBJ_RELEASE(proct->stdinev);
 291                 }
 292                 proct->stdinev = NULL;
 293             }
 294             if ((ORTE_IOF_STDOUT & source_tag) ||
 295                 (ORTE_IOF_STDMERGE & source_tag)) {
 296                 if (NULL != proct->revstdout) {
 297                     orte_iof_base_static_dump_output(proct->revstdout);
 298                     OBJ_RELEASE(proct->revstdout);
 299                 }
 300                 proct->revstdout = NULL;
 301             }
 302             if (ORTE_IOF_STDERR & source_tag) {
 303                 if (NULL != proct->revstderr) {
 304                     orte_iof_base_static_dump_output(proct->revstderr);
 305                     OBJ_RELEASE(proct->revstderr);
 306                 }
 307                 proct->revstderr = NULL;
 308             }
 309 #if OPAL_PMIX_V1
 310             if (ORTE_IOF_STDDIAG & source_tag) {
 311                 if (NULL != proct->revstddiag) {
 312                     orte_iof_base_static_dump_output(proct->revstddiag);
 313                     OBJ_RELEASE(proct->revstddiag);
 314                 }
 315                 proct->revstddiag = NULL;
 316             }
 317 #endif
 318             /* if we closed them all, then remove this proc */
 319             if (NULL == proct->stdinev &&
 320                 NULL == proct->revstdout &&
 321 #if OPAL_PMIX_V1
 322                 NULL == proct->revstddiag &&
 323 #endif
 324                 NULL == proct->revstderr) {
 325                 opal_list_remove_item(&mca_iof_orted_component.procs, &proct->super);
 326                 OBJ_RELEASE(proct);
 327             }
 328             break;
 329         }
 330     }
 331 
 332     return ORTE_SUCCESS;
 333 }
 334 
 335 static void orted_complete(const orte_job_t *jdata)
 336 {
 337     orte_iof_proc_t *proct, *next;
 338 
 339     /* cleanout any lingering sinks */
 340     OPAL_LIST_FOREACH_SAFE(proct, next, &mca_iof_orted_component.procs, orte_iof_proc_t) {
 341         if (jdata->jobid == proct->name.jobid) {
 342             opal_list_remove_item(&mca_iof_orted_component.procs, &proct->super);
 343             OBJ_RELEASE(proct);
 344         }
 345     }
 346 }
 347 
 348 static int finalize(void)
 349 {
 350     orte_iof_proc_t *proct;
 351 
 352     /* cycle thru the procs and ensure all their output was delivered
 353      * if they were writing to files */
 354     while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_orted_component.procs))) {
 355         if (NULL != proct->revstdout) {
 356             orte_iof_base_static_dump_output(proct->revstdout);
 357         }
 358         if (NULL != proct->revstderr) {
 359             orte_iof_base_static_dump_output(proct->revstderr);
 360         }
 361 #if OPAL_PMIX_V1
 362         if (NULL != proct->revstddiag) {
 363             orte_iof_base_static_dump_output(proct->revstddiag);
 364         }
 365 #endif
 366         OBJ_RELEASE(proct);
 367     }
 368     OBJ_DESTRUCT(&mca_iof_orted_component.procs);
 369 
 370     /* Cancel the RML receive */
 371     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
 372     return ORTE_SUCCESS;
 373 }
 374 
 375 /*
 376  * FT event
 377  */
 378 
 379 static int orted_ft_event(int state)
 380 {
 381     return ORTE_ERR_NOT_IMPLEMENTED;
 382 }
 383 
 384 static void stdin_write_handler(int _fd, short event, void *cbdata)
 385 {
 386     orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
 387     orte_iof_write_event_t *wev = sink->wev;
 388     opal_list_item_t *item;
 389     orte_iof_write_output_t *output;
 390     int num_written;
 391 
 392     ORTE_ACQUIRE_OBJECT(sink);
 393 
 394     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 395                          "%s orted:stdin:write:handler writing data to %d",
 396                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 397                          wev->fd));
 398 
 399     wev->pending = false;
 400 
 401     while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 402         output = (orte_iof_write_output_t*)item;
 403         if (0 == output->numbytes) {
 404             /* this indicates we are to close the fd - there is
 405              * nothing to write
 406              */
 407             OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
 408                                  "%s iof:orted closing fd %d on write event due to zero bytes output",
 409                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
 410             OBJ_RELEASE(wev);
 411             sink->wev = NULL;
 412             return;
 413         }
 414         num_written = write(wev->fd, output->data, output->numbytes);
 415         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 416                              "%s orted:stdin:write:handler wrote %d bytes",
 417                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 418                              num_written));
 419         if (num_written < 0) {
 420             if (EAGAIN == errno || EINTR == errno) {
 421                 /* push this item back on the front of the list */
 422                 opal_list_prepend(&wev->outputs, item);
 423                 /* leave the write event running so it will call us again
 424                  * when the fd is ready.
 425                  */
 426                 ORTE_IOF_SINK_ACTIVATE(wev);
 427                 goto CHECK;
 428             }
 429             /* otherwise, something bad happened so all we can do is declare an
 430              * error and abort
 431              */
 432             OBJ_RELEASE(output);
 433             OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
 434                                  "%s iof:orted closing fd %d on write event due to negative bytes written",
 435                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
 436             OBJ_RELEASE(wev);
 437             sink->wev = NULL;
 438             /* tell the HNP to stop sending us stuff */
 439             if (!mca_iof_orted_component.xoff) {
 440                 mca_iof_orted_component.xoff = true;
 441                 orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
 442             }
 443             return;
 444         } else if (num_written < output->numbytes) {
 445             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 446                                  "%s orted:stdin:write:handler incomplete write %d - adjusting data",
 447                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
 448             /* incomplete write - adjust data to avoid duplicate output */
 449             memmove(output->data, &output->data[num_written], output->numbytes - num_written);
 450             /* push this item back on the front of the list */
 451             opal_list_prepend(&wev->outputs, item);
 452             /* leave the write event running so it will call us again
 453              * when the fd is ready.
 454              */
 455             ORTE_IOF_SINK_ACTIVATE(wev);
 456             goto CHECK;
 457         }
 458         OBJ_RELEASE(output);
 459     }
 460 
 461 CHECK:
 462     if (mca_iof_orted_component.xoff) {
 463         /* if we have told the HNP to stop reading stdin, see if
 464          * the proc has absorbed enough to justify restart
 465          *
 466          * RHC: Note that when multiple procs want stdin, we
 467          * can get into a fight between a proc turnin stdin
 468          * back "on" and other procs turning it "off". There
 469          * is no clear way to resolve this as different procs
 470          * may take input at different rates.
 471          */
 472         if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) {
 473             /* restart the read */
 474             mca_iof_orted_component.xoff = false;
 475             orte_iof_orted_send_xonxoff(ORTE_IOF_XON);
 476         }
 477     }
 478 }
 479 
 480 static int orted_output(const orte_process_name_t* peer,
 481                         orte_iof_tag_t source_tag,
 482                         const char *msg)
 483 {
 484     opal_buffer_t *buf;
 485     int rc;
 486 
 487     /* prep the buffer */
 488     buf = OBJ_NEW(opal_buffer_t);
 489 
 490     /* pack the stream first - we do this so that flow control messages can
 491      * consist solely of the tag
 492      */
 493     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &source_tag, 1, ORTE_IOF_TAG))) {
 494         ORTE_ERROR_LOG(rc);
 495         return rc;
 496     }
 497 
 498     /* pack name of process that gave us this data */
 499     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, peer, 1, ORTE_NAME))) {
 500         ORTE_ERROR_LOG(rc);
 501         return rc;
 502     }
 503 
 504     /* pack the data - for compatibility, we have to pack this as OPAL_BYTE,
 505      * so ensure we include the NULL string terminator */
 506     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, msg, strlen(msg)+1, OPAL_BYTE))) {
 507         ORTE_ERROR_LOG(rc);
 508         return rc;
 509     }
 510 
 511     /* start non-blocking RML call to forward received data */
 512     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 513                          "%s iof:orted:output sending %d bytes to HNP",
 514                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)strlen(msg)+1));
 515 
 516     orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
 517                             orte_rml_send_callback, NULL);
 518 
 519     return ORTE_SUCCESS;
 520 }

/* [<][>][^][v][top][bottom][index][help] */