root/opal/mca/btl/usnic/btl_usnic_recv.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. opal_btl_usnic_recv_call

   1 /*
   2  * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006      Sandia National Laboratories. All rights
  13  *                         reserved.
  14  * Copyright (c) 2008-2019 Cisco Systems, Inc.  All rights reserved
  15  * Copyright (c) 2012      Los Alamos National Security, LLC.  All rights
  16  *                         reserved.
  17  * $COPYRIGHT$
  18  *
  19  * Additional copyrights may follow
  20  *
  21  * $HEADER$
  22  */
  23 
  24 #include "opal_config.h"
  25 
  26 #include <unistd.h>
  27 
  28 #include "opal_stdint.h"
  29 #include "opal/mca/memchecker/base/base.h"
  30 #include "opal/constants.h"
  31 
  32 #include "opal/mca/btl/btl.h"
  33 #include "opal/mca/btl/base/base.h"
  34 
  35 #include "btl_usnic.h"
  36 #include "btl_usnic_frag.h"
  37 #include "btl_usnic_endpoint.h"
  38 #include "btl_usnic_module.h"
  39 #include "btl_usnic_proc.h"
  40 #include "btl_usnic_ack.h"
  41 #include "btl_usnic_recv.h"
  42 #include "btl_usnic_util.h"
  43 
  44 
  45 /*
  46  * We have received a segment, take action based on the
  47  * packet type in the BTL header
  48  */
  49 void opal_btl_usnic_recv_call(opal_btl_usnic_module_t *module,
  50                               opal_btl_usnic_recv_segment_t *seg,
  51                               opal_btl_usnic_channel_t *channel)
  52 {
  53     opal_btl_usnic_segment_t *bseg;
  54     mca_btl_active_message_callback_t* reg;
  55     opal_btl_usnic_endpoint_t *endpoint;
  56     opal_btl_usnic_btl_chunk_header_t *chunk_hdr;
  57     opal_btl_usnic_btl_header_t *hdr;
  58     uint32_t window_index;
  59     int rc;
  60 #if MSGDEBUG1
  61     char local_ip[IPV4STRADDRLEN];
  62     char remote_ip[IPV4STRADDRLEN];
  63 #endif
  64 
  65     bseg = &seg->rs_base;
  66 
  67     ++module->stats.num_total_recvs;
  68 
  69     /* Valgrind help */
  70     opal_memchecker_base_mem_defined((void*)(seg->rs_protocol_header),
  71                                      seg->rs_len);
  72 
  73     /* Find out who sent this segment */
  74     endpoint = seg->rs_endpoint;
  75     if (FAKE_RECV_DROP || OPAL_UNLIKELY(NULL == endpoint)) {
  76         /* No idea who this was from, so drop it */
  77 #if MSGDEBUG1
  78         opal_output(0, "=== Unknown sender; dropped: seq %" UDSEQ,
  79                     bseg->us_btl_header->pkt_seq);
  80 #endif
  81         ++module->stats.num_unk_recvs;
  82         goto repost_no_endpoint;
  83     }
  84 
  85 #if MSGDEBUG1
  86     struct opal_btl_usnic_modex_t *modex;
  87 
  88     modex = &module->local_modex;
  89     opal_btl_usnic_snprintf_ipv4_addr(local_ip, sizeof(local_ip),
  90                                       modex->ipv4_addr,
  91                                       modex->netmask);
  92     modex = &endpoint->endpoint_remote_modex;
  93     opal_btl_usnic_snprintf_ipv4_addr(remote_ip, sizeof(remote_ip),
  94                                       modex->ipv4_addr,
  95                                       modex->netmask);
  96 #endif
  97 
  98     /***********************************************************************/
  99     /* Segment is an incoming frag */
 100     if (OPAL_BTL_USNIC_PAYLOAD_TYPE_FRAG == bseg->us_btl_header->payload_type) {
 101 
 102         /* do the receive bookkeeping */
 103         rc = opal_btl_usnic_recv_frag_bookkeeping(module, seg, channel);
 104         if (rc != 0) {
 105             return;
 106         }
 107 
 108         hdr = seg->rs_base.us_btl_header;
 109 
 110 #if MSGDEBUG1
 111         opal_output(0, "<-- Received FRAG ep %p, seq %" UDSEQ ", len=%d\n",
 112                     (void*) endpoint, hdr->pkt_seq, hdr->payload_len);
 113 #if 0
 114 
 115         opal_output(0, "<-- Received FRAG ep %p, seq %" UDSEQ " from %s to %s: GOOD! (rel seq %d, lowest seq %" UDSEQ ", highest seq: %" UDSEQ ", rwstart %d) seg %p, module %p\n",
 116                     (void*) endpoint,
 117                     seg->rs_base.us_btl_header->pkt_seq,
 118                     remote_ip, local_ip,
 119                     window_index,
 120                     endpoint->endpoint_next_contig_seq_to_recv,
 121                     endpoint->endpoint_highest_seq_rcvd,
 122                     endpoint->endpoint_rfstart,
 123                     (void*) seg, (void*) module);
 124         if (hdr->put_addr != NULL) {
 125             opal_output(0, "  put_addr = %p\n",
 126                     seg->rs_base.us_btl_header->put_addr);
 127         }
 128 #endif
 129 #endif
 130 
 131         /* If this it not a PUT, Pass this segment up to the PML.
 132          * Be sure to get the payload length from the BTL header because
 133          * the L2 layer may artificially inflate (or otherwise change)
 134          * the frame length to meet minimum sizes, add protocol information,
 135          * etc.
 136          */
 137         if (hdr->put_addr == NULL) {
 138             reg = mca_btl_base_active_message_trigger + hdr->tag;
 139             seg->rs_segment.seg_len = hdr->payload_len;
 140 #if MSGDEBUG2
 141                 opal_output(0, "small recv complete, pass up %u bytes, tag=%d\n",
 142                         (unsigned)bseg->us_btl_header->payload_len,
 143                         (int)bseg->us_btl_header->tag);
 144 #endif
 145             reg->cbfunc(&module->super, hdr->tag, &seg->rs_desc, reg->cbdata);
 146 
 147         /*
 148          * If this is a PUT, need to copy it to user buffer
 149          */
 150         } else {
 151 #if MSGDEBUG1
 152             opal_output(0, "Copy %d PUT bytes to %p\n",
 153                 seg->rs_base.us_btl_header->payload_len,
 154                 (void*)seg->rs_base.us_btl_header->put_addr);
 155 #endif
 156             memcpy(seg->rs_base.us_btl_header->put_addr,
 157                     seg->rs_base.us_payload.raw,
 158                     seg->rs_base.us_btl_header->payload_len);
 159         }
 160 
 161         /* do not jump to repost, already done by bookkeeping */
 162         return;
 163     }
 164 
 165     /***********************************************************************/
 166     /* Segment is an incoming chunk */
 167     if (OPAL_BTL_USNIC_PAYLOAD_TYPE_CHUNK == bseg->us_btl_header->payload_type) {
 168         int frag_index;
 169         opal_btl_usnic_rx_frag_info_t *fip;
 170 
 171         /* Is incoming sequence # ok? */
 172         if (OPAL_UNLIKELY(opal_btl_usnic_check_rx_seq(endpoint, seg,
 173                         &window_index) != 0)) {
 174             goto repost;
 175         }
 176 
 177 #if MSGDEBUG1
 178         opal_output(0, "<-- Received CHUNK fid %d ep %p, seq %" UDSEQ " from %s to %s: GOOD! (rel seq %d, lowest seq %" UDSEQ ", highest seq: %" UDSEQ ", rwstart %d) seg %p, module %p\n",
 179                     seg->rs_base.us_btl_chunk_header->ch_frag_id,
 180                     (void*) endpoint,
 181                     seg->rs_base.us_btl_chunk_header->ch_hdr.pkt_seq,
 182                     remote_ip, local_ip,
 183                     window_index,
 184                     endpoint->endpoint_next_contig_seq_to_recv,
 185                     endpoint->endpoint_highest_seq_rcvd,
 186                     endpoint->endpoint_rfstart,
 187                     (void*) seg, (void*) module);
 188 #endif
 189 
 190         /* start a new fragment if not one in progress
 191          * alloc memory, etc.  when last byte arrives, dealloc the
 192          * frag_id and pass data to PML
 193          */
 194         chunk_hdr = seg->rs_base.us_btl_chunk_header;
 195         frag_index = chunk_hdr->ch_frag_id % MAX_ACTIVE_FRAGS;
 196         fip = &(endpoint->endpoint_rx_frag_info[frag_index]);
 197 
 198         /* frag_id == 0 means this slot it empty, grab it! */
 199         if (0 == fip->rfi_frag_id) {
 200             fip->rfi_frag_id = chunk_hdr->ch_frag_id;
 201             fip->rfi_frag_size = chunk_hdr->ch_frag_size;
 202             if (chunk_hdr->ch_hdr.put_addr == NULL) {
 203                 int pool;
 204 
 205                 fip->rfi_data = NULL;
 206 
 207                 /* See which data pool this should come from,
 208                  * or if it should be malloc()ed
 209                  */
 210                 pool = usnic_fls(chunk_hdr->ch_frag_size-1);
 211                 if (pool >= module->first_pool &&
 212                         pool <= module->last_pool) {
 213                     opal_free_list_item_t* item;
 214                     opal_btl_usnic_rx_buf_t *rx_buf;
 215                     USNIC_COMPAT_FREE_LIST_GET(&module->module_recv_buffers[pool], item);
 216                     rx_buf = (opal_btl_usnic_rx_buf_t *)item;
 217                     if (OPAL_LIKELY(NULL != rx_buf)) {
 218                         fip->rfi_fl_elt = item;
 219                         fip->rfi_data = rx_buf->buf;
 220                         fip->rfi_data_pool = pool;
 221                         fip->rfi_data_in_pool = true;
 222                     }
 223                 }
 224                 if (fip->rfi_data == NULL) {
 225                     fip->rfi_data = malloc(chunk_hdr->ch_frag_size);
 226                     fip->rfi_data_in_pool = false;
 227                 }
 228                 if (fip->rfi_data == NULL) {
 229                     opal_btl_usnic_util_abort("malloc failed", __FILE__, __LINE__);
 230                 }
 231 #if MSGDEBUG1
 232                 opal_output(0, "Start large recv to %p, size=%"PRIu32"\n",
 233                     (void *)fip->rfi_data, chunk_hdr->ch_frag_size);
 234 #endif
 235             } else {
 236 #if MSGDEBUG1
 237                 opal_output(0, "Start PUT to %p\n",
 238                         (void *)chunk_hdr->ch_hdr.put_addr);
 239 #endif
 240                 fip->rfi_data = chunk_hdr->ch_hdr.put_addr;
 241             }
 242             fip->rfi_bytes_left = chunk_hdr->ch_frag_size;
 243             fip->rfi_frag_id = chunk_hdr->ch_frag_id;
 244 
 245         /* frag_id is not 0 - it must match, drop if not */
 246         } else if (fip->rfi_frag_id != chunk_hdr->ch_frag_id) {
 247             ++module->stats.num_badfrag_recvs;
 248             goto repost;
 249         }
 250 #if MSGDEBUG1
 251         opal_output(0, "put_addr=%p, copy_addr=%p, off=%d\n",
 252                 chunk_hdr->ch_hdr.put_addr,
 253                 fip->rfi_data+chunk_hdr->ch_frag_offset,
 254                 chunk_hdr->ch_frag_offset);
 255 #endif
 256 
 257         /* Stats */
 258         ++module->stats.num_chunk_recvs;
 259 
 260         /* validate offset and len to be within fragment */
 261         assert(chunk_hdr->ch_frag_offset + chunk_hdr->ch_hdr.payload_len <=
 262                 fip->rfi_frag_size);
 263         assert(fip->rfi_frag_size == chunk_hdr->ch_frag_size);
 264 
 265         /* copy the data into place */
 266         memcpy(fip->rfi_data + chunk_hdr->ch_frag_offset, (char *)(chunk_hdr+1),
 267                 chunk_hdr->ch_hdr.payload_len);
 268 
 269         /* update sliding window */
 270         opal_btl_usnic_update_window(endpoint, window_index);
 271 
 272         fip->rfi_bytes_left -= chunk_hdr->ch_hdr.payload_len;
 273         if (0 == fip->rfi_bytes_left) {
 274             mca_btl_base_descriptor_t desc;
 275             mca_btl_base_segment_t segment;
 276 
 277             segment.seg_addr.pval = fip->rfi_data;
 278             segment.seg_len = fip->rfi_frag_size;
 279             desc.USNIC_RECV_LOCAL = &segment;
 280             desc.USNIC_RECV_LOCAL_COUNT = 1;
 281 
 282             /* only up to PML if this was not a put */
 283             if (chunk_hdr->ch_hdr.put_addr == NULL) {
 284 
 285                 /* Pass this segment up to the PML */
 286 #if MSGDEBUG2
 287                 opal_output(0, "large recv complete, pass up %p, %u bytes, tag=%d\n",
 288                         desc.USNIC_RECV_LOCAL->seg_addr.pval,
 289                         (unsigned)desc.USNIC_RECV_LOCAL->seg_len,
 290                         (int)chunk_hdr->ch_hdr.tag);
 291 #endif
 292                 reg = mca_btl_base_active_message_trigger +
 293                     chunk_hdr->ch_hdr.tag;
 294 
 295                 /* mca_pml_ob1_recv_frag_callback_frag() */
 296                 reg->cbfunc(&module->super, chunk_hdr->ch_hdr.tag,
 297                         &desc, reg->cbdata);
 298 
 299                 /* free temp buffer for non-put */
 300                 if (fip->rfi_data_in_pool) {
 301                     USNIC_COMPAT_FREE_LIST_RETURN(&module->module_recv_buffers[fip->rfi_data_pool],
 302                                                   fip->rfi_fl_elt);
 303                 } else {
 304                     free(fip->rfi_data);
 305                 }
 306 
 307 #if MSGDEBUG1
 308             } else {
 309                 opal_output(0, "PUT recv complete, no callback\n");
 310 #endif
 311             }
 312 
 313             /* release the fragment ID */
 314             fip->rfi_frag_id = 0;
 315 
 316             /* force immediate ACK */
 317             endpoint->endpoint_acktime = 0;
 318         }
 319         goto repost;
 320     }
 321 
 322     /***********************************************************************/
 323     /* Frag is an incoming ACK */
 324     else if (OPAL_LIKELY(OPAL_BTL_USNIC_PAYLOAD_TYPE_ACK ==
 325                          bseg->us_btl_header->payload_type)) {
 326         opal_btl_usnic_seq_t ack_seq;
 327 
 328         /* sequence being ACKed */
 329         ack_seq = bseg->us_btl_header->ack_seq;
 330 
 331         /* Stats */
 332         ++module->stats.num_ack_recvs;
 333 
 334 #if MSGDEBUG1
 335         opal_output(0, "    Received ACK for sequence number %" UDSEQ " from %s to %s\n",
 336                     bseg->us_btl_header->ack_seq, remote_ip, local_ip);
 337 #endif
 338         OPAL_THREAD_LOCK(&btl_usnic_lock);
 339         opal_btl_usnic_handle_ack(endpoint, ack_seq);
 340         OPAL_THREAD_UNLOCK(&btl_usnic_lock);
 341         goto repost;
 342     }
 343 
 344     /***********************************************************************/
 345     /* Have no idea what the frag is; drop it */
 346     else {
 347         ++module->stats.num_unk_recvs;
 348         if (module->stats.num_unk_recvs < 10) {
 349             opal_output_verbose(15, USNIC_OUT, "unrecognized payload type %d", bseg->us_btl_header->payload_type);
 350             opal_output_verbose(15, USNIC_OUT, "base = %p, proto = %p, hdr = %p", bseg->us_list.ptr, seg->rs_protocol_header, (void*) bseg->us_btl_header);
 351             opal_btl_usnic_dump_hex(15, USNIC_OUT, bseg->us_list.ptr, 96+sizeof(*bseg->us_btl_header));
 352         }
 353         goto repost;
 354     }
 355 
 356     /***********************************************************************/
 357  repost:
 358 
 359     /* if endpoint exiting, and all ACKs received, release the endpoint */
 360     if (endpoint->endpoint_exiting && ENDPOINT_DRAINED(endpoint)) {
 361         OBJ_RELEASE(endpoint);
 362     }
 363  repost_no_endpoint:
 364     ++module->stats.num_recv_reposts;
 365 
 366     /* Add recv to linked list for reposting */
 367     seg->rs_next = channel->repost_recv_head;
 368     channel->repost_recv_head = seg;
 369 }

/* [<][>][^][v][top][bottom][index][help] */