This source file includes following definitions.
- init
- orted_push
- orted_pull
- orted_close
- orted_complete
- finalize
- orted_ft_event
- stdin_write_handler
- orted_output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "orte_config.h"
27 #include "opal/util/output.h"
28 #include "orte/constants.h"
29
30 #include <errno.h>
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34 #include <string.h>
35
36 #ifdef HAVE_FCNTL_H
37 #include <fcntl.h>
38 #else
39 #ifdef HAVE_SYS_FCNTL_H
40 #include <sys/fcntl.h>
41 #endif
42 #endif
43
44 #include "opal/util/os_dirpath.h"
45
46 #include "orte/mca/errmgr/errmgr.h"
47 #include "orte/util/name_fns.h"
48 #include "orte/util/threads.h"
49 #include "orte/runtime/orte_globals.h"
50 #include "orte/mca/odls/odls_types.h"
51 #include "orte/mca/rml/rml.h"
52
53 #include "orte/mca/iof/iof.h"
54 #include "orte/mca/iof/base/base.h"
55 #include "orte/mca/iof/base/iof_base_setup.h"
56
57 #include "iof_orted.h"
58
59
60
61 static void stdin_write_handler(int fd, short event, void *cbdata);
62
63
64
65 static int init(void);
66
67 static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
68
69 static int orted_pull(const orte_process_name_t* src_name,
70 orte_iof_tag_t src_tag,
71 int fd);
72
73 static int orted_close(const orte_process_name_t* peer,
74 orte_iof_tag_t source_tag);
75
76 static int orted_output(const orte_process_name_t* peer,
77 orte_iof_tag_t source_tag,
78 const char *msg);
79
80 static void orted_complete(const orte_job_t *jdata);
81
82 static int finalize(void);
83
84 static int orted_ft_event(int state);
85
86
87
88
89
90
91
92
93
94 orte_iof_base_module_t orte_iof_orted_module = {
95 .init = init,
96 .push = orted_push,
97 .pull = orted_pull,
98 .close = orted_close,
99 .output = orted_output,
100 .complete = orted_complete,
101 .finalize = finalize,
102 .ft_event = orted_ft_event
103 };
104
105 static int init(void)
106 {
107
108
109 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
110 ORTE_RML_TAG_IOF_PROXY,
111 ORTE_RML_PERSISTENT,
112 orte_iof_orted_recv,
113 NULL);
114
115
116 OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
117 mca_iof_orted_component.xoff = false;
118
119 return ORTE_SUCCESS;
120 }
121
122
123
124
125
126
127 static int orted_push(const orte_process_name_t* dst_name,
128 orte_iof_tag_t src_tag, int fd)
129 {
130 int flags;
131 orte_iof_proc_t *proct;
132 int rc;
133 orte_job_t *jobdat=NULL;
134 orte_ns_cmp_bitmask_t mask;
135
136 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
137 "%s iof:orted pushing fd %d for process %s",
138 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
139 fd, ORTE_NAME_PRINT(dst_name)));
140
141
142
143
144 if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
145 opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
146 __FILE__, __LINE__, errno);
147 } else {
148 flags |= O_NONBLOCK;
149 fcntl(fd, F_SETFL, flags);
150 }
151
152
153 OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
154 mask = ORTE_NS_CMP_ALL;
155
156 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
157
158 goto SETUP;
159 }
160 }
161
162 proct = OBJ_NEW(orte_iof_proc_t);
163 proct->name.jobid = dst_name->jobid;
164 proct->name.vpid = dst_name->vpid;
165 opal_list_append(&mca_iof_orted_component.procs, &proct->super);
166
167 SETUP:
168
169 if (NULL == (jobdat = orte_get_job_data_object(proct->name.jobid))) {
170 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
171 return ORTE_ERR_NOT_FOUND;
172 }
173
174 if (src_tag & ORTE_IOF_STDOUT) {
175 ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
176 orte_iof_orted_read_handler, false);
177 } else if (src_tag & ORTE_IOF_STDERR) {
178 ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
179 orte_iof_orted_read_handler, false);
180 #if OPAL_PMIX_V1
181 } else if (src_tag & ORTE_IOF_STDDIAG) {
182 ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
183 orte_iof_orted_read_handler, false);
184 #endif
185 }
186
187 if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jobdat, proct))) {
188 ORTE_ERROR_LOG(rc);
189 return rc;
190 }
191
192
193
194
195
196
197 if (NULL != proct->revstdout &&
198 #if OPAL_PMIX_V1
199 NULL != proct->revstddiag &&
200 #endif
201 (orte_iof_base.redirect_app_stderr_to_stdout || NULL != proct->revstderr)) {
202 ORTE_IOF_READ_ACTIVATE(proct->revstdout);
203 if (!orte_iof_base.redirect_app_stderr_to_stdout) {
204 ORTE_IOF_READ_ACTIVATE(proct->revstderr);
205 }
206 #if OPAL_PMIX_V1
207 if (NULL != proct->revstddiag) {
208 ORTE_IOF_READ_ACTIVATE(proct->revstddiag);
209 }
210 #endif
211 }
212 return ORTE_SUCCESS;
213 }
214
215
216
217
218
219
220
221
222
223
224
225 static int orted_pull(const orte_process_name_t* dst_name,
226 orte_iof_tag_t src_tag,
227 int fd)
228 {
229 orte_iof_proc_t *proct;
230 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
231 int flags;
232
233
234 if (ORTE_IOF_STDIN != src_tag) {
235 return ORTE_ERR_NOT_SUPPORTED;
236 }
237
238 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
239 "%s iof:orted pulling fd %d for process %s",
240 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
241 fd, ORTE_NAME_PRINT(dst_name)));
242
243
244
245
246 if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
247 opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
248 __FILE__, __LINE__, errno);
249 } else {
250 flags |= O_NONBLOCK;
251 fcntl(fd, F_SETFL, flags);
252 }
253
254
255 OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
256 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
257
258 goto SETUP;
259 }
260 }
261
262 proct = OBJ_NEW(orte_iof_proc_t);
263 proct->name.jobid = dst_name->jobid;
264 proct->name.vpid = dst_name->vpid;
265 opal_list_append(&mca_iof_orted_component.procs, &proct->super);
266
267 SETUP:
268 ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
269 stdin_write_handler);
270
271 return ORTE_SUCCESS;
272 }
273
274
275
276
277
278
279
280 static int orted_close(const orte_process_name_t* peer,
281 orte_iof_tag_t source_tag)
282 {
283 orte_iof_proc_t* proct;
284 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
285
286 OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
287 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
288 if (ORTE_IOF_STDIN & source_tag) {
289 if (NULL != proct->stdinev) {
290 OBJ_RELEASE(proct->stdinev);
291 }
292 proct->stdinev = NULL;
293 }
294 if ((ORTE_IOF_STDOUT & source_tag) ||
295 (ORTE_IOF_STDMERGE & source_tag)) {
296 if (NULL != proct->revstdout) {
297 orte_iof_base_static_dump_output(proct->revstdout);
298 OBJ_RELEASE(proct->revstdout);
299 }
300 proct->revstdout = NULL;
301 }
302 if (ORTE_IOF_STDERR & source_tag) {
303 if (NULL != proct->revstderr) {
304 orte_iof_base_static_dump_output(proct->revstderr);
305 OBJ_RELEASE(proct->revstderr);
306 }
307 proct->revstderr = NULL;
308 }
309 #if OPAL_PMIX_V1
310 if (ORTE_IOF_STDDIAG & source_tag) {
311 if (NULL != proct->revstddiag) {
312 orte_iof_base_static_dump_output(proct->revstddiag);
313 OBJ_RELEASE(proct->revstddiag);
314 }
315 proct->revstddiag = NULL;
316 }
317 #endif
318
319 if (NULL == proct->stdinev &&
320 NULL == proct->revstdout &&
321 #if OPAL_PMIX_V1
322 NULL == proct->revstddiag &&
323 #endif
324 NULL == proct->revstderr) {
325 opal_list_remove_item(&mca_iof_orted_component.procs, &proct->super);
326 OBJ_RELEASE(proct);
327 }
328 break;
329 }
330 }
331
332 return ORTE_SUCCESS;
333 }
334
335 static void orted_complete(const orte_job_t *jdata)
336 {
337 orte_iof_proc_t *proct, *next;
338
339
340 OPAL_LIST_FOREACH_SAFE(proct, next, &mca_iof_orted_component.procs, orte_iof_proc_t) {
341 if (jdata->jobid == proct->name.jobid) {
342 opal_list_remove_item(&mca_iof_orted_component.procs, &proct->super);
343 OBJ_RELEASE(proct);
344 }
345 }
346 }
347
348 static int finalize(void)
349 {
350 orte_iof_proc_t *proct;
351
352
353
354 while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_orted_component.procs))) {
355 if (NULL != proct->revstdout) {
356 orte_iof_base_static_dump_output(proct->revstdout);
357 }
358 if (NULL != proct->revstderr) {
359 orte_iof_base_static_dump_output(proct->revstderr);
360 }
361 #if OPAL_PMIX_V1
362 if (NULL != proct->revstddiag) {
363 orte_iof_base_static_dump_output(proct->revstddiag);
364 }
365 #endif
366 OBJ_RELEASE(proct);
367 }
368 OBJ_DESTRUCT(&mca_iof_orted_component.procs);
369
370
371 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
372 return ORTE_SUCCESS;
373 }
374
375
376
377
378
379 static int orted_ft_event(int state)
380 {
381 return ORTE_ERR_NOT_IMPLEMENTED;
382 }
383
384 static void stdin_write_handler(int _fd, short event, void *cbdata)
385 {
386 orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
387 orte_iof_write_event_t *wev = sink->wev;
388 opal_list_item_t *item;
389 orte_iof_write_output_t *output;
390 int num_written;
391
392 ORTE_ACQUIRE_OBJECT(sink);
393
394 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
395 "%s orted:stdin:write:handler writing data to %d",
396 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
397 wev->fd));
398
399 wev->pending = false;
400
401 while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
402 output = (orte_iof_write_output_t*)item;
403 if (0 == output->numbytes) {
404
405
406
407 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
408 "%s iof:orted closing fd %d on write event due to zero bytes output",
409 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
410 OBJ_RELEASE(wev);
411 sink->wev = NULL;
412 return;
413 }
414 num_written = write(wev->fd, output->data, output->numbytes);
415 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
416 "%s orted:stdin:write:handler wrote %d bytes",
417 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
418 num_written));
419 if (num_written < 0) {
420 if (EAGAIN == errno || EINTR == errno) {
421
422 opal_list_prepend(&wev->outputs, item);
423
424
425
426 ORTE_IOF_SINK_ACTIVATE(wev);
427 goto CHECK;
428 }
429
430
431
432 OBJ_RELEASE(output);
433 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
434 "%s iof:orted closing fd %d on write event due to negative bytes written",
435 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
436 OBJ_RELEASE(wev);
437 sink->wev = NULL;
438
439 if (!mca_iof_orted_component.xoff) {
440 mca_iof_orted_component.xoff = true;
441 orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
442 }
443 return;
444 } else if (num_written < output->numbytes) {
445 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
446 "%s orted:stdin:write:handler incomplete write %d - adjusting data",
447 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
448
449 memmove(output->data, &output->data[num_written], output->numbytes - num_written);
450
451 opal_list_prepend(&wev->outputs, item);
452
453
454
455 ORTE_IOF_SINK_ACTIVATE(wev);
456 goto CHECK;
457 }
458 OBJ_RELEASE(output);
459 }
460
461 CHECK:
462 if (mca_iof_orted_component.xoff) {
463
464
465
466
467
468
469
470
471
472 if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) {
473
474 mca_iof_orted_component.xoff = false;
475 orte_iof_orted_send_xonxoff(ORTE_IOF_XON);
476 }
477 }
478 }
479
480 static int orted_output(const orte_process_name_t* peer,
481 orte_iof_tag_t source_tag,
482 const char *msg)
483 {
484 opal_buffer_t *buf;
485 int rc;
486
487
488 buf = OBJ_NEW(opal_buffer_t);
489
490
491
492
493 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &source_tag, 1, ORTE_IOF_TAG))) {
494 ORTE_ERROR_LOG(rc);
495 return rc;
496 }
497
498
499 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, peer, 1, ORTE_NAME))) {
500 ORTE_ERROR_LOG(rc);
501 return rc;
502 }
503
504
505
506 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, msg, strlen(msg)+1, OPAL_BYTE))) {
507 ORTE_ERROR_LOG(rc);
508 return rc;
509 }
510
511
512 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
513 "%s iof:orted:output sending %d bytes to HNP",
514 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)strlen(msg)+1));
515
516 orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
517 orte_rml_send_callback, NULL);
518
519 return ORTE_SUCCESS;
520 }