This source file includes following definitions.
- orte_iof_orted_read_handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "orte_config.h"
24 #include "orte/constants.h"
25
26 #include <errno.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30 #include <string.h>
31
32 #include "opal/dss/dss.h"
33
34 #include "orte/mca/rml/rml.h"
35 #include "orte/mca/errmgr/errmgr.h"
36 #include "orte/mca/odls/odls_types.h"
37 #include "orte/util/name_fns.h"
38 #include "orte/util/threads.h"
39 #include "orte/mca/state/state.h"
40 #include "orte/runtime/orte_globals.h"
41
42 #include "orte/mca/iof/iof.h"
43 #include "orte/mca/iof/base/base.h"
44
45 #include "iof_orted.h"
46
47 void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
48 {
49 orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
50 unsigned char data[ORTE_IOF_BASE_MSG_MAX];
51 opal_buffer_t *buf=NULL;
52 int rc;
53 int32_t numbytes;
54 orte_iof_proc_t *proct = (orte_iof_proc_t*)rev->proc;
55
56 ORTE_ACQUIRE_OBJECT(rev);
57
58
59
60
61 fd = rev->fd;
62
63
64 #if !defined(__WINDOWS__)
65 numbytes = read(fd, data, sizeof(data));
66 #else
67 {
68 DWORD readed;
69 HANDLE handle = (HANDLE)_get_osfhandle(fd);
70 ReadFile(handle, data, sizeof(data), &readed, NULL);
71 numbytes = (int)readed;
72 }
73 #endif
74
75 if (NULL == proct) {
76
77 ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
78 return;
79 }
80
81 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
82 "%s iof:orted:read handler read %d bytes from %s, fd %d",
83 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
84 numbytes, ORTE_NAME_PRINT(&proct->name), fd));
85
86 if (numbytes <= 0) {
87 if (0 > numbytes) {
88
89 if (EAGAIN == errno || EINTR == errno) {
90
91 ORTE_IOF_READ_ACTIVATE(rev);
92 return;
93 }
94
95 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
96 "%s iof:orted:read handler %s Error on connection:%d",
97 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
98 ORTE_NAME_PRINT(&proct->name), fd));
99 }
100
101 goto CLEAN_RETURN;
102 }
103
104
105 if (NULL != rev->sink) {
106
107 orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
108 }
109 if (!proct->copy) {
110
111 ORTE_IOF_READ_ACTIVATE(rev);
112 return;
113 }
114
115
116 buf = OBJ_NEW(opal_buffer_t);
117
118
119
120
121 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->tag, 1, ORTE_IOF_TAG))) {
122 ORTE_ERROR_LOG(rc);
123 goto CLEAN_RETURN;
124 }
125
126
127 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &proct->name, 1, ORTE_NAME))) {
128 ORTE_ERROR_LOG(rc);
129 goto CLEAN_RETURN;
130 }
131
132
133 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &data, numbytes, OPAL_BYTE))) {
134 ORTE_ERROR_LOG(rc);
135 goto CLEAN_RETURN;
136 }
137
138
139 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
140 "%s iof:orted:read handler sending %d bytes to HNP",
141 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes));
142
143 orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
144 orte_rml_send_callback, NULL);
145
146
147 ORTE_IOF_READ_ACTIVATE(rev);
148
149 return;
150
151 CLEAN_RETURN:
152
153
154
155
156 if (rev->tag & ORTE_IOF_STDOUT) {
157 if( NULL != proct->revstdout ) {
158 orte_iof_base_static_dump_output(proct->revstdout);
159 OBJ_RELEASE(proct->revstdout);
160 }
161 } else if (rev->tag & ORTE_IOF_STDERR) {
162 if( NULL != proct->revstderr ) {
163 orte_iof_base_static_dump_output(proct->revstderr);
164 OBJ_RELEASE(proct->revstderr);
165 }
166 #if OPAL_PMIX_V1
167 } else if (rev->tag & ORTE_IOF_STDDIAG) {
168 if( NULL != proct->revstddiag ) {
169 orte_iof_base_static_dump_output(proct->revstddiag);
170 OBJ_RELEASE(proct->revstddiag);
171 }
172 #endif
173 }
174
175 if (NULL == proct->revstdout &&
176 #if OPAL_PMIX_V1
177 NULL == proct->revstddiag &&
178 #endif
179 NULL == proct->revstderr) {
180
181 ORTE_ACTIVATE_PROC_STATE(&proct->name, ORTE_PROC_STATE_IOF_COMPLETE);
182 }
183 if (NULL != buf) {
184 OBJ_RELEASE(buf);
185 }
186 return;
187 }