This source file includes following definitions.
- setup_scatter_buffers_linear
- setup_scatter_handles
- setup_sync_handles
- cleanup_scatter_handles
- cleanup_sync_handles
- ompi_coll_portals4_scatter_intra_linear_top
- ompi_coll_portals4_scatter_intra_linear_bottom
- ompi_coll_portals4_scatter_intra
- ompi_coll_portals4_iscatter_intra
- ompi_coll_portals4_iscatter_intra_fini
1
2
3
4
5
6
7
8
9
10
11 #include "ompi_config.h"
12
13 #include "mpi.h"
14 #include "ompi/constants.h"
15 #include "ompi/datatype/ompi_datatype.h"
16 #include "opal/util/bit_ops.h"
17 #include "ompi/mca/pml/pml.h"
18 #include "ompi/mca/coll/coll.h"
19 #include "ompi/mca/coll/base/base.h"
20
21 #include "coll_portals4.h"
22 #include "coll_portals4_request.h"
23
24
25 #undef RTR_USES_TRIGGERED_PUT
26
27
28 #define VRANK(ra, ro, si) ((ra - ro + si) % si)
29
30
31 static int
32 setup_scatter_buffers_linear(struct ompi_communicator_t *comm,
33 ompi_coll_portals4_request_t *request,
34 mca_coll_portals4_module_t *portals4_module)
35 {
36 int ret, line;
37
38 int8_t i_am_root = (request->u.scatter.my_rank == request->u.scatter.root_rank);
39
40 ompi_coll_portals4_create_send_converter (&request->u.scatter.send_converter,
41 request->u.scatter.pack_src_buf,
42 ompi_comm_peer_lookup(comm, request->u.scatter.my_rank),
43 request->u.scatter.pack_src_count,
44 request->u.scatter.pack_src_dtype);
45 opal_convertor_get_packed_size(&request->u.scatter.send_converter, &request->u.scatter.packed_size);
46 OBJ_DESTRUCT(&request->u.scatter.send_converter);
47
48
49
50
51 if (i_am_root) {
52
53
54
55
56 request->u.scatter.scatter_bytes=request->u.scatter.packed_size * (ptrdiff_t)request->u.scatter.size;
57
58
59
60 request->u.scatter.scatter_buf = (char *) malloc(request->u.scatter.scatter_bytes);
61 if (NULL == request->u.scatter.scatter_buf) {
62 ret = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
63 }
64 request->u.scatter.free_after = 1;
65
66 for (int32_t i=0;i<request->u.scatter.size;i++) {
67 uint32_t iov_count = 1;
68 struct iovec iov;
69 size_t max_data;
70
71 uint64_t offset = request->u.scatter.pack_src_extent * request->u.scatter.pack_src_count * i;
72
73 opal_output_verbose(30, ompi_coll_base_framework.framework_output,
74 "%s:%d:rank(%d): offset(%lu)",
75 __FILE__, __LINE__, request->u.scatter.my_rank,
76 offset);
77
78 ompi_coll_portals4_create_send_converter (&request->u.scatter.send_converter,
79 request->u.scatter.pack_src_buf + offset,
80 ompi_comm_peer_lookup(comm, request->u.scatter.my_rank),
81 request->u.scatter.pack_src_count,
82 request->u.scatter.pack_src_dtype);
83
84 iov.iov_len = request->u.scatter.packed_size;
85 iov.iov_base = (IOVBASE_TYPE *) ((char *)request->u.scatter.scatter_buf + (request->u.scatter.packed_size*i));
86 opal_convertor_pack(&request->u.scatter.send_converter, &iov, &iov_count, &max_data);
87
88 OBJ_DESTRUCT(&request->u.scatter.send_converter);
89 }
90
91 opal_output_verbose(30, ompi_coll_base_framework.framework_output,
92 "%s:%d:rank(%d): root - scatter_buf(%p) - scatter_bytes(%lu)=packed_size(%ld) * size(%d)",
93 __FILE__, __LINE__, request->u.scatter.my_rank,
94 request->u.scatter.scatter_buf, request->u.scatter.scatter_bytes,
95 request->u.scatter.packed_size, request->u.scatter.size);
96 } else {
97 request->u.scatter.scatter_bytes=request->u.scatter.packed_size;
98 request->u.scatter.scatter_buf = (char *) malloc(request->u.scatter.scatter_bytes);
99 if (NULL == request->u.scatter.scatter_buf) {
100 ret = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
101 }
102 request->u.scatter.free_after = 1;
103
104 opal_output_verbose(30, ompi_coll_base_framework.framework_output,
105 "%s:%d:rank(%d): leaf - scatter_buf(%p) - scatter_bytes(%lu)=packed_size(%ld)",
106 __FILE__, __LINE__, request->u.scatter.my_rank,
107 request->u.scatter.scatter_buf, request->u.scatter.scatter_bytes,
108 request->u.scatter.packed_size);
109 }
110
111 return OMPI_SUCCESS;
112
113 err_hdlr:
114 opal_output(ompi_coll_base_framework.framework_output,
115 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
116 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
117
118 return ret;
119 }
120
121 static int
122 setup_scatter_handles(struct ompi_communicator_t *comm,
123 ompi_coll_portals4_request_t *request,
124 mca_coll_portals4_module_t *portals4_module)
125 {
126 int ret, line;
127
128 ptl_me_t me;
129
130 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
131 "coll:portals4:setup_scatter_handles enter rank %d", request->u.scatter.my_rank));
132
133
134
135
136 COLL_PORTALS4_SET_BITS(request->u.scatter.scatter_match_bits, ompi_comm_get_cid(comm),
137 0, 0, COLL_PORTALS4_SCATTER, 0, request->u.scatter.coll_count);
138
139 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
140 "coll:portals4:setup_scatter_handles rank(%d) scatter_match_bits(0x%016lX)",
141 request->u.scatter.my_rank, request->u.scatter.scatter_match_bits));
142
143 ret = PtlCTAlloc(mca_coll_portals4_component.ni_h,
144 &request->u.scatter.scatter_cth);
145 if (PTL_OK != ret) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr; }
146
147 request->u.scatter.scatter_mdh = mca_coll_portals4_component.data_md_h;
148
149 me.start = request->u.scatter.scatter_buf;
150 me.length = request->u.scatter.scatter_bytes;
151 me.ct_handle = request->u.scatter.scatter_cth;
152 me.min_free = 0;
153 me.uid = mca_coll_portals4_component.uid;
154 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
155 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
156 PTL_ME_EVENT_CT_COMM;
157 me.match_id.phys.nid = PTL_NID_ANY;
158 me.match_id.phys.pid = PTL_PID_ANY;
159 me.match_bits = request->u.scatter.scatter_match_bits;
160 me.ignore_bits = 0;
161 ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
162 mca_coll_portals4_component.pt_idx,
163 &me,
164 PTL_PRIORITY_LIST,
165 NULL,
166 &request->u.scatter.scatter_meh);
167 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
168
169 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
170 "coll:portals4:setup_scatter_handles exit rank %d", request->u.scatter.my_rank));
171
172 return OMPI_SUCCESS;
173
174 err_hdlr:
175 opal_output(ompi_coll_base_framework.framework_output,
176 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
177 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
178
179 return ret;
180 }
181
182 static int
183 setup_sync_handles(struct ompi_communicator_t *comm,
184 ompi_coll_portals4_request_t *request,
185 mca_coll_portals4_module_t *portals4_module)
186 {
187 int ret, line;
188
189 ptl_me_t me;
190
191 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
192 "coll:portals4:setup_sync_handles enter rank %d", request->u.scatter.my_rank));
193
194
195
196
197 COLL_PORTALS4_SET_BITS(request->u.scatter.sync_match_bits, ompi_comm_get_cid(comm),
198 0, 1, COLL_PORTALS4_SCATTER, 0, request->u.scatter.coll_count);
199
200 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
201 "coll:portals4:setup_sync_handles rank(%d) sync_match_bits(0x%016lX)",
202 request->u.scatter.my_rank, request->u.scatter.sync_match_bits));
203
204 ret = PtlCTAlloc(mca_coll_portals4_component.ni_h,
205 &request->u.scatter.sync_cth);
206 if (PTL_OK != ret) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr; }
207
208 request->u.scatter.sync_mdh = mca_coll_portals4_component.zero_md_h;
209
210 me.start = NULL;
211 me.length = 0;
212 me.ct_handle = request->u.scatter.sync_cth;
213 me.min_free = 0;
214 me.uid = mca_coll_portals4_component.uid;
215 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
216 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
217 PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW;
218 me.match_id.phys.nid = PTL_NID_ANY;
219 me.match_id.phys.pid = PTL_PID_ANY;
220 me.match_bits = request->u.scatter.sync_match_bits;
221 me.ignore_bits = 0;
222 ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
223 mca_coll_portals4_component.pt_idx,
224 &me,
225 PTL_PRIORITY_LIST,
226 NULL,
227 &request->u.scatter.sync_meh);
228 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
229
230 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
231 "coll:portals4:setup_sync_handles exit rank %d", request->u.scatter.my_rank));
232
233 return OMPI_SUCCESS;
234
235 err_hdlr:
236 opal_output(ompi_coll_base_framework.framework_output,
237 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
238 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
239
240 return ret;
241 }
242
243 static int
244 cleanup_scatter_handles(ompi_coll_portals4_request_t *request)
245 {
246 int ret, line;
247
248 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
249 "coll:portals4:cleanup_scatter_handles enter rank %d", request->u.scatter.my_rank));
250
251
252
253
254 do {
255 ret = PtlMEUnlink(request->u.scatter.scatter_meh);
256 } while (PTL_IN_USE == ret);
257 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
258
259 ret = PtlCTFree(request->u.scatter.scatter_cth);
260 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
261
262 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
263 "coll:portals4:cleanup_scatter_handles exit rank %d", request->u.scatter.my_rank));
264
265 return OMPI_SUCCESS;
266
267 err_hdlr:
268 opal_output(ompi_coll_base_framework.framework_output,
269 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
270 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
271
272 return ret;
273 }
274
275 static int
276 cleanup_sync_handles(ompi_coll_portals4_request_t *request)
277 {
278 int ret, line;
279 int ptl_ret;
280
281 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
282 "coll:portals4:cleanup_sync_handles enter rank %d", request->u.scatter.my_rank));
283
284
285
286
287 do {
288 ret = PtlMEUnlink(request->u.scatter.sync_meh);
289 } while (PTL_IN_USE == ret);
290 if (PTL_OK != ret) { ptl_ret = ret; ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
291
292 ret = PtlCTFree(request->u.scatter.sync_cth);
293 if (PTL_OK != ret) { ptl_ret = ret; ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
294
295 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
296 "coll:portals4:cleanup_sync_handles exit rank %d", request->u.scatter.my_rank));
297
298 return OMPI_SUCCESS;
299
300 err_hdlr:
301 opal_output(ompi_coll_base_framework.framework_output,
302 "%s:%4d:%4d\tError occurred (ptl_ret=%d) ret=%d, rank %2d",
303 __FILE__, __LINE__, line, ptl_ret, ret, request->u.scatter.my_rank);
304
305 return ret;
306 }
307
308 static int
309 ompi_coll_portals4_scatter_intra_linear_top(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
310 void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
311 int root,
312 struct ompi_communicator_t *comm,
313 ompi_coll_portals4_request_t *request,
314 mca_coll_base_module_t *module)
315 {
316 mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
317 int ret, line;
318 ptl_ct_event_t ct;
319
320 ptl_ct_event_t sync_incr_event;
321
322 int8_t i_am_root;
323
324 int32_t expected_rtrs = 0;
325 int32_t expected_puts = 0;
326 int32_t expected_acks = 0;
327 int32_t expected_ops = 0;
328
329 int32_t expected_chained_rtrs = 0;
330 int32_t expected_chained_acks = 0;
331
332 ptl_size_t number_of_fragment = 1;
333
334 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
335 "coll:portals4:scatter_intra_linear_top enter rank %d", request->u.scatter.my_rank));
336
337 request->type = OMPI_COLL_PORTALS4_TYPE_SCATTER;
338 request->u.scatter.scatter_buf = NULL;
339 request->u.scatter.scatter_mdh = PTL_INVALID_HANDLE;
340 request->u.scatter.scatter_cth = PTL_INVALID_HANDLE;
341 request->u.scatter.scatter_meh = PTL_INVALID_HANDLE;
342 request->u.scatter.sync_mdh = PTL_INVALID_HANDLE;
343 request->u.scatter.sync_cth = PTL_INVALID_HANDLE;
344 request->u.scatter.sync_meh = PTL_INVALID_HANDLE;
345
346 request->u.scatter.my_rank = ompi_comm_rank(comm);
347 request->u.scatter.size = ompi_comm_size(comm);
348 request->u.scatter.root_rank = root;
349 request->u.scatter.sbuf = sbuf;
350 request->u.scatter.rbuf = rbuf;
351
352 request->u.scatter.pack_src_buf = sbuf;
353 request->u.scatter.pack_src_count = scount;
354 request->u.scatter.pack_src_dtype = sdtype;
355 ompi_datatype_get_extent(request->u.scatter.pack_src_dtype,
356 &request->u.scatter.pack_src_lb,
357 &request->u.scatter.pack_src_extent);
358 ompi_datatype_get_true_extent(request->u.scatter.pack_src_dtype,
359 &request->u.scatter.pack_src_true_lb,
360 &request->u.scatter.pack_src_true_extent);
361
362 if ((root == request->u.scatter.my_rank) && (rbuf == MPI_IN_PLACE)) {
363 request->u.scatter.unpack_dst_buf = NULL;
364 request->u.scatter.unpack_dst_count = 0;
365 request->u.scatter.unpack_dst_dtype = MPI_DATATYPE_NULL;
366 } else {
367 request->u.scatter.unpack_dst_buf = rbuf;
368 request->u.scatter.unpack_dst_count = rcount;
369 request->u.scatter.unpack_dst_dtype = rdtype;
370 request->u.scatter.unpack_dst_offset = 0;
371 ompi_datatype_get_extent(request->u.scatter.unpack_dst_dtype,
372 &request->u.scatter.unpack_dst_lb,
373 &request->u.scatter.unpack_dst_extent);
374 ompi_datatype_get_true_extent(request->u.scatter.unpack_dst_dtype,
375 &request->u.scatter.unpack_dst_true_lb,
376 &request->u.scatter.unpack_dst_true_extent);
377 }
378
379 opal_output_verbose(30, ompi_coll_base_framework.framework_output,
380 "%s:%d:rank(%d): request->u.scatter.unpack_dst_offset(%lu)",
381 __FILE__, __LINE__, request->u.scatter.my_rank,
382 request->u.scatter.unpack_dst_offset);
383
384
385
386
387
388 i_am_root = (request->u.scatter.my_rank == request->u.scatter.root_rank);
389
390 request->u.scatter.coll_count = opal_atomic_add_fetch_size_t(&portals4_module->coll_count, 1);
391
392 ret = setup_scatter_buffers_linear(comm, request, portals4_module);
393 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
394
395 ret = setup_scatter_handles(comm, request, portals4_module);
396 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
397
398 ret = setup_sync_handles(comm, request, portals4_module);
399 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
400
401 number_of_fragment = (request->u.scatter.packed_size > mca_coll_portals4_component.ni_limits.max_msg_size) ?
402 (request->u.scatter.packed_size + mca_coll_portals4_component.ni_limits.max_msg_size - 1) / mca_coll_portals4_component.ni_limits.max_msg_size :
403 1;
404 opal_output_verbose(90, ompi_coll_base_framework.framework_output,
405 "%s:%d:rank %d:number_of_fragment = %lu",
406 __FILE__, __LINE__, request->u.scatter.my_rank, number_of_fragment);
407
408
409
410
411 if (i_am_root) {
412
413 expected_rtrs = request->u.scatter.size - 1;
414 expected_acks = request->u.scatter.size - 1;
415
416
417 expected_puts = 0;
418 expected_chained_rtrs = 1;
419 expected_chained_acks = 1;
420
421
422 sync_incr_event.success=1;
423 sync_incr_event.failure=0;
424 ret = PtlTriggeredCTInc(request->u.scatter.scatter_cth,
425 sync_incr_event,
426 request->u.scatter.sync_cth,
427 expected_rtrs);
428 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
429
430 ret = PtlTriggeredCTInc(request->u.scatter.scatter_cth,
431 sync_incr_event,
432 request->u.scatter.sync_cth,
433 expected_rtrs + expected_acks);
434 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
435
436
437 for (int32_t i=0;i<request->u.scatter.size;i++) {
438
439 if (i == request->u.scatter.my_rank) {
440 continue;
441 }
442
443 ptl_size_t offset = request->u.scatter.packed_size * i;
444 ptl_size_t size_sent = 0;
445 ptl_size_t size_left = request->u.scatter.packed_size;
446
447 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
448 "%s:%d:rank(%d): offset(%lu)=rank(%d) * packed_size(%ld)",
449 __FILE__, __LINE__, request->u.scatter.my_rank,
450 offset, i, request->u.scatter.packed_size);
451
452 for (ptl_size_t j=0; j<number_of_fragment; j++) {
453
454 ptl_size_t frag_size = (size_left > mca_coll_portals4_component.ni_limits.max_msg_size) ?
455 mca_coll_portals4_component.ni_limits.max_msg_size :
456 size_left;
457
458 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
459 "%s:%d:rank(%d): frag(%lu),offset_frag (%lu) frag_size(%lu)",
460 __FILE__, __LINE__, request->u.scatter.my_rank,
461 j, size_sent, frag_size));
462
463 ret = PtlTriggeredPut(request->u.scatter.scatter_mdh,
464 (ptl_size_t)request->u.scatter.scatter_buf + offset + size_sent,
465 frag_size,
466 PTL_NO_ACK_REQ,
467 ompi_coll_portals4_get_peer(comm, i),
468 mca_coll_portals4_component.pt_idx,
469 request->u.scatter.scatter_match_bits,
470 size_sent,
471 NULL,
472 0,
473 request->u.scatter.scatter_cth,
474 expected_chained_rtrs);
475 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
476
477 size_left -= frag_size;
478 size_sent += frag_size;
479 }
480 }
481 } else {
482
483
484
485 expected_rtrs = 0;
486 expected_acks = 0;
487
488
489 expected_puts = number_of_fragment;
490 expected_chained_rtrs = 0;
491 expected_chained_acks = 0;
492 }
493
494 expected_ops = expected_chained_rtrs + expected_puts;
495
496
497
498
499
500 if (!i_am_root) {
501 ret = PtlTriggeredPut(request->u.scatter.sync_mdh,
502 0,
503 0,
504 PTL_NO_ACK_REQ,
505 ompi_coll_portals4_get_peer(comm, request->u.scatter.root_rank),
506 mca_coll_portals4_component.pt_idx,
507 request->u.scatter.sync_match_bits,
508 0,
509 NULL,
510 0,
511 request->u.scatter.scatter_cth,
512 expected_ops);
513 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
514 }
515
516 expected_ops += expected_chained_acks;
517
518 if (!request->u.scatter.is_sync) {
519
520
521
522 ret = PtlTriggeredPut(mca_coll_portals4_component.zero_md_h,
523 0,
524 0,
525 PTL_NO_ACK_REQ,
526 ompi_coll_portals4_get_peer(comm, request->u.scatter.my_rank),
527 mca_coll_portals4_component.finish_pt_idx,
528 0,
529 0,
530 NULL,
531 (uintptr_t) request,
532 request->u.scatter.scatter_cth,
533 expected_ops);
534 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
535 }
536
537
538
539
540 if (!i_am_root) {
541 ret = PtlPut(request->u.scatter.sync_mdh,
542 0,
543 0,
544 PTL_NO_ACK_REQ,
545 ompi_coll_portals4_get_peer(comm, request->u.scatter.root_rank),
546 mca_coll_portals4_component.pt_idx,
547 request->u.scatter.sync_match_bits,
548 0,
549 NULL,
550 0);
551 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
552 }
553
554 if (request->u.scatter.is_sync) {
555 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
556 "calling CTWait(expected_ops=%d)\n", expected_ops);
557
558
559
560
561 ret = PtlCTWait(request->u.scatter.scatter_cth, expected_ops, &ct);
562 if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; }
563
564 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
565 "completed CTWait(expected_ops=%d)\n", expected_ops);
566 }
567
568 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
569 "coll:portals4:scatter_intra_linear_top exit rank %d", request->u.scatter.my_rank));
570
571 return OMPI_SUCCESS;
572
573 err_hdlr:
574 if (NULL != request->u.scatter.scatter_buf)
575 free(request->u.scatter.scatter_buf);
576
577 opal_output(ompi_coll_base_framework.framework_output,
578 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
579 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
580
581 return ret;
582 }
583
584 static int
585 ompi_coll_portals4_scatter_intra_linear_bottom(struct ompi_communicator_t *comm,
586 ompi_coll_portals4_request_t *request)
587 {
588 int ret, line;
589
590 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
591 "coll:portals4:scatter_intra_linear_bottom enter rank %d", request->u.scatter.my_rank));
592
593 ret = cleanup_scatter_handles(request);
594 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
595
596 ret = cleanup_sync_handles(request);
597 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
598
599 if (NULL != request->u.scatter.unpack_dst_buf) {
600 uint32_t iov_count = 1;
601 struct iovec iov;
602 size_t max_data;
603
604 ompi_coll_portals4_create_recv_converter (&request->u.scatter.recv_converter,
605 request->u.scatter.unpack_dst_buf,
606 ompi_comm_peer_lookup(comm, request->u.scatter.my_rank),
607 request->u.scatter.unpack_dst_count,
608 request->u.scatter.unpack_dst_dtype);
609
610 iov.iov_len = request->u.scatter.packed_size;
611 if (request->u.scatter.my_rank == request->u.scatter.root_rank) {
612
613 uint64_t offset = request->u.scatter.pack_src_extent * request->u.scatter.pack_src_count * request->u.scatter.my_rank;
614 iov.iov_base = (IOVBASE_TYPE *)((char *)request->u.scatter.scatter_buf + offset);
615 } else {
616 iov.iov_base = (IOVBASE_TYPE *)request->u.scatter.scatter_buf;
617 }
618 opal_convertor_unpack(&request->u.scatter.recv_converter, &iov, &iov_count, &max_data);
619
620 OBJ_DESTRUCT(&request->u.scatter.recv_converter);
621 }
622
623 if (request->u.scatter.free_after)
624 free(request->u.scatter.scatter_buf);
625
626 request->super.req_status.MPI_ERROR = OMPI_SUCCESS;
627
628 ompi_request_complete(&request->super, true);
629
630 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
631 "coll:portals4:scatter_intra_linear_bottom exit rank %d", request->u.scatter.my_rank));
632
633 return OMPI_SUCCESS;
634
635 err_hdlr:
636 request->super.req_status.MPI_ERROR = ret;
637
638 if (request->u.scatter.free_after)
639 free(request->u.scatter.scatter_buf);
640
641 opal_output(ompi_coll_base_framework.framework_output,
642 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
643 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
644
645 return ret;
646 }
647
648 int
649 ompi_coll_portals4_scatter_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
650 void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
651 int root,
652 struct ompi_communicator_t *comm,
653 mca_coll_base_module_t *module)
654 {
655 int ret, line;
656
657 ompi_coll_portals4_request_t *request;
658
659 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
660 "coll:portals4:scatter_intra enter rank %d", ompi_comm_rank(comm)));
661
662
663
664
665 OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
666 if (NULL == request) {
667 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
668 }
669 request->u.scatter.is_sync = 1;
670
671
672
673
674
675
676
677 ret = ompi_coll_portals4_scatter_intra_linear_top(sbuf, scount, sdtype,
678 rbuf, rcount, rdtype,
679 root,
680 comm,
681 request,
682 module);
683 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
684
685 ret = ompi_coll_portals4_scatter_intra_linear_bottom(comm, request);
686 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
687
688
689
690
691 OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
692
693 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
694 "coll:portals4:scatter_intra exit rank %d", request->u.scatter.my_rank));
695
696 return OMPI_SUCCESS;
697
698 err_hdlr:
699 opal_output(ompi_coll_base_framework.framework_output,
700 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
701 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
702
703 return ret;
704 }
705
706
707 int
708 ompi_coll_portals4_iscatter_intra(const void *sbuf, int scount, struct ompi_datatype_t *sdtype,
709 void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
710 int root,
711 struct ompi_communicator_t *comm,
712 ompi_request_t **ompi_request,
713 mca_coll_base_module_t *module)
714 {
715 int ret, line;
716
717 ompi_coll_portals4_request_t *request;
718
719 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
720 "coll:portals4:iscatter_intra enter rank %d", ompi_comm_rank(comm)));
721
722
723
724
725 OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
726 if (NULL == request) {
727 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; line = __LINE__; goto err_hdlr;
728 }
729 *ompi_request = &request->super;
730 request->u.scatter.is_sync = 0;
731
732
733
734
735
736
737
738 ret = ompi_coll_portals4_scatter_intra_linear_top(sbuf, scount, sdtype,
739 rbuf, rcount, rdtype,
740 root,
741 comm,
742 request,
743 module);
744 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
745
746 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
747 "coll:portals4:iscatter_intra exit rank %d", request->u.scatter.my_rank));
748
749 return OMPI_SUCCESS;
750
751 err_hdlr:
752 opal_output(ompi_coll_base_framework.framework_output,
753 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
754 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
755
756 return ret;
757 }
758
759
760 int
761 ompi_coll_portals4_iscatter_intra_fini(ompi_coll_portals4_request_t *request)
762 {
763 int ret, line;
764
765 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
766 "coll:portals4:iscatter_intra_fini enter rank %d", request->u.scatter.my_rank));
767
768
769
770
771 ret = ompi_coll_portals4_scatter_intra_linear_bottom(request->super.req_mpi_object.comm, request);
772 if (MPI_SUCCESS != ret) { line = __LINE__; goto err_hdlr; }
773
774 OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
775 "coll:portals4:iscatter_intra_fini exit rank %d", request->u.scatter.my_rank));
776
777 return OMPI_SUCCESS;
778
779 err_hdlr:
780 opal_output(ompi_coll_base_framework.framework_output,
781 "%s:%4d:%4d\tError occurred ret=%d, rank %2d",
782 __FILE__, __LINE__, line, ret, request->u.scatter.my_rank);
783
784 return ret;
785 }