1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2011 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
13 * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
14 * All rights reserved.
15 * Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
16 * Copyright (c) 2017 IBM Corporation. All rights reserved.
17 * Copyright (c) 2017 Mellanox Technologies. All rights reserved.
18 * Copyright (c) 2018 Research Organization for Information Science
19 * and Technology (RIST). All rights reserved.
20 * $COPYRIGHT$
21 *
22 * Additional copyrights may follow
23 *
24 * $HEADER$
25 */
26 /**
27 * @file
28 *
29 * I/O Forwarding Service
30 */
31
32 #ifndef MCA_IOF_BASE_H
33 #define MCA_IOF_BASE_H
34
35 #include "orte_config.h"
36 #ifdef HAVE_SYS_TYPES_H
37 #include <sys/types.h>
38 #endif
39 #ifdef HAVE_SYS_UIO_H
40 #include <sys/uio.h>
41 #endif
42 #ifdef HAVE_NET_UIO_H
43 #include <net/uio.h>
44 #endif
45 #ifdef HAVE_UNISTD_H
46 #include <unistd.h>
47 #endif
48 #include <signal.h>
49
50 #include "opal/class/opal_list.h"
51 #include "opal/class/opal_bitmap.h"
52 #include "orte/mca/mca.h"
53 #include "opal/mca/event/event.h"
54 #include "opal/util/fd.h"
55
56 #include "orte/mca/iof/iof.h"
57 #include "orte/runtime/orte_globals.h"
58 #include "orte/mca/rml/rml_types.h"
59 #include "orte/util/threads.h"
60 #include "orte/mca/errmgr/errmgr.h"
61
62 BEGIN_C_DECLS
63
64 /*
65 * MCA framework
66 */
67 ORTE_DECLSPEC extern mca_base_framework_t orte_iof_base_framework;
68 /*
69 * Select an available component.
70 */
71 ORTE_DECLSPEC int orte_iof_base_select(void);
72
73 /* track xon/xoff of processes */
74 typedef struct {
75 opal_object_t super;
76 orte_job_t *jdata;
77 opal_bitmap_t xoff;
78 } orte_iof_job_t;
79 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_job_t);
80
81 /*
82 * Maximum size of single msg
83 */
84 #define ORTE_IOF_BASE_MSG_MAX 4096
85 #define ORTE_IOF_BASE_TAG_MAX 50
86 #define ORTE_IOF_BASE_TAGGED_OUT_MAX 8192
87 #define ORTE_IOF_MAX_INPUT_BUFFERS 50
88
89 typedef struct {
90 opal_list_item_t super;
91 bool pending;
92 bool always_writable;
93 opal_event_t *ev;
94 struct timeval tv;
95 int fd;
96 opal_list_t outputs;
97 } orte_iof_write_event_t;
98 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_event_t);
99
100 typedef struct {
101 opal_list_item_t super;
102 orte_process_name_t name;
103 orte_process_name_t daemon;
104 orte_iof_tag_t tag;
105 orte_iof_write_event_t *wev;
106 bool xoff;
107 bool exclusive;
108 bool closed;
109 } orte_iof_sink_t;
110 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
111
112 struct orte_iof_proc_t;
113 typedef struct {
114 opal_object_t super;
115 struct orte_iof_proc_t *proc;
116 opal_event_t *ev;
117 struct timeval tv;
118 int fd;
119 orte_iof_tag_t tag;
120 bool active;
121 bool always_readable;
122 orte_iof_sink_t *sink;
123 } orte_iof_read_event_t;
124 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
125
126 typedef struct {
127 opal_list_item_t super;
128 orte_process_name_t name;
129 orte_iof_sink_t *stdinev;
130 orte_iof_read_event_t *revstdout;
131 orte_iof_read_event_t *revstderr;
132 #if OPAL_PMIX_V1
133 orte_iof_read_event_t *revstddiag;
134 #endif
135 opal_list_t *subscribers;
136 bool copy;
137 } orte_iof_proc_t;
138 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
139
140 typedef struct {
141 opal_list_item_t super;
142 char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
143 int numbytes;
144 } orte_iof_write_output_t;
145 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
146
147 /* the iof globals struct */
148 struct orte_iof_base_t {
149 size_t output_limit;
150 orte_iof_sink_t *iof_write_stdout;
151 orte_iof_sink_t *iof_write_stderr;
152 bool redirect_app_stderr_to_stdout;
153 };
154 typedef struct orte_iof_base_t orte_iof_base_t;
155
156 /* Write event macro's */
157
158 static inline bool
159 orte_iof_base_fd_always_ready(int fd)
160 {
161 return opal_fd_is_regular(fd) ||
162 (opal_fd_is_chardev(fd) && !isatty(fd)) ||
163 opal_fd_is_blkdev(fd);
164 }
165
166 #define ORTE_IOF_SINK_BLOCKSIZE (1024)
167
168 #define ORTE_IOF_SINK_ACTIVATE(wev) \
169 do { \
170 struct timeval *tv = NULL; \
171 wev->pending = true; \
172 ORTE_POST_OBJECT(wev); \
173 if (wev->always_writable) { \
174 /* Regular is always write ready. Use timer to activate */ \
175 tv = &wev->tv; \
176 } \
177 if (opal_event_add(wev->ev, tv)) { \
178 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
179 } \
180 } while(0);
181
182
183 /* define an output "sink", adding it to the provided
184 * endpoint list for this proc */
185 #define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
186 do { \
187 orte_iof_sink_t *ep; \
188 OPAL_OUTPUT_VERBOSE((1, \
189 orte_iof_base_framework.framework_output, \
190 "defining endpt: file %s line %d fd %d", \
191 __FILE__, __LINE__, (fid))); \
192 ep = OBJ_NEW(orte_iof_sink_t); \
193 ep->name.jobid = (nm)->jobid; \
194 ep->name.vpid = (nm)->vpid; \
195 ep->tag = (tg); \
196 if (0 <= (fid)) { \
197 ep->wev->fd = (fid); \
198 ep->wev->always_writable = \
199 orte_iof_base_fd_always_ready(fid); \
200 if(ep->wev->always_writable) { \
201 opal_event_evtimer_set(orte_event_base, \
202 ep->wev->ev, wrthndlr, ep); \
203 } else { \
204 opal_event_set(orte_event_base, \
205 ep->wev->ev, ep->wev->fd, \
206 OPAL_EV_WRITE, \
207 wrthndlr, ep); \
208 } \
209 opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
210 } \
211 *(snk) = ep; \
212 ORTE_POST_OBJECT(ep); \
213 } while(0);
214
215 /* Read event macro's */
216 #define ORTE_IOF_READ_ADDEV(rev) \
217 do { \
218 struct timeval *tv = NULL; \
219 if (rev->always_readable) { \
220 tv = &rev->tv; \
221 } \
222 if (opal_event_add(rev->ev, tv)) { \
223 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
224 } \
225 } while(0);
226
227 #define ORTE_IOF_READ_ACTIVATE(rev) \
228 do { \
229 rev->active = true; \
230 ORTE_POST_OBJECT(rev); \
231 ORTE_IOF_READ_ADDEV(rev); \
232 } while(0);
233
234
235 /* add list of structs that has name of proc + orte_iof_tag_t - when
236 * defining a read event, search list for proc, add flag to the tag.
237 * when closing a read fd, find proc on list and zero out that flag
238 * when all flags = 0, then iof is complete - set message event to
239 * daemon processor indicating proc iof is terminated
240 */
241 #define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv) \
242 do { \
243 orte_iof_read_event_t *rev; \
244 OPAL_OUTPUT_VERBOSE((1, \
245 orte_iof_base_framework.framework_output, \
246 "%s defining read event for %s: %s %d", \
247 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
248 ORTE_NAME_PRINT(&(p)->name), \
249 __FILE__, __LINE__)); \
250 rev = OBJ_NEW(orte_iof_read_event_t); \
251 OBJ_RETAIN((p)); \
252 rev->proc = (struct orte_iof_proc_t*)(p); \
253 rev->tag = (tg); \
254 rev->fd = (fid); \
255 rev->always_readable = orte_iof_base_fd_always_ready(fid); \
256 *(rv) = rev; \
257 if(rev->always_readable) { \
258 opal_event_evtimer_set(orte_event_base, \
259 rev->ev, (cbfunc), rev); \
260 } else { \
261 opal_event_set(orte_event_base, \
262 rev->ev, (fid), \
263 OPAL_EV_READ, \
264 (cbfunc), rev); \
265 } \
266 opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
267 if ((actv)) { \
268 ORTE_IOF_READ_ACTIVATE(rev) \
269 } \
270 } while(0);
271
272
273 ORTE_DECLSPEC int orte_iof_base_flush(void);
274
275 ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base;
276
277 /* base functions */
278 ORTE_DECLSPEC int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
279 const unsigned char *data, int numbytes,
280 orte_iof_write_event_t *channel);
281 ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev);
282 ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata);
283
284 END_C_DECLS
285
286 #endif /* MCA_IOF_BASE_H */