root/ompi/mca/mtl/ofi/mtl_ofi.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. set_thread_context
  2. get_thread_context
  3. ompi_mtl_ofi_context_progress
  4. ompi_mtl_ofi_progress
  5. ompi_mtl_ofi_get_error
  6. ompi_mtl_ofi_send_callback
  7. ompi_mtl_ofi_send_error_callback
  8. ompi_mtl_ofi_send_ack_callback
  9. ompi_mtl_ofi_send_ack_error_callback
  10. ompi_mtl_ofi_isend_callback
  11. ompi_mtl_ofi_map_comm_to_ctxt
  12. ompi_mtl_ofi_ssend_recv
  13. ompi_mtl_ofi_send_generic
  14. ompi_mtl_ofi_isend_generic
  15. ompi_mtl_ofi_recv_callback
  16. ompi_mtl_ofi_recv_error_callback
  17. ompi_mtl_ofi_irecv_generic
  18. ompi_mtl_ofi_mrecv_callback
  19. ompi_mtl_ofi_mrecv_error_callback
  20. ompi_mtl_ofi_imrecv
  21. ompi_mtl_ofi_probe_callback
  22. ompi_mtl_ofi_probe_error_callback
  23. ompi_mtl_ofi_iprobe_generic
  24. ompi_mtl_ofi_improbe_generic
  25. ompi_mtl_ofi_cancel
  26. ompi_mtl_ofi_init_contexts
  27. ompi_mtl_ofi_finalize_contexts
  28. ompi_mtl_ofi_add_comm
  29. ompi_mtl_ofi_del_comm
  30. ompi_mtl_ofi_send
  31. ompi_mtl_ofi_isend
  32. ompi_mtl_ofi_irecv
  33. ompi_mtl_ofi_iprobe
  34. ompi_mtl_ofi_improbe

   1 /*
   2  * Copyright (c) 2013-2018 Intel, Inc. All rights reserved
   3  * Copyright (c) 2017      Los Alamos National Security, LLC. All rights
   4  *                         reserved.
   5  *
   6  * Copyright (c) 2018      Amazon.com, Inc. or its affiliates.  All Rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #ifndef MTL_OFI_H_HAS_BEEN_INCLUDED
  15 #define MTL_OFI_H_HAS_BEEN_INCLUDED
  16 
  17 #include "ompi/mca/mtl/mtl.h"
  18 #include "ompi/mca/mtl/base/base.h"
  19 #include "opal/datatype/opal_convertor.h"
  20 #include "opal/util/show_help.h"
  21 #include "opal/util/printf.h"
  22 
  23 #include <rdma/fabric.h>
  24 #include <rdma/fi_cm.h>
  25 #include <rdma/fi_domain.h>
  26 #include <rdma/fi_endpoint.h>
  27 #include <rdma/fi_errno.h>
  28 #include <rdma/fi_tagged.h>
  29 
  30 #include "ompi_config.h"
  31 #include "ompi/proc/proc.h"
  32 #include "ompi/mca/mtl/mtl.h"
  33 #include "opal/class/opal_list.h"
  34 #include "ompi/communicator/communicator.h"
  35 #include "opal/datatype/opal_convertor.h"
  36 #include "ompi/mca/mtl/base/base.h"
  37 #include "ompi/mca/mtl/base/mtl_base_datatype.h"
  38 #include "ompi/message/message.h"
  39 
  40 #include "mtl_ofi_opt.h"
  41 #include "mtl_ofi_types.h"
  42 #include "mtl_ofi_request.h"
  43 #include "mtl_ofi_endpoint.h"
  44 #include "mtl_ofi_compat.h"
  45 
  46 BEGIN_C_DECLS
  47 
  48 extern mca_mtl_ofi_module_t ompi_mtl_ofi;
  49 extern mca_base_framework_t ompi_mtl_base_framework;
  50 
  51 extern int ompi_mtl_ofi_del_procs(struct mca_mtl_base_module_t *mtl,
  52                                   size_t nprocs,
  53                                   struct ompi_proc_t **procs);
  54 
  55 int ompi_mtl_ofi_progress_no_inline(void);
  56 
  57 #if OPAL_HAVE_THREAD_LOCAL
  58 extern opal_thread_local int per_thread_ctx;
  59 extern opal_thread_local struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
  60 #endif
  61 
  62 /* Set OFI context for operations which generate completion events */
  63 __opal_attribute_always_inline__ static inline void
  64 set_thread_context(int ctxt)
  65 {
  66 #if OPAL_HAVE_THREAD_LOCAL
  67     per_thread_ctx = ctxt;
  68     return;
  69 #endif
  70 }
  71 
  72 /* Retrieve OFI context to use for CQ poll */
  73 __opal_attribute_always_inline__ static inline void
  74 get_thread_context(int *ctxt)
  75 {
  76 #if OPAL_HAVE_THREAD_LOCAL
  77     *ctxt = per_thread_ctx;
  78 #endif
  79     return;
  80 }
  81 
  82 #define MTL_OFI_CONTEXT_LOCK(ctxt_id) \
  83 OPAL_LIKELY(!opal_mutex_atomic_trylock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock))
  84 
  85 #define MTL_OFI_CONTEXT_UNLOCK(ctxt_id) \
  86 opal_mutex_atomic_unlock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock)
  87 
  88 __opal_attribute_always_inline__ static inline int
  89 ompi_mtl_ofi_context_progress(int ctxt_id)
  90 {
  91     int count = 0, i, events_read;
  92     ompi_mtl_ofi_request_t *ofi_req = NULL;
  93     struct fi_cq_err_entry error = { 0 };
  94     ssize_t ret;
  95 #if !OPAL_HAVE_THREAD_LOCAL
  96     struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
  97 #endif
  98 
  99     /**
 100      * Read the work completions from the CQ.
 101      * From the completion's op_context, we get the associated OFI request.
 102      * Call the request's callback.
 103      */
 104     while (true) {
 105         ret = fi_cq_read(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, (void *)&wc,
 106                          ompi_mtl_ofi.ofi_progress_event_count);
 107         if (ret > 0) {
 108             count+= ret;
 109             events_read = ret;
 110             for (i = 0; i < events_read; i++) {
 111                 if (NULL != wc[i].op_context) {
 112                     ofi_req = TO_OFI_REQ(wc[i].op_context);
 113                     assert(ofi_req);
 114                     ret = ofi_req->event_callback(&wc[i], ofi_req);
 115                     if (OMPI_SUCCESS != ret) {
 116                         opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n"
 117                                        "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
 118                                        __FILE__, __LINE__, ret);
 119                         fflush(stderr);
 120                         exit(1);
 121                     }
 122                 }
 123             }
 124         } else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) {
 125             /**
 126              * An error occured and is being reported via the CQ.
 127              * Read the error and forward it to the upper layer.
 128              */
 129             ret = fi_cq_readerr(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
 130                                 &error,
 131                                 0);
 132             if (0 > ret) {
 133                 opal_output(0, "%s:%d: Error returned from fi_cq_readerr: %s(%zd).\n"
 134                                "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
 135                                __FILE__, __LINE__, fi_strerror(-ret), ret);
 136                 fflush(stderr);
 137                 exit(1);
 138             }
 139 
 140             assert(error.op_context);
 141             ofi_req = TO_OFI_REQ(error.op_context);
 142             assert(ofi_req);
 143             ret = ofi_req->error_callback(&error, ofi_req);
 144             if (OMPI_SUCCESS != ret) {
 145                     opal_output(0, "%s:%d: Error returned by request error callback: %zd.\n"
 146                                    "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
 147                                    __FILE__, __LINE__, ret);
 148                 fflush(stderr);
 149                 exit(1);
 150             }
 151         } else {
 152             if (ret == -FI_EAGAIN || ret == -EINTR) {
 153                 break;
 154             } else {
 155                 opal_output(0, "%s:%d: Error returned from fi_cq_read: %s(%zd).\n"
 156                                "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n",
 157                                __FILE__, __LINE__, fi_strerror(-ret), ret);
 158                 fflush(stderr);
 159                 exit(1);
 160             }
 161         }
 162     }
 163 
 164     return count;
 165 }
 166 
 167 __opal_attribute_always_inline__ static inline int
 168 ompi_mtl_ofi_progress(void)
 169 {
 170     int count = 0, ctxt_id = 0, i;
 171     static volatile uint32_t num_calls = 0;
 172 
 173     get_thread_context(&ctxt_id);
 174 
 175     if (ompi_mtl_ofi.mpi_thread_multiple) {
 176         if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
 177             count += ompi_mtl_ofi_context_progress(ctxt_id);
 178             MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
 179         }
 180     } else {
 181         count += ompi_mtl_ofi_context_progress(ctxt_id);
 182     }
 183 
 184 #if OPAL_HAVE_THREAD_LOCAL
 185     /*
 186      * Try to progress other CQs in round-robin fashion.
 187      * Progress is only made if no events were read from the CQ
 188      * local to the calling thread past 16 times.
 189      */
 190     if (OPAL_UNLIKELY((count == 0) && ompi_mtl_ofi.mpi_thread_multiple &&
 191         (((num_calls++) & 0xF) == 0 ))) {
 192         for (i = 0; i < ompi_mtl_ofi.total_ctxts_used - 1; i++) {
 193             ctxt_id = (ctxt_id + 1) % ompi_mtl_ofi.total_ctxts_used;
 194 
 195             if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
 196                 count += ompi_mtl_ofi_context_progress(ctxt_id);
 197                 MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
 198             }
 199 
 200             /* Upon any work done, exit to let other threads take lock */
 201             if (OPAL_LIKELY(count > 0)) {
 202                 break;
 203             }
 204         }
 205     }
 206 #endif
 207 
 208     return count;
 209 }
 210 
 211 /**
 212  * When attempting to execute an OFI operation we need to handle
 213  * resource overrun cases. When a call to an OFI OP fails with -FI_EAGAIN
 214  * the OFI mtl will attempt to progress any pending Completion Queue
 215  * events that may prevent additional operations to be enqueued.
 216  * If the call to ofi progress is successful, then the function call
 217  * will be retried.
 218  */
 219 #define MTL_OFI_RETRY_UNTIL_DONE(FUNC, RETURN)         \
 220     do {                                               \
 221         do {                                           \
 222             RETURN = FUNC;                             \
 223             if (OPAL_LIKELY(0 == RETURN)) {break;}     \
 224             if (OPAL_LIKELY(RETURN == -FI_EAGAIN)) {   \
 225                 ompi_mtl_ofi_progress();               \
 226             }                                          \
 227         } while (OPAL_LIKELY(-FI_EAGAIN == RETURN));   \
 228     } while (0);
 229 
 230 #define MTL_OFI_LOG_FI_ERR(err, string)                                     \
 231     do {                                                                    \
 232         opal_output_verbose(1, ompi_mtl_base_framework.framework_output,    \
 233                             "%s:%d:%s: %s\n",                               \
 234                             __FILE__, __LINE__, string, fi_strerror(-err)); \
 235     } while(0);
 236 
 237 /* MTL interface functions */
 238 int ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl);
 239 
 240 __opal_attribute_always_inline__ static inline int
 241 ompi_mtl_ofi_get_error(int error_num)
 242 {
 243     int ret;
 244 
 245     switch (error_num) {
 246     case 0:
 247         ret = OMPI_SUCCESS;
 248         break;
 249     default:
 250         ret = OMPI_ERROR;
 251     }
 252 
 253     return ret;
 254 }
 255 
 256 __opal_attribute_always_inline__ static inline int
 257 ompi_mtl_ofi_send_callback(struct fi_cq_tagged_entry *wc,
 258                            ompi_mtl_ofi_request_t *ofi_req)
 259 {
 260     assert(ofi_req->completion_count > 0);
 261     ofi_req->completion_count--;
 262     return OMPI_SUCCESS;
 263 }
 264 
 265 __opal_attribute_always_inline__ static inline int
 266 ompi_mtl_ofi_send_error_callback(struct fi_cq_err_entry *error,
 267                                  ompi_mtl_ofi_request_t *ofi_req)
 268 {
 269     switch(error->err) {
 270         case FI_ETRUNC:
 271             ofi_req->status.MPI_ERROR = MPI_ERR_TRUNCATE;
 272             break;
 273         default:
 274             ofi_req->status.MPI_ERROR = MPI_ERR_INTERN;
 275     }
 276     return ofi_req->event_callback(NULL, ofi_req);
 277 }
 278 
 279 __opal_attribute_always_inline__ static inline int
 280 ompi_mtl_ofi_send_ack_callback(struct fi_cq_tagged_entry *wc,
 281                                ompi_mtl_ofi_request_t *ofi_req)
 282 {
 283     ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
 284 
 285     free(ofi_req);
 286 
 287     parent_req->event_callback(NULL, parent_req);
 288 
 289     return OMPI_SUCCESS;
 290 }
 291 
 292 __opal_attribute_always_inline__ static inline int
 293 ompi_mtl_ofi_send_ack_error_callback(struct fi_cq_err_entry *error,
 294                                      ompi_mtl_ofi_request_t *ofi_req)
 295 {
 296     ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
 297 
 298     free(ofi_req);
 299 
 300     parent_req->status.MPI_ERROR = MPI_ERR_INTERN;
 301 
 302     return parent_req->error_callback(error, parent_req);
 303 }
 304 
 305 __opal_attribute_always_inline__ static inline int
 306 ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry *wc,
 307                             ompi_mtl_ofi_request_t *ofi_req)
 308 {
 309     assert(ofi_req->completion_count > 0);
 310     ofi_req->completion_count--;
 311 
 312     if (0 == ofi_req->completion_count) {
 313         /* Request completed */
 314         if (OPAL_UNLIKELY(NULL != ofi_req->buffer)) {
 315             free(ofi_req->buffer);
 316             ofi_req->buffer = NULL;
 317         }
 318 
 319         ofi_req->super.ompi_req->req_status.MPI_ERROR =
 320             ofi_req->status.MPI_ERROR;
 321 
 322         ofi_req->super.completion_callback(&ofi_req->super);
 323     }
 324 
 325     return OMPI_SUCCESS;
 326 }
 327 
 328 /* Return OFI context ID associated with the specific communicator */
 329 __opal_attribute_always_inline__ static inline int
 330 ompi_mtl_ofi_map_comm_to_ctxt(uint32_t comm_id)
 331 {
 332     /* For non-thread-grouping use case, only one context is used which is
 333      * associated to MPI_COMM_WORLD, so use that. */
 334     if (0 == ompi_mtl_ofi.thread_grouping) {
 335         comm_id = 0;
 336     }
 337 
 338     return ompi_mtl_ofi.comm_to_context[comm_id];
 339 }
 340 
 341 __opal_attribute_always_inline__ static inline int
 342 ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req,
 343                   struct ompi_communicator_t *comm,
 344                   fi_addr_t *src_addr,
 345                   ompi_mtl_ofi_request_t *ofi_req,
 346                   mca_mtl_ofi_endpoint_t *endpoint,
 347                   uint64_t *match_bits,
 348                   int tag)
 349 {
 350     ssize_t ret = OMPI_SUCCESS;
 351     int ctxt_id = 0;
 352 
 353     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
 354     set_thread_context(ctxt_id);
 355 
 356     ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
 357     assert(ack_req);
 358 
 359     ack_req->parent = ofi_req;
 360     ack_req->event_callback = ompi_mtl_ofi_send_ack_callback;
 361     ack_req->error_callback = ompi_mtl_ofi_send_ack_error_callback;
 362 
 363     ofi_req->completion_count += 1;
 364 
 365     MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
 366                                       NULL,
 367                                       0,
 368                                       NULL,
 369                                       *src_addr,
 370                                       *match_bits | ompi_mtl_ofi.sync_send_ack,
 371                                       0, /* Exact match, no ignore bits */
 372                                       (void *) &ack_req->ctx), ret);
 373     if (OPAL_UNLIKELY(0 > ret)) {
 374         opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
 375                             "%s:%d: fi_trecv failed: %s(%zd)",
 376                             __FILE__, __LINE__, fi_strerror(-ret), ret);
 377         free(ack_req);
 378         return ompi_mtl_ofi_get_error(ret);
 379     }
 380 
 381      /* The SYNC_SEND tag bit is set for the send operation only.*/
 382     MTL_OFI_SET_SYNC_SEND(*match_bits);
 383     return OMPI_SUCCESS;
 384 }
 385 
 386 __opal_attribute_always_inline__ static inline int
 387 ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl,
 388                   struct ompi_communicator_t *comm,
 389                   int dest,
 390                   int tag,
 391                   struct opal_convertor_t *convertor,
 392                   mca_pml_base_send_mode_t mode,
 393                   bool ofi_cq_data)
 394 {
 395     ssize_t ret = OMPI_SUCCESS;
 396     ompi_mtl_ofi_request_t ofi_req;
 397     int ompi_ret, ctxt_id = 0;
 398     void *start;
 399     bool free_after;
 400     size_t length;
 401     uint64_t match_bits;
 402     ompi_proc_t *ompi_proc = NULL;
 403     mca_mtl_ofi_endpoint_t *endpoint = NULL;
 404     ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
 405     fi_addr_t src_addr = 0;
 406     fi_addr_t sep_peer_fiaddr = 0;
 407 
 408     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
 409     set_thread_context(ctxt_id);
 410 
 411     /**
 412      * Create a send request, start it and wait until it completes.
 413      */
 414     ofi_req.event_callback = ompi_mtl_ofi_send_callback;
 415     ofi_req.error_callback = ompi_mtl_ofi_send_error_callback;
 416 
 417     ompi_proc = ompi_comm_peer_lookup(comm, dest);
 418     endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
 419 
 420     /* For Scalable Endpoints, gather target receive context */
 421     sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
 422 
 423     ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
 424     if (OMPI_SUCCESS != ompi_ret) return ompi_ret;
 425 
 426     ofi_req.buffer = (free_after) ? start : NULL;
 427     ofi_req.length = length;
 428     ofi_req.status.MPI_ERROR = OMPI_SUCCESS;
 429     ofi_req.completion_count = 0;
 430 
 431     if (ofi_cq_data) {
 432         match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
 433         src_addr = sep_peer_fiaddr;
 434     } else {
 435         match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
 436                                              comm->c_my_rank, tag);
 437         /* src_addr is ignored when FI_DIRECTED_RECV is not supported */
 438     }
 439 
 440     if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
 441         ofi_req.status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &src_addr,
 442                                                            &ofi_req, endpoint,
 443                                                            &match_bits, tag);
 444         if (OPAL_UNLIKELY(ofi_req.status.MPI_ERROR != OMPI_SUCCESS))
 445             goto free_request_buffer;
 446     }
 447 
 448     if (ompi_mtl_ofi.max_inject_size >= length) {
 449         if (ofi_cq_data) {
 450             MTL_OFI_RETRY_UNTIL_DONE(fi_tinjectdata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
 451                                             start,
 452                                             length,
 453                                             comm->c_my_rank,
 454                                             sep_peer_fiaddr,
 455                                             match_bits), ret);
 456         } else {
 457             MTL_OFI_RETRY_UNTIL_DONE(fi_tinject(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
 458                                             start,
 459                                             length,
 460                                             sep_peer_fiaddr,
 461                                             match_bits), ret);
 462         }
 463         if (OPAL_UNLIKELY(0 > ret)) {
 464             MTL_OFI_LOG_FI_ERR(ret,
 465                                ofi_cq_data ? "fi_tinjectdata failed"
 466                                : "fi_tinject failed");
 467             if (ack_req) {
 468                 fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, &ack_req->ctx);
 469                 free(ack_req);
 470             }
 471 
 472             ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
 473             goto free_request_buffer;
 474         }
 475     } else {
 476         ofi_req.completion_count += 1;
 477         if (ofi_cq_data) {
 478             MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
 479                                           start,
 480                                           length,
 481                                           NULL,
 482                                           comm->c_my_rank,
 483                                           sep_peer_fiaddr,
 484                                           match_bits,
 485                                           (void *) &ofi_req.ctx), ret);
 486         } else {
 487             MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
 488                                           start,
 489                                           length,
 490                                           NULL,
 491                                           sep_peer_fiaddr,
 492                                           match_bits,
 493                                           (void *) &ofi_req.ctx), ret);
 494         }
 495         if (OPAL_UNLIKELY(0 > ret)) {
 496             MTL_OFI_LOG_FI_ERR(ret,
 497                                ofi_cq_data ? "fi_tsenddata failed"
 498                                : "fi_tsend failed");
 499             ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
 500             goto free_request_buffer;
 501         }
 502     }
 503 
 504     /**
 505      * Wait until the request is completed.
 506      * ompi_mtl_ofi_send_callback() updates this variable.
 507      */
 508     while (0 < ofi_req.completion_count) {
 509         ompi_mtl_ofi_progress();
 510     }
 511 
 512 free_request_buffer:
 513     if (OPAL_UNLIKELY(NULL != ofi_req.buffer)) {
 514         free(ofi_req.buffer);
 515     }
 516 
 517     return ofi_req.status.MPI_ERROR;
 518 }
 519 
 520 __opal_attribute_always_inline__ static inline int
 521 ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl,
 522                    struct ompi_communicator_t *comm,
 523                    int dest,
 524                    int tag,
 525                    struct opal_convertor_t *convertor,
 526                    mca_pml_base_send_mode_t mode,
 527                    bool blocking,
 528                    mca_mtl_request_t *mtl_request,
 529                    bool ofi_cq_data)
 530 {
 531     ssize_t ret = OMPI_SUCCESS;
 532     ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t *) mtl_request;
 533     int ompi_ret, ctxt_id = 0;
 534     void *start;
 535     size_t length;
 536     bool free_after;
 537     uint64_t match_bits;
 538     ompi_proc_t *ompi_proc = NULL;
 539     mca_mtl_ofi_endpoint_t *endpoint = NULL;
 540     ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
 541     fi_addr_t sep_peer_fiaddr = 0;
 542 
 543     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
 544     set_thread_context(ctxt_id);
 545 
 546     ofi_req->event_callback = ompi_mtl_ofi_isend_callback;
 547     ofi_req->error_callback = ompi_mtl_ofi_send_error_callback;
 548 
 549     ompi_proc = ompi_comm_peer_lookup(comm, dest);
 550     endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
 551 
 552     /* For Scalable Endpoints, gather target receive context */
 553     sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
 554 
 555     ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
 556     if (OMPI_SUCCESS != ompi_ret) return ompi_ret;
 557 
 558     ofi_req->buffer = (free_after) ? start : NULL;
 559     ofi_req->length = length;
 560     ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
 561     ofi_req->completion_count = 1;
 562 
 563     if (ofi_cq_data) {
 564         match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
 565     } else {
 566         match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
 567                           comm->c_my_rank, tag);
 568         /* src_addr is ignored when FI_DIRECTED_RECV  is not supported */
 569     }
 570 
 571     if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
 572         ofi_req->status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &sep_peer_fiaddr,
 573                                                            ofi_req, endpoint,
 574                                                            &match_bits, tag);
 575         if (OPAL_UNLIKELY(ofi_req->status.MPI_ERROR != OMPI_SUCCESS))
 576             goto free_request_buffer;
 577     }
 578 
 579     if (ofi_cq_data) {
 580         MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
 581                                       start,
 582                                       length,
 583                                       NULL,
 584                                       comm->c_my_rank,
 585                                       sep_peer_fiaddr,
 586                                       match_bits,
 587                                       (void *) &ofi_req->ctx), ret);
 588     } else {
 589         MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
 590                                       start,
 591                                       length,
 592                                       NULL,
 593                                       sep_peer_fiaddr,
 594                                       match_bits,
 595                                       (void *) &ofi_req->ctx), ret);
 596     }
 597     if (OPAL_UNLIKELY(0 > ret)) {
 598         MTL_OFI_LOG_FI_ERR(ret,
 599                            ofi_cq_data ? "fi_tsenddata failed"
 600                            : "fi_tsend failed");
 601         ofi_req->status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
 602     }
 603 
 604 free_request_buffer:
 605     if (OPAL_UNLIKELY(OMPI_SUCCESS != ofi_req->status.MPI_ERROR
 606             && NULL != ofi_req->buffer)) {
 607         free(ofi_req->buffer);
 608     }
 609 
 610     return ofi_req->status.MPI_ERROR;
 611 }
 612 
 613 /**
 614  * Called when a completion for a posted recv is received.
 615  */
 616 __opal_attribute_always_inline__ static inline int
 617 ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
 618                            ompi_mtl_ofi_request_t *ofi_req)
 619 {
 620     int ompi_ret, ctxt_id = 0;
 621     ssize_t ret;
 622     ompi_proc_t *ompi_proc = NULL;
 623     mca_mtl_ofi_endpoint_t *endpoint = NULL;
 624     int src = mtl_ofi_get_source(wc);
 625     ompi_status_public_t *status = NULL;
 626     struct fi_msg_tagged tagged_msg;
 627 
 628     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(ofi_req->comm->c_contextid);
 629 
 630     assert(ofi_req->super.ompi_req);
 631     status = &ofi_req->super.ompi_req->req_status;
 632 
 633     /**
 634      * Any event associated with a request starts it.
 635      * This prevents a started request from being cancelled.
 636      */
 637     ofi_req->req_started = true;
 638 
 639     status->MPI_SOURCE = src;
 640     status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
 641     status->_ucount = wc->len;
 642 
 643     if (OPAL_UNLIKELY(wc->len > ofi_req->length)) {
 644         opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
 645                             "truncate expected: %ld %ld",
 646                             wc->len, ofi_req->length);
 647         status->MPI_ERROR = MPI_ERR_TRUNCATE;
 648     }
 649 
 650     /**
 651      * Unpack data into recv buffer if necessary.
 652      */
 653     if (OPAL_UNLIKELY(ofi_req->buffer)) {
 654         ompi_ret = ompi_mtl_datatype_unpack(ofi_req->convertor,
 655                                             ofi_req->buffer,
 656                                             wc->len);
 657         if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
 658             opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
 659                                 "%s:%d: ompi_mtl_datatype_unpack failed: %d",
 660                                 __FILE__, __LINE__, ompi_ret);
 661             status->MPI_ERROR = ompi_ret;
 662         }
 663     }
 664 
 665     /**
 666     * We can only accept MTL_OFI_SYNC_SEND in the standard recv callback.
 667     * MTL_OFI_SYNC_SEND_ACK should only be received in the send_ack
 668     * callback.
 669     */
 670     assert(!MTL_OFI_IS_SYNC_SEND_ACK(wc->tag));
 671 
 672     /**
 673      * If this recv is part of an MPI_Ssend operation, then we send an
 674      * acknowledgment back to the sender.
 675      * The ack message is sent without generating a completion event in
 676      * the completion queue by not setting FI_COMPLETION in the flags to
 677      * fi_tsendmsg(FI_SELECTIVE_COMPLETION).
 678      * This is done since the 0 byte message requires no
 679      * notification on the send side for a successful completion.
 680      * If a failure occurs the provider will notify the error
 681      * in the cq_readerr during OFI progress. Once the message has been
 682      * successfully processed the request is marked as completed.
 683      */
 684     if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
 685         /**
 686          * If the recv request was posted for any source,
 687          * we need to extract the source's actual address.
 688          */
 689         if (ompi_mtl_ofi.any_addr == ofi_req->remote_addr) {
 690             ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
 691             endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
 692             ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
 693         }
 694 
 695         tagged_msg.msg_iov = NULL;
 696         tagged_msg.desc = NULL;
 697         tagged_msg.iov_count = 0;
 698         tagged_msg.addr = ofi_req->remote_addr;
 699         /**
 700         * We must continue to use the user's original tag but remove the
 701         * sync_send protocol tag bit and instead apply the sync_send_ack
 702         * tag bit to complete the initator's sync send receive.
 703         */
 704         tagged_msg.tag = (wc->tag | ompi_mtl_ofi.sync_send_ack) & ~ompi_mtl_ofi.sync_send;
 705         tagged_msg.context = NULL;
 706         tagged_msg.data = 0;
 707 
 708         MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
 709                                  &tagged_msg, 0), ret);
 710         if (OPAL_UNLIKELY(0 > ret)) {
 711             MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed");
 712             status->MPI_ERROR = OMPI_ERROR;
 713         }
 714     }
 715 
 716     ofi_req->super.completion_callback(&ofi_req->super);
 717 
 718     return OMPI_SUCCESS;
 719 }
 720 
 721 /**
 722  * Called when an error occured on a recv request.
 723  */
 724 __opal_attribute_always_inline__ static inline int
 725 ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry *error,
 726                                  ompi_mtl_ofi_request_t *ofi_req)
 727 {
 728     ompi_status_public_t *status;
 729     assert(ofi_req->super.ompi_req);
 730     status = &ofi_req->super.ompi_req->req_status;
 731     status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
 732     status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error);
 733 
 734     switch (error->err) {
 735         case FI_ETRUNC:
 736             status->MPI_ERROR = MPI_ERR_TRUNCATE;
 737             break;
 738         case FI_ECANCELED:
 739             status->_cancelled = true;
 740             break;
 741         default:
 742             status->MPI_ERROR = MPI_ERR_INTERN;
 743     }
 744 
 745     ofi_req->super.completion_callback(&ofi_req->super);
 746     return OMPI_SUCCESS;
 747 }
 748 
 749 __opal_attribute_always_inline__ static inline int
 750 ompi_mtl_ofi_irecv_generic(struct mca_mtl_base_module_t *mtl,
 751                    struct ompi_communicator_t *comm,
 752                    int src,
 753                    int tag,
 754                    struct opal_convertor_t *convertor,
 755                    mca_mtl_request_t *mtl_request,
 756                    bool ofi_cq_data)
 757 {
 758     int ompi_ret = OMPI_SUCCESS, ctxt_id = 0;
 759     ssize_t ret;
 760     uint64_t match_bits, mask_bits;
 761     fi_addr_t remote_addr = ompi_mtl_ofi.any_addr;
 762     ompi_proc_t *ompi_proc = NULL;
 763     mca_mtl_ofi_endpoint_t *endpoint = NULL;
 764     ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
 765     void *start;
 766     size_t length;
 767     bool free_after;
 768 
 769     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
 770     set_thread_context(ctxt_id);
 771 
 772     if (ofi_cq_data) {
 773         if (MPI_ANY_SOURCE != src) {
 774             ompi_proc = ompi_comm_peer_lookup(comm, src);
 775             endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
 776             remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
 777         }
 778 
 779         mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
 780                                     tag);
 781     } else {
 782         mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
 783                                 tag);
 784         /* src_addr is ignored when FI_DIRECTED_RECV is not used */
 785     }
 786 
 787     ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
 788                                           &start,
 789                                           &length,
 790                                           &free_after);
 791     if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
 792         return ompi_ret;
 793     }
 794 
 795     ofi_req->type = OMPI_MTL_OFI_RECV;
 796     ofi_req->event_callback = ompi_mtl_ofi_recv_callback;
 797     ofi_req->error_callback = ompi_mtl_ofi_recv_error_callback;
 798     ofi_req->comm = comm;
 799     ofi_req->buffer = (free_after) ? start : NULL;
 800     ofi_req->length = length;
 801     ofi_req->convertor = convertor;
 802     ofi_req->req_started = false;
 803     ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
 804     ofi_req->remote_addr = remote_addr;
 805     ofi_req->match_bits = match_bits;
 806 
 807     MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
 808                                       start,
 809                                       length,
 810                                       NULL,
 811                                       remote_addr,
 812                                       match_bits,
 813                                       mask_bits,
 814                                       (void *)&ofi_req->ctx), ret);
 815     if (OPAL_UNLIKELY(0 > ret)) {
 816         if (NULL != ofi_req->buffer) {
 817             free(ofi_req->buffer);
 818         }
 819         MTL_OFI_LOG_FI_ERR(ret, "fi_trecv failed");
 820         return ompi_mtl_ofi_get_error(ret);
 821     }
 822 
 823     return OMPI_SUCCESS;
 824 }
 825 
 826 /**
 827  * Called when a mrecv request completes.
 828  */
 829 __opal_attribute_always_inline__ static inline int
 830 ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
 831                             ompi_mtl_ofi_request_t *ofi_req)
 832 {
 833     struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
 834     ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
 835     status->MPI_SOURCE = mtl_ofi_get_source(wc);
 836     status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
 837     status->MPI_ERROR = MPI_SUCCESS;
 838     status->_ucount = wc->len;
 839 
 840     free(ofi_req);
 841 
 842     mrecv_req->completion_callback(mrecv_req);
 843 
 844     return OMPI_SUCCESS;
 845 }
 846 
 847 /**
 848  * Called when an error occured on a mrecv request.
 849  */
 850 __opal_attribute_always_inline__ static inline int
 851 ompi_mtl_ofi_mrecv_error_callback(struct fi_cq_err_entry *error,
 852                                   ompi_mtl_ofi_request_t *ofi_req)
 853 {
 854     struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
 855     ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
 856     status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
 857     status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry  *) error);
 858 
 859     switch (error->err) {
 860         case FI_ETRUNC:
 861             status->MPI_ERROR = MPI_ERR_TRUNCATE;
 862             break;
 863         case FI_ECANCELED:
 864             status->_cancelled = true;
 865             break;
 866         default:
 867             status->MPI_ERROR = MPI_ERR_INTERN;
 868     }
 869 
 870     free(ofi_req);
 871 
 872     mrecv_req->completion_callback(mrecv_req);
 873 
 874     return OMPI_SUCCESS;
 875 }
 876 
 877 __opal_attribute_always_inline__ static inline int
 878 ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
 879                     struct opal_convertor_t *convertor,
 880                     struct ompi_message_t **message,
 881                     struct mca_mtl_request_t *mtl_request)
 882 {
 883     ompi_mtl_ofi_request_t *ofi_req =
 884         (ompi_mtl_ofi_request_t *)(*message)->req_ptr;
 885     void *start;
 886     size_t length;
 887     bool free_after;
 888     struct iovec iov;
 889     struct fi_msg_tagged msg;
 890     int ompi_ret, ctxt_id = 0;
 891     ssize_t ret;
 892     uint64_t msgflags = FI_CLAIM | FI_COMPLETION;
 893     struct ompi_communicator_t *comm = (*message)->comm;
 894 
 895     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
 896     set_thread_context(ctxt_id);
 897 
 898     ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
 899                                           &start,
 900                                           &length,
 901                                           &free_after);
 902     if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
 903         return ompi_ret;
 904     }
 905 
 906     ofi_req->type = OMPI_MTL_OFI_RECV;
 907     ofi_req->event_callback = ompi_mtl_ofi_mrecv_callback;
 908     ofi_req->error_callback = ompi_mtl_ofi_mrecv_error_callback;
 909     ofi_req->buffer = (free_after) ? start : NULL;
 910     ofi_req->length = length;
 911     ofi_req->convertor = convertor;
 912     ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
 913     ofi_req->mrecv_req = mtl_request;
 914 
 915     /**
 916      * fi_trecvmsg with FI_CLAIM
 917      */
 918     iov.iov_base = start;
 919     iov.iov_len = length;
 920     msg.msg_iov = &iov;
 921     msg.desc = NULL;
 922     msg.iov_count = 1;
 923     msg.addr = 0;
 924     msg.tag = ofi_req->match_bits;
 925     msg.ignore = ofi_req->mask_bits;
 926     msg.context = (void *)&ofi_req->ctx;
 927     msg.data = 0;
 928 
 929     MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
 930     if (OPAL_UNLIKELY(0 > ret)) {
 931         MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
 932         return ompi_mtl_ofi_get_error(ret);
 933     }
 934 
 935     return OMPI_SUCCESS;
 936 }
 937 
 938 /**
 939  * Called when a probe request completes.
 940  */
 941 __opal_attribute_always_inline__ static inline int
 942 ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry *wc,
 943                             ompi_mtl_ofi_request_t *ofi_req)
 944 {
 945     ofi_req->match_state = 1;
 946     ofi_req->match_bits = wc->tag;
 947     ofi_req->status.MPI_SOURCE = mtl_ofi_get_source(wc);
 948     ofi_req->status.MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
 949     ofi_req->status.MPI_ERROR = MPI_SUCCESS;
 950     ofi_req->status._ucount = wc->len;
 951     ofi_req->completion_count--;
 952 
 953     return OMPI_SUCCESS;
 954 }
 955 
 956 /**
 957  * Called when a probe request encounters an error.
 958  */
 959 __opal_attribute_always_inline__ static inline int
 960 ompi_mtl_ofi_probe_error_callback(struct fi_cq_err_entry *error,
 961                                   ompi_mtl_ofi_request_t *ofi_req)
 962 {
 963     ofi_req->status.MPI_ERROR = MPI_ERR_INTERN;
 964     ofi_req->completion_count--;
 965 
 966     return OMPI_SUCCESS;
 967 }
 968 
 969 __opal_attribute_always_inline__ static inline int
 970 ompi_mtl_ofi_iprobe_generic(struct mca_mtl_base_module_t *mtl,
 971                     struct ompi_communicator_t *comm,
 972                     int src,
 973                     int tag,
 974                     int *flag,
 975                     struct ompi_status_public_t *status,
 976                     bool ofi_cq_data)
 977 {
 978     struct ompi_mtl_ofi_request_t ofi_req;
 979     ompi_proc_t *ompi_proc = NULL;
 980     mca_mtl_ofi_endpoint_t *endpoint = NULL;
 981     fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
 982     uint64_t match_bits, mask_bits;
 983     ssize_t ret;
 984     struct fi_msg_tagged msg;
 985     uint64_t msgflags = FI_PEEK | FI_COMPLETION;
 986     int ctxt_id = 0;
 987 
 988     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
 989     set_thread_context(ctxt_id);
 990 
 991     if (ofi_cq_data) {
 992      /* If the source is known, use its peer_fiaddr. */
 993         if (MPI_ANY_SOURCE != src) {
 994             ompi_proc = ompi_comm_peer_lookup( comm, src );
 995             endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
 996             remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
 997         }
 998 
 999         mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
1000                                     tag);
1001     }
1002     else {
1003         mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
1004                                 tag);
1005         /* src_addr is ignored when FI_DIRECTED_RECV is not used */
1006     }
1007 
1008     /**
1009      * fi_trecvmsg with FI_PEEK:
1010      * Initiate a search for a match in the hardware or software queue.
1011      * The search can complete immediately with -ENOMSG.
1012      * If successful, libfabric will enqueue a context entry into the completion
1013      * queue to make the search nonblocking.  This code will poll until the
1014      * entry is enqueued.
1015      */
1016     msg.msg_iov = NULL;
1017     msg.desc = NULL;
1018     msg.iov_count = 0;
1019     msg.addr = remote_proc;
1020     msg.tag = match_bits;
1021     msg.ignore = mask_bits;
1022     msg.context = (void *)&ofi_req.ctx;
1023     msg.data = 0;
1024 
1025     ofi_req.type = OMPI_MTL_OFI_PROBE;
1026     ofi_req.event_callback = ompi_mtl_ofi_probe_callback;
1027     ofi_req.error_callback = ompi_mtl_ofi_probe_error_callback;
1028     ofi_req.completion_count = 1;
1029     ofi_req.match_state = 0;
1030 
1031     MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
1032     if (-FI_ENOMSG == ret) {
1033         /**
1034          * The search request completed but no matching message was found.
1035          */
1036         *flag = 0;
1037         return OMPI_SUCCESS;
1038     } else if (OPAL_UNLIKELY(0 > ret)) {
1039         MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
1040         return ompi_mtl_ofi_get_error(ret);
1041     }
1042 
1043     while (0 < ofi_req.completion_count) {
1044         opal_progress();
1045     }
1046 
1047     *flag = ofi_req.match_state;
1048     if (1 == *flag) {
1049         if (MPI_STATUS_IGNORE != status) {
1050             *status = ofi_req.status;
1051         }
1052     }
1053 
1054     return OMPI_SUCCESS;
1055 }
1056 
1057 __opal_attribute_always_inline__ static inline int
1058 ompi_mtl_ofi_improbe_generic(struct mca_mtl_base_module_t *mtl,
1059                      struct ompi_communicator_t *comm,
1060                      int src,
1061                      int tag,
1062                      int *matched,
1063                      struct ompi_message_t **message,
1064                      struct ompi_status_public_t *status,
1065                      bool ofi_cq_data)
1066 {
1067     struct ompi_mtl_ofi_request_t *ofi_req;
1068     ompi_proc_t *ompi_proc = NULL;
1069     mca_mtl_ofi_endpoint_t *endpoint = NULL;
1070     fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
1071     uint64_t match_bits, mask_bits;
1072     ssize_t ret;
1073     struct fi_msg_tagged msg;
1074     uint64_t msgflags = FI_PEEK | FI_CLAIM | FI_COMPLETION;
1075     int ctxt_id = 0;
1076 
1077     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(comm->c_contextid);
1078     set_thread_context(ctxt_id);
1079 
1080     ofi_req = malloc(sizeof *ofi_req);
1081     if (NULL == ofi_req) {
1082         return OMPI_ERROR;
1083     }
1084 
1085     /**
1086      * If the source is known, use its peer_fiaddr.
1087      */
1088 
1089     if (ofi_cq_data) {
1090         if (MPI_ANY_SOURCE != src) {
1091             ompi_proc = ompi_comm_peer_lookup( comm, src );
1092             endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
1093             remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
1094         }
1095 
1096         mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
1097                                     tag);
1098     }
1099     else {
1100         /* src_addr is ignored when FI_DIRECTED_RECV is not used */
1101         mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
1102                                 tag);
1103     }
1104 
1105     /**
1106      * fi_trecvmsg with FI_PEEK and FI_CLAIM:
1107      * Initiate a search for a match in the hardware or software queue.
1108      * The search can complete immediately with -ENOMSG.
1109      * If successful, libfabric will enqueue a context entry into the completion
1110      * queue to make the search nonblocking.  This code will poll until the
1111      * entry is enqueued.
1112      */
1113     msg.msg_iov = NULL;
1114     msg.desc = NULL;
1115     msg.iov_count = 0;
1116     msg.addr = remote_proc;
1117     msg.tag = match_bits;
1118     msg.ignore = mask_bits;
1119     msg.context = (void *)&ofi_req->ctx;
1120     msg.data = 0;
1121 
1122     ofi_req->type = OMPI_MTL_OFI_PROBE;
1123     ofi_req->event_callback = ompi_mtl_ofi_probe_callback;
1124     ofi_req->error_callback = ompi_mtl_ofi_probe_error_callback;
1125     ofi_req->completion_count = 1;
1126     ofi_req->match_state = 0;
1127     ofi_req->mask_bits = mask_bits;
1128 
1129     MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
1130     if (-FI_ENOMSG == ret) {
1131         /**
1132          * The search request completed but no matching message was found.
1133          */
1134         *matched = 0;
1135         free(ofi_req);
1136         return OMPI_SUCCESS;
1137     } else if (OPAL_UNLIKELY(0 > ret)) {
1138         MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
1139         free(ofi_req);
1140         return ompi_mtl_ofi_get_error(ret);
1141     }
1142 
1143     while (0 < ofi_req->completion_count) {
1144         opal_progress();
1145     }
1146 
1147     *matched = ofi_req->match_state;
1148     if (1 == *matched) {
1149         if (MPI_STATUS_IGNORE != status) {
1150             *status = ofi_req->status;
1151         }
1152 
1153         (*message) = ompi_message_alloc();
1154         if (NULL == (*message)) {
1155             return OMPI_ERR_OUT_OF_RESOURCE;
1156         }
1157 
1158         (*message)->comm = comm;
1159         (*message)->req_ptr = ofi_req;
1160         (*message)->peer = ofi_req->status.MPI_SOURCE;
1161         (*message)->count = ofi_req->status._ucount;
1162 
1163     } else {
1164         (*message) = MPI_MESSAGE_NULL;
1165         free(ofi_req);
1166     }
1167 
1168     return OMPI_SUCCESS;
1169 }
1170 
1171 __opal_attribute_always_inline__ static inline int
1172 ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t *mtl,
1173                     mca_mtl_request_t *mtl_request,
1174                     int flag)
1175 {
1176     int ret, ctxt_id = 0;
1177     ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
1178 
1179     ctxt_id = ompi_mtl_ofi_map_comm_to_ctxt(ofi_req->comm->c_contextid);
1180 
1181     switch (ofi_req->type) {
1182         case OMPI_MTL_OFI_SEND:
1183             /**
1184              * Cannot cancel sends yet
1185              */
1186             break;
1187 
1188         case OMPI_MTL_OFI_RECV:
1189             /**
1190              * Cancel a receive request only if it hasn't been matched yet.
1191              * The event queue needs to be drained to make sure there isn't
1192              * any pending receive completion event.
1193              */
1194             ompi_mtl_ofi_progress();
1195 
1196             if (!ofi_req->req_started) {
1197                 ret = fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
1198                                &ofi_req->ctx);
1199                 if (0 == ret) {
1200                     if (ofi_req->req_started)
1201                         goto ofi_cancel_not_possible;
1202                 } else {
1203 ofi_cancel_not_possible:
1204                     /**
1205                      * Could not cancel the request.
1206                      */
1207                     ofi_req->super.ompi_req->req_status._cancelled = false;
1208                 }
1209             }
1210             break;
1211 
1212         default:
1213             return OMPI_ERROR;
1214     }
1215 
1216     return OMPI_SUCCESS;
1217 }
1218 
1219 static int ompi_mtl_ofi_init_contexts(struct mca_mtl_base_module_t *mtl,
1220                                       struct ompi_communicator_t *comm,
1221                                       mca_mtl_ofi_ep_type ep_type)
1222 {
1223     int ret;
1224     int ctxt_id = ompi_mtl_ofi.total_ctxts_used;
1225     struct fi_cq_attr cq_attr = {0};
1226     cq_attr.format = FI_CQ_FORMAT_TAGGED;
1227     cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
1228 
1229     if (OFI_REGULAR_EP == ep_type) {
1230         /*
1231          * For regular endpoints, just create the Lock object and register
1232          * progress function.
1233          */
1234         goto init_regular_ep;
1235     }
1236 
1237     /*
1238      * We only create upto Max number of contexts asked for by the user.
1239      * If user enables thread grouping feature and creates more number of
1240      * communicators than available contexts, then we set the threshold
1241      * context_id so that new communicators created beyond the threshold
1242      * will be assigned to contexts in a round-robin fashion.
1243      */
1244     if (ompi_mtl_ofi.num_ofi_contexts <= ompi_mtl_ofi.total_ctxts_used) {
1245         ompi_mtl_ofi.comm_to_context[comm->c_contextid] = comm->c_contextid %
1246                                                           ompi_mtl_ofi.total_ctxts_used;
1247         if (!ompi_mtl_ofi.threshold_comm_context_id) {
1248             ompi_mtl_ofi.threshold_comm_context_id = comm->c_contextid;
1249 
1250             opal_show_help("help-mtl-ofi.txt", "SEP thread grouping ctxt limit", true, ctxt_id,
1251                            ompi_process_info.nodename, __FILE__, __LINE__);
1252         }
1253 
1254         return OMPI_SUCCESS;
1255     }
1256 
1257     /* Init context info for Scalable EPs */
1258     ret = fi_tx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, NULL);
1259     if (ret) {
1260         MTL_OFI_LOG_FI_ERR(ret, "fi_tx_context failed");
1261         goto init_error;
1262     }
1263 
1264     ret = fi_rx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, NULL);
1265     if (ret) {
1266         MTL_OFI_LOG_FI_ERR(ret, "fi_rx_context failed");
1267         goto init_error;
1268     }
1269 
1270     ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, NULL);
1271     if (ret) {
1272         MTL_OFI_LOG_FI_ERR(ret, "fi_cq_open failed");
1273         goto init_error;
1274     }
1275 
1276     /* Bind CQ to TX/RX context object */
1277     ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
1278                      FI_TRANSMIT | FI_SELECTIVE_COMPLETION);
1279     if (0 != ret) {
1280         MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_TRANSMIT) failed");
1281         goto init_error;
1282     }
1283 
1284     ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
1285                      FI_RECV | FI_SELECTIVE_COMPLETION);
1286     if (0 != ret) {
1287         MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_RECV) failed");
1288         goto init_error;
1289     }
1290 
1291     /* Enable Endpoint for communication. This commits the bind operations */
1292     ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
1293     if (0 != ret) {
1294         MTL_OFI_LOG_FI_ERR(ret, "fi_enable (send context) failed");
1295         goto init_error;
1296     }
1297 
1298     ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
1299     if (0 != ret) {
1300         MTL_OFI_LOG_FI_ERR(ret, "fi_enable (recv context) failed");
1301         goto init_error;
1302     }
1303 
1304 init_regular_ep:
1305     /* Initialize per-context lock */
1306     OBJ_CONSTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock, opal_mutex_t);
1307 
1308     if (MPI_COMM_WORLD == comm) {
1309         ret = opal_progress_register(ompi_mtl_ofi_progress_no_inline);
1310         if (OMPI_SUCCESS != ret) {
1311             opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
1312                                 "%s:%d: opal_progress_register failed: %d\n",
1313                                 __FILE__, __LINE__, ret);
1314             goto init_error;
1315         }
1316     }
1317 
1318     ompi_mtl_ofi.comm_to_context[comm->c_contextid] = ompi_mtl_ofi.total_ctxts_used;
1319     ompi_mtl_ofi.total_ctxts_used++;
1320 
1321     return OMPI_SUCCESS;
1322 
1323 init_error:
1324     if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep) {
1325         (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
1326     }
1327 
1328     if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep) {
1329         (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
1330     }
1331 
1332     if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq) {
1333         (void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq);
1334     }
1335 
1336     return ret;
1337 }
1338 
1339 static int ompi_mtl_ofi_finalize_contexts(struct mca_mtl_base_module_t *mtl,
1340                                           struct ompi_communicator_t *comm,
1341                                           mca_mtl_ofi_ep_type ep_type)
1342 {
1343     int ret = OMPI_SUCCESS, ctxt_id = 0;
1344 
1345     if (OFI_REGULAR_EP == ep_type) {
1346         /* For regular EPs, simply destruct Lock object and exit */
1347         goto finalize_regular_ep;
1348     }
1349 
1350     if (ompi_mtl_ofi.thread_grouping &&
1351         ompi_mtl_ofi.threshold_comm_context_id &&
1352         ((uint32_t) ompi_mtl_ofi.threshold_comm_context_id <= comm->c_contextid)) {
1353         return OMPI_SUCCESS;
1354     }
1355 
1356     ctxt_id = ompi_mtl_ofi.thread_grouping ?
1357            ompi_mtl_ofi.comm_to_context[comm->c_contextid] : 0;
1358 
1359     /*
1360      * For regular EPs, TX/RX contexts are aliased to SEP object which is
1361      * closed in ompi_mtl_ofi_finalize(). So, skip handling those here.
1362      */
1363     if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep))) {
1364         goto finalize_err;
1365     }
1366 
1367     if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep))) {
1368         goto finalize_err;
1369     }
1370 
1371     if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq))) {
1372         goto finalize_err;
1373     }
1374 
1375 finalize_regular_ep:
1376     /* Destroy context lock */
1377     OBJ_DESTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock);
1378 
1379     return OMPI_SUCCESS;
1380 
1381 finalize_err:
1382     opal_show_help("help-mtl-ofi.txt", "OFI call fail", true,
1383                    "fi_close",
1384                    ompi_process_info.nodename, __FILE__, __LINE__,
1385                    fi_strerror(-ret), ret);
1386 
1387     return OMPI_ERROR;
1388 }
1389 
1390 __opal_attribute_always_inline__ static inline int
1391 ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl,
1392                       struct ompi_communicator_t *comm)
1393 {
1394     int ret;
1395     mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ?
1396                                   OFI_REGULAR_EP : OFI_SCALABLE_EP;
1397 
1398     /*
1399      * If thread grouping enabled, add new OFI context for each communicator
1400      * other than MPI_COMM_SELF.
1401      */
1402     if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
1403         /* If no thread grouping, add new OFI context only
1404          * for MPI_COMM_WORLD.
1405          */
1406         (!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
1407 
1408         ret = ompi_mtl_ofi_init_contexts(mtl, comm, ep_type);
1409 
1410         if (OMPI_SUCCESS != ret) {
1411             goto error;
1412         }
1413     }
1414 
1415     return OMPI_SUCCESS;
1416 
1417 error:
1418     return OMPI_ERROR;
1419 }
1420 
1421 __opal_attribute_always_inline__ static inline int
1422 ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl,
1423                       struct ompi_communicator_t *comm)
1424 {
1425     int ret = OMPI_SUCCESS;
1426     mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ?
1427                                   OFI_REGULAR_EP : OFI_SCALABLE_EP;
1428 
1429     /*
1430      * Clean up OFI contexts information.
1431      */
1432     if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
1433         (!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
1434 
1435         ret = ompi_mtl_ofi_finalize_contexts(mtl, comm, ep_type);
1436     }
1437 
1438     return ret;
1439 }
1440 
1441 #ifdef MCA_ompi_mtl_DIRECT_CALL
1442 
1443 __opal_attribute_always_inline__ static inline int
1444 ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
1445                   struct ompi_communicator_t *comm,
1446                   int dest,
1447                   int tag,
1448                   struct opal_convertor_t *convertor,
1449                   mca_pml_base_send_mode_t mode)
1450 {
1451     return ompi_mtl_ofi_send_generic(mtl, comm, dest, tag,
1452                                     convertor, mode,
1453                                     ompi_mtl_ofi.fi_cq_data);
1454 }
1455 
1456 __opal_attribute_always_inline__ static inline int
1457 ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
1458                struct ompi_communicator_t *comm,
1459                int dest,
1460                int tag,
1461                struct opal_convertor_t *convertor,
1462                mca_pml_base_send_mode_t mode,
1463                bool blocking,
1464                mca_mtl_request_t *mtl_request)
1465 {
1466     return ompi_mtl_ofi_isend_generic(mtl, comm, dest, tag,
1467                                     convertor, mode, blocking, mtl_request,
1468                                     ompi_mtl_ofi.fi_cq_data);
1469 }
1470 
1471 __opal_attribute_always_inline__ static inline int
1472 ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
1473                struct ompi_communicator_t *comm,
1474                int src,
1475                int tag,
1476                struct opal_convertor_t *convertor,
1477                mca_mtl_request_t *mtl_request)
1478 {
1479     return ompi_mtl_ofi_irecv_generic(mtl, comm, src, tag,
1480                                     convertor, mtl_request,
1481                                     ompi_mtl_ofi.fi_cq_data);
1482 }
1483 
1484 __opal_attribute_always_inline__ static inline int
1485 ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
1486                 struct ompi_communicator_t *comm,
1487                 int src,
1488                 int tag,
1489                 int *flag,
1490                 struct ompi_status_public_t *status)
1491 {
1492     return ompi_mtl_ofi_iprobe_generic(mtl, comm, src, tag,
1493                                     flag, status,
1494                                     ompi_mtl_ofi.fi_cq_data);
1495 }
1496 
1497 __opal_attribute_always_inline__ static inline int
1498 ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
1499                  struct ompi_communicator_t *comm,
1500                  int src,
1501                  int tag,
1502                  int *matched,
1503                  struct ompi_message_t **message,
1504                  struct ompi_status_public_t *status)
1505 {
1506     return ompi_mtl_ofi_improbe_generic(mtl, comm, src, tag,
1507                                     matched, message, status,
1508                                     ompi_mtl_ofi.fi_cq_data);
1509 }
1510 #endif
1511 
1512 END_C_DECLS
1513 
1514 #endif  /* MTL_OFI_H_HAS_BEEN_INCLUDED */

/* [<][>][^][v][top][bottom][index][help] */