This source file includes following definitions.
- ompi_comm_allgather_pml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 #include "ompi_config.h"
18
19 #include "ompi/constants.h"
20 #include "ompi/op/op.h"
21 #include "ompi/datatype/ompi_datatype.h"
22 #include "ompi/communicator/communicator.h"
23 #include "opal/include/opal/sys/atomic.h"
24 #include "ompi/mca/pml/pml.h"
25 #include "ompi/patterns/net/netpatterns.h"
26 #include "coll_ops.h"
27
28
29
30
31 OMPI_DECLSPEC int ompi_comm_allgather_pml(void *src_buf, void *dest_buf, int count,
32 ompi_datatype_t *dtype, int my_rank_in_group,
33 int n_peers, int *ranks_in_comm,ompi_communicator_t *comm)
34 {
35
36 int rc=OMPI_SUCCESS,msg_cnt;
37 int pair_rank,exchange,extra_rank, n_extra_nodes,n_extra;
38 int proc_block,extra_start,extra_end,iovec_len;
39 int remote_data_start_rank,remote_data_end_rank;
40 int local_data_start_rank;
41 netpatterns_pair_exchange_node_t my_exchange_node;
42 size_t message_extent,current_data_extent,current_data_count;
43 size_t dt_size;
44 ptrdiff_t dt_extent;
45 char *src_buf_current;
46 char *dest_buf_current;
47 struct iovec send_iov[2] = {{0,0},{0,0}},
48 recv_iov[2] = {{0,0},{0,0}};
49 ompi_request_t *requests[4];
50
51
52
53
54 rc = ompi_datatype_type_size(dtype, &dt_size);
55 if( OMPI_SUCCESS != rc ) {
56 goto Error;
57 }
58
59 rc = ompi_datatype_type_extent(dtype, &dt_extent);
60 if( OMPI_SUCCESS != rc ) {
61 goto Error;
62 }
63 message_extent = dt_extent*count;
64
65
66 rc=ompi_datatype_copy_content_same_ddt(dtype,count,
67 (char *)dest_buf+my_rank_in_group*message_extent,
68 (char *)src_buf);
69 if( OMPI_SUCCESS != rc ) {
70 goto Error;
71 }
72
73
74 if(1 == n_peers) {
75 return OMPI_SUCCESS;
76 }
77
78
79 memset(&my_exchange_node, 0, sizeof(netpatterns_pair_exchange_node_t));
80 rc = ompi_netpatterns_setup_recursive_doubling_tree_node(n_peers,
81 my_rank_in_group, &my_exchange_node);
82 if(OMPI_SUCCESS != rc){
83 return rc;
84 }
85
86 n_extra_nodes=n_peers-my_exchange_node.n_largest_pow_2;
87
88
89 if(0 < my_exchange_node.n_extra_sources) {
90
91 if ( EXCHANGE_NODE == my_exchange_node.node_type ) {
92
93
94
95
96
97 extra_rank=my_exchange_node.rank_extra_source;
98
99
100 dest_buf_current=(char *)dest_buf+message_extent*extra_rank;
101 rc=MCA_PML_CALL(recv(dest_buf_current,
102 count,dtype,ranks_in_comm[extra_rank],
103 -OMPI_COMMON_TAG_ALLREDUCE,
104 comm, MPI_STATUSES_IGNORE));
105 if( 0 > rc ) {
106 goto Error;
107 }
108
109 } else {
110
111
112
113
114 extra_rank=my_exchange_node.rank_extra_source;
115 src_buf_current=(char *)src_buf;
116 rc=MCA_PML_CALL(send(src_buf_current,
117 count,dtype,ranks_in_comm[extra_rank],
118 -OMPI_COMMON_TAG_ALLREDUCE,
119 MCA_PML_BASE_SEND_STANDARD,
120 comm));
121 if( 0 > rc ) {
122 goto Error;
123 }
124 }
125 }
126
127 current_data_extent=message_extent;
128 current_data_count=count;
129 src_buf_current=(char *)dest_buf+my_rank_in_group*message_extent;
130 proc_block=1;
131 local_data_start_rank=my_rank_in_group;
132
133 for(exchange=0 ; exchange < my_exchange_node.n_exchanges ; exchange++) {
134
135
136 pair_rank=my_exchange_node.rank_exchanges[exchange];
137 msg_cnt=0;
138
139
140
141
142
143 if(pair_rank > my_rank_in_group ){
144 recv_iov[0].iov_base=src_buf_current+current_data_extent;
145 recv_iov[0].iov_len=current_data_extent;
146 iovec_len=1;
147 remote_data_start_rank=local_data_start_rank+proc_block;
148 remote_data_end_rank=remote_data_start_rank+proc_block-1;
149 } else {
150 recv_iov[0].iov_base=src_buf_current-current_data_extent;
151 recv_iov[0].iov_len=current_data_extent;
152 iovec_len=1;
153 remote_data_start_rank=local_data_start_rank-proc_block;
154 remote_data_end_rank=remote_data_start_rank+proc_block-1;
155 }
156
157 if(remote_data_start_rank<n_extra_nodes) {
158
159
160 extra_start=remote_data_start_rank;
161 extra_end=remote_data_end_rank;
162 if(extra_end >= n_extra_nodes ) {
163
164
165
166 extra_end=n_extra_nodes-1;
167 }
168
169 n_extra=extra_end-extra_start+1;
170
171 recv_iov[1].iov_base=(char *)dest_buf+
172 (extra_start+my_exchange_node.n_largest_pow_2)*message_extent;
173 recv_iov[1].iov_len=n_extra*count;
174 iovec_len=2;
175 }
176
177 rc=MCA_PML_CALL(irecv(recv_iov[0].iov_base,
178 current_data_count,dtype,ranks_in_comm[pair_rank],
179 -OMPI_COMMON_TAG_ALLREDUCE,
180 comm,&(requests[msg_cnt])));
181 if( 0 > rc ) {
182 goto Error;
183 }
184 msg_cnt++;
185
186 if(iovec_len > 1 ) {
187 rc=MCA_PML_CALL(irecv(recv_iov[1].iov_base,
188 recv_iov[1].iov_len,dtype,ranks_in_comm[pair_rank],
189 -OMPI_COMMON_TAG_ALLREDUCE,
190 comm,&(requests[msg_cnt])));
191 if( 0 > rc ) {
192 goto Error;
193 }
194 msg_cnt++;
195 }
196
197
198 send_iov[0].iov_base=src_buf_current;
199 send_iov[0].iov_len=current_data_extent;
200 iovec_len=1;
201
202 if(local_data_start_rank<n_extra_nodes) {
203
204
205 extra_start=local_data_start_rank;
206 extra_end=extra_start+proc_block-1;
207 if(extra_end >= n_extra_nodes ) {
208
209
210
211 extra_end=n_extra_nodes-1;
212 }
213
214 n_extra=extra_end-extra_start+1;
215
216 send_iov[1].iov_base=(char *)dest_buf+
217 (extra_start+my_exchange_node.n_largest_pow_2)*message_extent;
218 send_iov[1].iov_len=n_extra*count;
219 iovec_len=2;
220 }
221
222 rc=MCA_PML_CALL(isend(send_iov[0].iov_base,
223 current_data_count,dtype,ranks_in_comm[pair_rank],
224 -OMPI_COMMON_TAG_ALLREDUCE,MCA_PML_BASE_SEND_STANDARD,
225 comm,&(requests[msg_cnt])));
226 if( 0 > rc ) {
227 goto Error;
228 }
229 msg_cnt++;
230 if( iovec_len > 1 ) {
231 rc=MCA_PML_CALL(isend(send_iov[1].iov_base,
232 send_iov[1].iov_len,dtype,ranks_in_comm[pair_rank],
233 -OMPI_COMMON_TAG_ALLREDUCE,MCA_PML_BASE_SEND_STANDARD,
234 comm,&(requests[msg_cnt])));
235 if( 0 > rc ) {
236 goto Error;
237 }
238 msg_cnt++;
239 }
240
241
242 if(pair_rank < my_rank_in_group ){
243 src_buf_current-=current_data_extent;
244 local_data_start_rank-=proc_block;
245 }
246 proc_block*=2;
247 current_data_extent*=2;
248 current_data_count*=2;
249
250
251 ompi_request_wait_all(msg_cnt,requests,MPI_STATUSES_IGNORE);
252 }
253
254
255 if(0 < my_exchange_node.n_extra_sources) {
256
257 if ( EXTRA_NODE == my_exchange_node.node_type ) {
258
259
260
261 extra_rank=my_exchange_node.rank_extra_source;
262
263 rc=MCA_PML_CALL(recv(dest_buf,
264 count*n_peers,dtype,ranks_in_comm[extra_rank],
265 -OMPI_COMMON_TAG_ALLREDUCE,
266 comm,MPI_STATUSES_IGNORE));
267 if(0 > rc ) {
268 goto Error;
269 }
270 } else {
271
272
273
274
275 extra_rank=my_exchange_node.rank_extra_source;
276 rc=MCA_PML_CALL(send(dest_buf,
277 count*n_peers,dtype,ranks_in_comm[extra_rank],
278 -OMPI_COMMON_TAG_ALLREDUCE,
279 MCA_PML_BASE_SEND_STANDARD,
280 comm));
281 if( 0 > rc ) {
282 goto Error;
283 }
284 }
285 }
286
287 ompi_netpatterns_cleanup_recursive_doubling_tree_node(&my_exchange_node);
288
289
290 return OMPI_SUCCESS;
291
292 Error:
293 return rc;
294 }