root/orte/mca/oob/tcp/oob_tcp_sendrecv.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2006 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2010-2018 Cisco Systems, Inc.  All rights reserved
  15  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #ifndef _MCA_OOB_TCP_SENDRECV_H_
  24 #define _MCA_OOB_TCP_SENDRECV_H_
  25 
  26 #include "orte_config.h"
  27 
  28 #include "opal/class/opal_list.h"
  29 #include "opal/util/string_copy.h"
  30 
  31 #include "orte/mca/rml/base/base.h"
  32 #include "orte/util/threads.h"
  33 #include "oob_tcp.h"
  34 #include "oob_tcp_hdr.h"
  35 
  36 /* forward declare */
  37 struct mca_oob_tcp_peer_t;
  38 
  39 /* tcp structure for sending a message */
  40 typedef struct {
  41     opal_list_item_t super;
  42     opal_event_t ev;
  43     struct mca_oob_tcp_peer_t *peer;
  44     bool activate;
  45     mca_oob_tcp_hdr_t hdr;
  46     orte_rml_send_t *msg;
  47     char *data;
  48     bool hdr_sent;
  49     int iovnum;
  50     char *sdptr;
  51     size_t sdbytes;
  52 } mca_oob_tcp_send_t;
  53 OBJ_CLASS_DECLARATION(mca_oob_tcp_send_t);
  54 
  55 /* tcp structure for recving a message */
  56 typedef struct {
  57     opal_list_item_t super;
  58     mca_oob_tcp_hdr_t hdr;
  59     bool hdr_recvd;
  60     char *data;
  61     char *rdptr;
  62     size_t rdbytes;
  63 } mca_oob_tcp_recv_t;
  64 OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
  65 
  66 /* Queue a message to be sent to a specified peer. The macro
  67  * checks to see if a message is already in position to be
  68  * sent - if it is, then the message provided is simply added
  69  * to the peer's message queue. If not, then the provided message
  70  * is placed in the "ready" position
  71  *
  72  * If the provided boolean is true, then the send event for the
  73  * peer is checked and activated if not already active. This allows
  74  * the macro to either immediately send the message, or to queue
  75  * it as "pending" for later transmission - e.g., after the
  76  * connection procedure is completed
  77  *
  78  * p => pointer to mca_oob_tcp_peer_t
  79  * s => pointer to mca_oob_tcp_send_t
  80  * f => true if send event is to be activated
  81  */
  82 #define MCA_OOB_TCP_QUEUE_MSG(p, s, f)                                  \
  83     do {                                                                \
  84         (s)->peer = (struct mca_oob_tcp_peer_t*)(p);                    \
  85         (s)->activate = (f);                                            \
  86         ORTE_THREADSHIFT((s), (p)->ev_base,                             \
  87                          mca_oob_tcp_queue_msg, ORTE_MSG_PRI);          \
  88     } while(0)
  89 
  90 /* queue a message to be sent by one of our modules - must
  91  * provide the following params:
  92  *
  93  * m - the RML message to be sent
  94  * p - the final recipient
  95  */
  96 #define MCA_OOB_TCP_QUEUE_SEND(m, p)                                    \
  97     do {                                                                \
  98         mca_oob_tcp_send_t *_s;                                         \
  99         int i;                                                          \
 100         opal_output_verbose(5, orte_oob_base_framework.framework_output, \
 101                             "%s:[%s:%d] queue send to %s",              \
 102                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),        \
 103                              __FILE__, __LINE__,                        \
 104                             ORTE_NAME_PRINT(&((m)->dst)));              \
 105         _s = OBJ_NEW(mca_oob_tcp_send_t);                              \
 106         /* setup the header */                                          \
 107         _s->hdr.origin = (m)->origin;                                  \
 108         _s->hdr.dst = (m)->dst;                                        \
 109         _s->hdr.type = MCA_OOB_TCP_USER;                               \
 110         _s->hdr.tag = (m)->tag;                                        \
 111         _s->hdr.seq_num = (m)->seq_num;                                \
 112         /* point to the actual message */                               \
 113         _s->msg = (m);                                                 \
 114         /* set the total number of bytes to be sent */                  \
 115         if (NULL != (m)->buffer) {                                      \
 116             _s->hdr.nbytes = (m)->buffer->bytes_used;                  \
 117         } else if (NULL != (m)->iov) {                                  \
 118             _s->hdr.nbytes = 0;                                        \
 119             for (i=0; i < (m)->count; i++) {                            \
 120                 _s->hdr.nbytes += (m)->iov[i].iov_len;                 \
 121             }                                                           \
 122         } else {                                                        \
 123             _s->hdr.nbytes = (m)->count;                               \
 124         }                                                               \
 125         /* prep header for xmission */                                  \
 126         MCA_OOB_TCP_HDR_HTON(&_s->hdr);                                \
 127         /* start the send with the header */                            \
 128         _s->sdptr = (char*)&_s->hdr;                                  \
 129         _s->sdbytes = sizeof(mca_oob_tcp_hdr_t);                       \
 130         /* add to the msg queue for this peer */                        \
 131         MCA_OOB_TCP_QUEUE_MSG((p), _s, true);                          \
 132     } while(0)
 133 
 134 /* queue a message to be sent by one of our modules upon completing
 135  * the connection process - must provide the following params:
 136  *
 137  * m - the RML message to be sent
 138  * p - the final recipient
 139  */
 140 #define MCA_OOB_TCP_QUEUE_PENDING(m, p)                                 \
 141     do {                                                                \
 142         mca_oob_tcp_send_t *_s;                                        \
 143         int i;                                                          \
 144         opal_output_verbose(5, orte_oob_base_framework.framework_output, \
 145                             "%s:[%s:%d] queue pending to %s",           \
 146                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),         \
 147                             __FILE__, __LINE__,                         \
 148                             ORTE_NAME_PRINT(&((m)->dst)));              \
 149         _s = OBJ_NEW(mca_oob_tcp_send_t);                              \
 150         /* setup the header */                                          \
 151         _s->hdr.origin = (m)->origin;                                  \
 152         _s->hdr.dst = (m)->dst;                                        \
 153         _s->hdr.type = MCA_OOB_TCP_USER;                               \
 154         _s->hdr.tag = (m)->tag;                                        \
 155         _s->hdr.seq_num = (m)->seq_num;                                \
 156         /* point to the actual message */                               \
 157         _s->msg = (m);                                                 \
 158         /* set the total number of bytes to be sent */                  \
 159         if (NULL != (m)->buffer) {                                      \
 160             _s->hdr.nbytes = (m)->buffer->bytes_used;                  \
 161         } else if (NULL != (m)->iov) {                                  \
 162             _s->hdr.nbytes = 0;                                        \
 163             for (i=0; i < (m)->count; i++) {                            \
 164                 _s->hdr.nbytes += (m)->iov[i].iov_len;                 \
 165             }                                                           \
 166         } else {                                                        \
 167             _s->hdr.nbytes = (m)->count;                               \
 168         }                                                               \
 169         /* prep header for xmission */                                  \
 170         MCA_OOB_TCP_HDR_HTON(&_s->hdr);                                \
 171         /* start the send with the header */                            \
 172         _s->sdptr = (char*)&_s->hdr;                                  \
 173         _s->sdbytes = sizeof(mca_oob_tcp_hdr_t);                       \
 174         /* add to the msg queue for this peer */                        \
 175         MCA_OOB_TCP_QUEUE_MSG((p), _s, false);                         \
 176     } while(0)
 177 
 178 /* queue a message for relay by one of our modules - must
 179  * provide the following params:
 180  *
 181  * m = the mca_oob_tcp_recv_t that was received
 182  * p - the next hop
 183 */
 184 #define MCA_OOB_TCP_QUEUE_RELAY(m, p)                                   \
 185     do {                                                                \
 186         mca_oob_tcp_send_t *_s;                                        \
 187         opal_output_verbose(5, orte_oob_base_framework.framework_output, \
 188                             "%s:[%s:%d] queue relay to %s",             \
 189                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),         \
 190                             __FILE__, __LINE__,                         \
 191                             ORTE_NAME_PRINT(&((p)->name)));             \
 192         _s = OBJ_NEW(mca_oob_tcp_send_t);                              \
 193         /* setup the header */                                          \
 194         _s->hdr.origin = (m)->hdr.origin;                              \
 195         _s->hdr.dst = (m)->hdr.dst;                                    \
 196         _s->hdr.type = MCA_OOB_TCP_USER;                               \
 197         _s->hdr.tag = (m)->hdr.tag;                                    \
 198         (void)opal_string_copy(_s->hdr.routed, (m)->hdr.routed,         \
 199                       ORTE_MAX_RTD_SIZE);                               \
 200         /* point to the actual message */                               \
 201         _s->data = (m)->data;                                          \
 202         /* set the total number of bytes to be sent */                  \
 203         _s->hdr.nbytes = (m)->hdr.nbytes;                              \
 204         /* prep header for xmission */                                  \
 205         MCA_OOB_TCP_HDR_HTON(&_s->hdr);                                \
 206         /* start the send with the header */                            \
 207         _s->sdptr = (char*)&_s->hdr;                                  \
 208         _s->sdbytes = sizeof(mca_oob_tcp_hdr_t);                       \
 209         /* add to the msg queue for this peer */                        \
 210         MCA_OOB_TCP_QUEUE_MSG((p), _s, true);                          \
 211     } while(0)
 212 
 213 /* State machine for processing message */
 214 typedef struct {
 215     opal_object_t super;
 216     opal_event_t ev;
 217     orte_rml_send_t *msg;
 218 } mca_oob_tcp_msg_op_t;
 219 OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
 220 
 221 #define ORTE_ACTIVATE_TCP_POST_SEND(ms, cbfunc)                         \
 222     do {                                                                \
 223         mca_oob_tcp_msg_op_t *mop;                                      \
 224         opal_output_verbose(5, orte_oob_base_framework.framework_output, \
 225                             "%s:[%s:%d] post send to %s",               \
 226                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),         \
 227                             __FILE__, __LINE__,                         \
 228                             ORTE_NAME_PRINT(&((ms)->dst)));             \
 229         mop = OBJ_NEW(mca_oob_tcp_msg_op_t);                            \
 230         mop->msg = (ms);                                                \
 231         ORTE_THREADSHIFT(mop, (ms)->peer->ev_base,                      \
 232                          (cbfunc), ORTE_MSG_PRI);                       \
 233     } while(0);
 234 
 235 typedef struct {
 236     opal_object_t super;
 237     opal_event_t ev;
 238     orte_rml_send_t *rmsg;
 239     mca_oob_tcp_send_t *snd;
 240     orte_process_name_t hop;
 241 } mca_oob_tcp_msg_error_t;
 242 OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
 243 
 244 #define ORTE_ACTIVATE_TCP_MSG_ERROR(s, r, h, cbfunc)                    \
 245     do {                                                                \
 246         mca_oob_tcp_msg_error_t *mop;                                   \
 247         mca_oob_tcp_send_t *snd;                                        \
 248         mca_oob_tcp_recv_t *proxy;                                      \
 249         opal_output_verbose(5, orte_oob_base_framework.framework_output, \
 250                             "%s:[%s:%d] post msg error to %s",          \
 251                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),         \
 252                             __FILE__, __LINE__,                         \
 253                             ORTE_NAME_PRINT((h)));                      \
 254         mop = OBJ_NEW(mca_oob_tcp_msg_error_t);                         \
 255         if (NULL != (s)) {                                              \
 256             mop->snd = (s);                                             \
 257         } else if (NULL != (r)) {                                       \
 258             /* use a proxy so we can pass NULL into the macro */        \
 259             proxy = (r);                                                \
 260             /* create a send object for this message */                 \
 261             snd = OBJ_NEW(mca_oob_tcp_send_t);                          \
 262             mop->snd = snd;                                             \
 263             /* transfer and prep the header */                          \
 264             snd->hdr = proxy->hdr;                                      \
 265             MCA_OOB_TCP_HDR_HTON(&snd->hdr);                            \
 266             /* point to the data */                                     \
 267             snd->data = proxy->data;                                    \
 268             /* start the message with the header */                     \
 269             snd->sdptr = (char*)&snd->hdr;                              \
 270             snd->sdbytes = sizeof(mca_oob_tcp_hdr_t);                   \
 271             /* protect the data */                                      \
 272             proxy->data = NULL;                                         \
 273         }                                                               \
 274         mop->hop.jobid = (h)->jobid;                                    \
 275         mop->hop.vpid = (h)->vpid;                                      \
 276         /* this goes to the OOB framework, so use that event base */    \
 277         ORTE_THREADSHIFT(mop, orte_oob_base.ev_base,                    \
 278                          (cbfunc), ORTE_MSG_PRI);                       \
 279     } while(0)
 280 
 281 #define ORTE_ACTIVATE_TCP_NO_ROUTE(r, h, c)                             \
 282     do {                                                                \
 283         mca_oob_tcp_msg_error_t *mop;                                   \
 284         opal_output_verbose(5, orte_oob_base_framework.framework_output, \
 285                             "%s:[%s:%d] post no route to %s",           \
 286                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),         \
 287                             __FILE__, __LINE__,                         \
 288                             ORTE_NAME_PRINT((h)));                      \
 289         mop = OBJ_NEW(mca_oob_tcp_msg_error_t);                         \
 290         mop->rmsg = (r);                                                \
 291         mop->hop.jobid = (h)->jobid;                                    \
 292         mop->hop.vpid = (h)->vpid;                                      \
 293         /* this goes to the component, so use the framework             \
 294          * event base */                                                \
 295         ORTE_THREADSHIFT(mop, orte_oob_base.ev_base,                    \
 296                          (c), ORTE_MSG_PRI);                            \
 297     } while(0)
 298 
 299 #endif /* _MCA_OOB_TCP_SENDRECV_H_ */

/* [<][>][^][v][top][bottom][index][help] */