root/ompi/mca/pml/yalla/pml_yalla.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. send_ep_address
  2. recv_ep_address
  3. mca_pml_yalla_mem_release_cb
  4. mca_pml_yalla_open
  5. mca_pml_yalla_close
  6. mca_pml_yalla_init
  7. mca_pml_yalla_cleanup
  8. mca_pml_yalla_add_procs
  9. mca_pml_yalla_del_procs
  10. mca_pml_yalla_enable
  11. mca_pml_yalla_progress
  12. mca_pml_yalla_add_comm
  13. mca_pml_yalla_del_comm
  14. mca_pml_yalla_irecv_init
  15. mca_pml_yalla_irecv
  16. mca_pml_yalla_recv
  17. mca_pml_yalla_isend_init
  18. mca_pml_yalla_bsend
  19. mca_pml_yalla_isend
  20. mca_pml_yalla_send
  21. mca_pml_yalla_iprobe
  22. mca_pml_yalla_probe
  23. mca_pml_yalla_improbe
  24. mca_pml_yalla_mprobe
  25. mca_pml_yalla_imrecv
  26. mca_pml_yalla_mrecv
  27. mca_pml_yalla_start
  28. mca_pml_yalla_dump

   1 /*
   2  * Copyright (C) 2001-2011 Mellanox Technologies Ltd. ALL RIGHTS RESERVED.
   3  * Copyright (c) 2015      Research Organization for Information Science
   4  *                         and Technology (RIST). All rights reserved.
   5  * Copyright (c) 2018      Cisco Systems, Inc.  All rights reserved
   6  * $COPYRIGHT$
   7  *
   8  * Additional copyrights may follow
   9  *
  10  * $HEADER$
  11  */
  12 
  13 #ifdef HAVE_ALLOCA_H
  14 #include <alloca.h>
  15 #endif
  16 
  17 #include "pml_yalla.h"
  18 #include "pml_yalla_request.h"
  19 
  20 #include "opal/runtime/opal.h"
  21 #include "opal/memoryhooks/memory.h"
  22 #include "opal/mca/memory/base/base.h"
  23 #include "opal/mca/pmix/pmix.h"
  24 #include "ompi/mca/pml/base/pml_base_bsend.h"
  25 #include "ompi/message/message.h"
  26 
  27 #define MODEX_KEY "yalla-mxm"
  28 
  29 mca_pml_yalla_module_t ompi_pml_yalla = {
  30     {
  31         mca_pml_yalla_add_procs,
  32         mca_pml_yalla_del_procs,
  33         mca_pml_yalla_enable,
  34         NULL,
  35         mca_pml_yalla_add_comm,
  36         mca_pml_yalla_del_comm,
  37         mca_pml_yalla_irecv_init,
  38         mca_pml_yalla_irecv,
  39         mca_pml_yalla_recv,
  40         mca_pml_yalla_isend_init,
  41         mca_pml_yalla_isend,
  42         mca_pml_yalla_send,
  43         mca_pml_yalla_iprobe,
  44         mca_pml_yalla_probe,
  45         mca_pml_yalla_start,
  46         mca_pml_yalla_improbe,
  47         mca_pml_yalla_mprobe,
  48         mca_pml_yalla_imrecv,
  49         mca_pml_yalla_mrecv,
  50         mca_pml_yalla_dump,
  51         NULL, /* FT */
  52         1ul << ((sizeof(mxm_ctxid_t)*8) - 1),
  53         1ul << ((sizeof(mxm_tag_t)*8 - 1) - 1),
  54     },
  55     NULL,
  56     NULL,
  57     NULL,
  58     NULL
  59 };
  60 
  61 static int send_ep_address(void)
  62 {
  63     mxm_error_t error;
  64     void *address;
  65     size_t addrlen;
  66     int rc;
  67 
  68     addrlen = 0;
  69     error = mxm_ep_get_address(ompi_pml_yalla.mxm_ep, NULL, &addrlen);
  70     PML_YALLA_ASSERT(error == MXM_ERR_BUFFER_TOO_SMALL);
  71 
  72     address = alloca(addrlen);
  73     error = mxm_ep_get_address(ompi_pml_yalla.mxm_ep, address, &addrlen);
  74     if (MXM_OK != error) {
  75         PML_YALLA_ERROR("%s", "Failed to get EP address");
  76         return OMPI_ERROR;
  77     }
  78 
  79     OPAL_MODEX_SEND(rc, OPAL_PMIX_GLOBAL,
  80                     &mca_pml_yalla_component.pmlm_version, address, addrlen);
  81     if (OMPI_SUCCESS != rc) {
  82         PML_YALLA_ERROR("%s", "Open MPI couldn't distribute EP connection details");
  83         return OMPI_ERROR;
  84     }
  85 
  86     return OMPI_SUCCESS;
  87 }
  88 
  89 static int recv_ep_address(ompi_proc_t *proc, void **address_p, size_t *addrlen_p)
  90 {
  91     int rc;
  92 
  93     OPAL_MODEX_RECV(rc, &mca_pml_yalla_component.pmlm_version, &proc->super.proc_name,
  94                     address_p, addrlen_p);
  95     if (rc < 0) {
  96         PML_YALLA_ERROR("%s", "Failed to receive EP address");
  97     }
  98     return rc;
  99 }
 100 
 101 static void mca_pml_yalla_mem_release_cb(void *buf, size_t length,
 102                                          void *cbdata, bool from_alloc)
 103 {
 104     mxm_mem_unmap(ompi_pml_yalla.mxm_context, buf, length,
 105                   from_alloc ? MXM_MEM_UNMAP_MARK_INVALID : 0);
 106 }
 107 
 108 int mca_pml_yalla_open(void)
 109 {
 110     mxm_error_t error;
 111 
 112     PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_open");
 113 
 114     (void)mca_base_framework_open(&opal_memory_base_framework, 0);
 115 
 116     /* Set memory hooks */
 117     if ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) ==
 118         ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) &
 119          opal_mem_hooks_support_level()))
 120     {
 121         PML_YALLA_VERBOSE(1, "%s", "enabling on-demand memory mapping");
 122         opal_setenv("MXM_MPI_MEM_ON_DEMAND_MAP", "y", false, &environ);
 123         ompi_pml_yalla.using_mem_hooks = 1;
 124     } else {
 125         PML_YALLA_VERBOSE(1, "%s", "disabling on-demand memory mapping");
 126         ompi_pml_yalla.using_mem_hooks = 0;
 127     }
 128     opal_setenv("MXM_MPI_SINGLE_THREAD", ompi_mpi_thread_multiple ? "n" : "y",
 129                 false, &environ);
 130 
 131     /* Read options */
 132     error = mxm_config_read_opts(&ompi_pml_yalla.ctx_opts, &ompi_pml_yalla.ep_opts,
 133                                  "MPI", NULL, 0);
 134     if (MXM_OK != error) {
 135         return OMPI_ERROR;
 136     }
 137 
 138     error = mxm_init(ompi_pml_yalla.ctx_opts, &ompi_pml_yalla.mxm_context);
 139     if (MXM_OK != error) {
 140         return OMPI_ERROR;
 141     }
 142 
 143     return OMPI_SUCCESS;
 144 }
 145 
 146 int mca_pml_yalla_close(void)
 147 {
 148     PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_close");
 149 
 150     if (ompi_pml_yalla.ctx_opts != NULL) {
 151         mxm_config_free_context_opts(ompi_pml_yalla.ctx_opts);
 152     }
 153     if (ompi_pml_yalla.ep_opts != NULL) {
 154         mxm_config_free_ep_opts(ompi_pml_yalla.ep_opts);
 155     }
 156     if (ompi_pml_yalla.mxm_context != NULL) {
 157         mxm_cleanup(ompi_pml_yalla.mxm_context);
 158         ompi_pml_yalla.mxm_context = NULL;
 159     }
 160     mca_base_framework_close(&opal_memory_base_framework);
 161     return 0;
 162 }
 163 
 164 int mca_pml_yalla_init(void)
 165 {
 166     mxm_error_t error;
 167     int rc;
 168 
 169     PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_init");
 170 
 171     if (ompi_pml_yalla.using_mem_hooks) {
 172         opal_mem_hooks_register_release(mca_pml_yalla_mem_release_cb, NULL);
 173     }
 174 
 175     error = mxm_ep_create(ompi_pml_yalla.mxm_context, ompi_pml_yalla.ep_opts,
 176                           &ompi_pml_yalla.mxm_ep);
 177     if (MXM_OK != error) {
 178         return OMPI_ERROR;
 179     }
 180 
 181     rc = send_ep_address();
 182     if (rc < 0) {
 183         return rc;
 184     }
 185 
 186     OBJ_CONSTRUCT(&ompi_pml_yalla.send_reqs, mca_pml_yalla_freelist_t);
 187     OBJ_CONSTRUCT(&ompi_pml_yalla.bsend_reqs, mca_pml_yalla_freelist_t);
 188     OBJ_CONSTRUCT(&ompi_pml_yalla.recv_reqs, mca_pml_yalla_freelist_t);
 189     OBJ_CONSTRUCT(&ompi_pml_yalla.convs, mca_pml_yalla_freelist_t);
 190 
 191     opal_progress_register(mca_pml_yalla_progress);
 192     
 193     ompi_pml_yalla.super.pml_flags |= MCA_PML_BASE_FLAG_REQUIRE_WORLD;
 194 
 195     PML_YALLA_VERBOSE(2, "created mxm context %p ep %p", (void *)ompi_pml_yalla.mxm_context,
 196                       (void *)ompi_pml_yalla.mxm_ep);
 197     return OMPI_SUCCESS;
 198 }
 199 
 200 int mca_pml_yalla_cleanup(void)
 201 {
 202     PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_cleanup");
 203 
 204     opal_progress_unregister(mca_pml_yalla_progress);
 205 
 206     OBJ_DESTRUCT(&ompi_pml_yalla.convs);
 207     OBJ_DESTRUCT(&ompi_pml_yalla.recv_reqs);
 208     OBJ_DESTRUCT(&ompi_pml_yalla.bsend_reqs);
 209     OBJ_DESTRUCT(&ompi_pml_yalla.send_reqs);
 210 
 211     if (ompi_pml_yalla.mxm_ep) {
 212         mxm_ep_destroy(ompi_pml_yalla.mxm_ep);
 213         ompi_pml_yalla.mxm_ep = NULL;
 214     }
 215     if (ompi_pml_yalla.using_mem_hooks) {
 216         opal_mem_hooks_unregister_release(mca_pml_yalla_mem_release_cb);
 217     }
 218 
 219     return OMPI_SUCCESS;
 220 }
 221 
 222 int mca_pml_yalla_add_procs(struct ompi_proc_t **procs, size_t nprocs)
 223 {
 224     size_t i;
 225     int ret;
 226     void *address;
 227     mxm_conn_h conn;
 228     size_t addrlen;
 229     mxm_error_t error;
 230 
 231     if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("yalla",
 232                                                               procs,
 233                                                               nprocs))) {
 234         return ret;
 235     }
 236 
 237     for (i = 0; i < nprocs; ++i) {
 238         ret = recv_ep_address(procs[i], &address, &addrlen);
 239         if (ret < 0) {
 240             return ret;
 241         }
 242 
 243         if (procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) {
 244             PML_YALLA_VERBOSE(3, "already connected to proc. %s",
 245                               OPAL_NAME_PRINT(procs[i]->super.proc_name));
 246             continue;
 247         }
 248 
 249         PML_YALLA_VERBOSE(2, "connecting to proc. %s",
 250                           OPAL_NAME_PRINT(procs[i]->super.proc_name));
 251         error = mxm_ep_connect(ompi_pml_yalla.mxm_ep, address, &conn);
 252         free(address);
 253 
 254         if (MXM_OK != error) {
 255             PML_YALLA_ERROR("%s", "Failed to connect");
 256             return OMPI_ERROR;
 257         }
 258 
 259         procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = conn;
 260     }
 261 
 262     return OMPI_SUCCESS;
 263 }
 264 
 265 int mca_pml_yalla_del_procs(struct ompi_proc_t **procs, size_t nprocs)
 266 {
 267     size_t i;
 268     int ret;
 269 
 270     if (ompi_mpi_state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
 271         PML_YALLA_VERBOSE(3, "%s", "using bulk powerdown");
 272         mxm_ep_powerdown(ompi_pml_yalla.mxm_ep);
 273     }
 274 
 275     for (i = 0; i < nprocs; ++i) {
 276         mxm_ep_disconnect(procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]);
 277         PML_YALLA_VERBOSE(2, "disconnected from rank %s", OPAL_NAME_PRINT(procs[i]->super.proc_name));
 278         procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
 279     }
 280     if (OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
 281         return ret;
 282     }
 283     return OMPI_SUCCESS;
 284 }
 285 
 286 int mca_pml_yalla_enable(bool enable)
 287 {
 288     mca_pml_yalla_init_reqs();
 289     mca_pml_yalla_init_datatype();
 290     return OMPI_SUCCESS;
 291 }
 292 
 293 int mca_pml_yalla_progress(void)
 294 {
 295     mxm_progress(ompi_pml_yalla.mxm_context);
 296     return OMPI_SUCCESS;
 297 }
 298 
 299 int mca_pml_yalla_add_comm(struct ompi_communicator_t* comm)
 300 {
 301     mxm_error_t error;
 302     mxm_mq_h mq;
 303 
 304     error = mxm_mq_create(ompi_pml_yalla.mxm_context, comm->c_contextid, &mq);
 305     if (MXM_OK != error) {
 306         return OMPI_ERROR;
 307     }
 308 
 309     comm->c_pml_comm = (void*)mq;
 310     PML_YALLA_VERBOSE(2, "created mq ctxid %d for comm %s", comm->c_contextid,
 311                       comm->c_name);
 312     return OMPI_SUCCESS;
 313 }
 314 
 315 int mca_pml_yalla_del_comm(struct ompi_communicator_t* comm)
 316 {
 317     mxm_mq_h mq = (void*)comm->c_pml_comm;
 318 
 319     if (ompi_pml_yalla.mxm_context == NULL) {
 320         PML_YALLA_ERROR("%s", "Destroying communicator after MXM context is destroyed");
 321         return OMPI_ERROR;
 322     }
 323 
 324     PML_YALLA_VERBOSE(2, "destroying mq ctxid %d of comm %s", comm->c_contextid,
 325                       comm->c_name);
 326     mxm_mq_destroy(mq);
 327     return OMPI_SUCCESS;
 328 }
 329 
 330 int mca_pml_yalla_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype,
 331                              int src, int tag, struct ompi_communicator_t* comm,
 332                              struct ompi_request_t **request)
 333 {
 334     mca_pml_yalla_recv_request_t *rreq;
 335 
 336     rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, src, tag, comm,
 337                                    OMPI_REQUEST_INACTIVE);
 338     rreq->super.ompi.req_persistent = true;
 339     rreq->super.flags = 0;
 340     *request = &rreq->super.ompi;
 341     PML_YALLA_VERBOSE(9, "init recv request %p src %d tag %d comm %s", (void*)(*request),
 342                       src, tag, comm->c_name);
 343     return OMPI_SUCCESS;
 344 }
 345 
 346 int mca_pml_yalla_irecv(void *buf, size_t count, ompi_datatype_t *datatype,
 347                         int src, int tag, struct ompi_communicator_t* comm,
 348                         struct ompi_request_t **request)
 349 {
 350     mca_pml_yalla_recv_request_t *rreq;
 351     mxm_error_t error;
 352 
 353     rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, src, tag, comm,
 354                                    OMPI_REQUEST_ACTIVE);
 355     rreq->super.ompi.req_persistent = false;
 356     rreq->super.flags = 0;
 357 
 358     PML_YALLA_VERBOSE(8, "receive request *%p=%p from %d tag %d dtype %s count %zu",
 359                       (void *)request, (void *)rreq, src, tag, datatype->name, count);
 360 
 361     error = mxm_req_recv(&rreq->mxm);
 362     if (MXM_OK != error) {
 363         return OMPI_ERROR;
 364     }
 365 
 366     *request = &rreq->super.ompi;
 367     return OMPI_SUCCESS;
 368 }
 369 
 370 int mca_pml_yalla_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src,
 371                        int tag, struct ompi_communicator_t* comm,
 372                        ompi_status_public_t* status)
 373 {
 374     mxm_recv_req_t rreq;
 375     mxm_error_t error;
 376     int rc;
 377 
 378     PML_YALLA_INIT_MXM_RECV_REQ(&rreq, buf, count, datatype, src, tag, comm, recv);
 379     PML_YALLA_INIT_BLOCKING_MXM_RECV_REQ(&rreq);
 380 
 381     PML_YALLA_VERBOSE(8, "receive from %d tag %d dtype %s count %zu", src, tag,
 382                       datatype->name, count);
 383 
 384     error = mxm_req_recv(&rreq);
 385     if (MXM_OK != error) {
 386         return OMPI_ERROR;
 387     }
 388 
 389     PML_YALLA_WAIT_MXM_REQ(&rreq.base);
 390     PML_YALLA_VERBOSE(8, "receive completed with status %s source %d rtag %d(%d/0x%x) len %zu",
 391                       mxm_error_string(rreq.base.error),
 392                       rreq.completion.sender_imm, rreq.completion.sender_tag,
 393                       rreq.tag, rreq.tag_mask,
 394                       rreq.completion.actual_len);
 395     rc = PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.actual_len, status);
 396     PML_YALLA_FREE_BLOCKING_MXM_REQ(&rreq.base);
 397 
 398     return rc;
 399 }
 400 
 401 int mca_pml_yalla_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype,
 402                              int dst, int tag, mca_pml_base_send_mode_t mode,
 403                              struct ompi_communicator_t* comm,
 404                              struct ompi_request_t **request)
 405 {
 406     mca_pml_yalla_send_request_t *sreq;
 407 
 408     sreq = MCA_PML_YALLA_SREQ_INIT((void *)buf, count, datatype, dst, tag, mode, comm,
 409                                    OMPI_REQUEST_INACTIVE);
 410     sreq->super.ompi.req_persistent = true;
 411     sreq->super.flags = MCA_PML_YALLA_REQUEST_FLAG_SEND;
 412     if (mode == MCA_PML_BASE_SEND_BUFFERED) {
 413         sreq->super.flags |= MCA_PML_YALLA_REQUEST_FLAG_BSEND;
 414     }
 415 
 416     *request = &sreq->super.ompi;
 417     PML_YALLA_VERBOSE(9, "init send request %p dst %d tag %d comm %s", (void *)*request,
 418                       dst, tag, comm->c_name);
 419     return OMPI_SUCCESS;
 420 }
 421 
 422 static int mca_pml_yalla_bsend(mxm_send_req_t *mxm_sreq)
 423 {
 424     mca_pml_yalla_bsend_request_t *bsreq = (mca_pml_yalla_bsend_request_t *)PML_YALLA_FREELIST_GET(&ompi_pml_yalla.bsend_reqs);
 425     mxm_error_t error;
 426     size_t length;
 427 
 428     /* Create a new send request using MPI internal buffer */
 429 
 430     bsreq->mxm.base.state     = mxm_sreq->base.state;
 431     bsreq->mxm.base.mq        = mxm_sreq->base.mq;
 432     bsreq->mxm.base.conn      = mxm_sreq->base.conn;
 433 
 434     bsreq->mxm.base.data_type = MXM_REQ_DATA_BUFFER;
 435     switch (mxm_sreq->base.data_type) {
 436     case MXM_REQ_DATA_BUFFER:
 437         length = mxm_sreq->base.data.buffer.length;
 438         bsreq->mxm.base.data.buffer.ptr = mca_pml_base_bsend_request_alloc_buf(length);
 439         bsreq->mxm.base.data.buffer.length = length;
 440         memcpy(bsreq->mxm.base.data.buffer.ptr, mxm_sreq->base.data.buffer.ptr, length);
 441         break;
 442     case MXM_REQ_DATA_STREAM:
 443         length = mxm_sreq->base.data.stream.length;
 444         bsreq->mxm.base.data.buffer.ptr = mca_pml_base_bsend_request_alloc_buf(length);
 445         bsreq->mxm.base.data.buffer.length = length;
 446         mxm_sreq->base.data.stream.cb(bsreq->mxm.base.data.buffer.ptr, length,
 447                                       0, mxm_sreq->base.context);
 448         break;
 449     default:
 450         return OMPI_ERROR;
 451     }
 452 
 453     bsreq->mxm.opcode         = mxm_sreq->opcode;
 454     bsreq->mxm.flags          = mxm_sreq->flags;
 455     bsreq->mxm.op.send        = mxm_sreq->op.send;
 456 
 457     error = mxm_req_send(&bsreq->mxm);
 458     if (MXM_OK != error) {
 459         return OMPI_ERROR;
 460     }
 461 
 462     /* Make the completion handler believe it's ok to release the original request */
 463     mxm_sreq->base.state = MXM_REQ_COMPLETED;
 464 
 465     return OMPI_SUCCESS;
 466 }
 467 
 468 int mca_pml_yalla_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
 469                         int dst, int tag, mca_pml_base_send_mode_t mode,
 470                         struct ompi_communicator_t* comm,
 471                         struct ompi_request_t **request)
 472 {
 473     mca_pml_yalla_send_request_t *sreq;
 474     mxm_error_t error;
 475     int rc;
 476 
 477     sreq = MCA_PML_YALLA_SREQ_INIT((void *)buf, count, datatype, dst, tag, mode, comm,
 478                                    OMPI_REQUEST_ACTIVE);
 479     sreq->super.ompi.req_persistent = false;
 480     sreq->super.flags = 0;
 481 
 482     PML_YALLA_VERBOSE(8, "send request *%p=%p to %d mode %d tag %d dtype %s count %zu",
 483                       (void *)request, (void *)sreq, dst, mode, tag, datatype->name, count);
 484 
 485     if (mode == MCA_PML_BASE_SEND_BUFFERED) {
 486         rc = mca_pml_yalla_bsend(&sreq->mxm);
 487         sreq->super.ompi.req_status.MPI_ERROR = rc;
 488         ompi_request_complete(&sreq->super.ompi, true);
 489         *request = &sreq->super.ompi;
 490         return rc;
 491     }
 492 
 493     error = mxm_req_send(&sreq->mxm);
 494     if (MXM_OK != error) {
 495         return OMPI_ERROR;
 496     }
 497 
 498     *request = &sreq->super.ompi;
 499     return OMPI_SUCCESS;
 500 }
 501 
 502 int mca_pml_yalla_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst,
 503                        int tag, mca_pml_base_send_mode_t mode,
 504                        struct ompi_communicator_t* comm)
 505 {
 506     mxm_send_req_t sreq;
 507     mxm_error_t error;
 508 
 509     PML_YALLA_INIT_MXM_SEND_REQ(&sreq, (void *)buf, count, datatype, dst, tag, mode, comm, send);
 510     PML_YALLA_INIT_BLOCKING_MXM_SEND_REQ(&sreq);
 511 
 512     PML_YALLA_VERBOSE(8, "send to %d tag %d dtype %s count %zu", dst, tag,
 513                       datatype->name, count);
 514 
 515     if (mode == MCA_PML_BASE_SEND_BUFFERED) {
 516         return mca_pml_yalla_bsend(&sreq);
 517     }
 518 
 519     error = mxm_req_send(&sreq);
 520     if (MXM_OK != error) {
 521         return OMPI_ERROR;
 522     }
 523 
 524     PML_YALLA_WAIT_MXM_REQ(&sreq.base);
 525     if (MXM_OK != sreq.base.error) {
 526         return OMPI_ERROR;
 527     }
 528 
 529     PML_YALLA_FREE_BLOCKING_MXM_REQ(&sreq.base);
 530 
 531     return OMPI_SUCCESS;
 532 }
 533 
 534 int mca_pml_yalla_iprobe(int src, int tag, struct ompi_communicator_t* comm,
 535                          int *matched, ompi_status_public_t* status)
 536 {
 537     mxm_recv_req_t rreq;
 538     mxm_error_t error;
 539 
 540     PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
 541 
 542     error = mxm_req_probe(&rreq);
 543     switch (error) {
 544     case MXM_OK:
 545         *matched = 1;
 546         PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
 547         return OMPI_SUCCESS;
 548     case MXM_ERR_NO_MESSAGE:
 549         *matched = 0;
 550         return OMPI_SUCCESS;
 551     default:
 552         return OMPI_ERROR;
 553     }
 554 
 555     return OMPI_SUCCESS;
 556 }
 557 
 558 int mca_pml_yalla_probe(int src, int tag, struct ompi_communicator_t* comm,
 559                         ompi_status_public_t* status)
 560 {
 561     mxm_recv_req_t rreq;
 562     mxm_error_t error;
 563 
 564     PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
 565     for (;;) {
 566         error = mxm_req_probe(&rreq);
 567         switch (error) {
 568         case MXM_OK:
 569             PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
 570             return OMPI_SUCCESS;
 571         case MXM_ERR_NO_MESSAGE:
 572             break;
 573         default:
 574             return OMPI_ERROR;
 575         }
 576 
 577         opal_progress();
 578     }
 579 }
 580 
 581 int mca_pml_yalla_improbe(int src, int tag, struct ompi_communicator_t* comm,
 582                           int *matched, struct ompi_message_t **message,
 583                           ompi_status_public_t* status)
 584 {
 585     mxm_recv_req_t rreq;
 586     mxm_message_h mxm_msg;
 587     mxm_error_t error;
 588 
 589     PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
 590 
 591     error = mxm_req_mprobe(&rreq, &mxm_msg);
 592     switch (error) {
 593     case MXM_OK:
 594         *matched = 1;
 595         PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
 596         PML_YALLA_SET_MESSAGE(&rreq, comm, mxm_msg, message);
 597         return OMPI_SUCCESS;
 598     case MXM_ERR_NO_MESSAGE:
 599         *matched = 0;
 600         return OMPI_SUCCESS;
 601     default:
 602         return OMPI_ERROR;
 603     }
 604 
 605     return OMPI_SUCCESS;
 606 }
 607 
 608 int mca_pml_yalla_mprobe(int src, int tag, struct ompi_communicator_t* comm,
 609                          struct ompi_message_t **message,
 610                          ompi_status_public_t* status)
 611 {
 612     mxm_recv_req_t rreq;
 613     mxm_message_h mxm_msg;
 614     mxm_error_t error;
 615 
 616     PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
 617     for (;;) {
 618         error = mxm_req_mprobe(&rreq, &mxm_msg);
 619         switch (error) {
 620         case MXM_OK:
 621             PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
 622             PML_YALLA_SET_MESSAGE(&rreq, comm, mxm_msg, message);
 623             return OMPI_SUCCESS;
 624         case MXM_ERR_NO_MESSAGE:
 625             break;
 626         default:
 627             return OMPI_ERROR;
 628         }
 629 
 630         opal_progress();
 631     }
 632 }
 633 
 634 int mca_pml_yalla_imrecv(void *buf, size_t count, ompi_datatype_t *datatype,
 635                          struct ompi_message_t **message,
 636                          struct ompi_request_t **request)
 637 {
 638     mca_pml_yalla_recv_request_t *rreq;
 639     mxm_error_t error;
 640 
 641     rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, -1, 0, (*message)->comm,
 642                                    OMPI_REQUEST_ACTIVE);
 643     rreq->super.ompi.req_persistent = false;
 644     rreq->super.flags = 0;
 645 
 646     PML_YALLA_VERBOSE(8, "receive request *%p=%p message *%p=%p dtype %s count %zu",
 647                       (void *)request, (void *)rreq, (void *)message, (void *)(*message), datatype->name, count);
 648 
 649     error = mxm_message_recv(&rreq->mxm, (*message)->req_ptr);
 650     if (MXM_OK != error) {
 651         return OMPI_ERROR;
 652     }
 653 
 654     PML_YALLA_MESSAGE_RELEASE(message);
 655 
 656     *request = &rreq->super.ompi;
 657     return OMPI_SUCCESS;
 658 }
 659 
 660 int mca_pml_yalla_mrecv(void *buf, size_t count, ompi_datatype_t *datatype,
 661                         struct ompi_message_t **message,
 662                         ompi_status_public_t* status)
 663 {
 664     mxm_recv_req_t rreq;
 665     mxm_error_t error;
 666 
 667     PML_YALLA_INIT_MXM_RECV_REQ(&rreq, buf, count, datatype, -1, 0, (*message)->comm, recv);
 668     PML_YALLA_INIT_BLOCKING_MXM_RECV_REQ(&rreq);
 669 
 670     PML_YALLA_VERBOSE(8, "receive message *%p=%p dtype %s count %zu", (void *)message,
 671                       (void *)*message, datatype->name, count);
 672 
 673     error = mxm_message_recv(&rreq, (*message)->req_ptr);
 674     if (MXM_OK != error) {
 675         return OMPI_ERROR;
 676     }
 677 
 678     PML_YALLA_MESSAGE_RELEASE(message);
 679 
 680     PML_YALLA_WAIT_MXM_REQ(&rreq.base);
 681     PML_YALLA_VERBOSE(8, "receive completed with status %s source %d rtag %d(%d/0x%x) len %zu",
 682                       mxm_error_string(rreq.base.error),
 683                       rreq.completion.sender_imm, rreq.completion.sender_tag,
 684                       rreq.tag, rreq.tag_mask,
 685                       rreq.completion.actual_len);
 686     return PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.actual_len, status);
 687 }
 688 
 689 int mca_pml_yalla_start(size_t count, ompi_request_t** requests)
 690 {
 691     mca_pml_yalla_base_request_t *req;
 692     mxm_error_t error;
 693     size_t i;
 694     int rc;
 695 
 696     for (i = 0; i < count; ++i) {
 697         req = (mca_pml_yalla_base_request_t *)requests[i];
 698 
 699         if ((req == NULL) || (OMPI_REQUEST_PML != req->ompi.req_type)) {
 700             /* Skip irrelevant requests */
 701             continue;
 702         }
 703 
 704         PML_YALLA_ASSERT(req->ompi.req_state != OMPI_REQUEST_INVALID);
 705         PML_YALLA_RESET_OMPI_REQ(&req->ompi, OMPI_REQUEST_ACTIVE);
 706 
 707         if (req->flags & MCA_PML_YALLA_REQUEST_FLAG_SEND) {
 708             mca_pml_yalla_send_request_t *sreq;
 709             sreq = (mca_pml_yalla_send_request_t *)req;
 710             PML_YALLA_RESET_PML_REQ(req, PML_YALLA_MXM_REQBASE(sreq));
 711 
 712             if (req->flags & MCA_PML_YALLA_REQUEST_FLAG_BSEND) {
 713                 PML_YALLA_VERBOSE(8, "start bsend request %p", (void *)sreq);
 714                 rc = mca_pml_yalla_bsend(&sreq->mxm);
 715                 sreq->super.ompi.req_status.MPI_ERROR = rc;
 716                 ompi_request_complete(&sreq->super.ompi, true);
 717                 if (OMPI_SUCCESS != rc) {
 718                     return rc;
 719                 }
 720             } else {
 721                 PML_YALLA_VERBOSE(8, "start send request %p", (void *)sreq);
 722                 error = mxm_req_send(&sreq->mxm);
 723                 if (MXM_OK != error) {
 724                     return OMPI_ERROR;
 725                 }
 726             }
 727         } else {
 728             mca_pml_yalla_recv_request_t *rreq;
 729             rreq = (mca_pml_yalla_recv_request_t *)req;
 730             PML_YALLA_RESET_PML_REQ(req, PML_YALLA_MXM_REQBASE(rreq));
 731 
 732             PML_YALLA_VERBOSE(8, "start recv request %p", (void *)req);
 733             error = mxm_req_recv(&rreq->mxm);
 734             if (MXM_OK != error) {
 735                 return OMPI_ERROR;
 736             }
 737         }
 738     }
 739     return OMPI_SUCCESS;
 740 }
 741 
 742 int mca_pml_yalla_dump(struct ompi_communicator_t* comm, int verbose)
 743 {
 744     return OMPI_SUCCESS;
 745 }

/* [<][>][^][v][top][bottom][index][help] */