1 /*
2 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2006 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
13 * All rights reserved.
14 * Copyright (c) 2010-2018 Cisco Systems, Inc. All rights reserved
15 * Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
16 * $COPYRIGHT$
17 *
18 * Additional copyrights may follow
19 *
20 * $HEADER$
21 */
22
23 #ifndef _MCA_OOB_TCP_SENDRECV_H_
24 #define _MCA_OOB_TCP_SENDRECV_H_
25
26 #include "orte_config.h"
27
28 #include "opal/class/opal_list.h"
29 #include "opal/util/string_copy.h"
30
31 #include "orte/mca/rml/base/base.h"
32 #include "orte/util/threads.h"
33 #include "oob_tcp.h"
34 #include "oob_tcp_hdr.h"
35
36 /* forward declare */
37 struct mca_oob_tcp_peer_t;
38
39 /* tcp structure for sending a message */
40 typedef struct {
41 opal_list_item_t super;
42 opal_event_t ev;
43 struct mca_oob_tcp_peer_t *peer;
44 bool activate;
45 mca_oob_tcp_hdr_t hdr;
46 orte_rml_send_t *msg;
47 char *data;
48 bool hdr_sent;
49 int iovnum;
50 char *sdptr;
51 size_t sdbytes;
52 } mca_oob_tcp_send_t;
53 OBJ_CLASS_DECLARATION(mca_oob_tcp_send_t);
54
55 /* tcp structure for recving a message */
56 typedef struct {
57 opal_list_item_t super;
58 mca_oob_tcp_hdr_t hdr;
59 bool hdr_recvd;
60 char *data;
61 char *rdptr;
62 size_t rdbytes;
63 } mca_oob_tcp_recv_t;
64 OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
65
66 /* Queue a message to be sent to a specified peer. The macro
67 * checks to see if a message is already in position to be
68 * sent - if it is, then the message provided is simply added
69 * to the peer's message queue. If not, then the provided message
70 * is placed in the "ready" position
71 *
72 * If the provided boolean is true, then the send event for the
73 * peer is checked and activated if not already active. This allows
74 * the macro to either immediately send the message, or to queue
75 * it as "pending" for later transmission - e.g., after the
76 * connection procedure is completed
77 *
78 * p => pointer to mca_oob_tcp_peer_t
79 * s => pointer to mca_oob_tcp_send_t
80 * f => true if send event is to be activated
81 */
82 #define MCA_OOB_TCP_QUEUE_MSG(p, s, f) \
83 do { \
84 (s)->peer = (struct mca_oob_tcp_peer_t*)(p); \
85 (s)->activate = (f); \
86 ORTE_THREADSHIFT((s), (p)->ev_base, \
87 mca_oob_tcp_queue_msg, ORTE_MSG_PRI); \
88 } while(0)
89
90 /* queue a message to be sent by one of our modules - must
91 * provide the following params:
92 *
93 * m - the RML message to be sent
94 * p - the final recipient
95 */
96 #define MCA_OOB_TCP_QUEUE_SEND(m, p) \
97 do { \
98 mca_oob_tcp_send_t *_s; \
99 int i; \
100 opal_output_verbose(5, orte_oob_base_framework.framework_output, \
101 "%s:[%s:%d] queue send to %s", \
102 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
103 __FILE__, __LINE__, \
104 ORTE_NAME_PRINT(&((m)->dst))); \
105 _s = OBJ_NEW(mca_oob_tcp_send_t); \
106 /* setup the header */ \
107 _s->hdr.origin = (m)->origin; \
108 _s->hdr.dst = (m)->dst; \
109 _s->hdr.type = MCA_OOB_TCP_USER; \
110 _s->hdr.tag = (m)->tag; \
111 _s->hdr.seq_num = (m)->seq_num; \
112 /* point to the actual message */ \
113 _s->msg = (m); \
114 /* set the total number of bytes to be sent */ \
115 if (NULL != (m)->buffer) { \
116 _s->hdr.nbytes = (m)->buffer->bytes_used; \
117 } else if (NULL != (m)->iov) { \
118 _s->hdr.nbytes = 0; \
119 for (i=0; i < (m)->count; i++) { \
120 _s->hdr.nbytes += (m)->iov[i].iov_len; \
121 } \
122 } else { \
123 _s->hdr.nbytes = (m)->count; \
124 } \
125 /* prep header for xmission */ \
126 MCA_OOB_TCP_HDR_HTON(&_s->hdr); \
127 /* start the send with the header */ \
128 _s->sdptr = (char*)&_s->hdr; \
129 _s->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
130 /* add to the msg queue for this peer */ \
131 MCA_OOB_TCP_QUEUE_MSG((p), _s, true); \
132 } while(0)
133
134 /* queue a message to be sent by one of our modules upon completing
135 * the connection process - must provide the following params:
136 *
137 * m - the RML message to be sent
138 * p - the final recipient
139 */
140 #define MCA_OOB_TCP_QUEUE_PENDING(m, p) \
141 do { \
142 mca_oob_tcp_send_t *_s; \
143 int i; \
144 opal_output_verbose(5, orte_oob_base_framework.framework_output, \
145 "%s:[%s:%d] queue pending to %s", \
146 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
147 __FILE__, __LINE__, \
148 ORTE_NAME_PRINT(&((m)->dst))); \
149 _s = OBJ_NEW(mca_oob_tcp_send_t); \
150 /* setup the header */ \
151 _s->hdr.origin = (m)->origin; \
152 _s->hdr.dst = (m)->dst; \
153 _s->hdr.type = MCA_OOB_TCP_USER; \
154 _s->hdr.tag = (m)->tag; \
155 _s->hdr.seq_num = (m)->seq_num; \
156 /* point to the actual message */ \
157 _s->msg = (m); \
158 /* set the total number of bytes to be sent */ \
159 if (NULL != (m)->buffer) { \
160 _s->hdr.nbytes = (m)->buffer->bytes_used; \
161 } else if (NULL != (m)->iov) { \
162 _s->hdr.nbytes = 0; \
163 for (i=0; i < (m)->count; i++) { \
164 _s->hdr.nbytes += (m)->iov[i].iov_len; \
165 } \
166 } else { \
167 _s->hdr.nbytes = (m)->count; \
168 } \
169 /* prep header for xmission */ \
170 MCA_OOB_TCP_HDR_HTON(&_s->hdr); \
171 /* start the send with the header */ \
172 _s->sdptr = (char*)&_s->hdr; \
173 _s->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
174 /* add to the msg queue for this peer */ \
175 MCA_OOB_TCP_QUEUE_MSG((p), _s, false); \
176 } while(0)
177
178 /* queue a message for relay by one of our modules - must
179 * provide the following params:
180 *
181 * m = the mca_oob_tcp_recv_t that was received
182 * p - the next hop
183 */
184 #define MCA_OOB_TCP_QUEUE_RELAY(m, p) \
185 do { \
186 mca_oob_tcp_send_t *_s; \
187 opal_output_verbose(5, orte_oob_base_framework.framework_output, \
188 "%s:[%s:%d] queue relay to %s", \
189 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
190 __FILE__, __LINE__, \
191 ORTE_NAME_PRINT(&((p)->name))); \
192 _s = OBJ_NEW(mca_oob_tcp_send_t); \
193 /* setup the header */ \
194 _s->hdr.origin = (m)->hdr.origin; \
195 _s->hdr.dst = (m)->hdr.dst; \
196 _s->hdr.type = MCA_OOB_TCP_USER; \
197 _s->hdr.tag = (m)->hdr.tag; \
198 (void)opal_string_copy(_s->hdr.routed, (m)->hdr.routed, \
199 ORTE_MAX_RTD_SIZE); \
200 /* point to the actual message */ \
201 _s->data = (m)->data; \
202 /* set the total number of bytes to be sent */ \
203 _s->hdr.nbytes = (m)->hdr.nbytes; \
204 /* prep header for xmission */ \
205 MCA_OOB_TCP_HDR_HTON(&_s->hdr); \
206 /* start the send with the header */ \
207 _s->sdptr = (char*)&_s->hdr; \
208 _s->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
209 /* add to the msg queue for this peer */ \
210 MCA_OOB_TCP_QUEUE_MSG((p), _s, true); \
211 } while(0)
212
213 /* State machine for processing message */
214 typedef struct {
215 opal_object_t super;
216 opal_event_t ev;
217 orte_rml_send_t *msg;
218 } mca_oob_tcp_msg_op_t;
219 OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
220
221 #define ORTE_ACTIVATE_TCP_POST_SEND(ms, cbfunc) \
222 do { \
223 mca_oob_tcp_msg_op_t *mop; \
224 opal_output_verbose(5, orte_oob_base_framework.framework_output, \
225 "%s:[%s:%d] post send to %s", \
226 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
227 __FILE__, __LINE__, \
228 ORTE_NAME_PRINT(&((ms)->dst))); \
229 mop = OBJ_NEW(mca_oob_tcp_msg_op_t); \
230 mop->msg = (ms); \
231 ORTE_THREADSHIFT(mop, (ms)->peer->ev_base, \
232 (cbfunc), ORTE_MSG_PRI); \
233 } while(0);
234
235 typedef struct {
236 opal_object_t super;
237 opal_event_t ev;
238 orte_rml_send_t *rmsg;
239 mca_oob_tcp_send_t *snd;
240 orte_process_name_t hop;
241 } mca_oob_tcp_msg_error_t;
242 OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
243
244 #define ORTE_ACTIVATE_TCP_MSG_ERROR(s, r, h, cbfunc) \
245 do { \
246 mca_oob_tcp_msg_error_t *mop; \
247 mca_oob_tcp_send_t *snd; \
248 mca_oob_tcp_recv_t *proxy; \
249 opal_output_verbose(5, orte_oob_base_framework.framework_output, \
250 "%s:[%s:%d] post msg error to %s", \
251 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
252 __FILE__, __LINE__, \
253 ORTE_NAME_PRINT((h))); \
254 mop = OBJ_NEW(mca_oob_tcp_msg_error_t); \
255 if (NULL != (s)) { \
256 mop->snd = (s); \
257 } else if (NULL != (r)) { \
258 /* use a proxy so we can pass NULL into the macro */ \
259 proxy = (r); \
260 /* create a send object for this message */ \
261 snd = OBJ_NEW(mca_oob_tcp_send_t); \
262 mop->snd = snd; \
263 /* transfer and prep the header */ \
264 snd->hdr = proxy->hdr; \
265 MCA_OOB_TCP_HDR_HTON(&snd->hdr); \
266 /* point to the data */ \
267 snd->data = proxy->data; \
268 /* start the message with the header */ \
269 snd->sdptr = (char*)&snd->hdr; \
270 snd->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
271 /* protect the data */ \
272 proxy->data = NULL; \
273 } \
274 mop->hop.jobid = (h)->jobid; \
275 mop->hop.vpid = (h)->vpid; \
276 /* this goes to the OOB framework, so use that event base */ \
277 ORTE_THREADSHIFT(mop, orte_oob_base.ev_base, \
278 (cbfunc), ORTE_MSG_PRI); \
279 } while(0)
280
281 #define ORTE_ACTIVATE_TCP_NO_ROUTE(r, h, c) \
282 do { \
283 mca_oob_tcp_msg_error_t *mop; \
284 opal_output_verbose(5, orte_oob_base_framework.framework_output, \
285 "%s:[%s:%d] post no route to %s", \
286 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
287 __FILE__, __LINE__, \
288 ORTE_NAME_PRINT((h))); \
289 mop = OBJ_NEW(mca_oob_tcp_msg_error_t); \
290 mop->rmsg = (r); \
291 mop->hop.jobid = (h)->jobid; \
292 mop->hop.vpid = (h)->vpid; \
293 /* this goes to the component, so use the framework \
294 * event base */ \
295 ORTE_THREADSHIFT(mop, orte_oob_base.ev_base, \
296 (c), ORTE_MSG_PRI); \
297 } while(0)
298
299 #endif /* _MCA_OOB_TCP_SENDRECV_H_ */