This source file includes following definitions.
- orte_iof_hnp_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "orte_config.h"
25 #include "orte/constants.h"
26
27 #include <errno.h>
28 #ifdef HAVE_UNISTD_H
29 #include <unistd.h>
30 #endif
31 #include <string.h>
32 #ifdef HAVE_FCNTL_H
33 #include <fcntl.h>
34 #else
35 #ifdef HAVE_SYS_FCNTL_H
36 #include <sys/fcntl.h>
37 #endif
38 #endif
39
40 #include "opal/dss/dss.h"
41 #include "opal/mca/pmix/pmix.h"
42
43 #include "orte/mca/rml/rml.h"
44 #include "orte/mca/errmgr/errmgr.h"
45 #include "orte/util/name_fns.h"
46 #include "orte/util/threads.h"
47 #include "orte/runtime/orte_globals.h"
48
49 #include "orte/mca/iof/iof.h"
50 #include "orte/mca/iof/base/base.h"
51
52 #include "iof_hnp.h"
53
54
55 void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
56 opal_buffer_t* buffer, orte_rml_tag_t tag,
57 void* cbdata)
58 {
59 orte_process_name_t origin, requestor;
60 unsigned char data[ORTE_IOF_BASE_MSG_MAX];
61 orte_iof_tag_t stream;
62 int32_t count, numbytes;
63 orte_iof_sink_t *sink, *next;
64 int rc;
65 bool exclusive;
66 orte_iof_proc_t *proct;
67 orte_ns_cmp_bitmask_t mask=ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
68
69 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
70 "%s received IOF from proc %s",
71 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
72 ORTE_NAME_PRINT(sender)));
73
74
75 count = 1;
76 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
77 ORTE_ERROR_LOG(rc);
78 goto CLEAN_RETURN;
79 }
80
81 if (ORTE_IOF_XON & stream) {
82
83 if (NULL != mca_iof_hnp_component.stdinev &&
84 !orte_job_term_ordered &&
85 !mca_iof_hnp_component.stdinev->active) {
86 ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
87 }
88 goto CLEAN_RETURN;
89 } else if (ORTE_IOF_XOFF & stream) {
90
91 if (NULL != mca_iof_hnp_component.stdinev &&
92 !mca_iof_hnp_component.stdinev->active) {
93 opal_event_del(mca_iof_hnp_component.stdinev->ev);
94 mca_iof_hnp_component.stdinev->active = false;
95 }
96 goto CLEAN_RETURN;
97 }
98
99
100 count = 1;
101 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &origin, &count, ORTE_NAME))) {
102 ORTE_ERROR_LOG(rc);
103 goto CLEAN_RETURN;
104 }
105
106 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
107 "%s received IOF cmd for source %s",
108 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
109 ORTE_NAME_PRINT(&origin)));
110
111
112 if (ORTE_IOF_PULL & stream) {
113
114 count = 1;
115 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, ORTE_NAME))) {
116 ORTE_ERROR_LOG(rc);
117 goto CLEAN_RETURN;
118 }
119
120 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
121 "%s received pull cmd from remote tool %s for proc %s",
122 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
123 ORTE_NAME_PRINT(&requestor),
124 ORTE_NAME_PRINT(&origin)));
125
126 if (ORTE_IOF_EXCLUSIVE & stream) {
127 exclusive = true;
128 } else {
129 exclusive = false;
130 }
131
132 OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
133 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &origin)) {
134
135 goto PROCESS;
136 }
137 }
138
139 proct = OBJ_NEW(orte_iof_proc_t);
140 proct->name.jobid = origin.jobid;
141 proct->name.vpid = origin.vpid;
142 opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
143
144 PROCESS:
145
146
147
148 if (NULL == proct->subscribers) {
149 proct->subscribers = OBJ_NEW(opal_list_t);
150 }
151 if (ORTE_IOF_STDOUT & stream) {
152 ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDOUT, NULL);
153 sink->daemon.jobid = requestor.jobid;
154 sink->daemon.vpid = requestor.vpid;
155 sink->exclusive = exclusive;
156 opal_list_append(proct->subscribers, &sink->super);
157 }
158 if (ORTE_IOF_STDERR & stream) {
159 ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR, NULL);
160 sink->daemon.jobid = requestor.jobid;
161 sink->daemon.vpid = requestor.vpid;
162 sink->exclusive = exclusive;
163 opal_list_append(proct->subscribers, &sink->super);
164 }
165 if (ORTE_IOF_STDDIAG & stream) {
166 ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG, NULL);
167 sink->daemon.jobid = requestor.jobid;
168 sink->daemon.vpid = requestor.vpid;
169 sink->exclusive = exclusive;
170 opal_list_append(proct->subscribers, &sink->super);
171 }
172 goto CLEAN_RETURN;
173 }
174
175 if (ORTE_IOF_CLOSE & stream) {
176 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
177 "%s received close cmd from remote tool %s for proc %s",
178 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
179 ORTE_NAME_PRINT(sender),
180 ORTE_NAME_PRINT(&origin)));
181
182
183
184 OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
185 if (OPAL_EQUAL != orte_util_compare_name_fields(mask, &proct->name, &origin)) {
186 continue;
187 }
188 OPAL_LIST_FOREACH_SAFE(sink, next, proct->subscribers, orte_iof_sink_t) {
189
190 if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
191 continue;
192 }
193
194 if ((stream & sink->tag) &&
195 sink->name.jobid == origin.jobid &&
196 (ORTE_VPID_WILDCARD == sink->name.vpid ||
197 ORTE_VPID_WILDCARD == origin.vpid ||
198 sink->name.vpid == origin.vpid)) {
199
200
201
202 orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, ORTE_IOF_CLOSE, NULL, 0);
203 opal_list_remove_item(proct->subscribers, &sink->super);
204 OBJ_RELEASE(sink);
205 }
206 }
207 }
208 goto CLEAN_RETURN;
209 }
210
211
212 numbytes=ORTE_IOF_BASE_MSG_MAX;
213 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
214 ORTE_ERROR_LOG(rc);
215 goto CLEAN_RETURN;
216 }
217
218
219 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
220 "%s unpacked %d bytes from remote proc %s",
221 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
222 ORTE_NAME_PRINT(&origin)));
223
224
225 OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
226 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &origin)) {
227
228 goto NSTEP;
229 }
230 }
231
232 proct = OBJ_NEW(orte_iof_proc_t);
233 proct->name.jobid = origin.jobid;
234 proct->name.vpid = origin.vpid;
235 opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
236
237 NSTEP:
238
239 exclusive = false;
240 if (NULL != proct->subscribers) {
241 OPAL_LIST_FOREACH(sink, proct->subscribers, orte_iof_sink_t) {
242
243 if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
244 continue;
245 }
246 if ((stream & sink->tag) &&
247 sink->name.jobid == origin.jobid &&
248 (ORTE_VPID_WILDCARD == sink->name.vpid ||
249 ORTE_VPID_WILDCARD == origin.vpid ||
250 sink->name.vpid == origin.vpid)) {
251
252 if (NULL != opal_pmix.server_iof_push) {
253
254 if (0 < numbytes) {
255 rc = opal_pmix.server_iof_push(&proct->name, stream, data, numbytes);
256 if (ORTE_SUCCESS != rc) {
257 ORTE_ERROR_LOG(rc);
258 }
259 }
260 } else {
261 orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, stream, data, numbytes);
262 }
263 if (sink->exclusive) {
264 exclusive = true;
265 }
266 }
267 }
268 }
269
270 if (!proct->copy) {
271 return;
272 }
273
274
275 if (!exclusive) {
276 if (ORTE_IOF_STDOUT & stream || orte_xml_output) {
277 orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev);
278 } else {
279 orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev);
280 }
281 }
282
283 CLEAN_RETURN:
284 return;
285 }