root/orte/mca/rml/oob/rml_oob_send.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. send_self_exe
  2. orte_rml_oob_send_nb
  3. orte_rml_oob_send_buffer_nb

   1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2006 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "orte_config.h"
  24 #include "opal/types.h"
  25 
  26 #include "opal/dss/dss.h"
  27 #include "opal/util/output.h"
  28 
  29 #include "orte/mca/errmgr/errmgr.h"
  30 #include "orte/mca/oob/base/base.h"
  31 #include "orte/util/name_fns.h"
  32 #include "orte/util/threads.h"
  33 #include "orte/runtime/orte_globals.h"
  34 
  35 #include "orte/mca/rml/base/base.h"
  36 #include "orte/mca/rml/rml_types.h"
  37 #include "rml_oob.h"
  38 
  39 static void send_self_exe(int fd, short args, void* data)
  40 {
  41     orte_self_send_xfer_t *xfer = (orte_self_send_xfer_t*)data;
  42 
  43     ORTE_ACQUIRE_OBJECT(xfer);
  44 
  45     OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
  46                          "%s rml_send_to_self callback executing for tag %d",
  47                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xfer->tag));
  48 
  49     /* execute the send callback function - note that
  50      * send-to-self always returns a SUCCESS status
  51      */
  52     if (NULL != xfer->iov) {
  53         if (NULL != xfer->cbfunc.iov) {
  54             /* non-blocking iovec send */
  55             xfer->cbfunc.iov(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->iov, xfer->count,
  56                              xfer->tag, xfer->cbdata);
  57         }
  58     } else if (NULL != xfer->buffer) {
  59         if (NULL != xfer->cbfunc.buffer) {
  60             /* non-blocking buffer send */
  61             xfer->cbfunc.buffer(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->buffer,
  62                                 xfer->tag, xfer->cbdata);
  63         }
  64     } else {
  65         /* should never happen */
  66         abort();
  67     }
  68 
  69     /* cleanup the memory */
  70     OBJ_RELEASE(xfer);
  71 }
  72 
  73 int orte_rml_oob_send_nb(orte_process_name_t* peer,
  74                          struct iovec* iov,
  75                          int count,
  76                          orte_rml_tag_t tag,
  77                          orte_rml_callback_fn_t cbfunc,
  78                          void* cbdata)
  79 {
  80     orte_rml_recv_t *rcv;
  81     orte_rml_send_t *snd;
  82     int bytes;
  83     orte_self_send_xfer_t *xfer;
  84     int i;
  85     char* ptr;
  86 
  87     OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
  88                          "%s rml_send to peer %s at tag %d",
  89                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  90                          ORTE_NAME_PRINT(peer), tag));
  91 
  92     if (ORTE_RML_TAG_INVALID == tag) {
  93         /* cannot send to an invalid tag */
  94         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
  95         return ORTE_ERR_BAD_PARAM;
  96     }
  97     if (NULL == peer ||
  98         OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
  99         /* cannot send to an invalid peer */
 100         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 101         return ORTE_ERR_BAD_PARAM;
 102     }
 103 
 104     /* if this is a message to myself, then just post the message
 105      * for receipt - no need to dive into the oob
 106      */
 107     if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {  /* local delivery */
 108         OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
 109                              "%s rml_send_iovec_to_self at tag %d",
 110                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
 111         /* send to self is a tad tricky - we really don't want
 112          * to track the send callback function throughout the recv
 113          * process and execute it upon receipt as this would provide
 114          * very different timing from a non-self message. Specifically,
 115          * if we just retain a pointer to the incoming data
 116          * and then execute the send callback prior to the receive,
 117          * then the caller will think we are done with the data and
 118          * can release it. So we have to copy the data in order to
 119          * execute the send callback prior to receiving the message.
 120          *
 121          * In truth, this really is a better mimic of the non-self
 122          * message behavior. If we actually pushed the message out
 123          * on the wire and had it loop back, then we would receive
 124          * a new block of data anyway.
 125          */
 126 
 127         /* setup the send callback */
 128         xfer = OBJ_NEW(orte_self_send_xfer_t);
 129         xfer->iov = iov;
 130         xfer->count = count;
 131         xfer->cbfunc.iov = cbfunc;
 132         xfer->tag = tag;
 133         xfer->cbdata = cbdata;
 134         /* setup the event for the send callback */
 135         ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
 136 
 137         /* copy the message for the recv */
 138         rcv = OBJ_NEW(orte_rml_recv_t);
 139         rcv->sender = *peer;
 140         rcv->tag = tag;
 141         /* get the total number of bytes in the iovec array */
 142         bytes = 0;
 143         for (i = 0 ; i < count ; ++i) {
 144             bytes += iov[i].iov_len;
 145         }
 146         /* get the required memory allocation */
 147         if (0 < bytes) {
 148             rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
 149             rcv->iov.iov_len = bytes;
 150             /* transfer the bytes */
 151             ptr =  (char*)rcv->iov.iov_base;
 152             for (i = 0 ; i < count ; ++i) {
 153                 memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
 154                 ptr += iov[i].iov_len;
 155             }
 156         }
 157         /* post the message for receipt - since the send callback was posted
 158          * first and has the same priority, it will execute first
 159          */
 160         ORTE_RML_ACTIVATE_MESSAGE(rcv);
 161         return ORTE_SUCCESS;
 162     }
 163 
 164     snd = OBJ_NEW(orte_rml_send_t);
 165     snd->dst = *peer;
 166     snd->origin = *ORTE_PROC_MY_NAME;
 167     snd->tag = tag;
 168     snd->iov = iov;
 169     snd->count = count;
 170     snd->cbfunc.iov = cbfunc;
 171     snd->cbdata = cbdata;
 172 
 173     /* activate the OOB send state */
 174     ORTE_OOB_SEND(snd);
 175 
 176     return ORTE_SUCCESS;
 177 }
 178 
 179 int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
 180                                 opal_buffer_t* buffer,
 181                                 orte_rml_tag_t tag,
 182                                 orte_rml_buffer_callback_fn_t cbfunc,
 183                                 void* cbdata)
 184 {
 185     orte_rml_recv_t *rcv;
 186     orte_rml_send_t *snd;
 187     orte_self_send_xfer_t *xfer;
 188 
 189     OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
 190                          "%s rml_send_buffer to peer %s at tag %d",
 191                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 192                          ORTE_NAME_PRINT(peer), tag));
 193 
 194     if (ORTE_RML_TAG_INVALID == tag) {
 195         /* cannot send to an invalid tag */
 196         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 197         return ORTE_ERR_BAD_PARAM;
 198     }
 199     if (NULL == peer ||
 200         OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
 201         /* cannot send to an invalid peer */
 202         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 203         return ORTE_ERR_BAD_PARAM;
 204     }
 205 
 206     /* if this is a message to myself, then just post the message
 207      * for receipt - no need to dive into the oob
 208      */
 209     if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) {  /* local delivery */
 210         OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
 211                              "%s rml_send_iovec_to_self at tag %d",
 212                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
 213         /* send to self is a tad tricky - we really don't want
 214          * to track the send callback function throughout the recv
 215          * process and execute it upon receipt as this would provide
 216          * very different timing from a non-self message. Specifically,
 217          * if we just retain a pointer to the incoming data
 218          * and then execute the send callback prior to the receive,
 219          * then the caller will think we are done with the data and
 220          * can release it. So we have to copy the data in order to
 221          * execute the send callback prior to receiving the message.
 222          *
 223          * In truth, this really is a better mimic of the non-self
 224          * message behavior. If we actually pushed the message out
 225          * on the wire and had it loop back, then we would receive
 226          * a new block of data anyway.
 227          */
 228 
 229         /* setup the send callback */
 230         xfer = OBJ_NEW(orte_self_send_xfer_t);
 231         xfer->buffer = buffer;
 232         xfer->cbfunc.buffer = cbfunc;
 233         xfer->tag = tag;
 234         xfer->cbdata = cbdata;
 235         /* setup the event for the send callback */
 236         ORTE_THREADSHIFT(xfer, orte_event_base, send_self_exe, ORTE_MSG_PRI);
 237 
 238         /* copy the message for the recv */
 239         rcv = OBJ_NEW(orte_rml_recv_t);
 240         rcv->sender = *peer;
 241         rcv->tag = tag;
 242         rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
 243         memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
 244         rcv->iov.iov_len = buffer->bytes_used;
 245         /* post the message for receipt - since the send callback was posted
 246          * first and has the same priority, it will execute first
 247          */
 248         ORTE_RML_ACTIVATE_MESSAGE(rcv);
 249         return ORTE_SUCCESS;
 250     }
 251 
 252     snd = OBJ_NEW(orte_rml_send_t);
 253     snd->dst = *peer;
 254     snd->origin = *ORTE_PROC_MY_NAME;
 255     snd->tag = tag;
 256     snd->buffer = buffer;
 257     snd->cbfunc.buffer = cbfunc;
 258     snd->cbdata = cbdata;
 259 
 260     /* activate the OOB send state */
 261     ORTE_OOB_SEND(snd);
 262 
 263     return ORTE_SUCCESS;
 264 }

/* [<][>][^][v][top][bottom][index][help] */