This source file includes following definitions.
- mca_coll_base_alltoallv_intra_basic_inplace
- ompi_coll_base_alltoallv_intra_pairwise
- ompi_coll_base_alltoallv_intra_basic_linear
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "ompi_config.h"
28
29 #include "mpi.h"
30 #include "ompi/constants.h"
31 #include "ompi/datatype/ompi_datatype.h"
32 #include "ompi/communicator/communicator.h"
33 #include "ompi/mca/coll/coll.h"
34 #include "ompi/mca/coll/base/coll_tags.h"
35 #include "ompi/mca/pml/pml.h"
36 #include "ompi/mca/coll/base/coll_base_functions.h"
37 #include "coll_base_topo.h"
38 #include "coll_base_util.h"
39
40 int
41 mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
42 struct ompi_datatype_t *rdtype,
43 struct ompi_communicator_t *comm,
44 mca_coll_base_module_t *module)
45 {
46 int i, j, size, rank, err=MPI_SUCCESS;
47 char *allocated_buffer, *tmp_buffer;
48 size_t max_size;
49 ptrdiff_t ext, gap = 0;
50
51
52
53 size = ompi_comm_size(comm);
54 rank = ompi_comm_rank(comm);
55
56
57 if (1 == size) {
58 return MPI_SUCCESS;
59 }
60
61 ompi_datatype_type_extent (rdtype, &ext);
62 for (i = 0, max_size = 0 ; i < size ; ++i) {
63 if (i == rank) {
64 continue;
65 }
66 size_t size = opal_datatype_span(&rdtype->super, rcounts[i], &gap);
67 max_size = size > max_size ? size : max_size;
68 }
69
70
71 if (OPAL_UNLIKELY(0 == max_size)) {
72 return MPI_SUCCESS;
73 }
74
75
76 allocated_buffer = calloc (max_size, 1);
77 if (NULL == allocated_buffer) {
78 return OMPI_ERR_OUT_OF_RESOURCE;
79 }
80 tmp_buffer = allocated_buffer - gap;
81
82
83
84 for (i = 0 ; i < size ; ++i) {
85 for (j = i+1 ; j < size ; ++j) {
86 if (i == rank && 0 != rcounts[j]) {
87
88 err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[j],
89 tmp_buffer, (char *) rbuf + rdisps[j] * ext);
90 if (MPI_SUCCESS != err) { goto error_hndl; }
91
92
93 err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[j], rdtype,
94 j, MCA_COLL_BASE_TAG_ALLTOALLV,
95 (char *)rbuf + rdisps[j] * ext, rcounts[j], rdtype,
96 j, MCA_COLL_BASE_TAG_ALLTOALLV,
97 comm, MPI_STATUS_IGNORE);
98 if (MPI_SUCCESS != err) { goto error_hndl; }
99 } else if (j == rank && 0 != rcounts[i]) {
100
101 err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[i],
102 tmp_buffer, (char *) rbuf + rdisps[i] * ext);
103 if (MPI_SUCCESS != err) { goto error_hndl; }
104
105
106 err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[i], rdtype,
107 i, MCA_COLL_BASE_TAG_ALLTOALLV,
108 (char *) rbuf + rdisps[i] * ext, rcounts[i], rdtype,
109 i, MCA_COLL_BASE_TAG_ALLTOALLV,
110 comm, MPI_STATUS_IGNORE);
111 if (MPI_SUCCESS != err) { goto error_hndl; }
112 }
113 }
114 }
115
116 error_hndl:
117
118 free (allocated_buffer);
119
120
121 return err;
122 }
123
124 int
125 ompi_coll_base_alltoallv_intra_pairwise(const void *sbuf, const int *scounts, const int *sdisps,
126 struct ompi_datatype_t *sdtype,
127 void* rbuf, const int *rcounts, const int *rdisps,
128 struct ompi_datatype_t *rdtype,
129 struct ompi_communicator_t *comm,
130 mca_coll_base_module_t *module)
131 {
132 int line = -1, err = 0, rank, size, step = 0, sendto, recvfrom;
133 void *psnd, *prcv;
134 ptrdiff_t sext, rext;
135
136 if (MPI_IN_PLACE == sbuf) {
137 return mca_coll_base_alltoallv_intra_basic_inplace (rbuf, rcounts, rdisps,
138 rdtype, comm, module);
139 }
140
141 size = ompi_comm_size(comm);
142 rank = ompi_comm_rank(comm);
143
144 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
145 "coll:base:alltoallv_intra_pairwise rank %d", rank));
146
147 ompi_datatype_type_extent(sdtype, &sext);
148 ompi_datatype_type_extent(rdtype, &rext);
149
150
151 for (step = 0; step < size; step++) {
152
153
154 sendto = (rank + step) % size;
155 recvfrom = (rank + size - step) % size;
156
157
158 psnd = (char*)sbuf + (ptrdiff_t)sdisps[sendto] * sext;
159 prcv = (char*)rbuf + (ptrdiff_t)rdisps[recvfrom] * rext;
160
161
162 err = ompi_coll_base_sendrecv( psnd, scounts[sendto], sdtype, sendto,
163 MCA_COLL_BASE_TAG_ALLTOALLV,
164 prcv, rcounts[recvfrom], rdtype, recvfrom,
165 MCA_COLL_BASE_TAG_ALLTOALLV,
166 comm, MPI_STATUS_IGNORE, rank);
167 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
168 }
169
170 return MPI_SUCCESS;
171
172 err_hndl:
173 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
174 "%s:%4d\tError occurred %d, rank %2d at step %d", __FILE__, line,
175 err, rank, step));
176 (void)line;
177 return err;
178 }
179
180
181
182
183
184
185
186
187
188 int
189 ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts, const int *sdisps,
190 struct ompi_datatype_t *sdtype,
191 void *rbuf, const int *rcounts, const int *rdisps,
192 struct ompi_datatype_t *rdtype,
193 struct ompi_communicator_t *comm,
194 mca_coll_base_module_t *module)
195 {
196 int i, size, rank, err, nreqs;
197 char *psnd, *prcv;
198 ptrdiff_t sext, rext;
199 ompi_request_t **preq, **reqs;
200 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
201 mca_coll_base_comm_t *data = base_module->base_data;
202
203 if (MPI_IN_PLACE == sbuf) {
204 return mca_coll_base_alltoallv_intra_basic_inplace (rbuf, rcounts, rdisps,
205 rdtype, comm, module);
206 }
207
208 size = ompi_comm_size(comm);
209 rank = ompi_comm_rank(comm);
210
211 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
212 "coll:base:alltoallv_intra_basic_linear rank %d", rank));
213
214 ompi_datatype_type_extent(sdtype, &sext);
215 ompi_datatype_type_extent(rdtype, &rext);
216
217
218 psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[rank] * sext;
219 prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[rank] * rext;
220 if (0 != scounts[rank]) {
221 err = ompi_datatype_sndrcv(psnd, scounts[rank], sdtype,
222 prcv, rcounts[rank], rdtype);
223 if (MPI_SUCCESS != err) {
224 return err;
225 }
226 }
227
228
229 if (1 == size) {
230 return MPI_SUCCESS;
231 }
232
233
234 nreqs = 0;
235 reqs = preq = ompi_coll_base_comm_get_reqs(data, 2 * size);
236 if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl; }
237
238
239 for (i = 0; i < size; ++i) {
240 if (i == rank) {
241 continue;
242 }
243
244 ++nreqs;
245 prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[i] * rext;
246 err = MCA_PML_CALL(irecv_init(prcv, rcounts[i], rdtype,
247 i, MCA_COLL_BASE_TAG_ALLTOALLV, comm,
248 preq++));
249 if (MPI_SUCCESS != err) { goto err_hndl; }
250 }
251
252
253 for (i = 0; i < size; ++i) {
254 if (i == rank) {
255 continue;
256 }
257
258 ++nreqs;
259 psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[i] * sext;
260 err = MCA_PML_CALL(isend_init(psnd, scounts[i], sdtype,
261 i, MCA_COLL_BASE_TAG_ALLTOALLV,
262 MCA_PML_BASE_SEND_STANDARD, comm,
263 preq++));
264 if (MPI_SUCCESS != err) { goto err_hndl; }
265 }
266
267
268 MCA_PML_CALL(start(nreqs, reqs));
269
270
271
272
273
274
275
276 err = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
277
278 err_hndl:
279
280 if (MPI_ERR_IN_STATUS == err) {
281 for( i = 0; i < nreqs; i++ ) {
282 if (MPI_REQUEST_NULL == reqs[i]) continue;
283 if (MPI_ERR_PENDING == reqs[i]->req_status.MPI_ERROR) continue;
284 err = reqs[i]->req_status.MPI_ERROR;
285 break;
286 }
287 }
288
289 ompi_coll_base_free_reqs(reqs, nreqs);
290
291 return err;
292 }