This source file includes following definitions.
- restart_stdin
- orte_iof_hnp_stdin_check
- orte_iof_hnp_stdin_cb
- orte_iof_hnp_read_local_handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "orte_config.h"
27 #include "orte/constants.h"
28
29 #include <errno.h>
30 #ifdef HAVE_UNISTD_H
31 #include <unistd.h>
32 #endif
33 #include <string.h>
34
35 #include "opal/dss/dss.h"
36 #include "opal/mca/pmix/pmix.h"
37
38 #include "orte/mca/rml/rml.h"
39 #include "orte/mca/errmgr/errmgr.h"
40 #include "orte/mca/odls/odls_types.h"
41 #include "orte/util/name_fns.h"
42 #include "orte/util/threads.h"
43 #include "orte/mca/state/state.h"
44 #include "orte/runtime/orte_globals.h"
45 #include "orte/runtime/orte_wait.h"
46
47 #include "orte/mca/iof/iof.h"
48 #include "orte/mca/iof/base/base.h"
49
50 #include "iof_hnp.h"
51
52 static void restart_stdin(int fd, short event, void *cbdata)
53 {
54 orte_timer_t *tm = (orte_timer_t*)cbdata;
55
56 ORTE_ACQUIRE_OBJECT(tm);
57
58 if (NULL != mca_iof_hnp_component.stdinev &&
59 !orte_job_term_ordered &&
60 !mca_iof_hnp_component.stdinev->active) {
61 ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
62 }
63
64
65 if (NULL != tm) {
66 OBJ_RELEASE(tm);
67 }
68 }
69
70
71 bool orte_iof_hnp_stdin_check(int fd)
72 {
73 #if defined(HAVE_TCGETPGRP)
74 if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) {
75 return false;
76 }
77 #endif
78 return true;
79 }
80
81 void orte_iof_hnp_stdin_cb(int fd, short event, void *cbdata)
82 {
83 bool should_process;
84
85 ORTE_ACQUIRE_OBJECT(mca_iof_hnp_component.stdinev);
86
87 should_process = orte_iof_hnp_stdin_check(0);
88
89 if (should_process) {
90 ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
91 } else {
92
93 opal_event_del(mca_iof_hnp_component.stdinev->ev);
94 mca_iof_hnp_component.stdinev->active = false;
95 }
96 }
97
98
99
100
101 void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
102 {
103 orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
104 unsigned char data[ORTE_IOF_BASE_MSG_MAX];
105 int32_t numbytes;
106 orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
107 int rc;
108 orte_ns_cmp_bitmask_t mask=ORTE_NS_CMP_ALL;
109 bool exclusive;
110 orte_iof_sink_t *sink;
111
112 ORTE_ACQUIRE_OBJECT(rev);
113
114
115
116
117 fd = rev->fd;
118
119
120 memset(data, 0, ORTE_IOF_BASE_MSG_MAX);
121 numbytes = read(fd, data, sizeof(data));
122
123 if (NULL == proct) {
124
125 ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
126 return;
127 }
128
129 if (numbytes < 0) {
130
131
132
133 if (EAGAIN == errno || EINTR == errno) {
134 ORTE_IOF_READ_ACTIVATE(rev);
135 return;
136 }
137
138 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
139 "%s iof:hnp:read handler %s Error on connection:%d",
140 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
141 ORTE_NAME_PRINT(&proct->name), fd));
142
143
144
145
146 numbytes = 0;
147 }
148
149
150 if (ORTE_IOF_STDIN & rev->tag) {
151
152
153 rev->active = false;
154 if (NULL == proct->stdinev) {
155
156 return;
157 }
158
159
160
161
162 if (orte_job_term_ordered) {
163 OBJ_RELEASE(rev);
164 return;
165 }
166
167 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdinev->daemon)) {
168 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
169 "%s read %d bytes from stdin - writing to %s",
170 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
171 ORTE_NAME_PRINT(&proct->name)));
172
173
174
175
176 if (NULL != proct->stdinev->wev) {
177 if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdinev->wev)) {
178
179
180 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
181 "buffer backed up - holding"));
182 return;
183 }
184 }
185 } else {
186 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
187 "%s sending %d bytes from stdinev to daemon %s",
188 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
189 ORTE_NAME_PRINT(&proct->stdinev->daemon)));
190
191
192
193
194
195
196
197
198
199 if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, &proct->stdinev->name, ORTE_IOF_STDIN, data, numbytes))) {
200
201 if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) {
202 OBJ_RELEASE(rev->sink);
203 }
204 }
205 }
206
207
208 if (0 == numbytes) {
209 if (0 != opal_list_get_size(&proct->stdinev->wev->outputs)) {
210
211 proct->stdinev->closed = true;
212 } else {
213
214 OBJ_RELEASE(proct->stdinev);
215 }
216 } else {
217
218
219
220 if (orte_iof_hnp_stdin_check(fd)) {
221 restart_stdin(fd, 0, NULL);
222 } else {
223
224 ORTE_TIMER_EVENT(0, 10000, restart_stdin, ORTE_INFO_PRI);
225 }
226 }
227
228 return;
229 }
230
231
232
233
234
235 exclusive = false;
236 if (NULL != proct->subscribers) {
237 OPAL_LIST_FOREACH(sink, proct->subscribers, orte_iof_sink_t) {
238
239 if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
240 continue;
241 }
242 if ((sink->tag & rev->tag) &&
243 sink->name.jobid == proct->name.jobid &&
244 (ORTE_VPID_WILDCARD == sink->name.vpid || sink->name.vpid == proct->name.vpid)) {
245
246
247
248
249
250
251 if (NULL != opal_pmix.server_iof_push) {
252 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
253 "%s sending data of size %d via PMIx to tool %s",
254 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)numbytes,
255 ORTE_NAME_PRINT(&sink->daemon)));
256
257 if (0 < numbytes) {
258 rc = opal_pmix.server_iof_push(&proct->name, rev->tag, data, numbytes);
259 if (ORTE_SUCCESS != rc) {
260 ORTE_ERROR_LOG(rc);
261 }
262 }
263 } else {
264 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
265 "%s sending data to tool %s",
266 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
267 ORTE_NAME_PRINT(&sink->daemon));
268 orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &proct->name, rev->tag, data, numbytes));
269 }
270 if (sink->exclusive) {
271 exclusive = true;
272 }
273 }
274 }
275 }
276
277 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
278 "%s read %d bytes from %s of %s",
279 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
280 (ORTE_IOF_STDOUT & rev->tag) ? "stdout" : ((ORTE_IOF_STDERR & rev->tag) ? "stderr" : "stddiag"),
281 ORTE_NAME_PRINT(&proct->name)));
282
283 if (0 == numbytes) {
284
285
286
287
288 OBJ_RETAIN(proct);
289 if (rev->tag & ORTE_IOF_STDOUT) {
290 orte_iof_base_static_dump_output(proct->revstdout);
291 OBJ_RELEASE(proct->revstdout);
292 } else if (rev->tag & ORTE_IOF_STDERR) {
293 orte_iof_base_static_dump_output(proct->revstderr);
294 OBJ_RELEASE(proct->revstderr);
295 #if OPAL_PMIX_V1
296 } else if (rev->tag & ORTE_IOF_STDDIAG) {
297 orte_iof_base_static_dump_output(proct->revstddiag);
298 OBJ_RELEASE(proct->revstddiag);
299 #endif
300 }
301
302 if (NULL == proct->revstdout &&
303 #if OPAL_PMIX_V1
304 NULL == proct->revstddiag &&
305 #endif
306 NULL == proct->revstderr) {
307
308 ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
309 }
310 OBJ_RELEASE(proct);
311 return;
312 }
313
314 if (proct->copy) {
315 if (NULL != proct->subscribers) {
316 if (!exclusive) {
317
318 if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
319 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
320 } else {
321 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
322 }
323 }
324 } else {
325
326 if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
327 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
328 } else {
329 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
330 }
331 }
332 }
333
334 if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) {
335
336 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
337 }
338
339
340 ORTE_IOF_READ_ACTIVATE(rev);
341 return;
342 }