This source file includes following definitions.
- mca_coll_basic_allreduce_intra
- mca_coll_basic_allreduce_inter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23 #include "coll_basic.h"
24
25 #include "mpi.h"
26 #include "ompi/constants.h"
27 #include "ompi/datatype/ompi_datatype.h"
28 #include "ompi/op/op.h"
29 #include "ompi/mca/coll/coll.h"
30 #include "ompi/mca/coll/base/coll_tags.h"
31 #include "ompi/mca/coll/base/coll_base_util.h"
32 #include "coll_basic.h"
33 #include "ompi/mca/pml/pml.h"
34
35
36
37
38
39
40
41
42
43 int
44 mca_coll_basic_allreduce_intra(const void *sbuf, void *rbuf, int count,
45 struct ompi_datatype_t *dtype,
46 struct ompi_op_t *op,
47 struct ompi_communicator_t *comm,
48 mca_coll_base_module_t *module)
49 {
50 int err;
51
52
53
54 if (MPI_IN_PLACE == sbuf) {
55 if (0 == ompi_comm_rank(comm)) {
56 err = comm->c_coll->coll_reduce(MPI_IN_PLACE, rbuf, count, dtype, op, 0, comm, comm->c_coll->coll_reduce_module);
57 } else {
58 err = comm->c_coll->coll_reduce(rbuf, NULL, count, dtype, op, 0, comm, comm->c_coll->coll_reduce_module);
59 }
60 } else {
61 err = comm->c_coll->coll_reduce(sbuf, rbuf, count, dtype, op, 0, comm, comm->c_coll->coll_reduce_module);
62 }
63 if (MPI_SUCCESS != err) {
64 return err;
65 }
66
67 return comm->c_coll->coll_bcast(rbuf, count, dtype, 0, comm, comm->c_coll->coll_bcast_module);
68 }
69
70
71
72
73
74
75
76
77
78 int
79 mca_coll_basic_allreduce_inter(const void *sbuf, void *rbuf, int count,
80 struct ompi_datatype_t *dtype,
81 struct ompi_op_t *op,
82 struct ompi_communicator_t *comm,
83 mca_coll_base_module_t *module)
84 {
85 int err, i, rank, root = 0, rsize, line;
86 ptrdiff_t extent, dsize, gap;
87 char *tmpbuf = NULL, *pml_buffer = NULL;
88 ompi_request_t **reqs = NULL;
89
90 rank = ompi_comm_rank(comm);
91 rsize = ompi_comm_remote_size(comm);
92
93
94
95
96
97
98
99
100
101
102 if (rank == root) {
103 err = ompi_datatype_type_extent(dtype, &extent);
104 if (OMPI_SUCCESS != err) {
105 return OMPI_ERROR;
106 }
107 dsize = opal_datatype_span(&dtype->super, count, &gap);
108 tmpbuf = (char *) malloc(dsize);
109 if (NULL == tmpbuf) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto exit; }
110 pml_buffer = tmpbuf - gap;
111
112 if (rsize > 1) {
113 reqs = ompi_coll_base_comm_get_reqs(module->base_data, rsize - 1);
114 if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto exit; }
115 }
116
117
118 err = ompi_coll_base_sendrecv_actual(sbuf, count, dtype, 0,
119 MCA_COLL_BASE_TAG_ALLREDUCE,
120 rbuf, count, dtype, 0,
121 MCA_COLL_BASE_TAG_ALLREDUCE,
122 comm, MPI_STATUS_IGNORE);
123 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
124
125
126 for (i = 1; i < rsize; i++) {
127 err = MCA_PML_CALL(recv(pml_buffer, count, dtype, i,
128 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
129 MPI_STATUS_IGNORE));
130 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
131
132
133 ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
134 }
135 } else {
136
137 err = MCA_PML_CALL(send(sbuf, count, dtype, root,
138 MCA_COLL_BASE_TAG_ALLREDUCE,
139 MCA_PML_BASE_SEND_STANDARD, comm));
140 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
141 }
142
143
144
145
146
147
148
149 if (rank == root) {
150
151 err = ompi_coll_base_sendrecv_actual(rbuf, count, dtype, 0,
152 MCA_COLL_BASE_TAG_ALLREDUCE,
153 pml_buffer, count, dtype, 0,
154 MCA_COLL_BASE_TAG_ALLREDUCE,
155 comm, MPI_STATUS_IGNORE);
156 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
157
158
159
160
161
162
163 if (rsize > 1) {
164 for (i = 1; i < rsize; i++) {
165 err = MCA_PML_CALL(isend(pml_buffer, count, dtype, i,
166 MCA_COLL_BASE_TAG_ALLREDUCE,
167 MCA_PML_BASE_SEND_STANDARD, comm,
168 &reqs[i - 1]));
169 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
170 }
171
172 err =
173 ompi_request_wait_all(rsize - 1, reqs,
174 MPI_STATUSES_IGNORE);
175 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
176 }
177 } else {
178 err = MCA_PML_CALL(recv(rbuf, count, dtype, root,
179 MCA_COLL_BASE_TAG_ALLREDUCE,
180 comm, MPI_STATUS_IGNORE));
181 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
182 }
183
184 exit:
185 if( MPI_SUCCESS != err ) {
186 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"%s:%4d\tError occurred %d, rank %2d", __FILE__,
187 line, err, rank));
188 (void)line;
189 ompi_coll_base_free_reqs(reqs, rsize - 1);
190 }
191 if (NULL != tmpbuf) {
192 free(tmpbuf);
193 }
194
195 return err;
196 }