This source file includes following definitions.
- ompi_coll_base_scan_intra_linear
- ompi_coll_base_scan_intra_recursivedoubling
1
2
3
4
5
6
7
8
9
10
11
12
13
14 #include "ompi_config.h"
15
16 #include "mpi.h"
17 #include "ompi/constants.h"
18 #include "ompi/datatype/ompi_datatype.h"
19 #include "ompi/communicator/communicator.h"
20 #include "ompi/mca/coll/coll.h"
21 #include "ompi/mca/coll/base/coll_base_functions.h"
22 #include "ompi/mca/coll/base/coll_tags.h"
23 #include "ompi/mca/coll/base/coll_base_util.h"
24 #include "ompi/mca/pml/pml.h"
25 #include "ompi/op/op.h"
26
27
28
29
30
31
32
33
34 int
35 ompi_coll_base_scan_intra_linear(const void *sbuf, void *rbuf, int count,
36 struct ompi_datatype_t *dtype,
37 struct ompi_op_t *op,
38 struct ompi_communicator_t *comm,
39 mca_coll_base_module_t *module)
40 {
41 int size, rank, err;
42 ptrdiff_t dsize, gap;
43 char *free_buffer = NULL;
44 char *pml_buffer = NULL;
45
46
47
48 rank = ompi_comm_rank(comm);
49 size = ompi_comm_size(comm);
50
51
52
53 if (0 == rank) {
54 if (MPI_IN_PLACE != sbuf) {
55 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
56 if (MPI_SUCCESS != err) {
57 return err;
58 }
59 }
60 }
61
62
63
64 else {
65
66
67
68
69 dsize = opal_datatype_span(&dtype->super, count, &gap);
70 free_buffer = malloc(dsize);
71 if (NULL == free_buffer) {
72 return OMPI_ERR_OUT_OF_RESOURCE;
73 }
74 pml_buffer = free_buffer - gap;
75
76
77
78 if (MPI_IN_PLACE != sbuf) {
79 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
80 if (MPI_SUCCESS != err) {
81 if (NULL != free_buffer) {
82 free(free_buffer);
83 }
84 return err;
85 }
86 }
87
88
89
90 err = MCA_PML_CALL(recv(pml_buffer, count, dtype,
91 rank - 1, MCA_COLL_BASE_TAG_SCAN, comm,
92 MPI_STATUS_IGNORE));
93 if (MPI_SUCCESS != err) {
94 if (NULL != free_buffer) {
95 free(free_buffer);
96 }
97 return err;
98 }
99
100
101
102 ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
103
104
105
106 if (NULL != free_buffer) {
107 free(free_buffer);
108 }
109 }
110
111
112
113 if (rank < (size - 1)) {
114 return MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
115 MCA_COLL_BASE_TAG_SCAN,
116 MCA_PML_BASE_SEND_STANDARD, comm));
117 }
118
119
120
121 return MPI_SUCCESS;
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 int ompi_coll_base_scan_intra_recursivedoubling(
158 const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype,
159 struct ompi_op_t *op, struct ompi_communicator_t *comm,
160 mca_coll_base_module_t *module)
161 {
162 int err = MPI_SUCCESS;
163 char *tmpsend_raw = NULL, *tmprecv_raw = NULL;
164 int comm_size = ompi_comm_size(comm);
165 int rank = ompi_comm_rank(comm);
166
167 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
168 "coll:base:scan_intra_recursivedoubling: rank %d/%d",
169 rank, comm_size));
170 if (count == 0)
171 return MPI_SUCCESS;
172
173 if (sendbuf != MPI_IN_PLACE) {
174 err = ompi_datatype_copy_content_same_ddt(datatype, count, recvbuf, (char *)sendbuf);
175 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
176 }
177 if (comm_size < 2)
178 return MPI_SUCCESS;
179
180 ptrdiff_t dsize, gap;
181 dsize = opal_datatype_span(&datatype->super, count, &gap);
182 tmpsend_raw = malloc(dsize);
183 tmprecv_raw = malloc(dsize);
184 if (NULL == tmpsend_raw || NULL == tmprecv_raw) {
185 err = OMPI_ERR_OUT_OF_RESOURCE;
186 goto cleanup_and_return;
187 }
188 char *psend = tmpsend_raw - gap;
189 char *precv = tmprecv_raw - gap;
190 err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, recvbuf);
191 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
192 int is_commute = ompi_op_is_commute(op);
193
194 for (int mask = 1; mask < comm_size; mask <<= 1) {
195 int remote = rank ^ mask;
196 if (remote < comm_size) {
197 err = ompi_coll_base_sendrecv(psend, count, datatype, remote,
198 MCA_COLL_BASE_TAG_SCAN,
199 precv, count, datatype, remote,
200 MCA_COLL_BASE_TAG_SCAN, comm,
201 MPI_STATUS_IGNORE, rank);
202 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
203
204 if (rank > remote) {
205
206 ompi_op_reduce(op, precv, recvbuf, count, datatype);
207
208 ompi_op_reduce(op, precv, psend, count, datatype);
209 } else {
210 if (is_commute) {
211
212 ompi_op_reduce(op, precv, psend, count, datatype);
213 } else {
214
215 ompi_op_reduce(op, psend, precv, count, datatype);
216 char *tmp = psend;
217 psend = precv;
218 precv = tmp;
219 }
220 }
221 }
222 }
223
224 cleanup_and_return:
225 if (NULL != tmpsend_raw)
226 free(tmpsend_raw);
227 if (NULL != tmprecv_raw)
228 free(tmprecv_raw);
229 return err;
230 }