This source file includes following definitions.
- send_ep_address
- recv_ep_address
- mca_pml_yalla_mem_release_cb
- mca_pml_yalla_open
- mca_pml_yalla_close
- mca_pml_yalla_init
- mca_pml_yalla_cleanup
- mca_pml_yalla_add_procs
- mca_pml_yalla_del_procs
- mca_pml_yalla_enable
- mca_pml_yalla_progress
- mca_pml_yalla_add_comm
- mca_pml_yalla_del_comm
- mca_pml_yalla_irecv_init
- mca_pml_yalla_irecv
- mca_pml_yalla_recv
- mca_pml_yalla_isend_init
- mca_pml_yalla_bsend
- mca_pml_yalla_isend
- mca_pml_yalla_send
- mca_pml_yalla_iprobe
- mca_pml_yalla_probe
- mca_pml_yalla_improbe
- mca_pml_yalla_mprobe
- mca_pml_yalla_imrecv
- mca_pml_yalla_mrecv
- mca_pml_yalla_start
- mca_pml_yalla_dump
1
2
3
4
5
6
7
8
9
10
11
12
13 #ifdef HAVE_ALLOCA_H
14 #include <alloca.h>
15 #endif
16
17 #include "pml_yalla.h"
18 #include "pml_yalla_request.h"
19
20 #include "opal/runtime/opal.h"
21 #include "opal/memoryhooks/memory.h"
22 #include "opal/mca/memory/base/base.h"
23 #include "opal/mca/pmix/pmix.h"
24 #include "ompi/mca/pml/base/pml_base_bsend.h"
25 #include "ompi/message/message.h"
26
27 #define MODEX_KEY "yalla-mxm"
28
29 mca_pml_yalla_module_t ompi_pml_yalla = {
30 {
31 mca_pml_yalla_add_procs,
32 mca_pml_yalla_del_procs,
33 mca_pml_yalla_enable,
34 NULL,
35 mca_pml_yalla_add_comm,
36 mca_pml_yalla_del_comm,
37 mca_pml_yalla_irecv_init,
38 mca_pml_yalla_irecv,
39 mca_pml_yalla_recv,
40 mca_pml_yalla_isend_init,
41 mca_pml_yalla_isend,
42 mca_pml_yalla_send,
43 mca_pml_yalla_iprobe,
44 mca_pml_yalla_probe,
45 mca_pml_yalla_start,
46 mca_pml_yalla_improbe,
47 mca_pml_yalla_mprobe,
48 mca_pml_yalla_imrecv,
49 mca_pml_yalla_mrecv,
50 mca_pml_yalla_dump,
51 NULL,
52 1ul << ((sizeof(mxm_ctxid_t)*8) - 1),
53 1ul << ((sizeof(mxm_tag_t)*8 - 1) - 1),
54 },
55 NULL,
56 NULL,
57 NULL,
58 NULL
59 };
60
61 static int send_ep_address(void)
62 {
63 mxm_error_t error;
64 void *address;
65 size_t addrlen;
66 int rc;
67
68 addrlen = 0;
69 error = mxm_ep_get_address(ompi_pml_yalla.mxm_ep, NULL, &addrlen);
70 PML_YALLA_ASSERT(error == MXM_ERR_BUFFER_TOO_SMALL);
71
72 address = alloca(addrlen);
73 error = mxm_ep_get_address(ompi_pml_yalla.mxm_ep, address, &addrlen);
74 if (MXM_OK != error) {
75 PML_YALLA_ERROR("%s", "Failed to get EP address");
76 return OMPI_ERROR;
77 }
78
79 OPAL_MODEX_SEND(rc, OPAL_PMIX_GLOBAL,
80 &mca_pml_yalla_component.pmlm_version, address, addrlen);
81 if (OMPI_SUCCESS != rc) {
82 PML_YALLA_ERROR("%s", "Open MPI couldn't distribute EP connection details");
83 return OMPI_ERROR;
84 }
85
86 return OMPI_SUCCESS;
87 }
88
89 static int recv_ep_address(ompi_proc_t *proc, void **address_p, size_t *addrlen_p)
90 {
91 int rc;
92
93 OPAL_MODEX_RECV(rc, &mca_pml_yalla_component.pmlm_version, &proc->super.proc_name,
94 address_p, addrlen_p);
95 if (rc < 0) {
96 PML_YALLA_ERROR("%s", "Failed to receive EP address");
97 }
98 return rc;
99 }
100
101 static void mca_pml_yalla_mem_release_cb(void *buf, size_t length,
102 void *cbdata, bool from_alloc)
103 {
104 mxm_mem_unmap(ompi_pml_yalla.mxm_context, buf, length,
105 from_alloc ? MXM_MEM_UNMAP_MARK_INVALID : 0);
106 }
107
108 int mca_pml_yalla_open(void)
109 {
110 mxm_error_t error;
111
112 PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_open");
113
114 (void)mca_base_framework_open(&opal_memory_base_framework, 0);
115
116
117 if ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) ==
118 ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) &
119 opal_mem_hooks_support_level()))
120 {
121 PML_YALLA_VERBOSE(1, "%s", "enabling on-demand memory mapping");
122 opal_setenv("MXM_MPI_MEM_ON_DEMAND_MAP", "y", false, &environ);
123 ompi_pml_yalla.using_mem_hooks = 1;
124 } else {
125 PML_YALLA_VERBOSE(1, "%s", "disabling on-demand memory mapping");
126 ompi_pml_yalla.using_mem_hooks = 0;
127 }
128 opal_setenv("MXM_MPI_SINGLE_THREAD", ompi_mpi_thread_multiple ? "n" : "y",
129 false, &environ);
130
131
132 error = mxm_config_read_opts(&ompi_pml_yalla.ctx_opts, &ompi_pml_yalla.ep_opts,
133 "MPI", NULL, 0);
134 if (MXM_OK != error) {
135 return OMPI_ERROR;
136 }
137
138 error = mxm_init(ompi_pml_yalla.ctx_opts, &ompi_pml_yalla.mxm_context);
139 if (MXM_OK != error) {
140 return OMPI_ERROR;
141 }
142
143 return OMPI_SUCCESS;
144 }
145
146 int mca_pml_yalla_close(void)
147 {
148 PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_close");
149
150 if (ompi_pml_yalla.ctx_opts != NULL) {
151 mxm_config_free_context_opts(ompi_pml_yalla.ctx_opts);
152 }
153 if (ompi_pml_yalla.ep_opts != NULL) {
154 mxm_config_free_ep_opts(ompi_pml_yalla.ep_opts);
155 }
156 if (ompi_pml_yalla.mxm_context != NULL) {
157 mxm_cleanup(ompi_pml_yalla.mxm_context);
158 ompi_pml_yalla.mxm_context = NULL;
159 }
160 mca_base_framework_close(&opal_memory_base_framework);
161 return 0;
162 }
163
164 int mca_pml_yalla_init(void)
165 {
166 mxm_error_t error;
167 int rc;
168
169 PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_init");
170
171 if (ompi_pml_yalla.using_mem_hooks) {
172 opal_mem_hooks_register_release(mca_pml_yalla_mem_release_cb, NULL);
173 }
174
175 error = mxm_ep_create(ompi_pml_yalla.mxm_context, ompi_pml_yalla.ep_opts,
176 &ompi_pml_yalla.mxm_ep);
177 if (MXM_OK != error) {
178 return OMPI_ERROR;
179 }
180
181 rc = send_ep_address();
182 if (rc < 0) {
183 return rc;
184 }
185
186 OBJ_CONSTRUCT(&ompi_pml_yalla.send_reqs, mca_pml_yalla_freelist_t);
187 OBJ_CONSTRUCT(&ompi_pml_yalla.bsend_reqs, mca_pml_yalla_freelist_t);
188 OBJ_CONSTRUCT(&ompi_pml_yalla.recv_reqs, mca_pml_yalla_freelist_t);
189 OBJ_CONSTRUCT(&ompi_pml_yalla.convs, mca_pml_yalla_freelist_t);
190
191 opal_progress_register(mca_pml_yalla_progress);
192
193 ompi_pml_yalla.super.pml_flags |= MCA_PML_BASE_FLAG_REQUIRE_WORLD;
194
195 PML_YALLA_VERBOSE(2, "created mxm context %p ep %p", (void *)ompi_pml_yalla.mxm_context,
196 (void *)ompi_pml_yalla.mxm_ep);
197 return OMPI_SUCCESS;
198 }
199
200 int mca_pml_yalla_cleanup(void)
201 {
202 PML_YALLA_VERBOSE(1, "%s", "mca_pml_yalla_cleanup");
203
204 opal_progress_unregister(mca_pml_yalla_progress);
205
206 OBJ_DESTRUCT(&ompi_pml_yalla.convs);
207 OBJ_DESTRUCT(&ompi_pml_yalla.recv_reqs);
208 OBJ_DESTRUCT(&ompi_pml_yalla.bsend_reqs);
209 OBJ_DESTRUCT(&ompi_pml_yalla.send_reqs);
210
211 if (ompi_pml_yalla.mxm_ep) {
212 mxm_ep_destroy(ompi_pml_yalla.mxm_ep);
213 ompi_pml_yalla.mxm_ep = NULL;
214 }
215 if (ompi_pml_yalla.using_mem_hooks) {
216 opal_mem_hooks_unregister_release(mca_pml_yalla_mem_release_cb);
217 }
218
219 return OMPI_SUCCESS;
220 }
221
222 int mca_pml_yalla_add_procs(struct ompi_proc_t **procs, size_t nprocs)
223 {
224 size_t i;
225 int ret;
226 void *address;
227 mxm_conn_h conn;
228 size_t addrlen;
229 mxm_error_t error;
230
231 if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("yalla",
232 procs,
233 nprocs))) {
234 return ret;
235 }
236
237 for (i = 0; i < nprocs; ++i) {
238 ret = recv_ep_address(procs[i], &address, &addrlen);
239 if (ret < 0) {
240 return ret;
241 }
242
243 if (procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) {
244 PML_YALLA_VERBOSE(3, "already connected to proc. %s",
245 OPAL_NAME_PRINT(procs[i]->super.proc_name));
246 continue;
247 }
248
249 PML_YALLA_VERBOSE(2, "connecting to proc. %s",
250 OPAL_NAME_PRINT(procs[i]->super.proc_name));
251 error = mxm_ep_connect(ompi_pml_yalla.mxm_ep, address, &conn);
252 free(address);
253
254 if (MXM_OK != error) {
255 PML_YALLA_ERROR("%s", "Failed to connect");
256 return OMPI_ERROR;
257 }
258
259 procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = conn;
260 }
261
262 return OMPI_SUCCESS;
263 }
264
265 int mca_pml_yalla_del_procs(struct ompi_proc_t **procs, size_t nprocs)
266 {
267 size_t i;
268 int ret;
269
270 if (ompi_mpi_state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
271 PML_YALLA_VERBOSE(3, "%s", "using bulk powerdown");
272 mxm_ep_powerdown(ompi_pml_yalla.mxm_ep);
273 }
274
275 for (i = 0; i < nprocs; ++i) {
276 mxm_ep_disconnect(procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]);
277 PML_YALLA_VERBOSE(2, "disconnected from rank %s", OPAL_NAME_PRINT(procs[i]->super.proc_name));
278 procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
279 }
280 if (OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
281 return ret;
282 }
283 return OMPI_SUCCESS;
284 }
285
286 int mca_pml_yalla_enable(bool enable)
287 {
288 mca_pml_yalla_init_reqs();
289 mca_pml_yalla_init_datatype();
290 return OMPI_SUCCESS;
291 }
292
293 int mca_pml_yalla_progress(void)
294 {
295 mxm_progress(ompi_pml_yalla.mxm_context);
296 return OMPI_SUCCESS;
297 }
298
299 int mca_pml_yalla_add_comm(struct ompi_communicator_t* comm)
300 {
301 mxm_error_t error;
302 mxm_mq_h mq;
303
304 error = mxm_mq_create(ompi_pml_yalla.mxm_context, comm->c_contextid, &mq);
305 if (MXM_OK != error) {
306 return OMPI_ERROR;
307 }
308
309 comm->c_pml_comm = (void*)mq;
310 PML_YALLA_VERBOSE(2, "created mq ctxid %d for comm %s", comm->c_contextid,
311 comm->c_name);
312 return OMPI_SUCCESS;
313 }
314
315 int mca_pml_yalla_del_comm(struct ompi_communicator_t* comm)
316 {
317 mxm_mq_h mq = (void*)comm->c_pml_comm;
318
319 if (ompi_pml_yalla.mxm_context == NULL) {
320 PML_YALLA_ERROR("%s", "Destroying communicator after MXM context is destroyed");
321 return OMPI_ERROR;
322 }
323
324 PML_YALLA_VERBOSE(2, "destroying mq ctxid %d of comm %s", comm->c_contextid,
325 comm->c_name);
326 mxm_mq_destroy(mq);
327 return OMPI_SUCCESS;
328 }
329
330 int mca_pml_yalla_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype,
331 int src, int tag, struct ompi_communicator_t* comm,
332 struct ompi_request_t **request)
333 {
334 mca_pml_yalla_recv_request_t *rreq;
335
336 rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, src, tag, comm,
337 OMPI_REQUEST_INACTIVE);
338 rreq->super.ompi.req_persistent = true;
339 rreq->super.flags = 0;
340 *request = &rreq->super.ompi;
341 PML_YALLA_VERBOSE(9, "init recv request %p src %d tag %d comm %s", (void*)(*request),
342 src, tag, comm->c_name);
343 return OMPI_SUCCESS;
344 }
345
346 int mca_pml_yalla_irecv(void *buf, size_t count, ompi_datatype_t *datatype,
347 int src, int tag, struct ompi_communicator_t* comm,
348 struct ompi_request_t **request)
349 {
350 mca_pml_yalla_recv_request_t *rreq;
351 mxm_error_t error;
352
353 rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, src, tag, comm,
354 OMPI_REQUEST_ACTIVE);
355 rreq->super.ompi.req_persistent = false;
356 rreq->super.flags = 0;
357
358 PML_YALLA_VERBOSE(8, "receive request *%p=%p from %d tag %d dtype %s count %zu",
359 (void *)request, (void *)rreq, src, tag, datatype->name, count);
360
361 error = mxm_req_recv(&rreq->mxm);
362 if (MXM_OK != error) {
363 return OMPI_ERROR;
364 }
365
366 *request = &rreq->super.ompi;
367 return OMPI_SUCCESS;
368 }
369
370 int mca_pml_yalla_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src,
371 int tag, struct ompi_communicator_t* comm,
372 ompi_status_public_t* status)
373 {
374 mxm_recv_req_t rreq;
375 mxm_error_t error;
376 int rc;
377
378 PML_YALLA_INIT_MXM_RECV_REQ(&rreq, buf, count, datatype, src, tag, comm, recv);
379 PML_YALLA_INIT_BLOCKING_MXM_RECV_REQ(&rreq);
380
381 PML_YALLA_VERBOSE(8, "receive from %d tag %d dtype %s count %zu", src, tag,
382 datatype->name, count);
383
384 error = mxm_req_recv(&rreq);
385 if (MXM_OK != error) {
386 return OMPI_ERROR;
387 }
388
389 PML_YALLA_WAIT_MXM_REQ(&rreq.base);
390 PML_YALLA_VERBOSE(8, "receive completed with status %s source %d rtag %d(%d/0x%x) len %zu",
391 mxm_error_string(rreq.base.error),
392 rreq.completion.sender_imm, rreq.completion.sender_tag,
393 rreq.tag, rreq.tag_mask,
394 rreq.completion.actual_len);
395 rc = PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.actual_len, status);
396 PML_YALLA_FREE_BLOCKING_MXM_REQ(&rreq.base);
397
398 return rc;
399 }
400
401 int mca_pml_yalla_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype,
402 int dst, int tag, mca_pml_base_send_mode_t mode,
403 struct ompi_communicator_t* comm,
404 struct ompi_request_t **request)
405 {
406 mca_pml_yalla_send_request_t *sreq;
407
408 sreq = MCA_PML_YALLA_SREQ_INIT((void *)buf, count, datatype, dst, tag, mode, comm,
409 OMPI_REQUEST_INACTIVE);
410 sreq->super.ompi.req_persistent = true;
411 sreq->super.flags = MCA_PML_YALLA_REQUEST_FLAG_SEND;
412 if (mode == MCA_PML_BASE_SEND_BUFFERED) {
413 sreq->super.flags |= MCA_PML_YALLA_REQUEST_FLAG_BSEND;
414 }
415
416 *request = &sreq->super.ompi;
417 PML_YALLA_VERBOSE(9, "init send request %p dst %d tag %d comm %s", (void *)*request,
418 dst, tag, comm->c_name);
419 return OMPI_SUCCESS;
420 }
421
422 static int mca_pml_yalla_bsend(mxm_send_req_t *mxm_sreq)
423 {
424 mca_pml_yalla_bsend_request_t *bsreq = (mca_pml_yalla_bsend_request_t *)PML_YALLA_FREELIST_GET(&ompi_pml_yalla.bsend_reqs);
425 mxm_error_t error;
426 size_t length;
427
428
429
430 bsreq->mxm.base.state = mxm_sreq->base.state;
431 bsreq->mxm.base.mq = mxm_sreq->base.mq;
432 bsreq->mxm.base.conn = mxm_sreq->base.conn;
433
434 bsreq->mxm.base.data_type = MXM_REQ_DATA_BUFFER;
435 switch (mxm_sreq->base.data_type) {
436 case MXM_REQ_DATA_BUFFER:
437 length = mxm_sreq->base.data.buffer.length;
438 bsreq->mxm.base.data.buffer.ptr = mca_pml_base_bsend_request_alloc_buf(length);
439 bsreq->mxm.base.data.buffer.length = length;
440 memcpy(bsreq->mxm.base.data.buffer.ptr, mxm_sreq->base.data.buffer.ptr, length);
441 break;
442 case MXM_REQ_DATA_STREAM:
443 length = mxm_sreq->base.data.stream.length;
444 bsreq->mxm.base.data.buffer.ptr = mca_pml_base_bsend_request_alloc_buf(length);
445 bsreq->mxm.base.data.buffer.length = length;
446 mxm_sreq->base.data.stream.cb(bsreq->mxm.base.data.buffer.ptr, length,
447 0, mxm_sreq->base.context);
448 break;
449 default:
450 return OMPI_ERROR;
451 }
452
453 bsreq->mxm.opcode = mxm_sreq->opcode;
454 bsreq->mxm.flags = mxm_sreq->flags;
455 bsreq->mxm.op.send = mxm_sreq->op.send;
456
457 error = mxm_req_send(&bsreq->mxm);
458 if (MXM_OK != error) {
459 return OMPI_ERROR;
460 }
461
462
463 mxm_sreq->base.state = MXM_REQ_COMPLETED;
464
465 return OMPI_SUCCESS;
466 }
467
468 int mca_pml_yalla_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
469 int dst, int tag, mca_pml_base_send_mode_t mode,
470 struct ompi_communicator_t* comm,
471 struct ompi_request_t **request)
472 {
473 mca_pml_yalla_send_request_t *sreq;
474 mxm_error_t error;
475 int rc;
476
477 sreq = MCA_PML_YALLA_SREQ_INIT((void *)buf, count, datatype, dst, tag, mode, comm,
478 OMPI_REQUEST_ACTIVE);
479 sreq->super.ompi.req_persistent = false;
480 sreq->super.flags = 0;
481
482 PML_YALLA_VERBOSE(8, "send request *%p=%p to %d mode %d tag %d dtype %s count %zu",
483 (void *)request, (void *)sreq, dst, mode, tag, datatype->name, count);
484
485 if (mode == MCA_PML_BASE_SEND_BUFFERED) {
486 rc = mca_pml_yalla_bsend(&sreq->mxm);
487 sreq->super.ompi.req_status.MPI_ERROR = rc;
488 ompi_request_complete(&sreq->super.ompi, true);
489 *request = &sreq->super.ompi;
490 return rc;
491 }
492
493 error = mxm_req_send(&sreq->mxm);
494 if (MXM_OK != error) {
495 return OMPI_ERROR;
496 }
497
498 *request = &sreq->super.ompi;
499 return OMPI_SUCCESS;
500 }
501
502 int mca_pml_yalla_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst,
503 int tag, mca_pml_base_send_mode_t mode,
504 struct ompi_communicator_t* comm)
505 {
506 mxm_send_req_t sreq;
507 mxm_error_t error;
508
509 PML_YALLA_INIT_MXM_SEND_REQ(&sreq, (void *)buf, count, datatype, dst, tag, mode, comm, send);
510 PML_YALLA_INIT_BLOCKING_MXM_SEND_REQ(&sreq);
511
512 PML_YALLA_VERBOSE(8, "send to %d tag %d dtype %s count %zu", dst, tag,
513 datatype->name, count);
514
515 if (mode == MCA_PML_BASE_SEND_BUFFERED) {
516 return mca_pml_yalla_bsend(&sreq);
517 }
518
519 error = mxm_req_send(&sreq);
520 if (MXM_OK != error) {
521 return OMPI_ERROR;
522 }
523
524 PML_YALLA_WAIT_MXM_REQ(&sreq.base);
525 if (MXM_OK != sreq.base.error) {
526 return OMPI_ERROR;
527 }
528
529 PML_YALLA_FREE_BLOCKING_MXM_REQ(&sreq.base);
530
531 return OMPI_SUCCESS;
532 }
533
534 int mca_pml_yalla_iprobe(int src, int tag, struct ompi_communicator_t* comm,
535 int *matched, ompi_status_public_t* status)
536 {
537 mxm_recv_req_t rreq;
538 mxm_error_t error;
539
540 PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
541
542 error = mxm_req_probe(&rreq);
543 switch (error) {
544 case MXM_OK:
545 *matched = 1;
546 PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
547 return OMPI_SUCCESS;
548 case MXM_ERR_NO_MESSAGE:
549 *matched = 0;
550 return OMPI_SUCCESS;
551 default:
552 return OMPI_ERROR;
553 }
554
555 return OMPI_SUCCESS;
556 }
557
558 int mca_pml_yalla_probe(int src, int tag, struct ompi_communicator_t* comm,
559 ompi_status_public_t* status)
560 {
561 mxm_recv_req_t rreq;
562 mxm_error_t error;
563
564 PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
565 for (;;) {
566 error = mxm_req_probe(&rreq);
567 switch (error) {
568 case MXM_OK:
569 PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
570 return OMPI_SUCCESS;
571 case MXM_ERR_NO_MESSAGE:
572 break;
573 default:
574 return OMPI_ERROR;
575 }
576
577 opal_progress();
578 }
579 }
580
581 int mca_pml_yalla_improbe(int src, int tag, struct ompi_communicator_t* comm,
582 int *matched, struct ompi_message_t **message,
583 ompi_status_public_t* status)
584 {
585 mxm_recv_req_t rreq;
586 mxm_message_h mxm_msg;
587 mxm_error_t error;
588
589 PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
590
591 error = mxm_req_mprobe(&rreq, &mxm_msg);
592 switch (error) {
593 case MXM_OK:
594 *matched = 1;
595 PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
596 PML_YALLA_SET_MESSAGE(&rreq, comm, mxm_msg, message);
597 return OMPI_SUCCESS;
598 case MXM_ERR_NO_MESSAGE:
599 *matched = 0;
600 return OMPI_SUCCESS;
601 default:
602 return OMPI_ERROR;
603 }
604
605 return OMPI_SUCCESS;
606 }
607
608 int mca_pml_yalla_mprobe(int src, int tag, struct ompi_communicator_t* comm,
609 struct ompi_message_t **message,
610 ompi_status_public_t* status)
611 {
612 mxm_recv_req_t rreq;
613 mxm_message_h mxm_msg;
614 mxm_error_t error;
615
616 PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm);
617 for (;;) {
618 error = mxm_req_mprobe(&rreq, &mxm_msg);
619 switch (error) {
620 case MXM_OK:
621 PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status);
622 PML_YALLA_SET_MESSAGE(&rreq, comm, mxm_msg, message);
623 return OMPI_SUCCESS;
624 case MXM_ERR_NO_MESSAGE:
625 break;
626 default:
627 return OMPI_ERROR;
628 }
629
630 opal_progress();
631 }
632 }
633
634 int mca_pml_yalla_imrecv(void *buf, size_t count, ompi_datatype_t *datatype,
635 struct ompi_message_t **message,
636 struct ompi_request_t **request)
637 {
638 mca_pml_yalla_recv_request_t *rreq;
639 mxm_error_t error;
640
641 rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, -1, 0, (*message)->comm,
642 OMPI_REQUEST_ACTIVE);
643 rreq->super.ompi.req_persistent = false;
644 rreq->super.flags = 0;
645
646 PML_YALLA_VERBOSE(8, "receive request *%p=%p message *%p=%p dtype %s count %zu",
647 (void *)request, (void *)rreq, (void *)message, (void *)(*message), datatype->name, count);
648
649 error = mxm_message_recv(&rreq->mxm, (*message)->req_ptr);
650 if (MXM_OK != error) {
651 return OMPI_ERROR;
652 }
653
654 PML_YALLA_MESSAGE_RELEASE(message);
655
656 *request = &rreq->super.ompi;
657 return OMPI_SUCCESS;
658 }
659
660 int mca_pml_yalla_mrecv(void *buf, size_t count, ompi_datatype_t *datatype,
661 struct ompi_message_t **message,
662 ompi_status_public_t* status)
663 {
664 mxm_recv_req_t rreq;
665 mxm_error_t error;
666
667 PML_YALLA_INIT_MXM_RECV_REQ(&rreq, buf, count, datatype, -1, 0, (*message)->comm, recv);
668 PML_YALLA_INIT_BLOCKING_MXM_RECV_REQ(&rreq);
669
670 PML_YALLA_VERBOSE(8, "receive message *%p=%p dtype %s count %zu", (void *)message,
671 (void *)*message, datatype->name, count);
672
673 error = mxm_message_recv(&rreq, (*message)->req_ptr);
674 if (MXM_OK != error) {
675 return OMPI_ERROR;
676 }
677
678 PML_YALLA_MESSAGE_RELEASE(message);
679
680 PML_YALLA_WAIT_MXM_REQ(&rreq.base);
681 PML_YALLA_VERBOSE(8, "receive completed with status %s source %d rtag %d(%d/0x%x) len %zu",
682 mxm_error_string(rreq.base.error),
683 rreq.completion.sender_imm, rreq.completion.sender_tag,
684 rreq.tag, rreq.tag_mask,
685 rreq.completion.actual_len);
686 return PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.actual_len, status);
687 }
688
689 int mca_pml_yalla_start(size_t count, ompi_request_t** requests)
690 {
691 mca_pml_yalla_base_request_t *req;
692 mxm_error_t error;
693 size_t i;
694 int rc;
695
696 for (i = 0; i < count; ++i) {
697 req = (mca_pml_yalla_base_request_t *)requests[i];
698
699 if ((req == NULL) || (OMPI_REQUEST_PML != req->ompi.req_type)) {
700
701 continue;
702 }
703
704 PML_YALLA_ASSERT(req->ompi.req_state != OMPI_REQUEST_INVALID);
705 PML_YALLA_RESET_OMPI_REQ(&req->ompi, OMPI_REQUEST_ACTIVE);
706
707 if (req->flags & MCA_PML_YALLA_REQUEST_FLAG_SEND) {
708 mca_pml_yalla_send_request_t *sreq;
709 sreq = (mca_pml_yalla_send_request_t *)req;
710 PML_YALLA_RESET_PML_REQ(req, PML_YALLA_MXM_REQBASE(sreq));
711
712 if (req->flags & MCA_PML_YALLA_REQUEST_FLAG_BSEND) {
713 PML_YALLA_VERBOSE(8, "start bsend request %p", (void *)sreq);
714 rc = mca_pml_yalla_bsend(&sreq->mxm);
715 sreq->super.ompi.req_status.MPI_ERROR = rc;
716 ompi_request_complete(&sreq->super.ompi, true);
717 if (OMPI_SUCCESS != rc) {
718 return rc;
719 }
720 } else {
721 PML_YALLA_VERBOSE(8, "start send request %p", (void *)sreq);
722 error = mxm_req_send(&sreq->mxm);
723 if (MXM_OK != error) {
724 return OMPI_ERROR;
725 }
726 }
727 } else {
728 mca_pml_yalla_recv_request_t *rreq;
729 rreq = (mca_pml_yalla_recv_request_t *)req;
730 PML_YALLA_RESET_PML_REQ(req, PML_YALLA_MXM_REQBASE(rreq));
731
732 PML_YALLA_VERBOSE(8, "start recv request %p", (void *)req);
733 error = mxm_req_recv(&rreq->mxm);
734 if (MXM_OK != error) {
735 return OMPI_ERROR;
736 }
737 }
738 }
739 return OMPI_SUCCESS;
740 }
741
742 int mca_pml_yalla_dump(struct ompi_communicator_t* comm, int verbose)
743 {
744 return OMPI_SUCCESS;
745 }