root/orte/mca/iof/base/base.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. orte_iof_base_fd_always_ready

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
  14  *                         All rights reserved.
  15  * Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
  16  * Copyright (c) 2017      IBM Corporation.  All rights reserved.
  17  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
  18  * Copyright (c) 2018      Research Organization for Information Science
  19  *                         and Technology (RIST).  All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  */
  26 /**
  27  * @file
  28  *
  29  * I/O Forwarding Service
  30  */
  31 
  32 #ifndef MCA_IOF_BASE_H
  33 #define MCA_IOF_BASE_H
  34 
  35 #include "orte_config.h"
  36 #ifdef HAVE_SYS_TYPES_H
  37 #include <sys/types.h>
  38 #endif
  39 #ifdef HAVE_SYS_UIO_H
  40 #include <sys/uio.h>
  41 #endif
  42 #ifdef HAVE_NET_UIO_H
  43 #include <net/uio.h>
  44 #endif
  45 #ifdef HAVE_UNISTD_H
  46 #include <unistd.h>
  47 #endif
  48 #include <signal.h>
  49 
  50 #include "opal/class/opal_list.h"
  51 #include "opal/class/opal_bitmap.h"
  52 #include "orte/mca/mca.h"
  53 #include "opal/mca/event/event.h"
  54 #include "opal/util/fd.h"
  55 
  56 #include "orte/mca/iof/iof.h"
  57 #include "orte/runtime/orte_globals.h"
  58 #include "orte/mca/rml/rml_types.h"
  59 #include "orte/util/threads.h"
  60 #include "orte/mca/errmgr/errmgr.h"
  61 
  62 BEGIN_C_DECLS
  63 
  64 /*
  65  * MCA framework
  66  */
  67 ORTE_DECLSPEC extern mca_base_framework_t orte_iof_base_framework;
  68 /*
  69  * Select an available component.
  70  */
  71 ORTE_DECLSPEC int orte_iof_base_select(void);
  72 
  73 /* track xon/xoff of processes */
  74 typedef struct {
  75     opal_object_t super;
  76     orte_job_t *jdata;
  77     opal_bitmap_t xoff;
  78 } orte_iof_job_t;
  79 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_job_t);
  80 
  81 /*
  82  * Maximum size of single msg
  83  */
  84 #define ORTE_IOF_BASE_MSG_MAX           4096
  85 #define ORTE_IOF_BASE_TAG_MAX             50
  86 #define ORTE_IOF_BASE_TAGGED_OUT_MAX    8192
  87 #define ORTE_IOF_MAX_INPUT_BUFFERS        50
  88 
  89 typedef struct {
  90     opal_list_item_t super;
  91     bool pending;
  92     bool always_writable;
  93     opal_event_t *ev;
  94     struct timeval tv;
  95     int fd;
  96     opal_list_t outputs;
  97 } orte_iof_write_event_t;
  98 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_event_t);
  99 
 100 typedef struct {
 101     opal_list_item_t super;
 102     orte_process_name_t name;
 103     orte_process_name_t daemon;
 104     orte_iof_tag_t tag;
 105     orte_iof_write_event_t *wev;
 106     bool xoff;
 107     bool exclusive;
 108     bool closed;
 109 } orte_iof_sink_t;
 110 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
 111 
 112 struct orte_iof_proc_t;
 113 typedef struct {
 114     opal_object_t super;
 115     struct orte_iof_proc_t *proc;
 116     opal_event_t *ev;
 117     struct timeval tv;
 118     int fd;
 119     orte_iof_tag_t tag;
 120     bool active;
 121     bool always_readable;
 122     orte_iof_sink_t *sink;
 123 } orte_iof_read_event_t;
 124 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
 125 
 126 typedef struct {
 127     opal_list_item_t super;
 128     orte_process_name_t name;
 129     orte_iof_sink_t *stdinev;
 130     orte_iof_read_event_t *revstdout;
 131     orte_iof_read_event_t *revstderr;
 132 #if OPAL_PMIX_V1
 133     orte_iof_read_event_t *revstddiag;
 134 #endif
 135     opal_list_t *subscribers;
 136     bool copy;
 137 } orte_iof_proc_t;
 138 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
 139 
 140 typedef struct {
 141     opal_list_item_t super;
 142     char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
 143     int numbytes;
 144 } orte_iof_write_output_t;
 145 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
 146 
 147 /* the iof globals struct */
 148 struct orte_iof_base_t {
 149     size_t                  output_limit;
 150     orte_iof_sink_t         *iof_write_stdout;
 151     orte_iof_sink_t         *iof_write_stderr;
 152     bool                    redirect_app_stderr_to_stdout;
 153 };
 154 typedef struct orte_iof_base_t orte_iof_base_t;
 155 
 156 /* Write event macro's */
 157 
 158 static inline bool
 159 orte_iof_base_fd_always_ready(int fd)
 160 {
 161     return opal_fd_is_regular(fd) ||
 162            (opal_fd_is_chardev(fd) && !isatty(fd)) ||
 163            opal_fd_is_blkdev(fd);
 164 }
 165 
 166 #define ORTE_IOF_SINK_BLOCKSIZE (1024)
 167 
 168 #define ORTE_IOF_SINK_ACTIVATE(wev)                                     \
 169     do {                                                                \
 170         struct timeval *tv = NULL;                                      \
 171         wev->pending = true;                                            \
 172         ORTE_POST_OBJECT(wev);                                          \
 173         if (wev->always_writable) {                                     \
 174             /* Regular is always write ready. Use timer to activate */  \
 175             tv = &wev->tv;                                        \
 176         }                                                               \
 177         if (opal_event_add(wev->ev, tv)) {                              \
 178             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);                         \
 179         }                                                               \
 180     } while(0);
 181 
 182 
 183 /* define an output "sink", adding it to the provided
 184  * endpoint list for this proc */
 185 #define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr)                \
 186     do {                                                                \
 187         orte_iof_sink_t *ep;                                            \
 188         OPAL_OUTPUT_VERBOSE((1,                                         \
 189                             orte_iof_base_framework.framework_output,   \
 190                             "defining endpt: file %s line %d fd %d",    \
 191                             __FILE__, __LINE__, (fid)));                \
 192         ep = OBJ_NEW(orte_iof_sink_t);                                  \
 193         ep->name.jobid = (nm)->jobid;                                   \
 194         ep->name.vpid = (nm)->vpid;                                     \
 195         ep->tag = (tg);                                                 \
 196         if (0 <= (fid)) {                                               \
 197             ep->wev->fd = (fid);                                        \
 198             ep->wev->always_writable =                                  \
 199                     orte_iof_base_fd_always_ready(fid);                 \
 200             if(ep->wev->always_writable) {                              \
 201                 opal_event_evtimer_set(orte_event_base,                 \
 202                                        ep->wev->ev,  wrthndlr, ep);     \
 203             } else {                                                    \
 204                 opal_event_set(orte_event_base,                         \
 205                                ep->wev->ev, ep->wev->fd,                \
 206                                OPAL_EV_WRITE,                           \
 207                                wrthndlr, ep);                           \
 208             }                                                           \
 209             opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI);         \
 210         }                                                               \
 211         *(snk) = ep;                                                    \
 212         ORTE_POST_OBJECT(ep);                                           \
 213     } while(0);
 214 
 215 /* Read event macro's */
 216 #define ORTE_IOF_READ_ADDEV(rev)                                \
 217     do {                                                        \
 218         struct timeval *tv = NULL;                              \
 219         if (rev->always_readable) {                             \
 220             tv = &rev->tv;                                      \
 221         }                                                       \
 222         if (opal_event_add(rev->ev, tv)) {                      \
 223             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);                 \
 224         }                                                       \
 225     } while(0);
 226 
 227 #define ORTE_IOF_READ_ACTIVATE(rev)                             \
 228     do {                                                        \
 229         rev->active = true;                                     \
 230         ORTE_POST_OBJECT(rev);                                  \
 231         ORTE_IOF_READ_ADDEV(rev);                               \
 232     } while(0);
 233 
 234 
 235 /* add list of structs that has name of proc + orte_iof_tag_t - when
 236  * defining a read event, search list for proc, add flag to the tag.
 237  * when closing a read fd, find proc on list and zero out that flag
 238  * when all flags = 0, then iof is complete - set message event to
 239  * daemon processor indicating proc iof is terminated
 240  */
 241 #define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv)               \
 242     do {                                                                \
 243         orte_iof_read_event_t *rev;                                     \
 244         OPAL_OUTPUT_VERBOSE((1,                                         \
 245                             orte_iof_base_framework.framework_output,   \
 246                             "%s defining read event for %s: %s %d",     \
 247                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),         \
 248                             ORTE_NAME_PRINT(&(p)->name),                \
 249                             __FILE__, __LINE__));                       \
 250         rev = OBJ_NEW(orte_iof_read_event_t);                           \
 251         OBJ_RETAIN((p));                                                \
 252         rev->proc = (struct orte_iof_proc_t*)(p);                       \
 253         rev->tag = (tg);                                                \
 254         rev->fd = (fid);                                                \
 255         rev->always_readable = orte_iof_base_fd_always_ready(fid);      \
 256         *(rv) = rev;                                                    \
 257         if(rev->always_readable) {                                      \
 258             opal_event_evtimer_set(orte_event_base,                     \
 259                                    rev->ev, (cbfunc), rev);             \
 260         } else {                                                        \
 261             opal_event_set(orte_event_base,                             \
 262                            rev->ev, (fid),                              \
 263                            OPAL_EV_READ,                                \
 264                            (cbfunc), rev);                              \
 265         }                                                               \
 266         opal_event_set_priority(rev->ev, ORTE_MSG_PRI);                 \
 267         if ((actv)) {                                                   \
 268             ORTE_IOF_READ_ACTIVATE(rev)                                 \
 269         }                                                               \
 270     } while(0);
 271 
 272 
 273 ORTE_DECLSPEC int orte_iof_base_flush(void);
 274 
 275 ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base;
 276 
 277 /* base functions */
 278 ORTE_DECLSPEC int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
 279                                              const unsigned char *data, int numbytes,
 280                                              orte_iof_write_event_t *channel);
 281 ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev);
 282 ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata);
 283 
 284 END_C_DECLS
 285 
 286 #endif /* MCA_IOF_BASE_H */

/* [<][>][^][v][top][bottom][index][help] */