This source file includes following definitions.
- send_cb
- orte_iof_orted_send_xonxoff
- orte_iof_orted_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "orte_config.h"
24 #include "orte/constants.h"
25
26 #include <errno.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30 #include <string.h>
31
32 #include "opal/dss/dss.h"
33
34 #include "orte/mca/rml/rml.h"
35 #include "orte/mca/rml/rml_types.h"
36 #include "orte/mca/errmgr/errmgr.h"
37 #include "orte/util/name_fns.h"
38 #include "orte/runtime/orte_globals.h"
39
40 #include "orte/mca/iof/iof_types.h"
41 #include "orte/mca/iof/base/base.h"
42
43 #include "iof_orted.h"
44
45 static void send_cb(int status, orte_process_name_t *peer,
46 opal_buffer_t *buf, orte_rml_tag_t tag,
47 void *cbdata)
48 {
49
50 OBJ_RELEASE(buf);
51 }
52
53 void orte_iof_orted_send_xonxoff(orte_iof_tag_t tag)
54 {
55 opal_buffer_t *buf;
56 int rc;
57
58 buf = OBJ_NEW(opal_buffer_t);
59
60
61
62
63 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
64 ORTE_ERROR_LOG(rc);
65 OBJ_RELEASE(buf);
66 return;
67 }
68
69 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
70 "%s sending %s",
71 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
72 (ORTE_IOF_XON == tag) ? "xon" : "xoff"));
73
74
75 if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
76 send_cb, NULL))) {
77 ORTE_ERROR_LOG(rc);
78 }
79 }
80
81
82
83
84
85
86
87
88
89 void orte_iof_orted_recv(int status, orte_process_name_t* sender,
90 opal_buffer_t* buffer, orte_rml_tag_t tag,
91 void* cbdata)
92 {
93 unsigned char data[ORTE_IOF_BASE_MSG_MAX];
94 orte_iof_tag_t stream;
95 int32_t count, numbytes;
96 orte_process_name_t target;
97 orte_iof_proc_t *proct;
98 int rc;
99
100
101 count = 1;
102 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
103 ORTE_ERROR_LOG(rc);
104 return;
105 }
106
107
108 if (ORTE_IOF_STDIN != stream) {
109 ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
110 return;
111 }
112
113
114 count = 1;
115 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &target, &count, ORTE_NAME))) {
116 ORTE_ERROR_LOG(rc);
117 return;
118 }
119
120
121 numbytes=ORTE_IOF_BASE_MSG_MAX;
122 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
123 ORTE_ERROR_LOG(rc);
124 return;
125 }
126
127
128 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
129 "%s unpacked %d bytes for local proc %s",
130 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
131 ORTE_NAME_PRINT(&target)));
132
133
134 OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
135
136 if (target.jobid == proct->name.jobid) {
137
138 if (ORTE_VPID_WILDCARD == target.vpid ||
139 proct->name.vpid == target.vpid) {
140 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
141 "%s writing data to local proc %s",
142 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
143 ORTE_NAME_PRINT(&proct->name)));
144 if (NULL == proct->stdinev) {
145 continue;
146 }
147
148
149
150
151 if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdinev->wev)) {
152
153
154
155 if (!mca_iof_orted_component.xoff) {
156 mca_iof_orted_component.xoff = true;
157 orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
158 }
159 }
160 }
161 }
162 }
163 }