This source file includes following definitions.
- orte_iof_base_fd_always_ready
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 #ifndef MCA_IOF_BASE_H
33 #define MCA_IOF_BASE_H
34
35 #include "orte_config.h"
36 #ifdef HAVE_SYS_TYPES_H
37 #include <sys/types.h>
38 #endif
39 #ifdef HAVE_SYS_UIO_H
40 #include <sys/uio.h>
41 #endif
42 #ifdef HAVE_NET_UIO_H
43 #include <net/uio.h>
44 #endif
45 #ifdef HAVE_UNISTD_H
46 #include <unistd.h>
47 #endif
48 #include <signal.h>
49
50 #include "opal/class/opal_list.h"
51 #include "opal/class/opal_bitmap.h"
52 #include "orte/mca/mca.h"
53 #include "opal/mca/event/event.h"
54 #include "opal/util/fd.h"
55
56 #include "orte/mca/iof/iof.h"
57 #include "orte/runtime/orte_globals.h"
58 #include "orte/mca/rml/rml_types.h"
59 #include "orte/util/threads.h"
60 #include "orte/mca/errmgr/errmgr.h"
61
62 BEGIN_C_DECLS
63
64
65
66
67 ORTE_DECLSPEC extern mca_base_framework_t orte_iof_base_framework;
68
69
70
71 ORTE_DECLSPEC int orte_iof_base_select(void);
72
73
74 typedef struct {
75 opal_object_t super;
76 orte_job_t *jdata;
77 opal_bitmap_t xoff;
78 } orte_iof_job_t;
79 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_job_t);
80
81
82
83
84 #define ORTE_IOF_BASE_MSG_MAX 4096
85 #define ORTE_IOF_BASE_TAG_MAX 50
86 #define ORTE_IOF_BASE_TAGGED_OUT_MAX 8192
87 #define ORTE_IOF_MAX_INPUT_BUFFERS 50
88
89 typedef struct {
90 opal_list_item_t super;
91 bool pending;
92 bool always_writable;
93 opal_event_t *ev;
94 struct timeval tv;
95 int fd;
96 opal_list_t outputs;
97 } orte_iof_write_event_t;
98 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_event_t);
99
100 typedef struct {
101 opal_list_item_t super;
102 orte_process_name_t name;
103 orte_process_name_t daemon;
104 orte_iof_tag_t tag;
105 orte_iof_write_event_t *wev;
106 bool xoff;
107 bool exclusive;
108 bool closed;
109 } orte_iof_sink_t;
110 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_sink_t);
111
112 struct orte_iof_proc_t;
113 typedef struct {
114 opal_object_t super;
115 struct orte_iof_proc_t *proc;
116 opal_event_t *ev;
117 struct timeval tv;
118 int fd;
119 orte_iof_tag_t tag;
120 bool active;
121 bool always_readable;
122 orte_iof_sink_t *sink;
123 } orte_iof_read_event_t;
124 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
125
126 typedef struct {
127 opal_list_item_t super;
128 orte_process_name_t name;
129 orte_iof_sink_t *stdinev;
130 orte_iof_read_event_t *revstdout;
131 orte_iof_read_event_t *revstderr;
132 #if OPAL_PMIX_V1
133 orte_iof_read_event_t *revstddiag;
134 #endif
135 opal_list_t *subscribers;
136 bool copy;
137 } orte_iof_proc_t;
138 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_proc_t);
139
140 typedef struct {
141 opal_list_item_t super;
142 char data[ORTE_IOF_BASE_TAGGED_OUT_MAX];
143 int numbytes;
144 } orte_iof_write_output_t;
145 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_write_output_t);
146
147
148 struct orte_iof_base_t {
149 size_t output_limit;
150 orte_iof_sink_t *iof_write_stdout;
151 orte_iof_sink_t *iof_write_stderr;
152 bool redirect_app_stderr_to_stdout;
153 };
154 typedef struct orte_iof_base_t orte_iof_base_t;
155
156
157
158 static inline bool
159 orte_iof_base_fd_always_ready(int fd)
160 {
161 return opal_fd_is_regular(fd) ||
162 (opal_fd_is_chardev(fd) && !isatty(fd)) ||
163 opal_fd_is_blkdev(fd);
164 }
165
166 #define ORTE_IOF_SINK_BLOCKSIZE (1024)
167
168 #define ORTE_IOF_SINK_ACTIVATE(wev) \
169 do { \
170 struct timeval *tv = NULL; \
171 wev->pending = true; \
172 ORTE_POST_OBJECT(wev); \
173 if (wev->always_writable) { \
174 \
175 tv = &wev->tv; \
176 } \
177 if (opal_event_add(wev->ev, tv)) { \
178 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
179 } \
180 } while(0);
181
182
183
184
185 #define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
186 do { \
187 orte_iof_sink_t *ep; \
188 OPAL_OUTPUT_VERBOSE((1, \
189 orte_iof_base_framework.framework_output, \
190 "defining endpt: file %s line %d fd %d", \
191 __FILE__, __LINE__, (fid))); \
192 ep = OBJ_NEW(orte_iof_sink_t); \
193 ep->name.jobid = (nm)->jobid; \
194 ep->name.vpid = (nm)->vpid; \
195 ep->tag = (tg); \
196 if (0 <= (fid)) { \
197 ep->wev->fd = (fid); \
198 ep->wev->always_writable = \
199 orte_iof_base_fd_always_ready(fid); \
200 if(ep->wev->always_writable) { \
201 opal_event_evtimer_set(orte_event_base, \
202 ep->wev->ev, wrthndlr, ep); \
203 } else { \
204 opal_event_set(orte_event_base, \
205 ep->wev->ev, ep->wev->fd, \
206 OPAL_EV_WRITE, \
207 wrthndlr, ep); \
208 } \
209 opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
210 } \
211 *(snk) = ep; \
212 ORTE_POST_OBJECT(ep); \
213 } while(0);
214
215
216 #define ORTE_IOF_READ_ADDEV(rev) \
217 do { \
218 struct timeval *tv = NULL; \
219 if (rev->always_readable) { \
220 tv = &rev->tv; \
221 } \
222 if (opal_event_add(rev->ev, tv)) { \
223 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
224 } \
225 } while(0);
226
227 #define ORTE_IOF_READ_ACTIVATE(rev) \
228 do { \
229 rev->active = true; \
230 ORTE_POST_OBJECT(rev); \
231 ORTE_IOF_READ_ADDEV(rev); \
232 } while(0);
233
234
235
236
237
238
239
240
241 #define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv) \
242 do { \
243 orte_iof_read_event_t *rev; \
244 OPAL_OUTPUT_VERBOSE((1, \
245 orte_iof_base_framework.framework_output, \
246 "%s defining read event for %s: %s %d", \
247 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
248 ORTE_NAME_PRINT(&(p)->name), \
249 __FILE__, __LINE__)); \
250 rev = OBJ_NEW(orte_iof_read_event_t); \
251 OBJ_RETAIN((p)); \
252 rev->proc = (struct orte_iof_proc_t*)(p); \
253 rev->tag = (tg); \
254 rev->fd = (fid); \
255 rev->always_readable = orte_iof_base_fd_always_ready(fid); \
256 *(rv) = rev; \
257 if(rev->always_readable) { \
258 opal_event_evtimer_set(orte_event_base, \
259 rev->ev, (cbfunc), rev); \
260 } else { \
261 opal_event_set(orte_event_base, \
262 rev->ev, (fid), \
263 OPAL_EV_READ, \
264 (cbfunc), rev); \
265 } \
266 opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
267 if ((actv)) { \
268 ORTE_IOF_READ_ACTIVATE(rev) \
269 } \
270 } while(0);
271
272
273 ORTE_DECLSPEC int orte_iof_base_flush(void);
274
275 ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base;
276
277
278 ORTE_DECLSPEC int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
279 const unsigned char *data, int numbytes,
280 orte_iof_write_event_t *channel);
281 ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev);
282 ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata);
283
284 END_C_DECLS
285
286 #endif