This source file includes following definitions.
- orte_iof_base_write_output
- orte_iof_base_static_dump_output
- orte_iof_base_write_handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 #include "orte_config.h"
29 #include "orte/constants.h"
30
31 #include <string.h>
32 #include <stdlib.h>
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <time.h>
37 #include <errno.h>
38
39 #include "opal/util/output.h"
40
41 #include "orte/util/name_fns.h"
42 #include "orte/util/threads.h"
43 #include "orte/runtime/orte_globals.h"
44 #include "orte/mca/errmgr/errmgr.h"
45 #include "orte/mca/state/state.h"
46
47 #include "orte/mca/iof/base/base.h"
48
49 int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
50 const unsigned char *data, int numbytes,
51 orte_iof_write_event_t *channel)
52 {
53 char starttag[ORTE_IOF_BASE_TAG_MAX], endtag[ORTE_IOF_BASE_TAG_MAX], *suffix;
54 orte_iof_write_output_t *output;
55 int i, j, k, starttaglen, endtaglen, num_buffered;
56 bool endtagged;
57 char qprint[10];
58
59 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
60 "%s write:output setting up to write %d bytes to %s for %s on fd %d",
61 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
62 (ORTE_IOF_STDIN & stream) ? "stdin" : ((ORTE_IOF_STDOUT & stream) ? "stdout" : ((ORTE_IOF_STDERR & stream) ? "stderr" : "stddiag")),
63 ORTE_NAME_PRINT(name),
64 (NULL == channel) ? -1 : channel->fd));
65
66
67 output = OBJ_NEW(orte_iof_write_output_t);
68
69
70 if (ORTE_IOF_STDIN & stream) {
71
72 if (0 < numbytes) {
73
74
75
76
77 memcpy(output->data, data, numbytes);
78 }
79 output->numbytes = numbytes;
80 goto process;
81 } else if (ORTE_IOF_STDOUT & stream) {
82
83 suffix = "stdout";
84 } else if (ORTE_IOF_STDERR & stream) {
85
86 suffix = "stderr";
87 } else if (ORTE_IOF_STDDIAG & stream) {
88
89 suffix = "stddiag";
90 } else {
91
92 ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
93 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
94 "%s stream %0x", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), stream));
95 return ORTE_ERR_VALUE_OUT_OF_BOUNDS;
96 }
97
98
99
100
101 if (orte_xml_output) {
102 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "<%s rank=\"%s\">", suffix, ORTE_VPID_PRINT(name->vpid));
103 snprintf(endtag, ORTE_IOF_BASE_TAG_MAX, "</%s>", suffix);
104 goto construct;
105 }
106
107
108 if (orte_timestamp_output) {
109 time_t mytime;
110 char *cptr;
111
112 time(&mytime);
113 cptr = ctime(&mytime);
114 cptr[strlen(cptr)-1] = '\0';
115
116 if (orte_tag_output) {
117
118 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s[%s,%s]<%s>:",
119 cptr, ORTE_LOCAL_JOBID_PRINT(name->jobid),
120 ORTE_VPID_PRINT(name->vpid), suffix);
121 } else {
122
123 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "%s<%s>:", cptr, suffix);
124 }
125
126 memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
127 goto construct;
128 }
129
130 if (orte_tag_output) {
131 snprintf(starttag, ORTE_IOF_BASE_TAG_MAX, "[%s,%s]<%s>:",
132 ORTE_LOCAL_JOBID_PRINT(name->jobid),
133 ORTE_VPID_PRINT(name->vpid), suffix);
134
135 memset(endtag, '\0', ORTE_IOF_BASE_TAG_MAX);
136 goto construct;
137 }
138
139
140
141
142 if (0 < numbytes) {
143
144
145
146
147 memcpy(output->data, data, numbytes);
148 }
149 output->numbytes = numbytes;
150 goto process;
151
152 construct:
153 starttaglen = strlen(starttag);
154 endtaglen = strlen(endtag);
155 endtagged = false;
156
157 for (j=0, k=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
158 output->data[k++] = starttag[j];
159 }
160
161
162
163 for (i=0; i < numbytes && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; i++) {
164 if (orte_xml_output) {
165 if ('&' == data[i]) {
166 if (k+5 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
167 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
168 goto process;
169 }
170 snprintf(qprint, 10, "&");
171 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
172 output->data[k++] = qprint[j];
173 }
174 } else if ('<' == data[i]) {
175 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
176 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
177 goto process;
178 }
179 snprintf(qprint, 10, "<");
180 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
181 output->data[k++] = qprint[j];
182 }
183 } else if ('>' == data[i]) {
184 if (k+4 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
185 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
186 goto process;
187 }
188 snprintf(qprint, 10, ">");
189 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
190 output->data[k++] = qprint[j];
191 }
192 } else if (data[i] < 32 || data[i] > 127) {
193
194 if (k+7 >= ORTE_IOF_BASE_TAGGED_OUT_MAX) {
195 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
196 goto process;
197 }
198 snprintf(qprint, 10, "&#%03d;", (int)data[i]);
199 for (j=0; j < (int)strlen(qprint) && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
200 output->data[k++] = qprint[j];
201 }
202
203 if ('\n' == data[i] && (k+endtaglen+1) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
204
205 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
206 output->data[k++] = endtag[j];
207 }
208
209 output->data[k++] = '\n';
210
211 if (i < numbytes-1 && (k+starttaglen) < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
212 for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
213 output->data[k++] = starttag[j];
214 endtagged = false;
215 }
216 } else {
217 endtagged = true;
218 }
219 }
220 } else {
221 output->data[k++] = data[i];
222 }
223 } else {
224 if ('\n' == data[i]) {
225
226 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
227 output->data[k++] = endtag[j];
228 }
229
230 output->data[k++] = '\n';
231
232 if (i < numbytes-1) {
233 for (j=0; j < starttaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX; j++) {
234 output->data[k++] = starttag[j];
235 endtagged = false;
236 }
237 } else {
238 endtagged = true;
239 }
240 } else {
241 output->data[k++] = data[i];
242 }
243 }
244 }
245 if (!endtagged && k < ORTE_IOF_BASE_TAGGED_OUT_MAX) {
246
247 for (j=0; j < endtaglen && k < ORTE_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
248 output->data[k++] = endtag[j];
249 }
250 output->data[k] = '\n';
251 }
252 output->numbytes = k;
253
254 process:
255
256 opal_list_append(&channel->outputs, &output->super);
257
258
259 num_buffered = opal_list_get_size(&channel->outputs);
260
261
262 if (!channel->pending) {
263
264 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
265 "%s write:output adding write event",
266 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
267 ORTE_IOF_SINK_ACTIVATE(channel);
268 }
269
270 return num_buffered;
271 }
272
273 void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
274 {
275 bool dump;
276 int num_written;
277 orte_iof_write_event_t *wev;
278 orte_iof_write_output_t *output;
279
280 if (NULL != rev->sink) {
281 wev = rev->sink->wev;
282 if (NULL != wev && !opal_list_is_empty(&wev->outputs)) {
283 dump = false;
284
285 while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
286 if (!dump) {
287 num_written = write(wev->fd, output->data, output->numbytes);
288 if (num_written < output->numbytes) {
289
290 dump = true;
291 }
292 }
293 OBJ_RELEASE(output);
294 }
295 }
296 }
297 }
298
299 void orte_iof_base_write_handler(int _fd, short event, void *cbdata)
300 {
301 orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
302 orte_iof_write_event_t *wev = sink->wev;
303 opal_list_item_t *item;
304 orte_iof_write_output_t *output;
305 int num_written, total_written = 0;
306
307 ORTE_ACQUIRE_OBJECT(sink);
308
309 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
310 "%s write:handler writing data to %d",
311 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
312 wev->fd));
313
314 while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
315 output = (orte_iof_write_output_t*)item;
316 if (0 == output->numbytes) {
317
318 OBJ_RELEASE(sink);
319 return;
320 }
321 num_written = write(wev->fd, output->data, output->numbytes);
322 if (num_written < 0) {
323 if (EAGAIN == errno || EINTR == errno) {
324
325 opal_list_prepend(&wev->outputs, item);
326
327 if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
328 opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
329 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
330 goto ABORT;
331 }
332
333
334
335 goto NEXT_CALL;
336 }
337
338
339
340 OBJ_RELEASE(output);
341 goto ABORT;
342 } else if (num_written < output->numbytes) {
343
344 memmove(output->data, &output->data[num_written], output->numbytes - num_written);
345
346 output->numbytes -= num_written;
347
348 opal_list_prepend(&wev->outputs, item);
349
350 if (orte_iof_base.output_limit < opal_list_get_size(&wev->outputs)) {
351 opal_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
352 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
353 goto ABORT;
354 }
355
356
357
358 goto NEXT_CALL;
359 }
360 OBJ_RELEASE(output);
361
362 total_written += num_written;
363 if(wev->always_writable && (ORTE_IOF_SINK_BLOCKSIZE <= total_written)){
364
365
366
367
368 goto NEXT_CALL;
369 }
370 }
371 ABORT:
372 wev->pending = false;
373 ORTE_POST_OBJECT(wev);
374 return;
375 NEXT_CALL:
376 ORTE_IOF_SINK_ACTIVATE(wev);
377 }