1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2006 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
14 * reserved.
15 * Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
16 * $COPYRIGHT$
17 *
18 * Additional copyrights may follow
19 *
20 * $HEADER$
21 */
22
23 #include "orte_config.h"
24 #include "opal/types.h"
25
26 #include "opal/dss/dss.h"
27 #include "opal/util/output.h"
28
29 #include "orte/mca/errmgr/errmgr.h"
30 #include "orte/mca/oob/base/base.h"
31 #include "orte/util/name_fns.h"
32 #include "orte/util/threads.h"
33 #include "orte/runtime/orte_globals.h"
34
35 #include "orte/mca/rml/base/base.h"
36 #include "orte/mca/rml/rml_types.h"
37 #include "rml_oob.h"
38
39 static void send_self_exe(int fd, short args, void* data)
40 {
41 orte_self_send_xfer_t *xfer = (orte_self_send_xfer_t*)data;
42
43 ORTE_ACQUIRE_OBJECT(xfer);
44
45 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
46 "%s rml_send_to_self callback executing for tag %d",
47 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xfer->tag));
48
49 /* execute the send callback function - note that
50 * send-to-self always returns a SUCCESS status
51 */
52 if (NULL != xfer->iov) {
53 if (NULL != xfer->cbfunc.iov) {
54 /* non-blocking iovec send */
55 xfer->cbfunc.iov(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->iov, xfer->count,
56 xfer->tag, xfer->cbdata);
57 }
58 } else if (NULL != xfer->buffer) {
59 if (NULL != xfer->cbfunc.buffer) {
60 /* non-blocking buffer send */
61 xfer->cbfunc.buffer(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->buffer,
62 xfer->tag, xfer->cbdata);
63 }
64 } else {
65 /* should never happen */
66 abort();
67 }
68
69 /* cleanup the memory */
70 OBJ_RELEASE(xfer);
71 }
72
73 int orte_rml_oob_send_nb(orte_process_name_t* peer,
74 struct iovec* iov,
75 int count,
76 orte_rml_tag_t tag,
77 orte_rml_callback_fn_t cbfunc,
78 void* cbdata)
79 {
80 orte_rml_recv_t *rcv;
81 orte_rml_send_t *snd;
82 int bytes;
83 orte_self_send_xfer_t *xfer;
84 int i;
85 char* ptr;
86
87 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
88 "%s rml_send to peer %s at tag %d",
89 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
90 ORTE_NAME_PRINT(peer), tag));
91
92 if (ORTE_RML_TAG_INVALID == tag) {
93 /* cannot send to an invalid tag */
94 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
95 return ORTE_ERR_BAD_PARAM;
96 }
97 if (NULL == peer ||
98 OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
99 /* cannot send to an invalid peer */
100 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
101 return ORTE_ERR_BAD_PARAM;
102 }
103
104 /* if this is a message to myself, then just post the message
105 * for receipt - no need to dive into the oob
106 */
107 if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
108 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
109 "%s rml_send_iovec_to_self at tag %d",
110 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
111 /* send to self is a tad tricky - we really don't want
112 * to track the send callback function throughout the recv
113 * process and execute it upon receipt as this would provide
114 * very different timing from a non-self message. Specifically,
115 * if we just retain a pointer to the incoming data
116 * and then execute the send callback prior to the receive,
117 * then the caller will think we are done with the data and
118 * can release it. So we have to copy the data in order to
119 * execute the send callback prior to receiving the message.
120 *
121 * In truth, this really is a better mimic of the non-self
122 * message behavior. If we actually pushed the message out
123 * on the wire and had it loop back, then we would receive
124 * a new block of data anyway.
125 */
126
127 /* setup the send callback */
128 xfer = OBJ_NEW(orte_self_send_xfer_t);
129 xfer->iov = iov;
130 xfer->count = count;
131 xfer->cbfunc.iov = cbfunc;
132 xfer->tag = tag;
133 xfer->cbdata = cbdata;
134 /* setup the event for the send callback */
135 ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
136
137 /* copy the message for the recv */
138 rcv = OBJ_NEW(orte_rml_recv_t);
139 rcv->sender = *peer;
140 rcv->tag = tag;
141 /* get the total number of bytes in the iovec array */
142 bytes = 0;
143 for (i = 0 ; i < count ; ++i) {
144 bytes += iov[i].iov_len;
145 }
146 /* get the required memory allocation */
147 if (0 < bytes) {
148 rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
149 rcv->iov.iov_len = bytes;
150 /* transfer the bytes */
151 ptr = (char*)rcv->iov.iov_base;
152 for (i = 0 ; i < count ; ++i) {
153 memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
154 ptr += iov[i].iov_len;
155 }
156 }
157 /* post the message for receipt - since the send callback was posted
158 * first and has the same priority, it will execute first
159 */
160 ORTE_RML_ACTIVATE_MESSAGE(rcv);
161 return ORTE_SUCCESS;
162 }
163
164 snd = OBJ_NEW(orte_rml_send_t);
165 snd->dst = *peer;
166 snd->origin = *ORTE_PROC_MY_NAME;
167 snd->tag = tag;
168 snd->iov = iov;
169 snd->count = count;
170 snd->cbfunc.iov = cbfunc;
171 snd->cbdata = cbdata;
172
173 /* activate the OOB send state */
174 ORTE_OOB_SEND(snd);
175
176 return ORTE_SUCCESS;
177 }
178
179 int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
180 opal_buffer_t* buffer,
181 orte_rml_tag_t tag,
182 orte_rml_buffer_callback_fn_t cbfunc,
183 void* cbdata)
184 {
185 orte_rml_recv_t *rcv;
186 orte_rml_send_t *snd;
187 orte_self_send_xfer_t *xfer;
188
189 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
190 "%s rml_send_buffer to peer %s at tag %d",
191 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
192 ORTE_NAME_PRINT(peer), tag));
193
194 if (ORTE_RML_TAG_INVALID == tag) {
195 /* cannot send to an invalid tag */
196 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
197 return ORTE_ERR_BAD_PARAM;
198 }
199 if (NULL == peer ||
200 OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
201 /* cannot send to an invalid peer */
202 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
203 return ORTE_ERR_BAD_PARAM;
204 }
205
206 /* if this is a message to myself, then just post the message
207 * for receipt - no need to dive into the oob
208 */
209 if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
210 OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
211 "%s rml_send_iovec_to_self at tag %d",
212 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
213 /* send to self is a tad tricky - we really don't want
214 * to track the send callback function throughout the recv
215 * process and execute it upon receipt as this would provide
216 * very different timing from a non-self message. Specifically,
217 * if we just retain a pointer to the incoming data
218 * and then execute the send callback prior to the receive,
219 * then the caller will think we are done with the data and
220 * can release it. So we have to copy the data in order to
221 * execute the send callback prior to receiving the message.
222 *
223 * In truth, this really is a better mimic of the non-self
224 * message behavior. If we actually pushed the message out
225 * on the wire and had it loop back, then we would receive
226 * a new block of data anyway.
227 */
228
229 /* setup the send callback */
230 xfer = OBJ_NEW(orte_self_send_xfer_t);
231 xfer->buffer = buffer;
232 xfer->cbfunc.buffer = cbfunc;
233 xfer->tag = tag;
234 xfer->cbdata = cbdata;
235 /* setup the event for the send callback */
236 ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
237
238 /* copy the message for the recv */
239 rcv = OBJ_NEW(orte_rml_recv_t);
240 rcv->sender = *peer;
241 rcv->tag = tag;
242 rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
243 memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
244 rcv->iov.iov_len = buffer->bytes_used;
245 /* post the message for receipt - since the send callback was posted
246 * first and has the same priority, it will execute first
247 */
248 ORTE_RML_ACTIVATE_MESSAGE(rcv);
249 return ORTE_SUCCESS;
250 }
251
252 snd = OBJ_NEW(orte_rml_send_t);
253 snd->dst = *peer;
254 snd->origin = *ORTE_PROC_MY_NAME;
255 snd->tag = tag;
256 snd->buffer = buffer;
257 snd->cbfunc.buffer = cbfunc;
258 snd->cbdata = cbdata;
259
260 /* activate the OOB send state */
261 ORTE_OOB_SEND(snd);
262
263 return ORTE_SUCCESS;
264 }