This source file includes following definitions.
- send_self_exe
- orte_rml_oob_send_nb
- orte_rml_oob_send_buffer_nb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "orte_config.h"
24 #include "opal/types.h"
25
26 #include "opal/dss/dss.h"
27 #include "opal/util/output.h"
28
29 #include "orte/mca/errmgr/errmgr.h"
30 #include "orte/mca/oob/base/base.h"
31 #include "orte/util/name_fns.h"
32 #include "orte/util/threads.h"
33 #include "orte/runtime/orte_globals.h"
34
35 #include "orte/mca/rml/base/base.h"
36 #include "orte/mca/rml/rml_types.h"
37 #include "rml_oob.h"
38
39 static void send_self_exe(int fd, short args, void* data)
40 {
41 orte_self_send_xfer_t *xfer = (orte_self_send_xfer_t*)data;
42
43 ORTE_ACQUIRE_OBJECT(xfer);
44
45 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
46 "%s rml_send_to_self callback executing for tag %d",
47 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xfer->tag));
48
49
50
51
52 if (NULL != xfer->iov) {
53 if (NULL != xfer->cbfunc.iov) {
54
55 xfer->cbfunc.iov(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->iov, xfer->count,
56 xfer->tag, xfer->cbdata);
57 }
58 } else if (NULL != xfer->buffer) {
59 if (NULL != xfer->cbfunc.buffer) {
60
61 xfer->cbfunc.buffer(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->buffer,
62 xfer->tag, xfer->cbdata);
63 }
64 } else {
65
66 abort();
67 }
68
69
70 OBJ_RELEASE(xfer);
71 }
72
73 int orte_rml_oob_send_nb(orte_process_name_t* peer,
74 struct iovec* iov,
75 int count,
76 orte_rml_tag_t tag,
77 orte_rml_callback_fn_t cbfunc,
78 void* cbdata)
79 {
80 orte_rml_recv_t *rcv;
81 orte_rml_send_t *snd;
82 int bytes;
83 orte_self_send_xfer_t *xfer;
84 int i;
85 char* ptr;
86
87 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
88 "%s rml_send to peer %s at tag %d",
89 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
90 ORTE_NAME_PRINT(peer), tag));
91
92 if (ORTE_RML_TAG_INVALID == tag) {
93
94 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
95 return ORTE_ERR_BAD_PARAM;
96 }
97 if (NULL == peer ||
98 OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
99
100 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
101 return ORTE_ERR_BAD_PARAM;
102 }
103
104
105
106
107 if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {
108 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
109 "%s rml_send_iovec_to_self at tag %d",
110 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 xfer = OBJ_NEW(orte_self_send_xfer_t);
129 xfer->iov = iov;
130 xfer->count = count;
131 xfer->cbfunc.iov = cbfunc;
132 xfer->tag = tag;
133 xfer->cbdata = cbdata;
134
135 ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
136
137
138 rcv = OBJ_NEW(orte_rml_recv_t);
139 rcv->sender = *peer;
140 rcv->tag = tag;
141
142 bytes = 0;
143 for (i = 0 ; i < count ; ++i) {
144 bytes += iov[i].iov_len;
145 }
146
147 if (0 < bytes) {
148 rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
149 rcv->iov.iov_len = bytes;
150
151 ptr = (char*)rcv->iov.iov_base;
152 for (i = 0 ; i < count ; ++i) {
153 memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
154 ptr += iov[i].iov_len;
155 }
156 }
157
158
159
160 ORTE_RML_ACTIVATE_MESSAGE(rcv);
161 return ORTE_SUCCESS;
162 }
163
164 snd = OBJ_NEW(orte_rml_send_t);
165 snd->dst = *peer;
166 snd->origin = *ORTE_PROC_MY_NAME;
167 snd->tag = tag;
168 snd->iov = iov;
169 snd->count = count;
170 snd->cbfunc.iov = cbfunc;
171 snd->cbdata = cbdata;
172
173
174 ORTE_OOB_SEND(snd);
175
176 return ORTE_SUCCESS;
177 }
178
179 int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
180 opal_buffer_t* buffer,
181 orte_rml_tag_t tag,
182 orte_rml_buffer_callback_fn_t cbfunc,
183 void* cbdata)
184 {
185 orte_rml_recv_t *rcv;
186 orte_rml_send_t *snd;
187 orte_self_send_xfer_t *xfer;
188
189 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
190 "%s rml_send_buffer to peer %s at tag %d",
191 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
192 ORTE_NAME_PRINT(peer), tag));
193
194 if (ORTE_RML_TAG_INVALID == tag) {
195
196 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
197 return ORTE_ERR_BAD_PARAM;
198 }
199 if (NULL == peer ||
200 OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
201
202 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
203 return ORTE_ERR_BAD_PARAM;
204 }
205
206
207
208
209 if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {
210 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
211 "%s rml_send_iovec_to_self at tag %d",
212 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230 xfer = OBJ_NEW(orte_self_send_xfer_t);
231 xfer->buffer = buffer;
232 xfer->cbfunc.buffer = cbfunc;
233 xfer->tag = tag;
234 xfer->cbdata = cbdata;
235
236 ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
237
238
239 rcv = OBJ_NEW(orte_rml_recv_t);
240 rcv->sender = *peer;
241 rcv->tag = tag;
242 rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
243 memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
244 rcv->iov.iov_len = buffer->bytes_used;
245
246
247
248 ORTE_RML_ACTIVATE_MESSAGE(rcv);
249 return ORTE_SUCCESS;
250 }
251
252 snd = OBJ_NEW(orte_rml_send_t);
253 snd->dst = *peer;
254 snd->origin = *ORTE_PROC_MY_NAME;
255 snd->tag = tag;
256 snd->buffer = buffer;
257 snd->cbfunc.buffer = cbfunc;
258 snd->cbdata = cbdata;
259
260
261 ORTE_OOB_SEND(snd);
262
263 return ORTE_SUCCESS;
264 }