This source file includes following definitions.
- msgcbfunc
- PMIx_IOF_pull
- stdincbfunc
- PMIx_IOF_push
- pmix_iof_write_output
- pmix_iof_static_dump_output
- pmix_iof_write_handler
- pmix_iof_stdin_check
- pmix_iof_stdin_cb
- iof_stdin_cbfunc
- pmix_iof_read_local_handler
- iof_sink_construct
- iof_sink_destruct
- iof_read_event_construct
- iof_read_event_destruct
- iof_write_event_construct
- iof_write_event_destruct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include <src/include/pmix_config.h>
16
17 #ifdef HAVE_FCNTL_H
18 #include <fcntl.h>
19 #else
20 #ifdef HAVE_SYS_FCNTL_H
21 #include <sys/fcntl.h>
22 #endif
23 #endif
24
25 #include <src/include/pmix_stdint.h>
26 #include <src/include/pmix_socket_errno.h>
27
28 #include <pmix.h>
29 #include <pmix_common.h>
30 #include <pmix_server.h>
31 #include <pmix_rename.h>
32
33 #include "src/threads/threads.h"
34 #include "src/util/argv.h"
35 #include "src/util/error.h"
36 #include "src/util/name_fns.h"
37 #include "src/util/output.h"
38 #include "src/mca/bfrops/bfrops.h"
39 #include "src/mca/ptl/ptl.h"
40
41 #include "src/client/pmix_client_ops.h"
42 #include "src/server/pmix_server_ops.h"
43 #include "src/include/pmix_globals.h"
44
45 static void msgcbfunc(struct pmix_peer_t *peer,
46 pmix_ptl_hdr_t *hdr,
47 pmix_buffer_t *buf, void *cbdata)
48 {
49 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
50 int32_t m;
51 pmix_status_t rc, status;
52
53
54 m=1;
55 PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &m, PMIX_STATUS);
56 if (PMIX_SUCCESS == rc && PMIX_SUCCESS == status) {
57
58
59 pmix_list_append(&pmix_globals.iof_requests, &cd->iofreq->super);
60 } else if (PMIX_SUCCESS != rc) {
61 status = rc;
62 PMIX_RELEASE(cd->iofreq);
63 }
64
65 pmix_output_verbose(2, pmix_client_globals.iof_output,
66 "pmix:iof_register returned status %s", PMIx_Error_string(status));
67
68 if (NULL != cd->cbfunc.opcbfn) {
69 cd->cbfunc.opcbfn(status, cd->cbdata);
70 }
71 PMIX_RELEASE(cd);
72 }
73
74 PMIX_EXPORT pmix_status_t PMIx_IOF_pull(const pmix_proc_t procs[], size_t nprocs,
75 const pmix_info_t directives[], size_t ndirs,
76 pmix_iof_channel_t channel, pmix_iof_cbfunc_t cbfunc,
77 pmix_hdlr_reg_cbfunc_t regcbfunc, void *regcbdata)
78 {
79 pmix_shift_caddy_t *cd;
80 pmix_cmd_t cmd = PMIX_IOF_PULL_CMD;
81 pmix_buffer_t *msg;
82 pmix_status_t rc;
83
84 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
85
86 pmix_output_verbose(2, pmix_client_globals.iof_output,
87 "pmix:iof_register");
88
89 if (pmix_globals.init_cntr <= 0) {
90 PMIX_RELEASE_THREAD(&pmix_global_lock);
91 return PMIX_ERR_INIT;
92 }
93
94
95 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
96 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
97 PMIX_RELEASE_THREAD(&pmix_global_lock);
98 return PMIX_ERR_NOT_SUPPORTED;
99 }
100
101
102 if (PMIX_FWD_STDIN_CHANNEL & channel) {
103 PMIX_RELEASE_THREAD(&pmix_global_lock);
104 return PMIX_ERR_NOT_SUPPORTED;
105 }
106
107
108 if (!pmix_globals.connected) {
109 PMIX_RELEASE_THREAD(&pmix_global_lock);
110 return PMIX_ERR_UNREACH;
111 }
112 PMIX_RELEASE_THREAD(&pmix_global_lock);
113
114
115 cd = PMIX_NEW(pmix_shift_caddy_t);
116 if (NULL == cd) {
117 return PMIX_ERR_NOMEM;
118 }
119 cd->cbfunc.hdlrregcbfn = regcbfunc;
120 cd->cbdata = regcbdata;
121
122 cd->iofreq = PMIX_NEW(pmix_iof_req_t);
123 if (NULL == cd->iofreq) {
124 PMIX_RELEASE(cd);
125 return PMIX_ERR_NOMEM;
126 }
127
128 cd->iofreq->channels = channel;
129 cd->iofreq->cbfunc = cbfunc;
130
131
132
133
134 msg = PMIX_NEW(pmix_buffer_t);
135 if (NULL == msg) {
136 PMIX_RELEASE(cd->iofreq);
137 PMIX_RELEASE(cd);
138 return PMIX_ERR_NOMEM;
139 }
140 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
141 msg, &cmd, 1, PMIX_COMMAND);
142 if (PMIX_SUCCESS != rc) {
143 PMIX_ERROR_LOG(rc);
144 goto cleanup;
145 }
146 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
147 msg, &nprocs, 1, PMIX_SIZE);
148 if (PMIX_SUCCESS != rc) {
149 PMIX_ERROR_LOG(rc);
150 goto cleanup;
151 }
152 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
153 msg, procs, nprocs, PMIX_PROC);
154 if (PMIX_SUCCESS != rc) {
155 PMIX_ERROR_LOG(rc);
156 goto cleanup;
157 }
158 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
159 msg, &ndirs, 1, PMIX_SIZE);
160 if (PMIX_SUCCESS != rc) {
161 PMIX_ERROR_LOG(rc);
162 goto cleanup;
163 }
164 if (0 < ndirs) {
165 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
166 msg, directives, ndirs, PMIX_INFO);
167 if (PMIX_SUCCESS != rc) {
168 PMIX_ERROR_LOG(rc);
169 goto cleanup;
170 }
171 }
172 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
173 msg, &channel, 1, PMIX_IOF_CHANNEL);
174 if (PMIX_SUCCESS != rc) {
175 PMIX_ERROR_LOG(rc);
176 goto cleanup;
177 }
178
179 pmix_output_verbose(2, pmix_client_globals.iof_output,
180 "pmix:iof_request sending to server");
181 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
182 msg, msgcbfunc, (void*)cd);
183
184 cleanup:
185 if (PMIX_SUCCESS != rc) {
186 PMIX_ERROR_LOG(rc);
187 PMIX_RELEASE(msg);
188 PMIX_RELEASE(cd->iofreq);
189 PMIX_RELEASE(cd);
190 }
191 return rc;
192 }
193
194 typedef struct {
195 pmix_op_cbfunc_t cbfunc;
196 void *cbdata;
197 } pmix_ltcaddy_t;
198
199 static pmix_event_t stdinsig_ev;
200 static pmix_iof_read_event_t *stdinev = NULL;
201
202 static void stdincbfunc(struct pmix_peer_t *peer,
203 pmix_ptl_hdr_t *hdr,
204 pmix_buffer_t *buf, void *cbdata)
205 {
206 pmix_ltcaddy_t *cd = (pmix_ltcaddy_t*)cbdata;
207 int cnt;
208 pmix_status_t rc, status;
209
210
211
212 if (PMIX_BUFFER_IS_EMPTY(buf)) {
213
214 if (NULL != cd->cbfunc) {
215 cd->cbfunc(PMIX_ERR_COMM_FAILURE, cd->cbdata);
216 }
217 free(cd);
218 return;
219 }
220
221
222 cnt = 1;
223 PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &cnt, PMIX_STATUS);
224 if (PMIX_SUCCESS != rc) {
225 status = rc;
226 }
227 if (NULL != cd->cbfunc) {
228 cd->cbfunc(status, cd->cbdata);
229 }
230 free(cd);
231 }
232
233 pmix_status_t PMIx_IOF_push(const pmix_proc_t targets[], size_t ntargets,
234 pmix_byte_object_t *bo,
235 const pmix_info_t directives[], size_t ndirs,
236 pmix_op_cbfunc_t cbfunc, void *cbdata)
237 {
238 pmix_buffer_t *msg;
239 pmix_cmd_t cmd = PMIX_IOF_PUSH_CMD;
240 pmix_status_t rc = PMIX_SUCCESS;
241 pmix_ltcaddy_t *cd;
242 size_t n;
243 bool begincollecting, stopcollecting;
244 int flags, fd = fileno(stdin);
245
246 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
247 if (pmix_globals.init_cntr <= 0) {
248 PMIX_RELEASE_THREAD(&pmix_global_lock);
249 return PMIX_ERR_INIT;
250 }
251 PMIX_RELEASE_THREAD(&pmix_global_lock);
252
253 if (NULL == bo) {
254
255 for (n=0; n < ndirs; n++) {
256 if (PMIX_CHECK_KEY(&directives[n], PMIX_IOF_PUSH_STDIN)) {
257
258
259 begincollecting = PMIX_INFO_TRUE(&directives[n]);
260 if (begincollecting) {
261
262 if (!pmix_globals.pushstdin) {
263
264 pmix_globals.pushstdin = true;
265
266
267
268
269
270
271
272
273
274
275 if (0 != fd) {
276 if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
277 pmix_output(pmix_client_globals.iof_output,
278 "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
279 __FILE__, __LINE__, errno);
280 } else {
281 flags |= O_NONBLOCK;
282 fcntl(fd, F_SETFL, flags);
283 }
284 }
285 if (isatty(fd)) {
286
287
288
289
290
291
292
293 pmix_event_signal_set(pmix_globals.evbase, &stdinsig_ev,
294 SIGCONT, pmix_iof_stdin_cb,
295 NULL);
296
297
298
299
300
301
302 PMIX_IOF_READ_EVENT(&stdinev,
303 targets, ntargets,
304 directives, ndirs, fd,
305 pmix_iof_read_local_handler, false);
306
307
308
309
310
311 if (pmix_iof_stdin_check(fd)) {
312 PMIX_IOF_READ_ACTIVATE(stdinev);
313 }
314 } else {
315
316
317
318 PMIX_IOF_READ_EVENT(&stdinev, targets, ntargets,
319 directives, ndirs, fd,
320 pmix_iof_read_local_handler, true);
321 }
322 }
323 } else {
324 if (pmix_globals.pushstdin) {
325
326
327
328
329
330
331 }
332 }
333 } else if (PMIX_CHECK_KEY(&directives[n], PMIX_IOF_COMPLETE)) {
334
335
336
337
338 stopcollecting = PMIX_INFO_TRUE(&directives[n]);
339 if (stopcollecting) {
340 if (pmix_globals.pushstdin) {
341
342
343
344 }
345 }
346 }
347 }
348 return PMIX_OPERATION_SUCCEEDED;
349 }
350
351
352
353 if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) ||
354 PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
355 msg = PMIX_NEW(pmix_buffer_t);
356 if (NULL == msg) {
357 return PMIX_ERR_NOMEM;
358 }
359 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
360 msg, &cmd, 1, PMIX_COMMAND);
361 if (PMIX_SUCCESS != rc) {
362 PMIX_ERROR_LOG(rc);
363 PMIX_RELEASE(msg);
364 return rc;
365 }
366 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
367 msg, &ntargets, 1, PMIX_SIZE);
368 if (PMIX_SUCCESS != rc) {
369 PMIX_ERROR_LOG(rc);
370 PMIX_RELEASE(msg);
371 return rc;
372 }
373 if (0 < ntargets) {
374 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
375 msg, targets, ntargets, PMIX_PROC);
376 if (PMIX_SUCCESS != rc) {
377 PMIX_ERROR_LOG(rc);
378 PMIX_RELEASE(msg);
379 return rc;
380 }
381 }
382 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
383 msg, &ndirs, 1, PMIX_SIZE);
384 if (PMIX_SUCCESS != rc) {
385 PMIX_ERROR_LOG(rc);
386 PMIX_RELEASE(msg);
387 return rc;
388 }
389 if (0 < ndirs) {
390 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
391 msg, directives, ndirs, PMIX_INFO);
392 if (PMIX_SUCCESS != rc) {
393 PMIX_ERROR_LOG(rc);
394 PMIX_RELEASE(msg);
395 return rc;
396 }
397 }
398 if (NULL != bo) {
399 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
400 msg, bo, 1, PMIX_BYTE_OBJECT);
401 if (PMIX_SUCCESS != rc) {
402 PMIX_ERROR_LOG(rc);
403 PMIX_RELEASE(msg);
404 return rc;
405 }
406 }
407
408 cd = (pmix_ltcaddy_t*)malloc(sizeof(pmix_ltcaddy_t));
409 if (NULL == cd) {
410 PMIX_RELEASE(msg);
411 rc = PMIX_ERR_NOMEM;
412 return rc;
413 }
414 cd->cbfunc = cbfunc;
415 cd->cbdata = cbdata;
416 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
417 msg, stdincbfunc, cd);
418 if (PMIX_SUCCESS != rc) {
419 PMIX_ERROR_LOG(rc);
420 PMIX_RELEASE(msg);
421 free(cd);
422 }
423 return rc;
424 }
425
426
427 if (NULL == pmix_host_server.push_stdin) {
428 return PMIX_ERR_NOT_SUPPORTED;
429 }
430 rc = pmix_host_server.push_stdin(&pmix_globals.myid,
431 targets, ntargets,
432 directives, ndirs,
433 bo, cbfunc, cbdata);
434 return rc;
435 }
436
437 pmix_status_t pmix_iof_write_output(const pmix_proc_t *name,
438 pmix_iof_channel_t stream,
439 const pmix_byte_object_t *bo,
440 pmix_iof_flags_t *flags)
441 {
442 char starttag[PMIX_IOF_BASE_TAG_MAX], endtag[PMIX_IOF_BASE_TAG_MAX], *suffix;
443 pmix_iof_write_output_t *output;
444 size_t i;
445 int j, k, starttaglen, endtaglen, num_buffered;
446 bool endtagged;
447 char qprint[10];
448 pmix_iof_write_event_t *channel;
449 pmix_iof_flags_t myflags;
450
451 if (PMIX_FWD_STDOUT_CHANNEL & stream) {
452 channel = &pmix_client_globals.iof_stdout.wev;
453 } else {
454 channel = &pmix_client_globals.iof_stderr.wev;
455 }
456 if (NULL == flags) {
457 myflags.xml = pmix_globals.xml_output;
458 if (pmix_globals.timestamp_output) {
459 time(&myflags.timestamp);
460 } else {
461 myflags.timestamp = 0;
462 }
463 myflags.tag = pmix_globals.tag_output;
464 } else {
465 myflags = *flags;
466 }
467
468 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
469 "%s write:output setting up to write %lu bytes to %s for %s on fd %d",
470 PMIX_NAME_PRINT(&pmix_globals.myid),
471 (unsigned long)bo->size,
472 PMIx_IOF_channel_string(stream),
473 PMIX_NAME_PRINT(name),
474 (NULL == channel) ? -1 : channel->fd));
475
476
477 output = PMIX_NEW(pmix_iof_write_output_t);
478 memset(starttag, 0, PMIX_IOF_BASE_TAG_MAX);
479 memset(endtag, 0, PMIX_IOF_BASE_TAG_MAX);
480
481
482 if (PMIX_FWD_STDIN_CHANNEL & stream) {
483
484 if (0 < bo->size) {
485
486
487
488
489 memcpy(output->data, bo->bytes, bo->size);
490 }
491 output->numbytes = bo->size;
492 goto process;
493 } else if (PMIX_FWD_STDOUT_CHANNEL & stream) {
494
495 suffix = "stdout";
496 } else if (PMIX_FWD_STDERR_CHANNEL & stream) {
497
498 suffix = "stderr";
499 } else if (PMIX_FWD_STDDIAG_CHANNEL & stream) {
500
501 suffix = "stddiag";
502 } else {
503
504 PMIX_ERROR_LOG(PMIX_ERR_VALUE_OUT_OF_BOUNDS);
505 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
506 "%s stream %0x", PMIX_NAME_PRINT(&pmix_globals.myid), stream));
507 return PMIX_ERR_VALUE_OUT_OF_BOUNDS;
508 }
509
510
511
512
513 if (myflags.xml) {
514 snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "<%s rank=\"%s\">", suffix, PMIX_RANK_PRINT(name->rank));
515 snprintf(endtag, PMIX_IOF_BASE_TAG_MAX, "</%s>", suffix);
516 goto construct;
517 }
518
519
520 if (0 < myflags.timestamp) {
521 char *cptr;
522
523 cptr = ctime(&myflags.timestamp);
524 cptr[strlen(cptr)-1] = '\0';
525
526 if (myflags.tag) {
527
528 snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "%s[%s]<%s>:",
529 cptr, PMIX_NAME_PRINT(name), suffix);
530 } else {
531
532 snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "%s<%s>:", cptr, suffix);
533 }
534
535 memset(endtag, '\0', PMIX_IOF_BASE_TAG_MAX);
536 goto construct;
537 }
538
539 if (myflags.tag) {
540 snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "[%s]<%s>:",
541 PMIX_NAME_PRINT(name), suffix);
542
543 memset(endtag, '\0', PMIX_IOF_BASE_TAG_MAX);
544 goto construct;
545 }
546
547
548
549
550 if (0 < bo->size) {
551
552
553
554
555 memcpy(output->data, bo->bytes, bo->size);
556 }
557 output->numbytes = bo->size;
558 goto process;
559
560 construct:
561 starttaglen = strlen(starttag);
562 endtaglen = strlen(endtag);
563 endtagged = false;
564
565 for (j=0, k=0; j < starttaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
566 output->data[k++] = starttag[j];
567 }
568
569
570
571 for (i=0; i < bo->size && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; i++) {
572 if (myflags.xml) {
573 if ('&' == bo->bytes[i]) {
574 if (k+5 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
575 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
576 goto process;
577 }
578 snprintf(qprint, 10, "&");
579 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
580 output->data[k++] = qprint[j];
581 }
582 } else if ('<' == bo->bytes[i]) {
583 if (k+4 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
584 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
585 goto process;
586 }
587 snprintf(qprint, 10, "<");
588 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
589 output->data[k++] = qprint[j];
590 }
591 } else if ('>' == bo->bytes[i]) {
592 if (k+4 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
593 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
594 goto process;
595 }
596 snprintf(qprint, 10, ">");
597 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
598 output->data[k++] = qprint[j];
599 }
600 } else if (bo->bytes[i] < 32 || bo->bytes[i] > 127) {
601
602 if (k+7 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
603 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
604 goto process;
605 }
606 snprintf(qprint, 10, "&#%03d;", (int)bo->bytes[i]);
607 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
608 output->data[k++] = qprint[j];
609 }
610
611 if ('\n' == bo->bytes[i] && (k+endtaglen+1) < PMIX_IOF_BASE_TAGGED_OUT_MAX) {
612
613 for (j=0; j < endtaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
614 output->data[k++] = endtag[j];
615 }
616
617 output->data[k++] = '\n';
618
619 if (i < bo->size-1 && (k+starttaglen) < PMIX_IOF_BASE_TAGGED_OUT_MAX) {
620 for (j=0; j < starttaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
621 output->data[k++] = starttag[j];
622 endtagged = false;
623 }
624 } else {
625 endtagged = true;
626 }
627 }
628 } else {
629 output->data[k++] = bo->bytes[i];
630 }
631 } else {
632 if ('\n' == bo->bytes[i]) {
633
634 for (j=0; j < endtaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
635 output->data[k++] = endtag[j];
636 }
637
638 output->data[k++] = '\n';
639
640 if (i < bo->size-1) {
641 for (j=0; j < starttaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
642 output->data[k++] = starttag[j];
643 endtagged = false;
644 }
645 } else {
646 endtagged = true;
647 }
648 } else {
649 output->data[k++] = bo->bytes[i];
650 }
651 }
652 }
653 if (!endtagged && k < PMIX_IOF_BASE_TAGGED_OUT_MAX) {
654
655 for (j=0; j < endtaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
656 output->data[k++] = endtag[j];
657 }
658 output->data[k] = '\n';
659 }
660 output->numbytes = k;
661
662 process:
663
664 pmix_list_append(&channel->outputs, &output->super);
665
666
667 num_buffered = pmix_list_get_size(&channel->outputs);
668
669
670 if (!channel->pending) {
671
672 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
673 "%s write:output adding write event",
674 PMIX_NAME_PRINT(&pmix_globals.myid)));
675 PMIX_IOF_SINK_ACTIVATE(channel);
676 }
677
678 return num_buffered;
679 }
680
681 void pmix_iof_static_dump_output(pmix_iof_sink_t *sink)
682 {
683 bool dump;
684 int num_written;
685 pmix_iof_write_event_t *wev = &sink->wev;
686 pmix_iof_write_output_t *output;
687
688 if (!pmix_list_is_empty(&wev->outputs)) {
689 dump = false;
690
691 while (NULL != (output = (pmix_iof_write_output_t*)pmix_list_remove_first(&wev->outputs))) {
692 if (!dump) {
693 num_written = write(wev->fd, output->data, output->numbytes);
694 if (num_written < output->numbytes) {
695
696 dump = true;
697 }
698 }
699 PMIX_RELEASE(output);
700 }
701 }
702 }
703
704 void pmix_iof_write_handler(int _fd, short event, void *cbdata)
705 {
706 pmix_iof_sink_t *sink = (pmix_iof_sink_t*)cbdata;
707 pmix_iof_write_event_t *wev = &sink->wev;
708 pmix_list_item_t *item;
709 pmix_iof_write_output_t *output;
710 int num_written, total_written = 0;
711
712 PMIX_ACQUIRE_OBJECT(sink);
713
714 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
715 "%s write:handler writing data to %d",
716 PMIX_NAME_PRINT(&pmix_globals.myid),
717 wev->fd));
718
719 while (NULL != (item = pmix_list_remove_first(&wev->outputs))) {
720 output = (pmix_iof_write_output_t*)item;
721 if (0 == output->numbytes) {
722
723 PMIX_RELEASE(sink);
724 return;
725 }
726 num_written = write(wev->fd, output->data, output->numbytes);
727 if (num_written < 0) {
728 if (EAGAIN == errno || EINTR == errno) {
729
730 pmix_list_prepend(&wev->outputs, item);
731
732 if (pmix_globals.output_limit < pmix_list_get_size(&wev->outputs)) {
733 pmix_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
734 goto ABORT;
735 }
736
737
738
739 goto NEXT_CALL;
740 }
741
742
743
744 PMIX_RELEASE(output);
745 goto ABORT;
746 } else if (num_written < output->numbytes) {
747
748 memmove(output->data, &output->data[num_written], output->numbytes - num_written);
749
750 output->numbytes -= num_written;
751
752 pmix_list_prepend(&wev->outputs, item);
753
754 if (pmix_globals.output_limit < pmix_list_get_size(&wev->outputs)) {
755 pmix_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
756 goto ABORT;
757 }
758
759
760
761 goto NEXT_CALL;
762 }
763 PMIX_RELEASE(output);
764
765 total_written += num_written;
766 if(wev->always_writable && (PMIX_IOF_SINK_BLOCKSIZE <= total_written)){
767
768
769
770
771 goto NEXT_CALL;
772 }
773 }
774 ABORT:
775 wev->pending = false;
776 PMIX_POST_OBJECT(wev);
777 return;
778 NEXT_CALL:
779 PMIX_IOF_SINK_ACTIVATE(wev);
780 }
781
782
783 bool pmix_iof_stdin_check(int fd)
784 {
785 #if defined(HAVE_TCGETPGRP)
786 if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) {
787 return false;
788 }
789 #endif
790 return true;
791 }
792
793 void pmix_iof_stdin_cb(int fd, short event, void *cbdata)
794 {
795 bool should_process;
796 pmix_iof_read_event_t *stdinev = (pmix_iof_read_event_t*)cbdata;
797
798 PMIX_ACQUIRE_OBJECT(stdinev);
799
800 should_process = pmix_iof_stdin_check(0);
801
802 if (should_process) {
803 PMIX_IOF_READ_ACTIVATE(stdinev);
804 } else {
805 pmix_event_del(&stdinev->ev);
806 stdinev->active = false;
807 PMIX_POST_OBJECT(stdinev);
808 }
809 }
810
811 static void iof_stdin_cbfunc(struct pmix_peer_t *peer,
812 pmix_ptl_hdr_t *hdr,
813 pmix_buffer_t *buf, void *cbdata)
814 {
815 pmix_iof_read_event_t *stdinev = (pmix_iof_read_event_t*)cbdata;
816 int cnt;
817 pmix_status_t rc, ret;
818
819 PMIX_ACQUIRE_OBJECT(stdinev);
820
821
822 cnt = 1;
823 PMIX_BFROPS_UNPACK(rc, peer, buf, &ret, &cnt, PMIX_STATUS);
824 if (PMIX_SUCCESS != rc) {
825 PMIX_ERROR_LOG(rc);
826 pmix_event_del(&stdinev->ev);
827 stdinev->active = false;
828 PMIX_POST_OBJECT(stdinev);
829 return;
830 }
831
832 if (PMIX_SUCCESS != ret) {
833 pmix_event_del(&stdinev->ev);
834 stdinev->active = false;
835 PMIX_POST_OBJECT(stdinev);
836 if (PMIX_ERR_IOF_COMPLETE != ret) {
837
838 PMIx_Notify_event(PMIX_ERR_IOF_FAILURE,
839 &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL,
840 NULL, 0, NULL, NULL);
841 }
842 return;
843 }
844
845 pmix_iof_stdin_cb(0, 0, stdinev);
846 }
847
848
849 void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata)
850 {
851 pmix_iof_read_event_t *rev = (pmix_iof_read_event_t*)cbdata;
852 unsigned char data[PMIX_IOF_BASE_MSG_MAX];
853 int32_t numbytes;
854 int fd;
855 pmix_status_t rc;
856 pmix_buffer_t *msg;
857 pmix_cmd_t cmd = PMIX_IOF_PUSH_CMD;
858 pmix_byte_object_t bo;
859
860 PMIX_ACQUIRE_OBJECT(rev);
861
862
863
864
865 fd = fileno(stdin);
866
867
868 memset(data, 0, PMIX_IOF_BASE_MSG_MAX);
869 numbytes = read(fd, data, sizeof(data));
870
871 if (numbytes < 0) {
872
873
874
875 if (EAGAIN == errno || EINTR == errno) {
876 PMIX_IOF_READ_ACTIVATE(rev);
877 return;
878 }
879
880 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
881 "%s iof:read handler Error on stdin",
882 PMIX_NAME_PRINT(&pmix_globals.myid)));
883
884
885
886
887 numbytes = 0;
888 }
889
890
891
892 rev->active = false;
893
894
895
896 msg = PMIX_NEW(pmix_buffer_t);
897 if (NULL == msg) {
898
899 return;
900 }
901 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
902 msg, &cmd, 1, PMIX_COMMAND);
903 if (PMIX_SUCCESS != rc) {
904 PMIX_ERROR_LOG(rc);
905 PMIX_RELEASE(msg);
906 return;
907 }
908
909 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
910 msg, &rev->ntargets, 1, PMIX_SIZE);
911 if (PMIX_SUCCESS != rc) {
912 PMIX_ERROR_LOG(rc);
913 PMIX_RELEASE(msg);
914 return;
915 }
916
917 if (0 < rev->ntargets) {
918 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
919 msg, rev->targets, rev->ntargets, PMIX_PROC);
920 if (PMIX_SUCCESS != rc) {
921 PMIX_ERROR_LOG(rc);
922 PMIX_RELEASE(msg);
923 return;
924 }
925 }
926
927 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
928 msg, &rev->ndirs, 1, PMIX_SIZE);
929 if (PMIX_SUCCESS != rc) {
930 PMIX_ERROR_LOG(rc);
931 PMIX_RELEASE(msg);
932 return;
933 }
934
935 if (0 < rev->ndirs) {
936 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
937 msg, rev->directives, rev->ndirs, PMIX_INFO);
938 if (PMIX_SUCCESS != rc) {
939 PMIX_ERROR_LOG(rc);
940 PMIX_RELEASE(msg);
941 return;
942 }
943 }
944
945
946 bo.bytes = (char*)data;
947 bo.size = numbytes;
948 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
949 msg, &bo, 1, PMIX_BYTE_OBJECT);
950 if (PMIX_SUCCESS != rc) {
951 PMIX_ERROR_LOG(rc);
952 PMIX_RELEASE(msg);
953 return;
954 }
955
956
957 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
958 msg, iof_stdin_cbfunc, rev);
959 if (PMIX_SUCCESS != rc) {
960 PMIX_ERROR_LOG(rc);
961 PMIX_RELEASE(msg);
962 }
963
964 return;
965 }
966
967
968 static void iof_sink_construct(pmix_iof_sink_t* ptr)
969 {
970 PMIX_CONSTRUCT(&ptr->wev, pmix_iof_write_event_t);
971 ptr->xoff = false;
972 ptr->exclusive = false;
973 ptr->closed = false;
974 }
975 static void iof_sink_destruct(pmix_iof_sink_t* ptr)
976 {
977 if (0 <= ptr->wev.fd) {
978 PMIX_OUTPUT_VERBOSE((20, pmix_client_globals.iof_output,
979 "%s iof: closing sink for process %s on fd %d",
980 PMIX_NAME_PRINT(&pmix_globals.myid),
981 PMIX_NAME_PRINT(&ptr->name), ptr->wev.fd));
982 PMIX_DESTRUCT(&ptr->wev);
983 }
984 }
985 PMIX_CLASS_INSTANCE(pmix_iof_sink_t,
986 pmix_list_item_t,
987 iof_sink_construct,
988 iof_sink_destruct);
989
990
991 static void iof_read_event_construct(pmix_iof_read_event_t* rev)
992 {
993 rev->fd = -1;
994 rev->active = false;
995 rev->tv.tv_sec = 0;
996 rev->tv.tv_usec = 0;
997 rev->targets = NULL;
998 rev->ntargets = 0;
999 rev->directives = NULL;
1000 rev->ndirs = 0;
1001 }
1002 static void iof_read_event_destruct(pmix_iof_read_event_t* rev)
1003 {
1004 pmix_event_del(&rev->ev);
1005 if (0 <= rev->fd) {
1006 PMIX_OUTPUT_VERBOSE((20, pmix_client_globals.iof_output,
1007 "%s iof: closing fd %d",
1008 PMIX_NAME_PRINT(&pmix_globals.myid), rev->fd));
1009 close(rev->fd);
1010 rev->fd = -1;
1011 }
1012 if (NULL != rev->targets) {
1013 PMIX_PROC_FREE(rev->targets, rev->ntargets);
1014 }
1015 if (NULL != rev->directives) {
1016 PMIX_INFO_FREE(rev->directives, rev->ndirs);
1017 }
1018 }
1019 PMIX_CLASS_INSTANCE(pmix_iof_read_event_t,
1020 pmix_object_t,
1021 iof_read_event_construct,
1022 iof_read_event_destruct);
1023
1024 static void iof_write_event_construct(pmix_iof_write_event_t* wev)
1025 {
1026 wev->pending = false;
1027 wev->always_writable = false;
1028 wev->fd = -1;
1029 PMIX_CONSTRUCT(&wev->outputs, pmix_list_t);
1030 wev->tv.tv_sec = 0;
1031 wev->tv.tv_usec = 0;
1032 }
1033 static void iof_write_event_destruct(pmix_iof_write_event_t* wev)
1034 {
1035 pmix_event_del(&wev->ev);
1036 if (2 < wev->fd) {
1037 PMIX_OUTPUT_VERBOSE((20, pmix_client_globals.iof_output,
1038 "%s iof: closing fd %d for write event",
1039 PMIX_NAME_PRINT(&pmix_globals.myid), wev->fd));
1040 close(wev->fd);
1041 }
1042 PMIX_DESTRUCT(&wev->outputs);
1043 }
1044 PMIX_CLASS_INSTANCE(pmix_iof_write_event_t,
1045 pmix_list_item_t,
1046 iof_write_event_construct,
1047 iof_write_event_destruct);
1048
1049 PMIX_CLASS_INSTANCE(pmix_iof_write_output_t,
1050 pmix_list_item_t,
1051 NULL, NULL);