This source file includes following definitions.
- init
- hnp_push
- hnp_pull
- hnp_close
- hnp_complete
- finalize
- hnp_ft_event
- stdin_write_handler
- hnp_output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "orte_config.h"
27 #include "opal/util/output.h"
28 #include "orte/constants.h"
29
30 #include <errno.h>
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34 #include <string.h>
35
36 #ifdef HAVE_FCNTL_H
37 #include <fcntl.h>
38 #else
39 #ifdef HAVE_SYS_FCNTL_H
40 #include <sys/fcntl.h>
41 #endif
42 #endif
43
44 #include "opal/mca/event/event.h"
45
46 #include "orte/runtime/orte_globals.h"
47 #include "orte/mca/errmgr/errmgr.h"
48 #include "orte/mca/ess/ess.h"
49 #include "orte/mca/rml/rml.h"
50 #include "orte/util/name_fns.h"
51 #include "orte/util/threads.h"
52 #include "orte/mca/odls/odls_types.h"
53
54 #include "orte/mca/iof/base/base.h"
55 #include "orte/mca/iof/base/iof_base_setup.h"
56 #include "iof_hnp.h"
57
58
59 static void stdin_write_handler(int fd, short event, void *cbdata);
60
61
62 static int init(void);
63
64 static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
65
66 static int hnp_pull(const orte_process_name_t* src_name,
67 orte_iof_tag_t src_tag,
68 int fd);
69
70 static int hnp_close(const orte_process_name_t* peer,
71 orte_iof_tag_t source_tag);
72
73 static int hnp_output(const orte_process_name_t* peer,
74 orte_iof_tag_t source_tag,
75 const char *msg);
76
77 static void hnp_complete(const orte_job_t *jdata);
78
79 static int finalize(void);
80
81 static int hnp_ft_event(int state);
82
83
84
85
86
87
88
89 orte_iof_base_module_t orte_iof_hnp_module = {
90 .init = init,
91 .push = hnp_push,
92 .pull = hnp_pull,
93 .close = hnp_close,
94 .output = hnp_output,
95 .complete = hnp_complete,
96 .finalize = finalize,
97 .ft_event = hnp_ft_event
98 };
99
100
101 static int init(void)
102 {
103
104
105
106 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
107 ORTE_RML_TAG_IOF_HNP,
108 ORTE_RML_PERSISTENT,
109 orte_iof_hnp_recv,
110 NULL);
111
112 OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
113 mca_iof_hnp_component.stdinev = NULL;
114
115 return ORTE_SUCCESS;
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
135 {
136 orte_job_t *jdata;
137 orte_proc_t *proc;
138 orte_iof_proc_t *proct, *pptr;
139 int flags, rc;
140 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
141
142
143 if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
144 return ORTE_SUCCESS;
145 }
146
147 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
148 "%s iof:hnp pushing fd %d for process %s",
149 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
150 fd, ORTE_NAME_PRINT(dst_name)));
151
152
153 OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
154 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
155
156 goto SETUP;
157 }
158 }
159
160 proct = OBJ_NEW(orte_iof_proc_t);
161 proct->name.jobid = dst_name->jobid;
162 proct->name.vpid = dst_name->vpid;
163 opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
164
165 SETUP:
166 if (!(src_tag & ORTE_IOF_STDIN)) {
167
168
169
170 if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
171 opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
172 __FILE__, __LINE__, errno);
173 } else {
174 flags |= O_NONBLOCK;
175 fcntl(fd, F_SETFL, flags);
176 }
177
178 if (NULL == (jdata = orte_get_job_data_object(proct->name.jobid))) {
179 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
180 return ORTE_ERR_NOT_FOUND;
181 }
182
183 if (src_tag & ORTE_IOF_STDOUT) {
184 ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
185 orte_iof_hnp_read_local_handler, false);
186 } else if (src_tag & ORTE_IOF_STDERR) {
187 ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
188 orte_iof_hnp_read_local_handler, false);
189 #if OPAL_PMIX_V1
190 } else if (src_tag & ORTE_IOF_STDDIAG) {
191 ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
192 orte_iof_hnp_read_local_handler, false);
193 #endif
194 }
195
196 if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct))) {
197 ORTE_ERROR_LOG(rc);
198 return rc;
199 }
200
201
202
203
204
205
206 if (NULL != proct->revstdout &&
207 #if OPAL_PMIX_V1
208 NULL != proct->revstddiag &&
209 #endif
210 (orte_iof_base.redirect_app_stderr_to_stdout || NULL != proct->revstderr)) {
211 if (proct->copy) {
212
213
214 OPAL_LIST_FOREACH(pptr, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
215 if (dst_name->jobid == pptr->name.jobid &&
216 ORTE_VPID_WILDCARD == pptr->name.vpid &&
217 NULL != pptr->subscribers) {
218 OBJ_RETAIN(pptr->subscribers);
219 proct->subscribers = pptr->subscribers;
220 break;
221 }
222 }
223 }
224 ORTE_IOF_READ_ACTIVATE(proct->revstdout);
225 if (!orte_iof_base.redirect_app_stderr_to_stdout) {
226 ORTE_IOF_READ_ACTIVATE(proct->revstderr);
227 }
228 #if OPAL_PMIX_V1
229 if (NULL != proct->revstddiag) {
230 ORTE_IOF_READ_ACTIVATE(proct->revstddiag);
231 }
232 #endif
233 }
234 return ORTE_SUCCESS;
235 }
236
237
238
239
240 if (ORTE_VPID_WILDCARD == dst_name->vpid) {
241
242 ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
243 stdin_write_handler);
244 proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
245 proct->stdinev->daemon.vpid = ORTE_VPID_WILDCARD;
246 } else {
247
248 if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
249 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
250 return ORTE_ERR_BAD_PARAM;
251 }
252 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, dst_name->vpid))) {
253 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
254 return ORTE_ERR_NOT_FOUND;
255 }
256
257 if (ORTE_PROC_MY_NAME->vpid != proc->node->daemon->name.vpid) {
258 ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
259 stdin_write_handler);
260 proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
261 proct->stdinev->daemon.vpid = proc->node->daemon->name.vpid;
262 }
263 }
264
265
266 if (NULL == mca_iof_hnp_component.stdinev) {
267
268
269
270
271
272
273
274
275
276
277 if (0 != fd) {
278 if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
279 opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
280 __FILE__, __LINE__, errno);
281 } else {
282 flags |= O_NONBLOCK;
283 fcntl(fd, F_SETFL, flags);
284 }
285 }
286 if (isatty(fd)) {
287
288
289
290
291
292
293
294 opal_event_signal_set(orte_event_base, &mca_iof_hnp_component.stdinsig,
295 SIGCONT, orte_iof_hnp_stdin_cb,
296 NULL);
297
298
299
300
301
302
303 ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
304 proct, fd, ORTE_IOF_STDIN,
305 orte_iof_hnp_read_local_handler, false);
306
307
308
309
310
311 if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
312 ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
313 }
314 } else {
315
316
317
318 ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
319 proct, fd, ORTE_IOF_STDIN,
320 orte_iof_hnp_read_local_handler, true);
321 }
322 }
323 return ORTE_SUCCESS;
324 }
325
326
327
328
329
330
331
332 static int hnp_pull(const orte_process_name_t* dst_name,
333 orte_iof_tag_t src_tag,
334 int fd)
335 {
336 orte_iof_proc_t *proct;
337 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
338 int flags;
339
340
341 if (ORTE_IOF_STDIN != src_tag) {
342 return ORTE_ERR_NOT_SUPPORTED;
343 }
344
345 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
346 "%s iof:hnp pulling fd %d for process %s",
347 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
348 fd, ORTE_NAME_PRINT(dst_name)));
349
350
351
352
353 if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
354 opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
355 __FILE__, __LINE__, errno);
356 } else {
357 flags |= O_NONBLOCK;
358 fcntl(fd, F_SETFL, flags);
359 }
360
361
362 OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
363 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
364
365 goto SETUP;
366 }
367 }
368
369 proct = OBJ_NEW(orte_iof_proc_t);
370 proct->name.jobid = dst_name->jobid;
371 proct->name.vpid = dst_name->vpid;
372 opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
373
374 SETUP:
375 ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
376 stdin_write_handler);
377 proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
378 proct->stdinev->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
379
380 return ORTE_SUCCESS;
381 }
382
383
384
385
386
387 static int hnp_close(const orte_process_name_t* peer,
388 orte_iof_tag_t source_tag)
389 {
390 orte_iof_proc_t* proct;
391 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
392
393 OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
394 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
395 if (ORTE_IOF_STDIN & source_tag) {
396 if (NULL != proct->stdinev) {
397 OBJ_RELEASE(proct->stdinev);
398 }
399 proct->stdinev = NULL;
400 }
401 if ((ORTE_IOF_STDOUT & source_tag) ||
402 (ORTE_IOF_STDMERGE & source_tag)) {
403 if (NULL != proct->revstdout) {
404 orte_iof_base_static_dump_output(proct->revstdout);
405 OBJ_RELEASE(proct->revstdout);
406 }
407 proct->revstdout = NULL;
408 }
409 if (ORTE_IOF_STDERR & source_tag) {
410 if (NULL != proct->revstderr) {
411 orte_iof_base_static_dump_output(proct->revstderr);
412 OBJ_RELEASE(proct->revstderr);
413 }
414 proct->revstderr = NULL;
415 }
416 #if OPAL_PMIX_V1
417 if (ORTE_IOF_STDDIAG & source_tag) {
418 if (NULL != proct->revstddiag) {
419 orte_iof_base_static_dump_output(proct->revstddiag);
420 OBJ_RELEASE(proct->revstddiag);
421 }
422 proct->revstddiag = NULL;
423 }
424 #endif
425
426 if (NULL == proct->stdinev &&
427 NULL == proct->revstdout &&
428 #if OPAL_PMIX_V1
429 NULL == proct->revstddiag &&
430 #endif
431 NULL == proct->revstderr) {
432 opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
433 OBJ_RELEASE(proct);
434 }
435 break;
436 }
437 }
438 return ORTE_SUCCESS;
439 }
440
441 static void hnp_complete(const orte_job_t *jdata)
442 {
443 orte_iof_proc_t *proct, *next;
444
445
446 OPAL_LIST_FOREACH_SAFE(proct, next, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
447 if (jdata->jobid == proct->name.jobid) {
448 opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
449 OBJ_RELEASE(proct);
450 }
451 }
452 }
453
454 static int finalize(void)
455 {
456 orte_iof_write_event_t *wev;
457 orte_iof_proc_t *proct;
458 bool dump;
459 orte_iof_write_output_t *output;
460 int num_written;
461
462
463 wev = orte_iof_base.iof_write_stdout->wev;
464 if (!opal_list_is_empty(&wev->outputs)) {
465 dump = false;
466
467 while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
468 if (!dump) {
469 num_written = write(wev->fd, output->data, output->numbytes);
470 if (num_written < output->numbytes) {
471
472 dump = true;
473 }
474 }
475 OBJ_RELEASE(output);
476 }
477 }
478 if (!orte_xml_output) {
479
480 wev = orte_iof_base.iof_write_stderr->wev;
481 if (!opal_list_is_empty(&wev->outputs)) {
482 dump = false;
483
484 while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
485 if (!dump) {
486 num_written = write(wev->fd, output->data, output->numbytes);
487 if (num_written < output->numbytes) {
488
489 dump = true;
490 }
491 }
492 OBJ_RELEASE(output);
493 }
494 }
495 }
496
497
498
499 while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_hnp_component.procs))) {
500 if (NULL != proct->revstdout) {
501 orte_iof_base_static_dump_output(proct->revstdout);
502 }
503 if (NULL != proct->revstderr) {
504 orte_iof_base_static_dump_output(proct->revstderr);
505 }
506 #if OPAL_PMIX_V1
507 if (NULL != proct->revstddiag) {
508 orte_iof_base_static_dump_output(proct->revstddiag);
509 }
510 #endif
511 OBJ_RELEASE(proct);
512 }
513 OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
514
515 return ORTE_SUCCESS;
516 }
517
518 int hnp_ft_event(int state) {
519
520
521
522 return ORTE_SUCCESS;
523 }
524
525
526
527
528
529 static void stdin_write_handler(int fd, short event, void *cbdata)
530 {
531 orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
532 orte_iof_write_event_t *wev = sink->wev;
533 opal_list_item_t *item;
534 orte_iof_write_output_t *output;
535 int num_written, total_written = 0;
536
537 ORTE_ACQUIRE_OBJECT(sink);
538
539 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
540 "%s hnp:stdin:write:handler writing data to %d",
541 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
542 wev->fd));
543
544 wev->pending = false;
545
546 while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
547 output = (orte_iof_write_output_t*)item;
548
549
550
551 if (orte_abnormal_term_ordered) {
552 OBJ_RELEASE(output);
553 continue;
554 }
555 if (0 == output->numbytes) {
556
557
558
559 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
560 "%s iof:hnp closing fd %d on write event due to zero bytes output",
561 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
562 goto finish;
563 }
564 num_written = write(wev->fd, output->data, output->numbytes);
565 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
566 "%s hnp:stdin:write:handler wrote %d bytes",
567 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
568 num_written));
569 if (num_written < 0) {
570 if (EAGAIN == errno || EINTR == errno) {
571
572 opal_list_prepend(&wev->outputs, item);
573
574
575
576 goto re_enter;
577 }
578
579
580
581 OBJ_RELEASE(output);
582 OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
583 "%s iof:hnp closing fd %d on write event due to negative bytes written",
584 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
585 goto finish;
586 } else if (num_written < output->numbytes) {
587 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
588 "%s hnp:stdin:write:handler incomplete write %d - adjusting data",
589 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
590
591 memmove(output->data, &output->data[num_written], output->numbytes - num_written);
592
593 opal_list_prepend(&wev->outputs, item);
594
595
596
597 goto re_enter;
598 }
599 OBJ_RELEASE(output);
600
601 total_written += num_written;
602 if ((ORTE_IOF_SINK_BLOCKSIZE <= total_written) && wev->always_writable) {
603 goto re_enter;
604 }
605 }
606 goto check;
607 re_enter:
608 ORTE_IOF_SINK_ACTIVATE(wev);
609 check:
610 if (NULL != mca_iof_hnp_component.stdinev &&
611 !orte_abnormal_term_ordered &&
612 !mca_iof_hnp_component.stdinev->active) {
613 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
614 "read event is off - checking if okay to restart"));
615
616
617
618
619
620
621
622
623
624
625 if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) {
626
627 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
628 "restarting read event"));
629 ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
630 }
631 }
632 if (sink->closed && 0 == opal_list_get_size(&wev->outputs)) {
633
634 OBJ_RELEASE(sink);
635 }
636 return;
637 finish:
638 OBJ_RELEASE(wev);
639 sink->wev = NULL;
640 return;
641 }
642
643 static int hnp_output(const orte_process_name_t* peer,
644 orte_iof_tag_t source_tag,
645 const char *msg)
646 {
647
648 if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
649 orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
650 } else {
651 orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
652 }
653
654 return ORTE_SUCCESS;
655 }