This source file includes following definitions.
- init
- tool_push
- send_cb
- tool_pull
- tool_close
- finalize
- tool_output
- tool_ft_event
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "orte_config.h"
24 #include "orte/constants.h"
25
26 #include <errno.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30 #include <string.h>
31
32 #include "orte/mca/rml/rml.h"
33 #include "orte/mca/rml/rml_types.h"
34 #include "orte/mca/errmgr/errmgr.h"
35 #include "orte/util/name_fns.h"
36 #include "orte/runtime/orte_globals.h"
37 #include "orte/runtime/orte_wait.h"
38
39 #include "orte/mca/iof/iof.h"
40 #include "orte/mca/iof/base/base.h"
41
42 #include "iof_tool.h"
43
44 static int init(void);
45
46 static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
47
48 static int tool_pull(const orte_process_name_t* src_name,
49 orte_iof_tag_t src_tag,
50 int fd);
51
52 static int tool_close(const orte_process_name_t* peer,
53 orte_iof_tag_t source_tag);
54
55 static int tool_output(const orte_process_name_t* peer,
56 orte_iof_tag_t source_tag,
57 const char *msg);
58
59 static int finalize(void);
60
61 static int tool_ft_event(int state);
62
63 orte_iof_base_module_t orte_iof_tool_module = {
64 .init = init,
65 .push = tool_push,
66 .pull = tool_pull,
67 .close = tool_close,
68 .output = tool_output,
69 .finalize = finalize,
70 .ft_event = tool_ft_event
71 };
72
73
74 static int init(void)
75 {
76
77
78 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
79 ORTE_RML_TAG_IOF_PROXY,
80 ORTE_RML_PERSISTENT,
81 orte_iof_tool_recv,
82 NULL);
83
84 mca_iof_tool_component.closed = false;
85
86 return ORTE_SUCCESS;
87 }
88
89
90
91
92
93
94 static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
95 {
96
97
98
99
100
101
102 return ORTE_ERR_NOT_SUPPORTED;
103 }
104
105
106
107
108
109 static void send_cb(int status, orte_process_name_t *peer,
110 opal_buffer_t *buf, orte_rml_tag_t tag,
111 void *cbdata)
112 {
113
114 OBJ_RELEASE(buf);
115 }
116
117
118
119
120
121
122 static int tool_pull(const orte_process_name_t* src_name,
123 orte_iof_tag_t src_tag,
124 int fd)
125 {
126
127
128
129
130
131
132
133
134
135 opal_buffer_t *buf;
136 orte_iof_tag_t tag;
137 orte_process_name_t hnp;
138 int rc;
139
140 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
141 "%s pulling output for proc %s",
142 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
143 ORTE_NAME_PRINT(src_name)));
144
145 buf = OBJ_NEW(opal_buffer_t);
146
147
148 tag = src_tag | ORTE_IOF_PULL;
149
150
151
152
153 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
154 ORTE_ERROR_LOG(rc);
155 OBJ_RELEASE(buf);
156 return rc;
157 }
158
159 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) {
160 ORTE_ERROR_LOG(rc);
161 OBJ_RELEASE(buf);
162 return rc;
163 }
164
165 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
166 ORTE_ERROR_LOG(rc);
167 OBJ_RELEASE(buf);
168 return rc;
169 }
170
171
172 ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid);
173 orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP,
174 send_cb, NULL);
175
176 return ORTE_SUCCESS;
177 }
178
179
180 static int tool_close(const orte_process_name_t* src_name,
181 orte_iof_tag_t src_tag)
182 {
183
184
185
186
187 opal_buffer_t *buf;
188 orte_iof_tag_t tag;
189 orte_process_name_t hnp;
190 int rc;
191
192 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
193 "%s closing output for proc %s",
194 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
195 ORTE_NAME_PRINT(src_name)));
196
197 buf = OBJ_NEW(opal_buffer_t);
198
199
200 tag = src_tag | ORTE_IOF_CLOSE;
201
202
203
204
205 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
206 ORTE_ERROR_LOG(rc);
207 OBJ_RELEASE(buf);
208 return rc;
209 }
210
211 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) {
212 ORTE_ERROR_LOG(rc);
213 OBJ_RELEASE(buf);
214 return rc;
215 }
216
217
218 mca_iof_tool_component.closed = false;
219
220
221 ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid);
222 orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP,
223 send_cb, NULL);
224
225 return ORTE_SUCCESS;
226 }
227
228 static int finalize(void)
229 {
230 opal_list_item_t* item;
231 orte_iof_write_output_t *output;
232 orte_iof_write_event_t *wev;
233 int num_written;
234 bool dump;
235
236
237 wev = orte_iof_base.iof_write_stdout->wev;
238 if (!opal_list_is_empty(&wev->outputs)) {
239 dump = false;
240
241 while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
242 output = (orte_iof_write_output_t*)item;
243 if (!dump) {
244 num_written = write(wev->fd, output->data, output->numbytes);
245 if (num_written < output->numbytes) {
246
247 dump = true;
248 }
249 }
250 OBJ_RELEASE(output);
251 }
252 }
253 OBJ_RELEASE(orte_iof_base.iof_write_stdout);
254 if (!orte_xml_output) {
255
256 wev = orte_iof_base.iof_write_stderr->wev;
257 if (!opal_list_is_empty(&wev->outputs)) {
258 dump = false;
259
260 while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
261 output = (orte_iof_write_output_t*)item;
262 if (!dump) {
263 num_written = write(wev->fd, output->data, output->numbytes);
264 if (num_written < output->numbytes) {
265
266 dump = true;
267 }
268 }
269 OBJ_RELEASE(output);
270 }
271 }
272 OBJ_RELEASE(orte_iof_base.iof_write_stderr);
273 }
274
275
276 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
277
278 return ORTE_SUCCESS;
279 }
280
281 static int tool_output(const orte_process_name_t* peer,
282 orte_iof_tag_t source_tag,
283 const char *msg)
284 {
285
286 if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
287 orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
288 } else {
289 orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
290 }
291
292 return ORTE_SUCCESS;
293 }
294
295
296
297
298
299 static int tool_ft_event(int state)
300 {
301 return ORTE_ERR_NOT_IMPLEMENTED;
302 }