root/ompi/mca/pml/ob1/pml_ob1_recvreq.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_pml_ob1_recv_request_process_pending
  2. mca_pml_ob1_recv_request_free
  3. mca_pml_ob1_recv_request_cancel
  4. mca_pml_ob1_recv_request_construct
  5. mca_pml_ob1_recv_request_destruct
  6. mca_pml_ob1_recv_ctl_completion
  7. mca_pml_ob1_put_completion
  8. mca_pml_ob1_recv_request_ack_send_btl
  9. mca_pml_ob1_recv_request_ack
  10. mca_pml_ob1_recv_request_get_frag_failed
  11. mca_pml_ob1_rget_completion
  12. mca_pml_ob1_recv_request_put_frag
  13. mca_pml_ob1_recv_request_get_frag
  14. mca_pml_ob1_recv_request_progress_frag
  15. mca_pml_ob1_recv_request_frag_copy_start
  16. mca_pml_ob1_recv_request_frag_copy_finished
  17. mca_pml_ob1_recv_request_progress_rget
  18. mca_pml_ob1_recv_request_progress_rndv
  19. mca_pml_ob1_recv_request_progress_match
  20. mca_pml_ob1_recv_request_matched_probe
  21. mca_pml_ob1_recv_request_schedule_once
  22. append_recv_req_to_queue
  23. recv_req_match_specific_proc
  24. recv_req_match_wild
  25. mca_pml_ob1_recv_req_start

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2019 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008      UT-Battelle, LLC. All rights reserved.
  14  * Copyright (c) 2011      Sandia National Laboratories. All rights reserved.
  15  * Copyright (c) 2012-2015 NVIDIA Corporation.  All rights reserved.
  16  * Copyright (c) 2011-2017 Los Alamos National Security, LLC. All rights
  17  *                         reserved.
  18  * Copyright (c) 2012      FUJITSU LIMITED.  All rights reserved.
  19  * Copyright (c) 2014-2016 Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * Copyright (c) 2018      Sandia National Laboratories
  22  *                         All rights reserved.
  23  * $COPYRIGHT$
  24  *
  25  * Additional copyrights may follow
  26  *
  27  * $HEADER$
  28  */
  29 
  30 #include "ompi_config.h"
  31 
  32 #include "opal/mca/mpool/mpool.h"
  33 #include "opal/util/arch.h"
  34 #include "ompi/runtime/ompi_spc.h"
  35 #include "ompi/mca/pml/pml.h"
  36 #include "ompi/mca/bml/bml.h"
  37 #include "pml_ob1_comm.h"
  38 #include "pml_ob1_recvreq.h"
  39 #include "pml_ob1_recvfrag.h"
  40 #include "pml_ob1_sendreq.h"
  41 #include "pml_ob1_rdmafrag.h"
  42 #include "ompi/mca/bml/base/base.h"
  43 #include "ompi/memchecker.h"
  44 #if OPAL_CUDA_SUPPORT
  45 #include "opal/datatype/opal_datatype_cuda.h"
  46 #include "opal/mca/common/cuda/common_cuda.h"
  47 #endif /* OPAL_CUDA_SUPPORT */
  48 
  49 #if OPAL_CUDA_SUPPORT
  50 int mca_pml_ob1_cuda_need_buffers(mca_pml_ob1_recv_request_t* recvreq,
  51                                   mca_btl_base_module_t* btl);
  52 #endif /* OPAL_CUDA_SUPPORT */
  53 
  54 void mca_pml_ob1_recv_request_process_pending(void)
  55 {
  56     mca_pml_ob1_recv_request_t* recvreq;
  57     int rc, i, s = (int)opal_list_get_size(&mca_pml_ob1.recv_pending);
  58 
  59     for(i = 0; i < s; i++) {
  60         OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
  61         recvreq = (mca_pml_ob1_recv_request_t*)
  62             opal_list_remove_first(&mca_pml_ob1.recv_pending);
  63         OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
  64         if( OPAL_UNLIKELY(NULL == recvreq) )
  65             break;
  66         recvreq->req_pending = false;
  67         rc = mca_pml_ob1_recv_request_schedule_exclusive(recvreq, NULL);
  68         if(OMPI_ERR_OUT_OF_RESOURCE == rc)
  69             break;
  70     }
  71 }
  72 
  73 static int mca_pml_ob1_recv_request_free(struct ompi_request_t** request)
  74 {
  75     mca_pml_ob1_recv_request_t* recvreq = *(mca_pml_ob1_recv_request_t**)request;
  76     assert (false == recvreq->req_recv.req_base.req_free_called);
  77 
  78     recvreq->req_recv.req_base.req_free_called = true;
  79     PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_NOTIFY,
  80                              &(recvreq->req_recv.req_base), PERUSE_RECV );
  81 
  82     if( true == recvreq->req_recv.req_base.req_pml_complete ) {
  83         /* make buffer defined when the request is compeleted,
  84            and before releasing the objects. */
  85         MEMCHECKER(
  86                    memchecker_call(&opal_memchecker_base_mem_defined,
  87                                    recvreq->req_recv.req_base.req_addr,
  88                                    recvreq->req_recv.req_base.req_count,
  89                                    recvreq->req_recv.req_base.req_datatype);
  90                    );
  91 
  92         MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq );
  93     }
  94 
  95     *request = MPI_REQUEST_NULL;
  96     return OMPI_SUCCESS;
  97 }
  98 
  99 static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request, int complete)
 100 {
 101     mca_pml_ob1_recv_request_t* request = (mca_pml_ob1_recv_request_t*)ompi_request;
 102     ompi_communicator_t *comm = request->req_recv.req_base.req_comm;
 103     mca_pml_ob1_comm_t *ob1_comm = comm->c_pml_comm;
 104 
 105     /* The rest should be protected behind the match logic lock */
 106     OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
 107     if( true == request->req_match_received ) { /* way to late to cancel this one */
 108         OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
 109         assert( OMPI_ANY_TAG != ompi_request->req_status.MPI_TAG ); /* not matched isn't it */
 110         return OMPI_SUCCESS;
 111     }
 112 
 113 #if MCA_PML_OB1_CUSTOM_MATCH
 114     custom_match_prq_cancel(ob1_comm->prq, request);
 115 #else
 116     if( request->req_recv.req_base.req_peer == OMPI_ANY_SOURCE ) {
 117         opal_list_remove_item( &ob1_comm->wild_receives, (opal_list_item_t*)request );
 118     } else {
 119         mca_pml_ob1_comm_proc_t* proc = mca_pml_ob1_peer_lookup (comm, request->req_recv.req_base.req_peer);
 120         opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request);
 121     }
 122 #endif
 123     PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q,
 124                              &(request->req_recv.req_base), PERUSE_RECV );
 125     /**
 126      * As now the PML is done with this request we have to force the pml_complete
 127      * to true. Otherwise, the request will never be freed.
 128      */
 129     request->req_recv.req_base.req_pml_complete = true;
 130     OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
 131 
 132     ompi_request->req_status._cancelled = true;
 133     /* This macro will set the req_complete to true so the MPI Test/Wait* functions
 134      * on this request will be able to complete. As the status is marked as
 135      * cancelled the cancel state will be detected.
 136      */
 137     MCA_PML_OB1_RECV_REQUEST_MPI_COMPLETE(request);
 138     /*
 139      * Receive request cancelled, make user buffer accessible.
 140      */
 141     MEMCHECKER(
 142         memchecker_call(&opal_memchecker_base_mem_defined,
 143                         request->req_recv.req_base.req_addr,
 144                         request->req_recv.req_base.req_count,
 145                         request->req_recv.req_base.req_datatype);
 146     );
 147     return OMPI_SUCCESS;
 148 }
 149 
 150 static void mca_pml_ob1_recv_request_construct(mca_pml_ob1_recv_request_t* request)
 151 {
 152     /* the request type is set by the superclass */
 153     request->req_recv.req_base.req_ompi.req_start = mca_pml_ob1_start;
 154     request->req_recv.req_base.req_ompi.req_free = mca_pml_ob1_recv_request_free;
 155     request->req_recv.req_base.req_ompi.req_cancel = mca_pml_ob1_recv_request_cancel;
 156     request->req_rdma_cnt = 0;
 157     request->local_handle = NULL;
 158     OBJ_CONSTRUCT(&request->lock, opal_mutex_t);
 159 }
 160 
 161 static void mca_pml_ob1_recv_request_destruct(mca_pml_ob1_recv_request_t* request)
 162 {
 163     OBJ_DESTRUCT(&request->lock);
 164     if (OPAL_UNLIKELY(request->local_handle)) {
 165         mca_bml_base_deregister_mem (request->rdma_bml, request->local_handle);
 166         request->local_handle = NULL;
 167     }
 168 }
 169 
 170 OBJ_CLASS_INSTANCE(
 171     mca_pml_ob1_recv_request_t,
 172     mca_pml_base_recv_request_t,
 173     mca_pml_ob1_recv_request_construct,
 174     mca_pml_ob1_recv_request_destruct);
 175 
 176 
 177 /*
 178  * Release resources.
 179  */
 180 
 181 static void mca_pml_ob1_recv_ctl_completion( mca_btl_base_module_t* btl,
 182                                              struct mca_btl_base_endpoint_t* ep,
 183                                              struct mca_btl_base_descriptor_t* des,
 184                                              int status )
 185 {
 186     mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*)des->des_context;
 187 
 188     MCA_PML_OB1_PROGRESS_PENDING(bml_btl);
 189 }
 190 
 191 /*
 192  * Put operation has completed remotely - update request status
 193  */
 194 
 195 static void mca_pml_ob1_put_completion (mca_pml_ob1_rdma_frag_t *frag, int64_t rdma_size)
 196 {
 197     mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t *) frag->rdma_req;
 198     mca_bml_base_btl_t *bml_btl = frag->rdma_bml;
 199 
 200     OPAL_THREAD_ADD_FETCH32(&recvreq->req_pipeline_depth, -1);
 201 
 202     assert ((uint64_t) rdma_size == frag->rdma_length);
 203     MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
 204 
 205     if (OPAL_LIKELY(0 < rdma_size)) {
 206 
 207         /* check completion status */
 208         OPAL_THREAD_ADD_FETCH_SIZE_T(&recvreq->req_bytes_received, rdma_size);
 209         SPC_USER_OR_MPI(recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG, (ompi_spc_value_t)rdma_size,
 210                         OMPI_SPC_BYTES_RECEIVED_USER, OMPI_SPC_BYTES_RECEIVED_MPI);
 211         if (recv_request_pml_complete_check(recvreq) == false &&
 212             recvreq->req_rdma_offset < recvreq->req_send_offset) {
 213             /* schedule additional rdma operations */
 214             mca_pml_ob1_recv_request_schedule(recvreq, bml_btl);
 215         }
 216     }
 217 
 218     MCA_PML_OB1_PROGRESS_PENDING(bml_btl);
 219 }
 220 
 221 /*
 222  *
 223  */
 224 
 225 int mca_pml_ob1_recv_request_ack_send_btl(
 226         ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl,
 227         uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset,
 228         uint64_t size, bool nordma)
 229 {
 230     mca_btl_base_descriptor_t* des;
 231     mca_pml_ob1_ack_hdr_t* ack;
 232     int rc;
 233 
 234     /* allocate descriptor */
 235     mca_bml_base_alloc(bml_btl, &des, MCA_BTL_NO_ORDER,
 236                        sizeof(mca_pml_ob1_ack_hdr_t),
 237                        MCA_BTL_DES_FLAGS_PRIORITY | MCA_BTL_DES_FLAGS_BTL_OWNERSHIP |
 238                        MCA_BTL_DES_SEND_ALWAYS_CALLBACK | MCA_BTL_DES_FLAGS_SIGNAL);
 239     if( OPAL_UNLIKELY(NULL == des) ) {
 240         return OMPI_ERR_OUT_OF_RESOURCE;
 241     }
 242 
 243     /* fill out header */
 244     ack = (mca_pml_ob1_ack_hdr_t*)des->des_segments->seg_addr.pval;
 245     mca_pml_ob1_ack_hdr_prepare (ack, nordma ? MCA_PML_OB1_HDR_FLAGS_NORDMA : 0,
 246                                  hdr_src_req, hdr_dst_req, hdr_send_offset, size);
 247 
 248     ob1_hdr_hton(ack, MCA_PML_OB1_HDR_TYPE_ACK, proc);
 249 
 250     /* initialize descriptor */
 251     des->des_cbfunc = mca_pml_ob1_recv_ctl_completion;
 252 
 253     rc = mca_bml_base_send(bml_btl, des, MCA_PML_OB1_HDR_TYPE_ACK);
 254     SPC_RECORD(OMPI_SPC_BYTES_RECEIVED_MPI, (ompi_spc_value_t)size);
 255     if( OPAL_LIKELY( rc >= 0 ) ) {
 256         return OMPI_SUCCESS;
 257     }
 258     mca_bml_base_free(bml_btl, des);
 259     return OMPI_ERR_OUT_OF_RESOURCE;
 260 }
 261 
 262 static int mca_pml_ob1_recv_request_ack(
 263     mca_pml_ob1_recv_request_t* recvreq,
 264     mca_pml_ob1_rendezvous_hdr_t* hdr,
 265     size_t bytes_received)
 266 {
 267     ompi_proc_t* proc = (ompi_proc_t*)recvreq->req_recv.req_base.req_proc;
 268     mca_bml_base_endpoint_t* bml_endpoint = NULL;
 269 
 270     bml_endpoint = mca_bml_base_get_endpoint (proc);
 271 
 272     /* by default copy everything */
 273     recvreq->req_send_offset = bytes_received;
 274     if(hdr->hdr_msg_length > bytes_received) {
 275         size_t rdma_num = mca_pml_ob1_rdma_pipeline_btls_count (bml_endpoint);
 276         /*
 277          * lookup request buffer to determine if memory is already
 278          * registered.
 279          */
 280 
 281         if(opal_convertor_need_buffers(&recvreq->req_recv.req_base.req_convertor) == 0 &&
 282            hdr->hdr_match.hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_CONTIG &&
 283            rdma_num != 0) {
 284             unsigned char *base;
 285             opal_convertor_get_current_pointer( &recvreq->req_recv.req_base.req_convertor, (void**)&(base) );
 286 
 287             if(hdr->hdr_match.hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PIN)
 288                 recvreq->req_rdma_cnt = mca_pml_ob1_rdma_btls(bml_endpoint,
 289                         base, recvreq->req_recv.req_bytes_packed,
 290                         recvreq->req_rdma );
 291             else
 292                 recvreq->req_rdma_cnt = 0;
 293 
 294             /* memory is already registered on both sides */
 295             if (recvreq->req_rdma_cnt != 0) {
 296                 recvreq->req_send_offset = hdr->hdr_msg_length;
 297                 /* are rdma devices available for long rdma protocol */
 298             } else if(bml_endpoint->btl_send_limit < hdr->hdr_msg_length) {
 299                 /* use convertor to figure out the rdma offset for this request */
 300                 recvreq->req_send_offset = hdr->hdr_msg_length -
 301                     bml_endpoint->btl_pipeline_send_length;
 302 
 303                 if(recvreq->req_send_offset < bytes_received)
 304                     recvreq->req_send_offset = bytes_received;
 305 
 306                 /* use converter to figure out the rdma offset for this
 307                  * request */
 308                 opal_convertor_set_position(&recvreq->req_recv.req_base.req_convertor,
 309                         &recvreq->req_send_offset);
 310 
 311                 recvreq->req_rdma_cnt =
 312                     mca_pml_ob1_rdma_pipeline_btls(bml_endpoint,
 313                             recvreq->req_send_offset - bytes_received,
 314                             recvreq->req_rdma);
 315             }
 316         }
 317         /* nothing to send by copy in/out - no need to ack */
 318         if(recvreq->req_send_offset == hdr->hdr_msg_length)
 319             return OMPI_SUCCESS;
 320     }
 321 
 322     /* let know to shedule function there is no need to put ACK flag. If not all message went over
 323      * RDMA then we cancel the GET protocol in order to switch back to send/recv. In this case send
 324      * back the remote send request, the peer kept a poointer to the frag locally. In the future we
 325      * might want to cancel the fragment itself, in which case we will have to send back the remote
 326      * fragment instead of the remote request.
 327      */
 328     recvreq->req_ack_sent = true;
 329     return mca_pml_ob1_recv_request_ack_send(proc, hdr->hdr_src_req.lval,
 330                                              recvreq, recvreq->req_send_offset, 0,
 331                                              recvreq->req_send_offset == bytes_received);
 332 }
 333 
 334 static int mca_pml_ob1_recv_request_put_frag (mca_pml_ob1_rdma_frag_t *frag);
 335 
 336 static int mca_pml_ob1_recv_request_get_frag_failed (mca_pml_ob1_rdma_frag_t *frag, int rc)
 337 {
 338     mca_pml_ob1_recv_request_t *recvreq = (mca_pml_ob1_recv_request_t *) frag->rdma_req;
 339     ompi_proc_t *proc = (ompi_proc_t *) recvreq->req_recv.req_base.req_proc;
 340 
 341     if (OMPI_ERR_NOT_AVAILABLE == rc) {
 342         /* get isn't supported for this transfer. tell peer to fallback on put */
 343         rc = mca_pml_ob1_recv_request_put_frag (frag);
 344         if (OMPI_SUCCESS == rc){
 345             return OMPI_SUCCESS;
 346         } else if (OMPI_ERR_OUT_OF_RESOURCE == rc) {
 347             OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
 348             opal_list_append (&mca_pml_ob1.rdma_pending, (opal_list_item_t*)frag);
 349             OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
 350 
 351             return OMPI_SUCCESS;
 352         }
 353     }
 354 
 355     if (++frag->retries < mca_pml_ob1.rdma_retries_limit &&
 356         OMPI_ERR_OUT_OF_RESOURCE == rc) {
 357         OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
 358         opal_list_append(&mca_pml_ob1.rdma_pending, (opal_list_item_t*)frag);
 359         OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
 360 
 361         return OMPI_SUCCESS;
 362     }
 363 
 364     /* tell peer to fall back on send for this region */
 365     rc = mca_pml_ob1_recv_request_ack_send(proc, frag->rdma_hdr.hdr_rget.hdr_rndv.hdr_src_req.lval,
 366                                            recvreq, frag->rdma_offset, frag->rdma_length, false);
 367     MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
 368     return rc;
 369 }
 370 
 371 /**
 372  * Return resources used by the RDMA
 373  */
 374 
 375 static void mca_pml_ob1_rget_completion (mca_btl_base_module_t* btl, struct mca_btl_base_endpoint_t* ep,
 376                                          void *local_address, mca_btl_base_registration_handle_t *local_handle,
 377                                          void *context, void *cbdata, int status)
 378 {
 379     mca_bml_base_btl_t *bml_btl = (mca_bml_base_btl_t *) context;
 380     mca_pml_ob1_rdma_frag_t *frag = (mca_pml_ob1_rdma_frag_t *) cbdata;
 381     mca_pml_ob1_recv_request_t *recvreq = (mca_pml_ob1_recv_request_t *) frag->rdma_req;
 382 
 383     /* check completion status */
 384     if (OPAL_UNLIKELY(OMPI_SUCCESS != status)) {
 385         status = mca_pml_ob1_recv_request_get_frag_failed (frag, status);
 386         if (OPAL_UNLIKELY(OMPI_SUCCESS != status)) {
 387             /* TSW - FIX */
 388             OMPI_ERROR_LOG(status);
 389             ompi_rte_abort(-1, NULL);
 390         }
 391     } else {
 392         /* is receive request complete */
 393         OPAL_THREAD_ADD_FETCH_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length);
 394         SPC_USER_OR_MPI(recvreq->req_recv.req_base.req_tag, (ompi_spc_value_t)frag->rdma_length,
 395                         OMPI_SPC_BYTES_RECEIVED_USER, OMPI_SPC_BYTES_RECEIVED_MPI);
 396         /* TODO: re-add order */
 397         mca_pml_ob1_send_fin (recvreq->req_recv.req_base.req_proc,
 398                               bml_btl, frag->rdma_hdr.hdr_rget.hdr_frag,
 399                               frag->rdma_length, 0, 0);
 400 
 401         recv_request_pml_complete_check(recvreq);
 402 
 403         MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
 404     }
 405 
 406     MCA_PML_OB1_PROGRESS_PENDING(bml_btl);
 407 }
 408 
 409 
 410 static int mca_pml_ob1_recv_request_put_frag (mca_pml_ob1_rdma_frag_t *frag)
 411 {
 412     mca_pml_ob1_recv_request_t *recvreq = (mca_pml_ob1_recv_request_t *) frag->rdma_req;
 413 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 414     ompi_proc_t* proc = (ompi_proc_t*)recvreq->req_recv.req_base.req_proc;
 415 #endif
 416     mca_btl_base_registration_handle_t *local_handle = NULL;
 417     mca_bml_base_btl_t *bml_btl = frag->rdma_bml;
 418     mca_btl_base_descriptor_t *ctl;
 419     mca_pml_ob1_rdma_hdr_t *hdr;
 420     size_t reg_size;
 421     int rc;
 422 
 423     reg_size = bml_btl->btl->btl_registration_handle_size;
 424 
 425     if (frag->local_handle) {
 426         local_handle = frag->local_handle;
 427     } else if (recvreq->local_handle) {
 428         local_handle = recvreq->local_handle;
 429     }
 430 
 431     /* prepare a descriptor for rdma control message */
 432     mca_bml_base_alloc (bml_btl, &ctl, MCA_BTL_NO_ORDER, sizeof (mca_pml_ob1_rdma_hdr_t) + reg_size,
 433                         MCA_BTL_DES_FLAGS_PRIORITY | MCA_BTL_DES_FLAGS_BTL_OWNERSHIP |
 434                         MCA_BTL_DES_SEND_ALWAYS_CALLBACK | MCA_BTL_DES_FLAGS_SIGNAL);
 435     if (OPAL_UNLIKELY(NULL == ctl)) {
 436         return OMPI_ERR_OUT_OF_RESOURCE;
 437     }
 438     ctl->des_cbfunc = mca_pml_ob1_recv_ctl_completion;
 439 
 440     /* fill in rdma header */
 441     hdr = (mca_pml_ob1_rdma_hdr_t *) ctl->des_segments->seg_addr.pval;
 442     mca_pml_ob1_rdma_hdr_prepare (hdr, (!recvreq->req_ack_sent) ? MCA_PML_OB1_HDR_TYPE_ACK : 0,
 443                                   recvreq->remote_req_send.lval, frag, recvreq, frag->rdma_offset,
 444                                   frag->local_address, frag->rdma_length, local_handle,
 445                                   reg_size);
 446     ob1_hdr_hton(hdr, MCA_PML_OB1_HDR_TYPE_PUT, proc);
 447 
 448     frag->cbfunc = mca_pml_ob1_put_completion;
 449 
 450     recvreq->req_ack_sent = true;
 451 
 452     PERUSE_TRACE_COMM_OMPI_EVENT( PERUSE_COMM_REQ_XFER_CONTINUE,
 453                                   &(recvreq->req_recv.req_base), frag->rdma_length,
 454                                   PERUSE_RECV);
 455 
 456     /* send rdma request to peer */
 457     rc = mca_bml_base_send (bml_btl, ctl, MCA_PML_OB1_HDR_TYPE_PUT);
 458     /* Increment counter for bytes_put even though they probably haven't all been received yet */
 459     SPC_RECORD(OMPI_SPC_BYTES_PUT, (ompi_spc_value_t)frag->rdma_length);
 460     if (OPAL_UNLIKELY(rc < 0)) {
 461         mca_bml_base_free (bml_btl, ctl);
 462         return rc;
 463     }
 464 
 465     return OMPI_SUCCESS;
 466 }
 467 
 468 /*
 469  *
 470  */
 471 int mca_pml_ob1_recv_request_get_frag (mca_pml_ob1_rdma_frag_t *frag)
 472 {
 473     mca_pml_ob1_recv_request_t *recvreq = (mca_pml_ob1_recv_request_t *) frag->rdma_req;
 474     mca_btl_base_registration_handle_t *local_handle = NULL;
 475     mca_bml_base_btl_t *bml_btl = frag->rdma_bml;
 476     int rc;
 477 
 478     /* prepare descriptor */
 479     if (bml_btl->btl->btl_register_mem && !frag->local_handle && !recvreq->local_handle) {
 480         mca_bml_base_register_mem (bml_btl, frag->local_address, frag->rdma_length, MCA_BTL_REG_FLAG_LOCAL_WRITE |
 481                                    MCA_BTL_REG_FLAG_REMOTE_WRITE, &frag->local_handle);
 482         if (OPAL_UNLIKELY(NULL == frag->local_handle)) {
 483             return mca_pml_ob1_recv_request_get_frag_failed (frag, OMPI_ERR_OUT_OF_RESOURCE);
 484         }
 485     }
 486 
 487     if (frag->local_handle) {
 488         local_handle = frag->local_handle;
 489     } else if (recvreq->local_handle) {
 490         local_handle = recvreq->local_handle;
 491     }
 492 
 493     PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE,
 494                                  &(((mca_pml_ob1_recv_request_t *) frag->rdma_req)->req_recv.req_base),
 495                                  frag->rdma_length, PERUSE_RECV);
 496 
 497     /* queue up get request */
 498     rc = mca_bml_base_get (bml_btl, frag->local_address, frag->remote_address, local_handle,
 499                            (mca_btl_base_registration_handle_t *) frag->remote_handle, frag->rdma_length,
 500                            0, MCA_BTL_NO_ORDER, mca_pml_ob1_rget_completion, frag);
 501     /* Increment counter for bytes_get even though they probably haven't all been received yet */
 502     SPC_RECORD(OMPI_SPC_BYTES_GET, (ompi_spc_value_t)frag->rdma_length);
 503     if( OPAL_UNLIKELY(OMPI_SUCCESS > rc) ) {
 504         return mca_pml_ob1_recv_request_get_frag_failed (frag, OMPI_ERR_OUT_OF_RESOURCE);
 505     }
 506 
 507     return OMPI_SUCCESS;
 508 }
 509 
 510 
 511 
 512 
 513 /*
 514  * Update the recv request status to reflect the number of bytes
 515  * received and actually delivered to the application.
 516  */
 517 
 518 void mca_pml_ob1_recv_request_progress_frag( mca_pml_ob1_recv_request_t* recvreq,
 519                                              mca_btl_base_module_t* btl,
 520                                              mca_btl_base_segment_t* segments,
 521                                              size_t num_segments )
 522 {
 523     size_t bytes_received, data_offset = 0;
 524     size_t bytes_delivered __opal_attribute_unused__; /* is being set to zero in MCA_PML_OB1_RECV_REQUEST_UNPACK */
 525     mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
 526 
 527     bytes_received = mca_pml_ob1_compute_segment_length_base (segments, num_segments,
 528                                                               sizeof(mca_pml_ob1_frag_hdr_t));
 529     data_offset     = hdr->hdr_frag.hdr_frag_offset;
 530 
 531     /*
 532      *  Make user buffer accessible(defined) before unpacking.
 533      */
 534     MEMCHECKER(
 535                memchecker_call(&opal_memchecker_base_mem_defined,
 536                                recvreq->req_recv.req_base.req_addr,
 537                                recvreq->req_recv.req_base.req_count,
 538                                recvreq->req_recv.req_base.req_datatype);
 539                );
 540     MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
 541                                      segments,
 542                                      num_segments,
 543                                      sizeof(mca_pml_ob1_frag_hdr_t),
 544                                      data_offset,
 545                                      bytes_received,
 546                                      bytes_delivered );
 547     /*
 548      *  Unpacking finished, make the user buffer unaccessable again.
 549      */
 550     MEMCHECKER(
 551                memchecker_call(&opal_memchecker_base_mem_noaccess,
 552                                recvreq->req_recv.req_base.req_addr,
 553                                recvreq->req_recv.req_base.req_count,
 554                                recvreq->req_recv.req_base.req_datatype);
 555                );
 556 
 557     OPAL_THREAD_ADD_FETCH_SIZE_T(&recvreq->req_bytes_received, bytes_received);
 558     SPC_USER_OR_MPI(recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG, (ompi_spc_value_t)bytes_received,
 559                     OMPI_SPC_BYTES_RECEIVED_USER, OMPI_SPC_BYTES_RECEIVED_MPI);
 560     /* check completion status */
 561     if(recv_request_pml_complete_check(recvreq) == false &&
 562             recvreq->req_rdma_offset < recvreq->req_send_offset) {
 563         /* schedule additional rdma operations */
 564         mca_pml_ob1_recv_request_schedule(recvreq, NULL);
 565     }
 566 }
 567 
 568 #if OPAL_CUDA_SUPPORT /* CUDA_ASYNC_RECV */
 569 /**
 570  * This function is basically the first half of the code in the
 571  * mca_pml_ob1_recv_request_progress_frag function.  This fires off
 572  * the asynchronous copy and returns.  Unused fields in the descriptor
 573  * are used to pass extra information for when the asynchronous copy
 574  * completes.  No memchecker support in this function as copies are
 575  * happening asynchronously.
 576  */
 577 void mca_pml_ob1_recv_request_frag_copy_start( mca_pml_ob1_recv_request_t* recvreq,
 578                                                mca_btl_base_module_t* btl,
 579                                                mca_btl_base_segment_t* segments,
 580                                                size_t num_segments,
 581                                                mca_btl_base_descriptor_t* des)
 582 {
 583     int result;
 584     size_t bytes_received = 0, data_offset = 0;
 585     size_t bytes_delivered __opal_attribute_unused__; /* is being set to zero in MCA_PML_OB1_RECV_REQUEST_UNPACK */
 586     mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
 587 
 588     OPAL_OUTPUT((-1, "start_frag_copy frag=%p", (void *)des));
 589 
 590     bytes_received = mca_pml_ob1_compute_segment_length_base (segments, num_segments,
 591                                                               sizeof(mca_pml_ob1_frag_hdr_t));
 592     data_offset     = hdr->hdr_frag.hdr_frag_offset;
 593 
 594     MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
 595                                      segments,
 596                                      num_segments,
 597                                      sizeof(mca_pml_ob1_frag_hdr_t),
 598                                      data_offset,
 599                                      bytes_received,
 600                                      bytes_delivered );
 601     /* Store the receive request in unused context pointer. */
 602     des->des_context = (void *)recvreq;
 603     /* Store the amount of bytes in unused cbdata pointer */
 604     des->des_cbdata = (void *) (intptr_t) bytes_delivered;
 605     /* Then record an event that will get triggered by a PML progress call which
 606      * checks the stream events.  If we get an error, abort.  Should get message
 607      * from CUDA code about what went wrong. */
 608     result = mca_common_cuda_record_htod_event("pml", des);
 609     if (OMPI_SUCCESS != result) {
 610         opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
 611         ompi_rte_abort(-1, NULL);
 612     }
 613 }
 614 
 615 /**
 616  * This function is basically the second half of the code in the
 617  * mca_pml_ob1_recv_request_progress_frag function.  The number of
 618  * bytes delivered is updated.  Then a call is made into the BTL so it
 619  * can free the fragment that held that data.  This is currently
 620  * called directly by the common CUDA code. No memchecker support
 621  * in this function as copies are happening asynchronously.
 622  */
 623 void mca_pml_ob1_recv_request_frag_copy_finished( mca_btl_base_module_t* btl,
 624                                                   struct mca_btl_base_endpoint_t* ep,
 625                                                   struct mca_btl_base_descriptor_t* des,
 626                                                   int status )
 627 {
 628     mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t*)des->des_context;
 629     size_t bytes_received = (size_t) (intptr_t) des->des_cbdata;
 630 
 631     OPAL_OUTPUT((-1, "frag_copy_finished (delivered=%d), frag=%p", (int)bytes_received, (void *)des));
 632     /* Call into the BTL so it can free the descriptor.  At this point, it is
 633      * known that the data has been copied out of the descriptor. */
 634     des->des_cbfunc(NULL, NULL, des, 0);
 635 
 636     OPAL_THREAD_ADD_FETCH_SIZE_T(&recvreq->req_bytes_received, bytes_received);
 637     SPC_USER_OR_MPI(recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG, (ompi_spc_value_t)bytes_received,
 638                     OMPI_SPC_BYTES_RECEIVED_USER, OMPI_SPC_BYTES_RECEIVED_MPI);
 639     /* check completion status */
 640     if(recv_request_pml_complete_check(recvreq) == false &&
 641             recvreq->req_rdma_offset < recvreq->req_send_offset) {
 642         /* schedule additional rdma operations */
 643         mca_pml_ob1_recv_request_schedule(recvreq, NULL);
 644     }
 645 }
 646 #endif /* OPAL_CUDA_SUPPORT */
 647 
 648 /*
 649  * Update the recv request status to reflect the number of bytes
 650  * received and actually delivered to the application.
 651  */
 652 
 653 void mca_pml_ob1_recv_request_progress_rget( mca_pml_ob1_recv_request_t* recvreq,
 654                                              mca_btl_base_module_t* btl,
 655                                              mca_btl_base_segment_t* segments,
 656                                              size_t num_segments )
 657 {
 658     mca_pml_ob1_rget_hdr_t* hdr = (mca_pml_ob1_rget_hdr_t*)segments->seg_addr.pval;
 659     mca_bml_base_endpoint_t* bml_endpoint = NULL;
 660     size_t bytes_remaining, prev_sent, offset;
 661     mca_pml_ob1_rdma_frag_t *frag;
 662     mca_bml_base_btl_t *rdma_bml;
 663     int rc;
 664 
 665     prev_sent = offset = 0;
 666     recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
 667     recvreq->req_send_offset = 0;
 668     recvreq->req_rdma_offset = 0;
 669 
 670     MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_rndv.hdr_match);
 671 
 672     /* if receive buffer is not contiguous we can't just RDMA read into it, so
 673      * fall back to copy in/out protocol. It is a pity because buffer on the
 674      * sender side is already registered. We need to be smarter here, perhaps
 675      * do couple of RDMA reads */
 676     if (opal_convertor_need_buffers(&recvreq->req_recv.req_base.req_convertor) == true) {
 677 #if OPAL_CUDA_SUPPORT
 678         if (mca_pml_ob1_cuda_need_buffers(recvreq, btl))
 679 #endif /* OPAL_CUDA_SUPPORT */
 680         {
 681             mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, 0);
 682             return;
 683         }
 684     }
 685 
 686     /* lookup bml datastructures */
 687     bml_endpoint = mca_bml_base_get_endpoint (recvreq->req_recv.req_base.req_proc);
 688     rdma_bml = mca_bml_base_btl_array_find(&bml_endpoint->btl_rdma, btl);
 689 
 690 #if OPAL_CUDA_SUPPORT
 691     if (OPAL_UNLIKELY(NULL == rdma_bml)) {
 692         if (recvreq->req_recv.req_base.req_convertor.flags & CONVERTOR_CUDA) {
 693             mca_bml_base_btl_t *bml_btl;
 694             bml_btl = mca_bml_base_btl_array_find(&bml_endpoint->btl_send, btl);
 695             /* Check to see if this is a CUDA get */
 696             if (bml_btl->btl_flags & MCA_BTL_FLAGS_CUDA_GET) {
 697                 rdma_bml = bml_btl;
 698             }
 699         } else {
 700             /* Just default back to send and receive.  Must be mix of GPU and HOST memory. */
 701             mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, 0);
 702             return;
 703         }
 704     }
 705 #endif /* OPAL_CUDA_SUPPORT */
 706 
 707     if (OPAL_UNLIKELY(NULL == rdma_bml)) {
 708         opal_output(0, "[%s:%d] invalid bml for rdma get", __FILE__, __LINE__);
 709         ompi_rte_abort(-1, NULL);
 710     }
 711 
 712     bytes_remaining = hdr->hdr_rndv.hdr_msg_length;
 713 
 714     /* save the request for put fallback */
 715     recvreq->remote_req_send = hdr->hdr_rndv.hdr_src_req;
 716     recvreq->rdma_bml = rdma_bml;
 717 
 718     /* try to register the entire buffer */
 719     if (rdma_bml->btl->btl_register_mem) {
 720         void *data_ptr;
 721         uint32_t flags = MCA_BTL_REG_FLAG_LOCAL_WRITE | MCA_BTL_REG_FLAG_REMOTE_WRITE;
 722 #if OPAL_CUDA_GDR_SUPPORT
 723         if (recvreq->req_recv.req_base.req_convertor.flags & CONVERTOR_CUDA) {
 724             flags |= MCA_BTL_REG_FLAG_CUDA_GPU_MEM;
 725         }
 726 #endif /* OPAL_CUDA_GDR_SUPPORT */
 727 
 728 
 729         offset = 0;
 730 
 731         OPAL_THREAD_LOCK(&recvreq->lock);
 732         opal_convertor_set_position( &recvreq->req_recv.req_base.req_convertor, &offset);
 733         opal_convertor_get_current_pointer (&recvreq->req_recv.req_base.req_convertor, &data_ptr);
 734         OPAL_THREAD_UNLOCK(&recvreq->lock);
 735 
 736         mca_bml_base_register_mem (rdma_bml, data_ptr, bytes_remaining, flags, &recvreq->local_handle);
 737         /* It is not an error if the memory region can not be registered here. The registration will
 738          * be attempted again for each get fragment. */
 739     }
 740 
 741     /* The while loop adds a fragmentation mechanism. The variable bytes_remaining holds the num
 742      * of bytes left to be send. In each iteration we send the max possible bytes supported
 743      * by the HCA. The field  frag->rdma_length holds the actual num of  bytes that were
 744      * sent in each iteration. We subtract this number from bytes_remaining and continue to
 745      * the next iteration with the updated size.
 746      * Also - In each iteration we update the location in the buffer to be used for writing
 747      * the message ,and the location to read from. This is done using the offset variable that
 748      * accumulates the number of bytes that were sent so far.
 749      *
 750      * NTH: This fragmentation may go away if we change the btls to require them to handle
 751      * get fragmentation internally. This is a reasonable solution since some btls do not
 752      * need any fragmentation (sm, vader, self, etc). Remove this loop if this ends up
 753      * being the case. */
 754     while (bytes_remaining > 0) {
 755         /* allocate/initialize a fragment */
 756         MCA_PML_OB1_RDMA_FRAG_ALLOC(frag);
 757         if (OPAL_UNLIKELY(NULL == frag)) {
 758             /* GLB - FIX */
 759              OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
 760              ompi_rte_abort(-1, NULL);
 761         }
 762 
 763         memcpy (frag->remote_handle, hdr + 1, btl->btl_registration_handle_size);
 764 
 765         /* update the read location */
 766         frag->remote_address = hdr->hdr_src_ptr + offset;
 767 
 768         /* updating the write location */
 769         OPAL_THREAD_LOCK(&recvreq->lock);
 770         opal_convertor_set_position( &recvreq->req_recv.req_base.req_convertor, &offset);
 771         opal_convertor_get_current_pointer (&recvreq->req_recv.req_base.req_convertor, &frag->local_address);
 772         OPAL_THREAD_UNLOCK(&recvreq->lock);
 773 
 774         frag->rdma_bml = rdma_bml;
 775 
 776         frag->rdma_hdr.hdr_rget = *hdr;
 777         frag->retries       = 0;
 778         frag->rdma_req      = recvreq;
 779         frag->rdma_state    = MCA_PML_OB1_RDMA_GET;
 780         frag->local_handle  = NULL;
 781         frag->rdma_offset   = offset;
 782 
 783         if (bytes_remaining > rdma_bml->btl->btl_get_limit) {
 784             frag->rdma_length = rdma_bml->btl->btl_get_limit;
 785         } else {
 786             frag->rdma_length = bytes_remaining;
 787         }
 788 
 789         prev_sent = frag->rdma_length;
 790 
 791         /* NTH: TODO -- handle error conditions gracefully */
 792         rc = mca_pml_ob1_recv_request_get_frag(frag);
 793         if (OMPI_SUCCESS != rc) {
 794             break;
 795         }
 796 
 797         bytes_remaining -= prev_sent;
 798         offset += prev_sent;
 799     }
 800 }
 801 
 802 /*
 803  * Update the recv request status to reflect the number of bytes
 804  * received and actually delivered to the application.
 805  */
 806 
 807 void mca_pml_ob1_recv_request_progress_rndv( mca_pml_ob1_recv_request_t* recvreq,
 808                                              mca_btl_base_module_t* btl,
 809                                              mca_btl_base_segment_t* segments,
 810                                              size_t num_segments )
 811 {
 812     size_t bytes_received = 0;
 813     size_t bytes_delivered __opal_attribute_unused__; /* is being set to zero in MCA_PML_OB1_RECV_REQUEST_UNPACK */
 814     size_t data_offset = 0;
 815     mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
 816 
 817     bytes_received = mca_pml_ob1_compute_segment_length_base (segments, num_segments,
 818                                                               sizeof(mca_pml_ob1_rendezvous_hdr_t));
 819 
 820     recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
 821     recvreq->remote_req_send = hdr->hdr_rndv.hdr_src_req;
 822     recvreq->req_rdma_offset = bytes_received;
 823     MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_match);
 824     mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received);
 825     /**
 826      * The PUT protocol do not attach any data to the original request.
 827      * Therefore, we might want to avoid unpacking if there is nothing to
 828      * unpack.
 829      */
 830     if( 0 < bytes_received ) {
 831         MEMCHECKER(
 832                    memchecker_call(&opal_memchecker_base_mem_defined,
 833                                    recvreq->req_recv.req_base.req_addr,
 834                                    recvreq->req_recv.req_base.req_count,
 835                                    recvreq->req_recv.req_base.req_datatype);
 836                    );
 837         MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
 838                                          segments,
 839                                          num_segments,
 840                                          sizeof(mca_pml_ob1_rendezvous_hdr_t),
 841                                          data_offset,
 842                                          bytes_received,
 843                                          bytes_delivered );
 844         MEMCHECKER(
 845                    memchecker_call(&opal_memchecker_base_mem_noaccess,
 846                                    recvreq->req_recv.req_base.req_addr,
 847                                    recvreq->req_recv.req_base.req_count,
 848                                    recvreq->req_recv.req_base.req_datatype);
 849                    );
 850         OPAL_THREAD_ADD_FETCH_SIZE_T(&recvreq->req_bytes_received, bytes_received);
 851         SPC_USER_OR_MPI(recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG, (ompi_spc_value_t)bytes_received,
 852                         OMPI_SPC_BYTES_RECEIVED_USER, OMPI_SPC_BYTES_RECEIVED_MPI);
 853     }
 854     /* check completion status */
 855     if(recv_request_pml_complete_check(recvreq) == false &&
 856        recvreq->req_rdma_offset < recvreq->req_send_offset) {
 857         /* schedule additional rdma operations */
 858         mca_pml_ob1_recv_request_schedule(recvreq, NULL);
 859     }
 860 
 861 #if OPAL_CUDA_SUPPORT /* CUDA_ASYNC_RECV */
 862     /* If BTL supports it and this is a CUDA buffer being received into,
 863      * have all subsequent FRAGS copied in asynchronously. */
 864     if ((recvreq->req_recv.req_base.req_convertor.flags & CONVERTOR_CUDA) &&
 865         (btl->btl_flags & MCA_BTL_FLAGS_CUDA_COPY_ASYNC_RECV)) {
 866         void *strm = mca_common_cuda_get_htod_stream();
 867         opal_cuda_set_copy_function_async(&recvreq->req_recv.req_base.req_convertor, strm);
 868     }
 869 #endif
 870 
 871 }
 872 
 873 /*
 874  * Update the recv request status to reflect the number of bytes
 875  * received and actually delivered to the application.
 876  */
 877 void mca_pml_ob1_recv_request_progress_match( mca_pml_ob1_recv_request_t* recvreq,
 878                                               mca_btl_base_module_t* btl,
 879                                               mca_btl_base_segment_t* segments,
 880                                               size_t num_segments )
 881 {
 882     size_t bytes_received, data_offset = 0;
 883     size_t bytes_delivered __opal_attribute_unused__; /* is being set to zero in MCA_PML_OB1_RECV_REQUEST_UNPACK */
 884     mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
 885 
 886     bytes_received = mca_pml_ob1_compute_segment_length_base (segments, num_segments,
 887                                                               OMPI_PML_OB1_MATCH_HDR_LEN);
 888 
 889     recvreq->req_recv.req_bytes_packed = bytes_received;
 890 
 891     MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_match);
 892     /*
 893      *  Make user buffer accessable(defined) before unpacking.
 894      */
 895     MEMCHECKER(
 896                memchecker_call(&opal_memchecker_base_mem_defined,
 897                                recvreq->req_recv.req_base.req_addr,
 898                                recvreq->req_recv.req_base.req_count,
 899                                recvreq->req_recv.req_base.req_datatype);
 900                );
 901     MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
 902                                      segments,
 903                                      num_segments,
 904                                      OMPI_PML_OB1_MATCH_HDR_LEN,
 905                                      data_offset,
 906                                      bytes_received,
 907                                      bytes_delivered);
 908     /*
 909      *  Unpacking finished, make the user buffer unaccessable again.
 910      */
 911     MEMCHECKER(
 912                memchecker_call(&opal_memchecker_base_mem_noaccess,
 913                                recvreq->req_recv.req_base.req_addr,
 914                                recvreq->req_recv.req_base.req_count,
 915                                recvreq->req_recv.req_base.req_datatype);
 916                );
 917 
 918     /*
 919      * No need for atomic here, as we know there is only one fragment
 920      * for this request.
 921      */
 922     recvreq->req_bytes_received += bytes_received;
 923     SPC_USER_OR_MPI(recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG, (ompi_spc_value_t)bytes_received,
 924                     OMPI_SPC_BYTES_RECEIVED_USER, OMPI_SPC_BYTES_RECEIVED_MPI);
 925     recv_request_pml_complete(recvreq);
 926 }
 927 
 928 
 929 /**
 930  * Handle completion of a probe request
 931  */
 932 
 933 void mca_pml_ob1_recv_request_matched_probe( mca_pml_ob1_recv_request_t* recvreq,
 934                                              mca_btl_base_module_t* btl,
 935                                              mca_btl_base_segment_t* segments,
 936                                              size_t num_segments )
 937 {
 938     size_t bytes_packed = 0;
 939     mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
 940 
 941     switch(hdr->hdr_common.hdr_type) {
 942         case MCA_PML_OB1_HDR_TYPE_MATCH:
 943             bytes_packed = mca_pml_ob1_compute_segment_length_base (segments, num_segments,
 944                                                                     OMPI_PML_OB1_MATCH_HDR_LEN);
 945             break;
 946         case MCA_PML_OB1_HDR_TYPE_RNDV:
 947         case MCA_PML_OB1_HDR_TYPE_RGET:
 948 
 949             bytes_packed = hdr->hdr_rndv.hdr_msg_length;
 950             break;
 951     }
 952 
 953     /* set completion status */
 954     recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag;
 955     recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src;
 956     recvreq->req_bytes_received = bytes_packed;
 957     recvreq->req_bytes_expected = bytes_packed;
 958 
 959     recv_request_pml_complete(recvreq);
 960 }
 961 
 962 
 963 /*
 964  * Schedule RDMA protocol.
 965  *
 966 */
 967 
 968 int mca_pml_ob1_recv_request_schedule_once( mca_pml_ob1_recv_request_t* recvreq,
 969                                             mca_bml_base_btl_t *start_bml_btl )
 970 {
 971     mca_bml_base_btl_t* bml_btl;
 972     int num_tries = recvreq->req_rdma_cnt, num_fail = 0;
 973     size_t i, prev_bytes_remaining = 0;
 974     size_t bytes_remaining = recvreq->req_send_offset -
 975         recvreq->req_rdma_offset;
 976 
 977     /* if starting bml_btl is provided schedule next fragment on it first */
 978     if(start_bml_btl != NULL) {
 979         for(i = 0; i < recvreq->req_rdma_cnt; i++) {
 980             if(recvreq->req_rdma[i].bml_btl != start_bml_btl)
 981                 continue;
 982             /* something left to be send? */
 983             if( OPAL_LIKELY(recvreq->req_rdma[i].length) )
 984                 recvreq->req_rdma_idx = i;
 985             break;
 986         }
 987     }
 988 
 989     while(bytes_remaining > 0 &&
 990           recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth) {
 991         mca_pml_ob1_rdma_frag_t *frag = NULL;
 992         mca_btl_base_module_t *btl;
 993         int rc, rdma_idx;
 994         void *data_ptr;
 995         size_t size;
 996 
 997         if(prev_bytes_remaining == bytes_remaining) {
 998             if(++num_fail == num_tries) {
 999                 OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
1000                 if(false == recvreq->req_pending) {
1001                     opal_list_append(&mca_pml_ob1.recv_pending,
1002                             (opal_list_item_t*)recvreq);
1003                     recvreq->req_pending = true;
1004                 }
1005                 OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
1006                 return OMPI_ERR_OUT_OF_RESOURCE;
1007             }
1008         } else {
1009             num_fail = 0;
1010             prev_bytes_remaining = bytes_remaining;
1011         }
1012 
1013         do {
1014             rdma_idx = recvreq->req_rdma_idx;
1015             bml_btl = recvreq->req_rdma[rdma_idx].bml_btl;
1016             size = recvreq->req_rdma[rdma_idx].length;
1017             if(++recvreq->req_rdma_idx >= recvreq->req_rdma_cnt)
1018                 recvreq->req_rdma_idx = 0;
1019         } while(!size);
1020         btl = bml_btl->btl;
1021 
1022          /* NTH: Note: I feel this protocol needs work to better improve resource
1023           * usage when running with a leave pinned protocol. */
1024         /* GB: We should always abide by the BTL RDMA pipeline fragment limit (if one is set) */
1025         if ((btl->btl_rdma_pipeline_frag_size != 0) && (size > btl->btl_rdma_pipeline_frag_size)) {
1026             size = btl->btl_rdma_pipeline_frag_size;
1027         }
1028 
1029         MCA_PML_OB1_RDMA_FRAG_ALLOC(frag);
1030         if (OPAL_UNLIKELY(NULL == frag)) {
1031             continue;
1032         }
1033 
1034         /* take lock to protect convertor against concurrent access
1035          * from unpack */
1036         OPAL_THREAD_LOCK(&recvreq->lock);
1037         opal_convertor_set_position (&recvreq->req_recv.req_base.req_convertor,
1038                                      &recvreq->req_rdma_offset);
1039         opal_convertor_get_current_pointer (&recvreq->req_recv.req_base.req_convertor, &data_ptr);
1040         OPAL_THREAD_UNLOCK(&recvreq->lock);
1041 
1042         if (btl->btl_register_mem) {
1043             mca_bml_base_register_mem (bml_btl, data_ptr, size, MCA_BTL_REG_FLAG_REMOTE_WRITE,
1044                                        &frag->local_handle);
1045             if (OPAL_UNLIKELY(NULL == frag->local_handle)) {
1046                 MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
1047                 continue;
1048             }
1049         }
1050 
1051         /* fill in the minimum information needed to handle the fin message */
1052         frag->cbfunc        = mca_pml_ob1_put_completion;
1053         frag->rdma_length   = size;
1054         frag->rdma_req      = recvreq;
1055         frag->rdma_bml      = bml_btl;
1056         frag->local_address = data_ptr;
1057         frag->rdma_offset   = recvreq->req_rdma_offset;
1058 
1059         rc = mca_pml_ob1_recv_request_put_frag (frag);
1060         if (OPAL_LIKELY(OMPI_SUCCESS == rc)) {
1061             /* update request state */
1062             recvreq->req_rdma_offset += size;
1063             OPAL_THREAD_ADD_FETCH32(&recvreq->req_pipeline_depth, 1);
1064             recvreq->req_rdma[rdma_idx].length -= size;
1065             bytes_remaining -= size;
1066         } else {
1067             MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
1068         }
1069     }
1070 
1071     return OMPI_SUCCESS;
1072 }
1073 
1074 #define IS_PROB_REQ(R) \
1075     ((MCA_PML_REQUEST_IPROBE == (R)->req_recv.req_base.req_type) || \
1076      (MCA_PML_REQUEST_PROBE == (R)->req_recv.req_base.req_type) || \
1077      (MCA_PML_REQUEST_IMPROBE == (R)->req_recv.req_base.req_type) || \
1078      (MCA_PML_REQUEST_MPROBE == (R)->req_recv.req_base.req_type))
1079 #define IS_MPROB_REQ(R) \
1080     ((MCA_PML_REQUEST_IMPROBE == (R)->req_recv.req_base.req_type) || \
1081      (MCA_PML_REQUEST_MPROBE == (R)->req_recv.req_base.req_type))
1082 
1083 static inline void append_recv_req_to_queue(opal_list_t *queue,
1084         mca_pml_ob1_recv_request_t *req)
1085 {
1086     opal_list_append(queue, (opal_list_item_t*)req);
1087 
1088 #if OMPI_WANT_PERUSE
1089     /**
1090      * We don't want to generate this kind of event for MPI_Probe.
1091      */
1092     if (req->req_recv.req_base.req_type != MCA_PML_REQUEST_PROBE &&
1093         req->req_recv.req_base.req_type != MCA_PML_REQUEST_MPROBE) {
1094         PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_INSERT_IN_POSTED_Q,
1095                                 &(req->req_recv.req_base), PERUSE_RECV);
1096     }
1097 #endif
1098 }
1099 
1100 /*
1101  *  this routine tries to match a posted receive.  If a match is found,
1102  *  it places the request in the appropriate matched receive list. This
1103  *  function has to be called with the communicator matching lock held.
1104 */
1105 
1106 #if MCA_PML_OB1_CUSTOM_MATCH
1107 static mca_pml_ob1_recv_frag_t*
1108 recv_req_match_specific_proc( const mca_pml_ob1_recv_request_t *req,
1109                               mca_pml_ob1_comm_proc_t *proc,
1110                               custom_match_umq_node** hold_prev,
1111                               custom_match_umq_node** hold_elem,
1112                               int* hold_index)
1113 #else
1114 static mca_pml_ob1_recv_frag_t*
1115 recv_req_match_specific_proc( const mca_pml_ob1_recv_request_t *req,
1116                               mca_pml_ob1_comm_proc_t *proc )
1117 #endif
1118 {
1119     if (NULL == proc) {
1120         return NULL;
1121     }
1122 
1123 #if !MCA_PML_OB1_CUSTOM_MATCH
1124     int tag = req->req_recv.req_base.req_tag;
1125     opal_list_t* unexpected_frags = &proc->unexpected_frags;
1126     mca_pml_ob1_recv_frag_t* frag;
1127 
1128     if(opal_list_get_size(unexpected_frags) == 0) {
1129         return NULL;
1130     }
1131 
1132     if( OMPI_ANY_TAG == tag ) {
1133         OPAL_LIST_FOREACH(frag, unexpected_frags, mca_pml_ob1_recv_frag_t) {
1134             if( frag->hdr.hdr_match.hdr_tag >= 0 )
1135                 return frag;
1136         }
1137     } else {
1138         OPAL_LIST_FOREACH(frag, unexpected_frags, mca_pml_ob1_recv_frag_t) {
1139             if( frag->hdr.hdr_match.hdr_tag == tag )
1140                 return frag;
1141         }
1142     }
1143     return NULL;
1144 #else
1145     return custom_match_umq_find_verify_hold(req->req_recv.req_base.req_comm->c_pml_comm->umq,
1146                                              req->req_recv.req_base.req_tag,
1147                                              req->req_recv.req_base.req_peer,
1148                                              hold_prev, hold_elem, hold_index);
1149 #endif
1150 }
1151 
1152 /*
1153  * this routine is used to try and match a wild posted receive - where
1154  * wild is determined by the value assigned to the source process
1155 */
1156 #if MCA_PML_OB1_CUSTOM_MATCH
1157 static mca_pml_ob1_recv_frag_t*
1158 recv_req_match_wild( mca_pml_ob1_recv_request_t* req,
1159                      mca_pml_ob1_comm_proc_t **p,
1160                      custom_match_umq_node** hold_prev,
1161                      custom_match_umq_node** hold_elem,
1162                      int* hold_index)
1163 #else
1164 static mca_pml_ob1_recv_frag_t*
1165 recv_req_match_wild( mca_pml_ob1_recv_request_t* req,
1166                      mca_pml_ob1_comm_proc_t **p)
1167 #endif
1168 {
1169     mca_pml_ob1_comm_t* comm = req->req_recv.req_base.req_comm->c_pml_comm;
1170     mca_pml_ob1_comm_proc_t **procp = comm->procs;
1171 
1172 #if MCA_PML_OB1_CUSTOM_MATCH
1173     mca_pml_ob1_recv_frag_t* frag;
1174     frag = custom_match_umq_find_verify_hold (comm->umq, req->req_recv.req_base.req_tag,
1175                                               req->req_recv.req_base.req_peer,
1176                                               hold_prev, hold_elem, hold_index);
1177 
1178     if (frag) {
1179         *p = procp[frag->hdr.hdr_match.hdr_src];
1180         req->req_recv.req_base.req_proc = procp[frag->hdr.hdr_match.hdr_src]->ompi_proc;
1181         prepare_recv_req_converter(req);
1182     } else {
1183         *p = NULL;
1184     }
1185 
1186     return frag;
1187 #else
1188 
1189     /*
1190      * Loop over all the outstanding messages to find one that matches.
1191      * There is an outer loop over lists of messages from each
1192      * process, then an inner loop over the messages from the
1193      * process.
1194      *
1195      * In order to avoid starvation do this in a round-robin fashion.
1196      */
1197     for (size_t i = comm->last_probed + 1; i < comm->num_procs; i++) {
1198         mca_pml_ob1_recv_frag_t* frag;
1199 
1200         /* loop over messages from the current proc */
1201         if((frag = recv_req_match_specific_proc(req, procp[i]))) {
1202             *p = procp[i];
1203             comm->last_probed = i;
1204             req->req_recv.req_base.req_proc = procp[i]->ompi_proc;
1205             prepare_recv_req_converter(req);
1206             return frag; /* match found */
1207         }
1208     }
1209     for (size_t i = 0; i <= comm->last_probed; i++) {
1210         mca_pml_ob1_recv_frag_t* frag;
1211 
1212         /* loop over messages from the current proc */
1213         if((frag = recv_req_match_specific_proc(req, procp[i]))) {
1214             *p = procp[i];
1215             comm->last_probed = i;
1216             req->req_recv.req_base.req_proc = procp[i]->ompi_proc;
1217             prepare_recv_req_converter(req);
1218             return frag; /* match found */
1219         }
1220     }
1221 
1222     *p = NULL;
1223     return NULL;
1224 #endif
1225 }
1226 
1227 
1228 void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
1229 {
1230     ompi_communicator_t *comm = req->req_recv.req_base.req_comm;
1231     mca_pml_ob1_comm_t *ob1_comm = comm->c_pml_comm;
1232     mca_pml_ob1_comm_proc_t* proc;
1233     mca_pml_ob1_recv_frag_t* frag;
1234     mca_pml_ob1_hdr_t* hdr;
1235 #if MCA_PML_OB1_CUSTOM_MATCH
1236     custom_match_umq_node* hold_prev;
1237     custom_match_umq_node* hold_elem;
1238     int hold_index;
1239 #else
1240     opal_list_t *queue;
1241 #endif
1242 
1243     /* init/re-init the request */
1244     req->req_lock = 0;
1245     req->req_pipeline_depth = 0;
1246     req->req_bytes_received = 0;
1247     req->req_bytes_expected = 0;
1248     /* What about req_rdma_cnt ? */
1249     req->req_rdma_idx = 0;
1250     req->req_pending = false;
1251     req->req_ack_sent = false;
1252 
1253     MCA_PML_BASE_RECV_START(&req->req_recv);
1254 
1255     OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
1256     /**
1257      * The laps of time between the ACTIVATE event and the SEARCH_UNEX one include
1258      * the cost of the request lock.
1259      */
1260     PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_SEARCH_UNEX_Q_BEGIN,
1261                             &(req->req_recv.req_base), PERUSE_RECV);
1262 
1263     /* assign sequence number */
1264     req->req_recv.req_base.req_sequence = ob1_comm->recv_sequence++;
1265 
1266     /* attempt to match posted recv */
1267     if(req->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) {
1268 #if MCA_PML_OB1_CUSTOM_MATCH
1269         frag = recv_req_match_wild(req, &proc, &hold_prev, &hold_elem, &hold_index);
1270 #else
1271         frag = recv_req_match_wild(req, &proc);
1272         queue = &ob1_comm->wild_receives;
1273 #endif
1274 #if !OPAL_ENABLE_HETEROGENEOUS_SUPPORT
1275         /* As we are in a homogeneous environment we know that all remote
1276          * architectures are exactly the same as the local one. Therefore,
1277          * we can safely construct the convertor based on the proc
1278          * information of rank 0.
1279          */
1280         if( NULL == frag ) {
1281             req->req_recv.req_base.req_proc = ompi_proc_local_proc;
1282             prepare_recv_req_converter(req);
1283         }
1284 #endif  /* !OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
1285     } else {
1286         proc = mca_pml_ob1_peer_lookup (comm, req->req_recv.req_base.req_peer);
1287         req->req_recv.req_base.req_proc = proc->ompi_proc;
1288 #if MCA_PML_OB1_CUSTOM_MATCH
1289         frag = recv_req_match_specific_proc(req, proc, &hold_prev, &hold_elem, &hold_index);
1290 #else
1291         frag = recv_req_match_specific_proc(req, proc);
1292         queue = &proc->specific_receives;
1293 #endif
1294         /* wildcard recv will be prepared on match */
1295         prepare_recv_req_converter(req);
1296     }
1297 
1298     if(OPAL_UNLIKELY(NULL == frag)) {
1299         PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_SEARCH_UNEX_Q_END,
1300                                 &(req->req_recv.req_base), PERUSE_RECV);
1301         /* We didn't find any matches.  Record this irecv so we can match
1302            it when the message comes in. */
1303         if(OPAL_LIKELY(req->req_recv.req_base.req_type != MCA_PML_REQUEST_IPROBE &&
1304                        req->req_recv.req_base.req_type != MCA_PML_REQUEST_IMPROBE))
1305 #if MCA_PML_OB1_CUSTOM_MATCH
1306             custom_match_prq_append(ob1_comm->prq, req,
1307                                     req->req_recv.req_base.req_tag,
1308                                     req->req_recv.req_base.req_peer);
1309 #else
1310             append_recv_req_to_queue(queue, req);
1311 #endif
1312         req->req_match_received = false;
1313         OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
1314     } else {
1315         if(OPAL_LIKELY(!IS_PROB_REQ(req))) {
1316             PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_MATCH_UNEX,
1317                                     &(req->req_recv.req_base), PERUSE_RECV);
1318 
1319             hdr = (mca_pml_ob1_hdr_t*)frag->segments->seg_addr.pval;
1320             PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_REMOVE_FROM_UNEX_Q,
1321                                    req->req_recv.req_base.req_comm,
1322                                    hdr->hdr_match.hdr_src,
1323                                    hdr->hdr_match.hdr_tag,
1324                                    PERUSE_RECV);
1325 
1326             PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_SEARCH_UNEX_Q_END,
1327                                     &(req->req_recv.req_base), PERUSE_RECV);
1328 
1329 #if MCA_PML_OB1_CUSTOM_MATCH
1330             custom_match_umq_remove_hold(req->req_recv.req_base.req_comm->c_pml_comm->umq, hold_prev, hold_elem, hold_index);
1331 #else
1332             opal_list_remove_item(&proc->unexpected_frags,
1333                                   (opal_list_item_t*)frag);
1334 #endif
1335             SPC_RECORD(OMPI_SPC_UNEXPECTED_IN_QUEUE, -1);
1336             OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
1337 
1338             switch(hdr->hdr_common.hdr_type) {
1339             case MCA_PML_OB1_HDR_TYPE_MATCH:
1340                 mca_pml_ob1_recv_request_progress_match(req, frag->btl, frag->segments,
1341                                                         frag->num_segments);
1342                 break;
1343             case MCA_PML_OB1_HDR_TYPE_RNDV:
1344                 mca_pml_ob1_recv_request_progress_rndv(req, frag->btl, frag->segments,
1345                                                        frag->num_segments);
1346                 break;
1347             case MCA_PML_OB1_HDR_TYPE_RGET:
1348                 mca_pml_ob1_recv_request_progress_rget(req, frag->btl, frag->segments,
1349                                                        frag->num_segments);
1350                 break;
1351             default:
1352                 assert(0);
1353             }
1354 
1355             MCA_PML_OB1_RECV_FRAG_RETURN(frag);
1356 
1357         } else if (OPAL_UNLIKELY(IS_MPROB_REQ(req))) {
1358             /* Remove the fragment from the match list, as it's now
1359                matched.  Stash it somewhere in the request (which,
1360                yes, is a complete hack), where it will be plucked out
1361                during the end of mprobe.  The request will then be
1362                "recreated" as a receive request, and the frag will be
1363                restarted with this request during mrecv */
1364 
1365 #if MCA_PML_OB1_CUSTOM_MATCH
1366             custom_match_umq_remove_hold(req->req_recv.req_base.req_comm->c_pml_comm->umq, hold_prev, hold_elem, hold_index);
1367 #else
1368             opal_list_remove_item(&proc->unexpected_frags,
1369                                   (opal_list_item_t*)frag);
1370 #endif
1371             SPC_RECORD(OMPI_SPC_UNEXPECTED_IN_QUEUE, -1);
1372             OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
1373 
1374             req->req_recv.req_base.req_addr = frag;
1375             mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
1376                                                    frag->segments, frag->num_segments);
1377 
1378         } else {
1379             OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
1380             mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
1381                                                    frag->segments, frag->num_segments);
1382         }
1383     }
1384 }

/* [<][>][^][v][top][bottom][index][help] */