This source file includes following definitions.
- orte_iof_base_register
- orte_iof_base_close
- orte_iof_base_open
- orte_iof_job_construct
- orte_iof_job_destruct
- orte_iof_base_proc_construct
- orte_iof_base_proc_destruct
- orte_iof_base_sink_construct
- orte_iof_base_sink_destruct
- orte_iof_base_read_event_construct
- orte_iof_base_read_event_destruct
- orte_iof_base_write_event_construct
- orte_iof_base_write_event_destruct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "orte_config.h"
28 #include "orte/constants.h"
29
30 #include <string.h>
31 #include <stdio.h>
32
33 #include "orte/mca/mca.h"
34 #include "opal/mca/base/base.h"
35 #include "opal/util/os_dirpath.h"
36 #include "opal/util/output.h"
37 #include "opal/util/basename.h"
38
39 #include "orte/util/proc_info.h"
40 #include "orte/runtime/orte_globals.h"
41 #include "orte/util/name_fns.h"
42 #include "orte/mca/rml/rml.h"
43
44 #include "orte/mca/iof/iof.h"
45 #include "orte/mca/iof/base/base.h"
46
47
48
49
50
51
52
53 #include "orte/mca/iof/base/static-components.h"
54
55 orte_iof_base_module_t orte_iof = {0};
56
57
58
59
60
61
62 orte_iof_base_t orte_iof_base = {0};
63
64 static int orte_iof_base_register(mca_base_register_flag_t flags)
65 {
66
67 orte_iof_base.output_limit = (size_t) INT_MAX;
68 (void) mca_base_var_register("orte", "iof", "base", "output_limit",
69 "Maximum backlog of output messages [default: unlimited]",
70 MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0,
71 OPAL_INFO_LVL_9,
72 MCA_BASE_VAR_SCOPE_READONLY,
73 &orte_iof_base.output_limit);
74
75
76 orte_iof_base.redirect_app_stderr_to_stdout = false;
77 (void) mca_base_var_register("orte", "iof","base", "redirect_app_stderr_to_stdout",
78 "Redirect application stderr to stdout at source (default: false)",
79 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
80 OPAL_INFO_LVL_9,
81 MCA_BASE_VAR_SCOPE_READONLY,
82 &orte_iof_base.redirect_app_stderr_to_stdout);
83
84 return ORTE_SUCCESS;
85 }
86
87 static int orte_iof_base_close(void)
88 {
89
90 if (NULL != orte_iof.finalize) {
91 orte_iof.finalize();
92 }
93
94 if (!ORTE_PROC_IS_DAEMON) {
95 if (NULL != orte_iof_base.iof_write_stdout) {
96 OBJ_RELEASE(orte_iof_base.iof_write_stdout);
97 }
98 if (!orte_xml_output && NULL != orte_iof_base.iof_write_stderr) {
99 OBJ_RELEASE(orte_iof_base.iof_write_stderr);
100 }
101 }
102 return mca_base_framework_components_close(&orte_iof_base_framework, NULL);
103 }
104
105
106
107
108
109
110 static int orte_iof_base_open(mca_base_open_flag_t flags)
111 {
112 int xmlfd;
113
114
115 if (!ORTE_PROC_IS_DAEMON) {
116 if (orte_xml_output) {
117 if (NULL != orte_xml_fp) {
118
119 xmlfd = fileno(orte_xml_fp);
120 } else {
121 xmlfd = 1;
122 }
123
124 ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
125 xmlfd, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
126
127
128
129 } else {
130
131 ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stdout, ORTE_PROC_MY_NAME,
132 1, ORTE_IOF_STDOUT, orte_iof_base_write_handler);
133
134 ORTE_IOF_SINK_DEFINE(&orte_iof_base.iof_write_stderr, ORTE_PROC_MY_NAME,
135 2, ORTE_IOF_STDERR, orte_iof_base_write_handler);
136 }
137
138
139
140
141
142
143
144
145
146
147
148 }
149
150
151 return mca_base_framework_components_open(&orte_iof_base_framework, flags);
152 }
153
154 MCA_BASE_FRAMEWORK_DECLARE(orte, iof, "ORTE I/O Forwarding",
155 orte_iof_base_register, orte_iof_base_open, orte_iof_base_close,
156 mca_iof_base_static_components, 0);
157
158
159
160 static void orte_iof_job_construct(orte_iof_job_t *ptr)
161 {
162 ptr->jdata = NULL;
163 OBJ_CONSTRUCT(&ptr->xoff, opal_bitmap_t);
164 }
165 static void orte_iof_job_destruct(orte_iof_job_t *ptr)
166 {
167 if (NULL != ptr->jdata) {
168 OBJ_RELEASE(ptr->jdata);
169 }
170 OBJ_DESTRUCT(&ptr->xoff);
171 }
172 OBJ_CLASS_INSTANCE(orte_iof_job_t,
173 opal_object_t,
174 orte_iof_job_construct,
175 orte_iof_job_destruct);
176
177 static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
178 {
179 ptr->stdinev = NULL;
180 ptr->revstdout = NULL;
181 ptr->revstderr = NULL;
182 #if OPAL_PMIX_V1
183 ptr->revstddiag = NULL;
184 #endif
185 ptr->subscribers = NULL;
186 ptr->copy = true;
187 }
188 static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
189 {
190 if (NULL != ptr->stdinev) {
191 OBJ_RELEASE(ptr->stdinev);
192 }
193 if (NULL != ptr->revstdout) {
194 OBJ_RELEASE(ptr->revstdout);
195 }
196 if (NULL != ptr->revstderr) {
197 OBJ_RELEASE(ptr->revstderr);
198 }
199 #if OPAL_PMIX_V1
200 if (NULL != ptr->revstddiag) {
201 OBJ_RELEASE(ptr->revstddiag);
202 }
203 #endif
204 if (NULL != ptr->subscribers) {
205 OPAL_LIST_RELEASE(ptr->subscribers);
206 }
207 }
208 OBJ_CLASS_INSTANCE(orte_iof_proc_t,
209 opal_list_item_t,
210 orte_iof_base_proc_construct,
211 orte_iof_base_proc_destruct);
212
213
214 static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
215 {
216 ptr->daemon.jobid = ORTE_JOBID_INVALID;
217 ptr->daemon.vpid = ORTE_VPID_INVALID;
218 ptr->wev = OBJ_NEW(orte_iof_write_event_t);
219 ptr->xoff = false;
220 ptr->exclusive = false;
221 ptr->closed = false;
222 }
223 static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr)
224 {
225 if (NULL != ptr->wev && 0 <= ptr->wev->fd) {
226 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
227 "%s iof: closing sink for process %s on fd %d",
228 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
229 ORTE_NAME_PRINT(&ptr->name), ptr->wev->fd));
230 OBJ_RELEASE(ptr->wev);
231 }
232 }
233 OBJ_CLASS_INSTANCE(orte_iof_sink_t,
234 opal_list_item_t,
235 orte_iof_base_sink_construct,
236 orte_iof_base_sink_destruct);
237
238
239 static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
240 {
241 rev->proc = NULL;
242 rev->fd = -1;
243 rev->active = false;
244 rev->ev = opal_event_alloc();
245 rev->sink = NULL;
246 rev->tv.tv_sec = 0;
247 rev->tv.tv_usec = 0;
248 }
249 static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
250 {
251 orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
252
253 opal_event_free(rev->ev);
254 if (0 <= rev->fd) {
255 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
256 "%s iof: closing fd %d for process %s",
257 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rev->fd,
258 (NULL == proct) ? "UNKNOWN" : ORTE_NAME_PRINT(&proct->name)));
259 close(rev->fd);
260 rev->fd = -1;
261 }
262 if (NULL != rev->sink) {
263 OBJ_RELEASE(rev->sink);
264 }
265 if (NULL != proct) {
266 OBJ_RELEASE(proct);
267 }
268 }
269 OBJ_CLASS_INSTANCE(orte_iof_read_event_t,
270 opal_object_t,
271 orte_iof_base_read_event_construct,
272 orte_iof_base_read_event_destruct);
273
274 static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
275 {
276 wev->pending = false;
277 wev->always_writable = false;
278 wev->fd = -1;
279 OBJ_CONSTRUCT(&wev->outputs, opal_list_t);
280 wev->ev = opal_event_alloc();
281 wev->tv.tv_sec = 0;
282 wev->tv.tv_usec = 0;
283 }
284 static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
285 {
286 opal_event_free(wev->ev);
287 if (ORTE_PROC_IS_HNP && NULL != orte_xml_fp) {
288 int xmlfd = fileno(orte_xml_fp);
289 if (xmlfd == wev->fd) {
290
291 OBJ_DESTRUCT(&wev->outputs);
292 return;
293 }
294 }
295 if (2 < wev->fd) {
296 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
297 "%s iof: closing fd %d for write event",
298 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
299 close(wev->fd);
300 }
301 OBJ_DESTRUCT(&wev->outputs);
302 }
303 OBJ_CLASS_INSTANCE(orte_iof_write_event_t,
304 opal_list_item_t,
305 orte_iof_base_write_event_construct,
306 orte_iof_base_write_event_destruct);
307
308 OBJ_CLASS_INSTANCE(orte_iof_write_output_t,
309 opal_list_item_t,
310 NULL, NULL);