This source file includes following definitions.
- mca_oob_tcp_queue_msg
- send_msg
- mca_oob_tcp_send_handler
- read_bytes
- mca_oob_tcp_recv_handler
- snd_cons
- snd_des
- rcv_cons
- err_cons
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "orte_config.h"
32
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <fcntl.h>
37 #ifdef HAVE_SYS_UIO_H
38 #include <sys/uio.h>
39 #endif
40 #ifdef HAVE_NET_UIO_H
41 #include <net/uio.h>
42 #endif
43 #ifdef HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46 #include "opal/opal_socket_errno.h"
47 #ifdef HAVE_NETINET_IN_H
48 #include <netinet/in.h>
49 #endif
50 #ifdef HAVE_ARPA_INET_H
51 #include <arpa/inet.h>
52 #endif
53 #ifdef HAVE_NETINET_TCP_H
54 #include <netinet/tcp.h>
55 #endif
56
57 #include "opal_stdint.h"
58 #include "opal/types.h"
59 #include "opal/mca/backtrace/backtrace.h"
60 #include "opal/util/output.h"
61 #include "opal/util/net.h"
62 #include "opal/util/error.h"
63 #include "opal/class/opal_hash_table.h"
64 #include "opal/mca/event/event.h"
65
66 #include "orte/util/name_fns.h"
67 #include "orte/util/threads.h"
68 #include "orte/runtime/orte_globals.h"
69 #include "orte/mca/errmgr/errmgr.h"
70 #include "orte/mca/ess/ess.h"
71 #include "orte/mca/routed/routed.h"
72 #include "orte/mca/state/state.h"
73 #include "orte/runtime/orte_wait.h"
74
75 #include "oob_tcp.h"
76 #include "orte/mca/oob/tcp/oob_tcp_component.h"
77 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
78 #include "orte/mca/oob/tcp/oob_tcp_common.h"
79 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
80
81 #define OOB_SEND_MAX_RETRIES 3
82
83 void mca_oob_tcp_queue_msg(int sd, short args, void *cbdata)
84 {
85 mca_oob_tcp_send_t *snd = (mca_oob_tcp_send_t*)cbdata;
86 mca_oob_tcp_peer_t *peer;
87
88 ORTE_ACQUIRE_OBJECT(snd);
89 peer = (mca_oob_tcp_peer_t*)snd->peer;
90
91
92 if (NULL == peer->send_msg) {
93 peer->send_msg = snd;
94 } else {
95
96 opal_list_append(&peer->send_queue, &snd->super);
97 }
98 if (snd->activate) {
99
100 if (MCA_OOB_TCP_CONNECTED != peer->state) {
101 peer->state = MCA_OOB_TCP_CONNECTING;
102 ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
103 } else {
104
105 if (!peer->send_ev_active) {
106 peer->send_ev_active = true;
107 ORTE_POST_OBJECT(peer);
108 opal_event_add(&peer->send_event, 0);
109 }
110 }
111 }
112 }
113
114 static int send_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_send_t* msg)
115 {
116 struct iovec iov[2];
117 int iov_count, retries = 0;
118 ssize_t remain = msg->sdbytes, rc;
119
120 iov[0].iov_base = msg->sdptr;
121 iov[0].iov_len = msg->sdbytes;
122 if (!msg->hdr_sent) {
123 if (NULL != msg->data) {
124
125 iov[1].iov_base = msg->data;
126 } else if (NULL != msg->msg->buffer) {
127
128 iov[1].iov_base = msg->msg->buffer->base_ptr;
129 } else {
130 iov[1].iov_base = msg->msg->data;
131 }
132 iov[1].iov_len = ntohl(msg->hdr.nbytes);
133 remain += ntohl(msg->hdr.nbytes);
134 iov_count = 2;
135 } else {
136 iov_count = 1;
137 }
138
139 retry:
140 rc = writev(peer->sd, iov, iov_count);
141 if (OPAL_LIKELY(rc == remain)) {
142
143 msg->hdr_sent = true;
144 msg->sdbytes = 0;
145 msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
146 return ORTE_SUCCESS;
147 } else if (rc < 0) {
148 if (opal_socket_errno == EINTR) {
149 goto retry;
150 } else if (opal_socket_errno == EAGAIN) {
151
152
153
154
155 ++retries;
156 if (retries < OOB_SEND_MAX_RETRIES) {
157 goto retry;
158 }
159 return ORTE_ERR_RESOURCE_BUSY;
160 } else if (opal_socket_errno == EWOULDBLOCK) {
161
162
163
164
165 ++retries;
166 if (retries < OOB_SEND_MAX_RETRIES) {
167 goto retry;
168 }
169 return ORTE_ERR_WOULD_BLOCK;
170 } else {
171
172 opal_output(0, "oob:tcp: send_msg: write failed: %s (%d) [sd = %d]",
173 strerror(opal_socket_errno),
174 opal_socket_errno, peer->sd);
175 return ORTE_ERR_UNREACH;
176 }
177 } else {
178
179
180
181 if ((size_t)rc < msg->sdbytes) {
182
183 msg->sdptr = (char *)msg->sdptr + rc;
184 msg->sdbytes -= rc;
185 } else {
186
187 msg->hdr_sent = true;
188 rc -= msg->sdbytes;
189 assert(2 == iov_count);
190 msg->sdptr = (char *)iov[1].iov_base + rc;
191 msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
192 }
193 return ORTE_ERR_RESOURCE_BUSY;
194 }
195 }
196
197
198
199
200
201 void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
202 {
203 mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
204 mca_oob_tcp_send_t* msg;
205 int rc;
206
207 ORTE_ACQUIRE_OBJECT(peer);
208 msg = peer->send_msg;
209
210 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
211 "%s tcp:send_handler called to send to peer %s",
212 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
213 ORTE_NAME_PRINT(&peer->name));
214
215 switch (peer->state) {
216 case MCA_OOB_TCP_CONNECTING:
217 case MCA_OOB_TCP_CLOSED:
218 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
219 "%s tcp:send_handler %s",
220 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
221 mca_oob_tcp_state_print(peer->state));
222 mca_oob_tcp_peer_complete_connect(peer);
223
224
225
226 if (peer->send_ev_active) {
227 opal_event_del(&peer->send_event);
228 peer->send_ev_active = false;
229 }
230 break;
231 case MCA_OOB_TCP_CONNECTED:
232 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
233 "%s tcp:send_handler SENDING TO %s",
234 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
235 (NULL == peer->send_msg) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
236 if (NULL != msg) {
237 opal_output_verbose(2, orte_oob_base_framework.framework_output,
238 "oob:tcp:send_handler SENDING MSG");
239 if (ORTE_SUCCESS == (rc = send_msg(peer, msg))) {
240
241 if (NULL != msg->data || NULL == msg->msg) {
242
243 opal_output_verbose(2, orte_oob_base_framework.framework_output,
244 "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
245 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
246 ORTE_NAME_PRINT(&(peer->name)),
247 (int)ntohl(msg->hdr.nbytes), peer->sd);
248 OBJ_RELEASE(msg);
249 peer->send_msg = NULL;
250 } else if (NULL != msg->msg->buffer) {
251
252 opal_output_verbose(2, orte_oob_base_framework.framework_output,
253 "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
254 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
255 ORTE_NAME_PRINT(&(peer->name)),
256 (int)ntohl(msg->hdr.nbytes), peer->sd);
257 msg->msg->status = ORTE_SUCCESS;
258 ORTE_RML_SEND_COMPLETE(msg->msg);
259 OBJ_RELEASE(msg);
260 peer->send_msg = NULL;
261 } else if (NULL != msg->msg->data) {
262
263
264
265
266 opal_output_verbose(2, orte_oob_base_framework.framework_output,
267 "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
268 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
269 ORTE_NAME_PRINT(&(peer->name)),
270 (int)ntohl(msg->hdr.nbytes), peer->sd);
271 msg->msg->status = ORTE_SUCCESS;
272 OBJ_RELEASE(msg);
273 peer->send_msg = NULL;
274 } else {
275
276 msg->iovnum++;
277 if (msg->iovnum < msg->msg->count) {
278 msg->sdptr = msg->msg->iov[msg->iovnum].iov_base;
279 msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len;
280
281
282
283
284 return;
285 } else {
286
287 opal_output_verbose(2, orte_oob_base_framework.framework_output,
288 "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
289 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
290 ORTE_NAME_PRINT(&(peer->name)),
291 (int)ntohl(msg->hdr.nbytes), peer->sd);
292 msg->msg->status = ORTE_SUCCESS;
293 ORTE_RML_SEND_COMPLETE(msg->msg);
294 OBJ_RELEASE(msg);
295 peer->send_msg = NULL;
296 }
297 }
298
299 } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
300 ORTE_ERR_WOULD_BLOCK == rc) {
301
302 return;
303 } else {
304
305 opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message ON SOCKET %d",
306 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
307 ORTE_NAME_PRINT(&(peer->name)), peer->sd);
308 opal_event_del(&peer->send_event);
309 msg->msg->status = rc;
310 ORTE_RML_SEND_COMPLETE(msg->msg);
311 OBJ_RELEASE(msg);
312 peer->send_msg = NULL;
313 ORTE_FORCED_TERMINATE(1);
314 return;
315 }
316
317
318
319
320
321
322
323 peer->send_msg = (mca_oob_tcp_send_t*)
324 opal_list_remove_first(&peer->send_queue);
325 }
326
327
328 if (NULL == peer->send_msg && peer->send_ev_active) {
329 opal_event_del(&peer->send_event);
330 peer->send_ev_active = false;
331 }
332 break;
333 default:
334 opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: invalid connection state (%d) on socket %d",
335 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
336 ORTE_NAME_PRINT(&(peer->name)),
337 peer->state, peer->sd);
338 if (peer->send_ev_active) {
339 opal_event_del(&peer->send_event);
340 peer->send_ev_active = false;
341 }
342 break;
343 }
344 }
345
346 static int read_bytes(mca_oob_tcp_peer_t* peer)
347 {
348 int rc;
349
350
351 while (0 < peer->recv_msg->rdbytes) {
352 rc = read(peer->sd, peer->recv_msg->rdptr, peer->recv_msg->rdbytes);
353 if (rc < 0) {
354 if(opal_socket_errno == EINTR) {
355 continue;
356 } else if (opal_socket_errno == EAGAIN) {
357
358
359
360
361 return ORTE_ERR_RESOURCE_BUSY;
362 } else if (opal_socket_errno == EWOULDBLOCK) {
363
364
365
366
367 return ORTE_ERR_WOULD_BLOCK;
368 }
369
370
371
372
373 opal_output_verbose(OOB_TCP_DEBUG_FAIL, orte_oob_base_framework.framework_output,
374 "%s-%s mca_oob_tcp_msg_recv: readv failed: %s (%d)",
375 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
376 ORTE_NAME_PRINT(&(peer->name)),
377 strerror(opal_socket_errno),
378 opal_socket_errno);
379
380
381
382
383 return ORTE_ERR_COMM_FAILURE;
384 } else if (rc == 0) {
385
386
387
388 opal_output_verbose(OOB_TCP_DEBUG_FAIL, orte_oob_base_framework.framework_output,
389 "%s-%s mca_oob_tcp_msg_recv: peer closed connection",
390 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
391 ORTE_NAME_PRINT(&(peer->name)));
392
393 if (peer->recv_ev_active) {
394 opal_event_del(&peer->recv_event);
395 peer->recv_ev_active = false;
396 }
397 if (peer->timer_ev_active) {
398 opal_event_del(&peer->timer_event);
399 peer->timer_ev_active = false;
400 }
401 if (peer->send_ev_active) {
402 opal_event_del(&peer->send_event);
403 peer->send_ev_active = false;
404 }
405 if (NULL != peer->recv_msg) {
406 OBJ_RELEASE(peer->recv_msg);
407 peer->recv_msg = NULL;
408 }
409 mca_oob_tcp_peer_close(peer);
410
411
412
413 return ORTE_ERR_WOULD_BLOCK;
414 }
415
416 peer->recv_msg->rdbytes -= rc;
417 peer->recv_msg->rdptr += rc;
418 }
419
420
421 return ORTE_SUCCESS;
422 }
423
424
425
426
427
428
429 void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
430 {
431 mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
432 int rc;
433 orte_rml_send_t *snd;
434
435 ORTE_ACQUIRE_OBJECT(peer);
436
437 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
438 "%s:tcp:recv:handler called for peer %s",
439 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
440 ORTE_NAME_PRINT(&peer->name));
441
442 switch (peer->state) {
443 case MCA_OOB_TCP_CONNECT_ACK:
444 if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(peer, peer->sd, NULL))) {
445 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
446 "%s:tcp:recv:handler starting send/recv events",
447 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
448
449 if (!peer->recv_ev_active) {
450 peer->recv_ev_active = true;
451 ORTE_POST_OBJECT(peer);
452 opal_event_add(&peer->recv_event, 0);
453 }
454 if (peer->timer_ev_active) {
455 opal_event_del(&peer->timer_event);
456 peer->timer_ev_active = false;
457 }
458
459 if (NULL == peer->send_msg) {
460 peer->send_msg = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue);
461 }
462 if (NULL != peer->send_msg && !peer->send_ev_active) {
463 peer->send_ev_active = true;
464 ORTE_POST_OBJECT(peer);
465 opal_event_add(&peer->send_event, 0);
466 }
467
468 peer->state = MCA_OOB_TCP_CONNECTED;
469 } else if (ORTE_ERR_UNREACH != rc) {
470
471
472
473 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
474 "%s UNABLE TO COMPLETE CONNECT ACK WITH %s",
475 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
476 ORTE_NAME_PRINT(&peer->name));
477 opal_event_del(&peer->recv_event);
478 ORTE_FORCED_TERMINATE(1);
479 return;
480 }
481 break;
482 case MCA_OOB_TCP_CONNECTED:
483 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
484 "%s:tcp:recv:handler CONNECTED",
485 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
486
487 if (NULL == peer->recv_msg) {
488 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
489 "%s:tcp:recv:handler allocate new recv msg",
490 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
491 peer->recv_msg = OBJ_NEW(mca_oob_tcp_recv_t);
492 if (NULL == peer->recv_msg) {
493 opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",
494 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
495 ORTE_NAME_PRINT(&(peer->name)));
496 return;
497 }
498
499 peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
500 peer->recv_msg->rdbytes = sizeof(mca_oob_tcp_hdr_t);
501 }
502
503 if (!peer->recv_msg->hdr_recvd) {
504 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
505 "%s:tcp:recv:handler read hdr",
506 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
507 if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
508
509 peer->recv_msg->hdr_recvd = true;
510
511 MCA_OOB_TCP_HDR_NTOH(&peer->recv_msg->hdr);
512
513 if (0 == peer->recv_msg->hdr.nbytes) {
514 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
515 "%s RECVD ZERO-BYTE MESSAGE FROM %s for tag %d",
516 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
517 ORTE_NAME_PRINT(&peer->name), peer->recv_msg->hdr.tag);
518 peer->recv_msg->data = NULL;
519 } else {
520 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
521 "%s:tcp:recv:handler allocate data region of size %lu",
522 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)peer->recv_msg->hdr.nbytes);
523
524 peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
525
526 peer->recv_msg->rdptr = peer->recv_msg->data;
527 peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
528 }
529
530 } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
531 ORTE_ERR_WOULD_BLOCK == rc) {
532
533 return;
534 } else {
535
536 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
537 "%s:tcp:recv:handler error reading bytes - closing connection",
538 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
539 mca_oob_tcp_peer_close(peer);
540 return;
541 }
542 }
543
544 if (peer->recv_msg->hdr_recvd) {
545
546
547
548
549 if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
550
551 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
552 "%s RECVD COMPLETE MESSAGE FROM %s (ORIGIN %s) OF %d BYTES FOR DEST %s TAG %d",
553 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
554 ORTE_NAME_PRINT(&peer->name),
555 ORTE_NAME_PRINT(&peer->recv_msg->hdr.origin),
556 (int)peer->recv_msg->hdr.nbytes,
557 ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
558 peer->recv_msg->hdr.tag);
559
560
561 if (peer->recv_msg->hdr.dst.jobid == ORTE_PROC_MY_NAME->jobid &&
562 peer->recv_msg->hdr.dst.vpid == ORTE_PROC_MY_NAME->vpid) {
563
564 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
565 "%s DELIVERING TO RML tag = %d seq_num = %d",
566 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
567 peer->recv_msg->hdr.tag,
568 peer->recv_msg->hdr.seq_num);
569 ORTE_RML_POST_MESSAGE(&peer->recv_msg->hdr.origin,
570 peer->recv_msg->hdr.tag,
571 peer->recv_msg->hdr.seq_num,
572 peer->recv_msg->data,
573 peer->recv_msg->hdr.nbytes);
574 OBJ_RELEASE(peer->recv_msg);
575 } else {
576
577
578 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
579 "%s TCP PROMOTING ROUTED MESSAGE FOR %s TO OOB",
580 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
581 ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst));
582 snd = OBJ_NEW(orte_rml_send_t);
583 snd->dst = peer->recv_msg->hdr.dst;
584 snd->origin = peer->recv_msg->hdr.origin;
585 snd->tag = peer->recv_msg->hdr.tag;
586 snd->data = peer->recv_msg->data;
587 snd->seq_num = peer->recv_msg->hdr.seq_num;
588 snd->count = peer->recv_msg->hdr.nbytes;
589 snd->cbfunc.iov = NULL;
590 snd->cbdata = NULL;
591
592 ORTE_OOB_SEND(snd);
593
594 peer->recv_msg->data = NULL;
595
596 OBJ_RELEASE(peer->recv_msg);
597 }
598 peer->recv_msg = NULL;
599 return;
600 } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
601 ORTE_ERR_WOULD_BLOCK == rc) {
602
603 return;
604 } else {
605
606 opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to recv message",
607 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
608 ORTE_NAME_PRINT(&(peer->name)));
609
610 opal_event_del(&peer->recv_event);
611 ORTE_FORCED_TERMINATE(1);
612 return;
613 }
614 }
615 break;
616 default:
617 opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",
618 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
619 ORTE_NAME_PRINT(&(peer->name)),
620 peer->state);
621
622 break;
623 }
624 }
625
626 static void snd_cons(mca_oob_tcp_send_t *ptr)
627 {
628 memset(&ptr->hdr, 0, sizeof(mca_oob_tcp_hdr_t));
629 ptr->msg = NULL;
630 ptr->data = NULL;
631 ptr->hdr_sent = false;
632 ptr->iovnum = 0;
633 ptr->sdptr = NULL;
634 ptr->sdbytes = 0;
635 }
636
637
638
639
640
641
642 static void snd_des(mca_oob_tcp_send_t *ptr)
643 {
644 if (NULL != ptr->data) {
645 free(ptr->data);
646 }
647 }
648 OBJ_CLASS_INSTANCE(mca_oob_tcp_send_t,
649 opal_list_item_t,
650 snd_cons, snd_des);
651
652 static void rcv_cons(mca_oob_tcp_recv_t *ptr)
653 {
654 memset(&ptr->hdr, 0, sizeof(mca_oob_tcp_hdr_t));
655 ptr->hdr_recvd = false;
656 ptr->rdptr = NULL;
657 ptr->rdbytes = 0;
658 }
659 OBJ_CLASS_INSTANCE(mca_oob_tcp_recv_t,
660 opal_list_item_t,
661 rcv_cons, NULL);
662
663 static void err_cons(mca_oob_tcp_msg_error_t *ptr)
664 {
665 ptr->rmsg = NULL;
666 ptr->snd = NULL;
667 }
668 OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_error_t,
669 opal_object_t,
670 err_cons, NULL);