This source file includes following definitions.
- accept_connection
- ping
- send_nb
- recv_handler
- ft_event
- ft_event
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "orte_config.h"
28 #include "orte/types.h"
29 #include "opal/types.h"
30
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34 #ifdef HAVE_SYS_TYPES_H
35 #include <sys/types.h>
36 #endif
37 #include <fcntl.h>
38 #ifdef HAVE_NETINET_IN_H
39 #include <netinet/in.h>
40 #endif
41 #ifdef HAVE_ARPA_INET_H
42 #include <arpa/inet.h>
43 #endif
44 #ifdef HAVE_NETDB_H
45 #include <netdb.h>
46 #endif
47 #include <ctype.h>
48
49 #include "opal/runtime/opal_progress_threads.h"
50 #include "opal/util/show_help.h"
51 #include "opal/util/error.h"
52 #include "opal/util/output.h"
53 #include "opal/opal_socket_errno.h"
54 #include "opal/util/if.h"
55 #include "opal/util/net.h"
56 #include "opal/util/argv.h"
57 #include "opal/class/opal_hash_table.h"
58
59 #include "orte/mca/errmgr/errmgr.h"
60 #include "orte/mca/ess/ess.h"
61 #include "orte/mca/routed/routed.h"
62 #include "orte/util/name_fns.h"
63 #include "orte/util/parse_options.h"
64 #include "orte/util/show_help.h"
65 #include "orte/util/threads.h"
66 #include "orte/runtime/orte_globals.h"
67
68 #include "orte/mca/oob/tcp/oob_tcp.h"
69 #include "orte/mca/oob/tcp/oob_tcp_component.h"
70 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
71 #include "orte/mca/oob/tcp/oob_tcp_common.h"
72 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
73 #include "orte/mca/oob/tcp/oob_tcp_sendrecv.h"
74
75 static void accept_connection(const int accepted_fd,
76 const struct sockaddr *addr);
77 static void ping(const orte_process_name_t *proc);
78 static void send_nb(orte_rml_send_t *msg);
79 static void ft_event(int state);
80
81 mca_oob_tcp_module_t mca_oob_tcp_module = {
82 .accept_connection = accept_connection,
83 .ping = ping,
84 .send_nb = send_nb,
85 .ft_event = ft_event
86 };
87
88
89
90
91 static void recv_handler(int sd, short flags, void* user);
92
93
94
95
96
97
98
99 static void accept_connection(const int accepted_fd,
100 const struct sockaddr *addr)
101 {
102 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
103 "%s accept_connection: %s:%d\n",
104 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
105 opal_net_get_hostname(addr),
106 opal_net_get_port(addr));
107
108
109 orte_oob_tcp_set_socket_options(accepted_fd);
110
111
112
113
114 ORTE_ACTIVATE_TCP_ACCEPT_STATE(accepted_fd, addr, recv_handler);
115 }
116
117
118 static void ping(const orte_process_name_t *proc)
119 {
120 mca_oob_tcp_peer_t *peer;
121
122 opal_output_verbose(2, orte_oob_base_framework.framework_output,
123 "%s:[%s:%d] processing ping to peer %s",
124 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
125 __FILE__, __LINE__,
126 ORTE_NAME_PRINT(proc));
127
128
129 if (NULL == (peer = mca_oob_tcp_peer_lookup(proc))) {
130
131
132
133
134
135 opal_output_verbose(2, orte_oob_base_framework.framework_output,
136 "%s:[%s:%d] hop %s unknown",
137 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
138 __FILE__, __LINE__,
139 ORTE_NAME_PRINT(proc));
140 ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, NULL, proc, mca_oob_tcp_component_hop_unknown);
141 return;
142 }
143
144
145 if (NULL == peer->ev_base) {
146
147 ORTE_OOB_TCP_NEXT_BASE(peer);
148 }
149
150
151 if (MCA_OOB_TCP_CONNECTED == peer->state) {
152 opal_output_verbose(2, orte_oob_base_framework.framework_output,
153 "%s:[%s:%d] already connected to peer %s",
154 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
155 __FILE__, __LINE__,
156 ORTE_NAME_PRINT(proc));
157 return;
158 }
159
160
161 if (MCA_OOB_TCP_CONNECTING == peer->state ||
162 MCA_OOB_TCP_CONNECT_ACK == peer->state) {
163 opal_output_verbose(2, orte_oob_base_framework.framework_output,
164 "%s:[%s:%d] already connecting to peer %s",
165 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
166 __FILE__, __LINE__,
167 ORTE_NAME_PRINT(proc));
168 return;
169 }
170
171
172 peer->state = MCA_OOB_TCP_CONNECTING;
173 ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
174 }
175
176 static void send_nb(orte_rml_send_t *msg)
177 {
178 mca_oob_tcp_peer_t *peer;
179 orte_process_name_t hop;
180
181
182
183 hop = orte_routed.get_route(&msg->dst);
184
185 if (NULL == (peer = mca_oob_tcp_peer_lookup(&hop))) {
186
187
188
189
190
191 opal_output_verbose(2, orte_oob_base_framework.framework_output,
192 "%s:[%s:%d] processing send to peer %s:%d seq_num = %d hop %s unknown",
193 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
194 __FILE__, __LINE__,
195 ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num,
196 ORTE_NAME_PRINT(&hop));
197 ORTE_ACTIVATE_TCP_NO_ROUTE(msg, &hop, mca_oob_tcp_component_no_route);
198 return;
199 }
200
201 opal_output_verbose(2, orte_oob_base_framework.framework_output,
202 "%s:[%s:%d] processing send to peer %s:%d seq_num = %d via %s",
203 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
204 __FILE__, __LINE__,
205 ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num,
206 ORTE_NAME_PRINT(&peer->name));
207
208 if (NULL == peer->ev_base) {
209
210 ORTE_OOB_TCP_NEXT_BASE(peer);
211 }
212
213 if (MCA_OOB_TCP_CONNECTED == peer->state) {
214 opal_output_verbose(2, orte_oob_base_framework.framework_output,
215 "%s tcp:send_nb: already connected to %s - queueing for send",
216 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
217 ORTE_NAME_PRINT(&peer->name));
218 MCA_OOB_TCP_QUEUE_SEND(msg, peer);
219 return;
220 }
221
222
223
224
225 MCA_OOB_TCP_QUEUE_PENDING(msg, peer);
226
227 if (MCA_OOB_TCP_CONNECTING != peer->state &&
228 MCA_OOB_TCP_CONNECT_ACK != peer->state) {
229
230
231
232
233
234 opal_output_verbose(2, orte_oob_base_framework.framework_output,
235 "%s tcp:send_nb: initiating connection to %s",
236 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
237 ORTE_NAME_PRINT(&peer->name));
238 peer->state = MCA_OOB_TCP_CONNECTING;
239 ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
240 }
241 }
242
243
244
245
246
247
248
249
250 static void recv_handler(int sd, short flg, void *cbdata)
251 {
252 mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
253 int flags;
254 mca_oob_tcp_hdr_t hdr;
255 mca_oob_tcp_peer_t *peer;
256
257 ORTE_ACQUIRE_OBJECT(op);
258
259 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
260 "%s:tcp:recv:handler called",
261 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
262
263
264 if (ORTE_SUCCESS != mca_oob_tcp_peer_recv_connect_ack(NULL, sd, &hdr)) {
265 goto cleanup;
266 }
267
268
269 if (MCA_OOB_TCP_IDENT == hdr.type) {
270 if (NULL == (peer = mca_oob_tcp_peer_lookup(&hdr.origin))) {
271
272 mca_oob_tcp_peer_close(peer);
273 goto cleanup;
274 }
275
276 if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
277 opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_GETFL) failed: %s (%d)",
278 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
279 } else {
280 flags |= O_NONBLOCK;
281 if (fcntl(sd, F_SETFL, flags) < 0) {
282 opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_SETFL) failed: %s (%d)",
283 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
284 }
285 }
286
287 peer->sd = sd;
288 if (mca_oob_tcp_peer_accept(peer) == false) {
289 if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
290 opal_output(0, "%s-%s mca_oob_tcp_recv_connect: "
291 "rejected connection from %s connection state %d",
292 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
293 ORTE_NAME_PRINT(&(peer->name)),
294 ORTE_NAME_PRINT(&(hdr.origin)),
295 peer->state);
296 }
297 CLOSE_THE_SOCKET(sd);
298 }
299 }
300
301 cleanup:
302 OBJ_RELEASE(op);
303 }
304
305
306 #if OPAL_ENABLE_FT_CR == 0
307 static void ft_event(int state)
308 {
309 return;
310 }
311
312 #else
313 static void ft_event(int state) {
314 #if 0
315 opal_list_item_t *item;
316 #endif
317
318 if(OPAL_CRS_CHECKPOINT == state) {
319 #if 0
320
321
322
323 opal_event_disable();
324 #endif
325 }
326 else if(OPAL_CRS_CONTINUE == state) {
327 #if 0
328
329
330
331 opal_event_enable();
332 }
333 else if(OPAL_CRS_RESTART == state) {
334
335
336
337
338 for (item = opal_list_remove_first(&mca_oob_tcp_module.peer_list);
339 item != NULL;
340 item = opal_list_remove_first(&mca_oob_tcp_module.peer_list)) {
341 mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;
342
343
344
345 MCA_OOB_TCP_PEER_RETURN(peer);
346 }
347
348 OBJ_DESTRUCT(&mca_oob_tcp_module.peer_free);
349 OBJ_DESTRUCT(&mca_oob_tcp_module.peer_names);
350 OBJ_DESTRUCT(&mca_oob_tcp_module.peers);
351 OBJ_DESTRUCT(&mca_oob_tcp_module.peer_list);
352
353 OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_list, opal_list_t);
354 OBJ_CONSTRUCT(&mca_oob_tcp_module.peers, opal_hash_table_t);
355 OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_names, opal_hash_table_t);
356 OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_free, opal_free_list_t);
357
358
359
360
361 opal_event_enable();
362 #endif
363 }
364 else if(OPAL_CRS_TERM == state ) {
365 ;
366 }
367 else {
368 ;
369 }
370
371 return;
372 }
373 #endif