This source file includes following definitions.
- ompi_coll_base_exscan_intra_linear
- ompi_coll_base_exscan_intra_recursivedoubling
1
2
3
4
5
6
7
8
9
10
11
12
13
14 #include "ompi_config.h"
15
16 #include "mpi.h"
17 #include "ompi/constants.h"
18 #include "ompi/datatype/ompi_datatype.h"
19 #include "ompi/communicator/communicator.h"
20 #include "ompi/mca/coll/coll.h"
21 #include "ompi/mca/coll/base/coll_base_functions.h"
22 #include "ompi/mca/coll/base/coll_tags.h"
23 #include "ompi/mca/coll/base/coll_base_util.h"
24 #include "ompi/mca/pml/pml.h"
25 #include "ompi/op/op.h"
26
27
28
29
30
31
32
33
34 int
35 ompi_coll_base_exscan_intra_linear(const void *sbuf, void *rbuf, int count,
36 struct ompi_datatype_t *dtype,
37 struct ompi_op_t *op,
38 struct ompi_communicator_t *comm,
39 mca_coll_base_module_t *module)
40 {
41 int size, rank, err;
42 ptrdiff_t dsize, gap;
43 char *free_buffer = NULL;
44 char *reduce_buffer = NULL;
45
46 rank = ompi_comm_rank(comm);
47 size = ompi_comm_size(comm);
48
49
50
51 if (MPI_IN_PLACE == sbuf) {
52 sbuf = rbuf;
53 }
54
55
56
57 if (0 == rank) {
58 return MCA_PML_CALL(send(sbuf, count, dtype, rank + 1,
59 MCA_COLL_BASE_TAG_EXSCAN,
60 MCA_PML_BASE_SEND_STANDARD, comm));
61 }
62
63
64
65 else if ((size - 1) == rank) {
66 return MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
67 MCA_COLL_BASE_TAG_EXSCAN, comm,
68 MPI_STATUS_IGNORE));
69 }
70
71
72
73
74
75
76 dsize = opal_datatype_span(&dtype->super, count, &gap);
77
78 free_buffer = (char*)malloc(dsize);
79 if (NULL == free_buffer) {
80 return OMPI_ERR_OUT_OF_RESOURCE;
81 }
82 reduce_buffer = free_buffer - gap;
83 err = ompi_datatype_copy_content_same_ddt(dtype, count,
84 reduce_buffer, (char*)sbuf);
85
86
87 err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
88 MCA_COLL_BASE_TAG_EXSCAN, comm, MPI_STATUS_IGNORE));
89 if (MPI_SUCCESS != err) {
90 goto error;
91 }
92
93
94
95 ompi_op_reduce(op, rbuf, reduce_buffer, count, dtype);
96
97
98 err = MCA_PML_CALL(send(reduce_buffer, count, dtype, rank + 1,
99 MCA_COLL_BASE_TAG_EXSCAN,
100 MCA_PML_BASE_SEND_STANDARD, comm));
101
102 error:
103 free(free_buffer);
104
105
106 return err;
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 int ompi_coll_base_exscan_intra_recursivedoubling(
143 const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype,
144 struct ompi_op_t *op, struct ompi_communicator_t *comm,
145 mca_coll_base_module_t *module)
146 {
147 int err = MPI_SUCCESS;
148 char *tmpsend_raw = NULL, *tmprecv_raw = NULL;
149 int comm_size = ompi_comm_size(comm);
150 int rank = ompi_comm_rank(comm);
151
152 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:exscan_intra_recursivedoubling: rank %d/%d",
153 rank, comm_size));
154 if (count == 0)
155 return MPI_SUCCESS;
156 if (comm_size < 2)
157 return MPI_SUCCESS;
158
159 ptrdiff_t dsize, gap;
160 dsize = opal_datatype_span(&datatype->super, count, &gap);
161 tmpsend_raw = malloc(dsize);
162 tmprecv_raw = malloc(dsize);
163 if (NULL == tmpsend_raw || NULL == tmprecv_raw) {
164 err = OMPI_ERR_OUT_OF_RESOURCE;
165 goto cleanup_and_return;
166 }
167 char *psend = tmpsend_raw - gap;
168 char *precv = tmprecv_raw - gap;
169 if (sendbuf != MPI_IN_PLACE) {
170 err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, (char *)sendbuf);
171 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
172 } else {
173 err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, recvbuf);
174 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
175 }
176 int is_commute = ompi_op_is_commute(op);
177 int is_first_block = 1;
178
179 for (int mask = 1; mask < comm_size; mask <<= 1) {
180 int remote = rank ^ mask;
181 if (remote < comm_size) {
182 err = ompi_coll_base_sendrecv(psend, count, datatype, remote,
183 MCA_COLL_BASE_TAG_EXSCAN,
184 precv, count, datatype, remote,
185 MCA_COLL_BASE_TAG_EXSCAN, comm,
186 MPI_STATUS_IGNORE, rank);
187 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
188
189 if (rank > remote) {
190
191 if (is_first_block) {
192 err = ompi_datatype_copy_content_same_ddt(datatype, count,
193 recvbuf, precv);
194 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
195 is_first_block = 0;
196 } else {
197
198 ompi_op_reduce(op, precv, recvbuf, count, datatype);
199 }
200
201 ompi_op_reduce(op, precv, psend, count, datatype);
202 } else {
203 if (is_commute) {
204
205 ompi_op_reduce(op, precv, psend, count, datatype);
206 } else {
207
208 ompi_op_reduce(op, psend, precv, count, datatype);
209 char *tmp = psend;
210 psend = precv;
211 precv = tmp;
212 }
213 }
214 }
215 }
216
217 cleanup_and_return:
218 if (NULL != tmpsend_raw)
219 free(tmpsend_raw);
220 if (NULL != tmprecv_raw)
221 free(tmprecv_raw);
222 return err;
223 }