This source file includes following definitions.
- allreduce_kary_tree_top
- allreduce_kary_tree_bottom
- ompi_coll_portals4_allreduce_intra
- ompi_coll_portals4_iallreduce_intra
- ompi_coll_portals4_iallreduce_intra_fini
1
2
3
4
5
6
7
8
9
10
11
12
13
14 #include "ompi_config.h"
15 #include "coll_portals4.h"
16 #include "coll_portals4_request.h"
17
18 #include <stdio.h>
19
20 #include "mpi.h"
21 #include "ompi/constants.h"
22 #include "ompi/datatype/ompi_datatype.h"
23 #include "ompi/datatype/ompi_datatype_internal.h"
24 #include "ompi/op/op.h"
25 #include "opal/util/bit_ops.h"
26 #include "ompi/mca/pml/pml.h"
27 #include "ompi/mca/coll/coll.h"
28 #include "ompi/mca/coll/base/base.h"
29
30
31 #define COLL_PORTALS4_ALLREDUCE_MAX_SEGMENT 128
32 #define COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN 2
33
34 static int
35 allreduce_kary_tree_top(const void *sendbuf, void *recvbuf, int count,
36 MPI_Datatype dtype, MPI_Op op,
37 struct ompi_communicator_t *comm,
38 ompi_coll_portals4_request_t *request,
39 mca_coll_portals4_module_t *module)
40 {
41 bool is_sync = request->is_sync;
42 int ret, i;
43 int size = ompi_comm_size(comm);
44 int rank = ompi_comm_rank(comm);
45 ptl_rank_t parent, child[COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN];
46 unsigned int child_nb;
47 size_t internal_count, length;
48 ptl_handle_md_t zero_md_h, data_md_h;
49 ptl_handle_me_t me_h;
50 ptl_me_t me;
51 ptl_match_bits_t match_bits_ack, match_bits_rtr, match_bits;
52 ptl_ct_event_t ct;
53 ptl_op_t ptl_op;
54 ptl_datatype_t ptl_dtype;
55
56 request->type = OMPI_COLL_PORTALS4_TYPE_ALLREDUCE;
57
58
59
60
61
62 for (i = 0 ; i < COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN ; i++) {
63 child[i] = PTL_INVALID_RANK;
64 }
65
66 parent = PTL_INVALID_RANK;
67
68 zero_md_h = mca_coll_portals4_component.zero_md_h;
69 data_md_h = mca_coll_portals4_component.data_md_h;
70
71 internal_count = opal_atomic_add_fetch_size_t(&module->coll_count, 1);
72
73
74
75
76 ret = ompi_datatype_type_size(dtype, &length);
77 length *= count;
78
79 request->u.allreduce.is_optim = is_reduce_optimizable(dtype, length, op, &ptl_dtype, &ptl_op);
80
81 if (request->u.allreduce.is_optim) {
82
83
84
85
86
87
88
89
90 get_k_ary_tree(COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN,
91 rank, size, PTL_FIRST_RANK, &parent, child, &child_nb);
92 request->u.allreduce.child_nb = child_nb;
93
94
95
96
97
98
99 COLL_PORTALS4_SET_BITS(match_bits_ack, ompi_comm_get_cid(comm), 1, 0,
100 COLL_PORTALS4_ALLREDUCE, 0, internal_count);
101
102 COLL_PORTALS4_SET_BITS(match_bits_rtr, ompi_comm_get_cid(comm), 0, 1,
103 COLL_PORTALS4_ALLREDUCE, 0, internal_count);
104
105 COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_cid(comm), 0, 0,
106 COLL_PORTALS4_ALLREDUCE, 0, internal_count);
107
108 if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.allreduce.trig_ct_h)) != 0) {
109 return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
110 }
111
112 if (sendbuf != MPI_IN_PLACE) {
113
114
115
116 memcpy(recvbuf, sendbuf, length);
117 }
118
119
120
121
122 memset(&me, 0, sizeof(ptl_me_t));
123 me.start = recvbuf;
124 me.length = length;
125 me.ct_handle = request->u.allreduce.trig_ct_h;
126 me.uid = mca_coll_portals4_component.uid;
127 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
128 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
129 PTL_ME_EVENT_CT_COMM;
130 me.match_id.phys.nid = PTL_NID_ANY;
131 me.match_id.phys.pid = PTL_PID_ANY;
132 me.match_bits = match_bits;
133 me.ignore_bits = 0;
134
135 if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
136 mca_coll_portals4_component.pt_idx,
137 &me, PTL_PRIORITY_LIST, NULL,
138 &request->u.allreduce.data_me_h)) != 0) {
139 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
140 }
141
142 if (child_nb) {
143 if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.allreduce.ack_ct_h)) != 0) {
144 return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret);
145 }
146
147 for (i = 0 ; i < COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN ; i++) {
148 if (child[i] != PTL_INVALID_RANK) {
149
150
151
152
153
154
155 memset(&me, 0, sizeof(ptl_me_t));
156 me.start = NULL;
157 me.length = 0;
158 me.min_free = 0;
159 me.uid = mca_coll_portals4_component.uid;
160 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
161 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
162 PTL_ME_USE_ONCE |
163 PTL_ME_EVENT_CT_COMM;
164 me.match_id.phys.nid = PTL_NID_ANY;
165 me.match_id.phys.pid = PTL_PID_ANY;
166 me.match_bits = match_bits_ack;
167 me.ignore_bits = 0;
168 me.ct_handle = request->u.allreduce.ack_ct_h;
169
170 if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
171 mca_coll_portals4_component.pt_idx,
172 &me, PTL_PRIORITY_LIST,
173 NULL,
174 &me_h)) != 0) {
175 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
176 }
177
178
179
180
181
182
183 if ((ret = PtlTriggeredPut (data_md_h,
184 (uint64_t)recvbuf,
185 length, PTL_NO_ACK_REQ,
186 ompi_coll_portals4_get_peer(comm, child[i]),
187 mca_coll_portals4_component.pt_idx,
188 match_bits, 0, NULL, 0,
189 request->u.allreduce.trig_ct_h,
190 (rank != PTL_FIRST_RANK) ? child_nb + 2 : child_nb)) != 0) {
191 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
192 }
193 }
194
195 }
196 }
197
198 if (rank != PTL_FIRST_RANK) {
199
200
201 if ((ret = PtlTriggeredAtomic(data_md_h,
202 (uint64_t)recvbuf,
203 length, PTL_NO_ACK_REQ,
204 ompi_coll_portals4_get_peer(comm, parent),
205 mca_coll_portals4_component.pt_idx,
206 match_bits, 0, NULL, 0,
207 ptl_op, ptl_dtype, request->u.allreduce.trig_ct_h,
208 child_nb + 1)) != 0) {
209 return opal_stderr("PtlTriggeredAtomic failed", __FILE__, __LINE__, ret);
210 }
211
212 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
213 ompi_coll_portals4_get_peer(comm, parent),
214 mca_coll_portals4_component.pt_idx,
215 match_bits_ack, 0, NULL, 0,
216 request->u.allreduce.trig_ct_h,
217 child_nb + 2)) != 0) {
218 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
219 }
220
221
222
223
224
225
226 memset(&me, 0, sizeof(ptl_me_t));
227 me.start = NULL;
228 me.length = 0;
229 me.min_free = 0;
230 me.uid = mca_coll_portals4_component.uid;
231 me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE |
232 PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE |
233 PTL_ME_USE_ONCE |
234 PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW;
235 me.match_id.phys.nid = PTL_NID_ANY;
236 me.match_id.phys.pid = PTL_PID_ANY;
237 me.match_bits = match_bits_rtr;
238 me.ignore_bits = 0;
239 me.ct_handle = request->u.allreduce.trig_ct_h;
240
241 if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h,
242 mca_coll_portals4_component.pt_idx,
243 &me, PTL_PRIORITY_LIST,
244 NULL,
245 &me_h)) != 0) {
246 return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret);
247 }
248 }
249
250
251
252
253
254
255
256 if (child_nb) {
257 for (i = 0 ; i < COLL_PORTALS4_ALLREDUCE_MAX_CHILDREN ; i++) {
258 if (child[i] != PTL_INVALID_RANK) {
259
260
261
262
263
264
265 if ((ret = PtlPut(zero_md_h, 0, 0, PTL_NO_ACK_REQ,
266 ompi_coll_portals4_get_peer(comm, child[i]),
267 mca_coll_portals4_component.pt_idx,
268 match_bits_rtr, 0, NULL, 0)) != PTL_OK)
269 return opal_stderr("Put RTR failed %d", __FILE__, __LINE__, ret);
270 }
271 }
272 }
273
274 if (child_nb) {
275 if (is_sync) {
276 if ((ret = PtlCTWait(request->u.allreduce.ack_ct_h,
277 child_nb, &ct)) != 0) {
278 opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
279 }
280 }
281 else {
282 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
283 ompi_coll_portals4_get_peer(comm, rank),
284 mca_coll_portals4_component.finish_pt_idx,
285 0, 0, NULL, (uintptr_t) request,
286 request->u.allreduce.ack_ct_h,
287 child_nb)) != 0) {
288 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
289 }
290 }
291 }
292 else {
293 if (is_sync) {
294 if ((ret = PtlCTWait(request->u.allreduce.trig_ct_h,
295 (rank != PTL_FIRST_RANK) ? 2 : 0, &ct)) != 0) {
296 opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
297 }
298 }
299 else {
300 if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ,
301 ompi_coll_portals4_get_peer(comm, rank),
302 mca_coll_portals4_component.finish_pt_idx,
303 0, 0, NULL, (uintptr_t) request,
304 request->u.allreduce.trig_ct_h,
305 (rank != PTL_FIRST_RANK) ? 2 : 0)) != 0) {
306 return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
307 }
308 }
309 }
310
311 }
312 else {
313 opal_output_verbose(100, ompi_coll_base_framework.framework_output,
314 "rank %d - optimization not supported, falling back to previous handler\n", rank);
315
316 if (request->is_sync) {
317 if ((module->previous_allreduce) && (module->previous_allreduce_module)) {
318 ret = module->previous_allreduce(sendbuf, recvbuf, count, dtype, op,
319 comm, module->previous_allreduce_module);
320 }
321 else {
322 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
323 "rank %d - no previous allreduce handler is available, aborting\n", rank);
324 return (OMPI_ERROR);
325 }
326 }
327 else {
328 if ((module->previous_iallreduce) && (module->previous_iallreduce_module)) {
329 ret = module->previous_iallreduce(sendbuf, recvbuf, count, dtype, op,
330 comm, request->fallback_request, module->previous_iallreduce_module);
331 }
332 else {
333 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
334 "rank %d - no previous iallreduce handler is available, aborting\n", rank);
335 return (OMPI_ERROR);
336 }
337 }
338 return ret;
339 }
340 return (OMPI_SUCCESS);
341 }
342
343 static int
344 allreduce_kary_tree_bottom(ompi_coll_portals4_request_t *request)
345 {
346 int ret;
347
348 if (request->u.allreduce.is_optim) {
349 PtlAtomicSync();
350
351 if (request->u.allreduce.child_nb) {
352 ret = PtlCTFree(request->u.allreduce.ack_ct_h);
353 if (PTL_OK != ret) {
354 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
355 "%s:%d: PtlCTFree failed: %d\n",
356 __FILE__, __LINE__, ret);
357 return OMPI_ERROR;
358 }
359 }
360
361 do {
362 ret = PtlMEUnlink(request->u.allreduce.data_me_h);
363 } while (PTL_IN_USE == ret);
364 if (PTL_OK != ret) {
365 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
366 "%s:%d: PtlMEUnlink failed: %d\n",
367 __FILE__, __LINE__, ret);
368 return OMPI_ERROR;
369 }
370
371 ret = PtlCTFree(request->u.allreduce.trig_ct_h);
372 if (PTL_OK != ret) {
373 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
374 "%s:%d: PtlCTFree failed: %d\n",
375 __FILE__, __LINE__, ret);
376 return OMPI_ERROR;
377 }
378 }
379
380 return (OMPI_SUCCESS);
381 }
382
383 int ompi_coll_portals4_allreduce_intra(const void* sendbuf, void* recvbuf, int count,
384 MPI_Datatype dtype, MPI_Op op,
385 struct ompi_communicator_t *comm,
386 struct mca_coll_base_module_2_3_0_t *module)
387 {
388 mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
389 ompi_coll_portals4_request_t *request;
390
391 OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
392 if (NULL == request) {
393 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
394 "%s:%d: request alloc failed\n",
395 __FILE__, __LINE__);
396 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
397 }
398
399 request->is_sync = true;
400 request->fallback_request = NULL;
401
402 allreduce_kary_tree_top(sendbuf, recvbuf, count,
403 dtype, op, comm, request, portals4_module);
404
405 allreduce_kary_tree_bottom(request);
406
407 OMPI_COLL_PORTALS4_REQUEST_RETURN(request);
408 return (OMPI_SUCCESS);
409 }
410
411
412 int ompi_coll_portals4_iallreduce_intra(const void* sendbuf, void* recvbuf, int count,
413 MPI_Datatype dtype, MPI_Op op,
414 struct ompi_communicator_t *comm,
415 ompi_request_t ** ompi_request,
416 struct mca_coll_base_module_2_3_0_t *module)
417 {
418 mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module;
419 ompi_coll_portals4_request_t *request;
420
421 OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request);
422 if (NULL == request) {
423 opal_output_verbose(1, ompi_coll_base_framework.framework_output,
424 "%s:%d: request alloc failed\n",
425 __FILE__, __LINE__);
426 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
427 }
428 *ompi_request = &request->super;
429 request->fallback_request = ompi_request;
430 request->is_sync = false;
431
432 allreduce_kary_tree_top(sendbuf, recvbuf, count,
433 dtype, op, comm, request, portals4_module);
434
435 opal_output_verbose(10, ompi_coll_base_framework.framework_output, "iallreduce");
436 return (OMPI_SUCCESS);
437 }
438
439
440 int
441 ompi_coll_portals4_iallreduce_intra_fini(struct ompi_coll_portals4_request_t *request)
442 {
443 allreduce_kary_tree_bottom(request);
444 ompi_request_complete(&request->super, true);
445
446 return (OMPI_SUCCESS);
447 }