This source file includes following definitions.
- ompi_comm_allreduce_pml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 #include "ompi_config.h"
18
19 #include "ompi/constants.h"
20 #include "ompi/op/op.h"
21 #include "ompi/datatype/ompi_datatype.h"
22 #include "ompi/communicator/communicator.h"
23 #include "opal/include/opal/sys/atomic.h"
24 #include "ompi/mca/pml/pml.h"
25 #include "ompi/patterns/net/netpatterns.h"
26 #include "ompi/mca/coll/base/coll_base_util.h"
27 #include "coll_ops.h"
28 #include "commpatterns.h"
29
30
31
32
33 OMPI_DECLSPEC int ompi_comm_allreduce_pml(void *sbuf, void *rbuf, int count,
34 ompi_datatype_t *dtype, int my_rank_in_group,
35 struct ompi_op_t *op, int n_peers,int *ranks_in_comm,
36 ompi_communicator_t *comm)
37 {
38
39 int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
40 int pair_rank,exchange,extra_rank;
41 netpatterns_pair_exchange_node_t my_exchange_node;
42 int count_processed,count_this_stripe;
43 size_t dt_size,dt_extent;
44 char scratch_bufers[2][MAX_TMP_BUFFER];
45 int send_buffer=0,recv_buffer=1;
46 char *sbuf_current, *rbuf_current;
47
48
49
50
51 rc = opal_datatype_type_size((opal_datatype_t *)dtype, &dt_size);
52 if( OMPI_SUCCESS != rc ) {
53 goto Error;
54 }
55 rc = ompi_datatype_type_extent(dtype, (ptrdiff_t *)&dt_extent);
56 if( OMPI_SUCCESS != rc ) {
57 goto Error;
58 }
59
60
61 if(1 == n_peers) {
62
63 rc=ompi_datatype_copy_content_same_ddt(dtype,count,
64 (char *)rbuf, (char *)sbuf);
65 if( OMPI_SUCCESS != rc ) {
66 goto Error;
67 }
68 return OMPI_SUCCESS;
69 }
70
71
72 n_dts_per_buffer=((int) MAX_TMP_BUFFER)/dt_extent;
73 if ( 0 == n_dts_per_buffer ) {
74 rc=OMPI_ERROR;
75 goto Error;
76 }
77
78
79 n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
80
81
82 memset(&my_exchange_node, 0, sizeof(netpatterns_pair_exchange_node_t));
83 rc = ompi_netpatterns_setup_recursive_doubling_tree_node(n_peers,
84 my_rank_in_group, &my_exchange_node);
85 if(OMPI_SUCCESS != rc){
86 return rc;
87 }
88
89 count_processed=0;
90
91
92
93 for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
94
95
96 count_this_stripe=n_dts_per_buffer;
97 if( count_processed + count_this_stripe > count )
98 count_this_stripe=count-count_processed;
99
100
101 sbuf_current=(char *)sbuf+count_processed*dt_extent;
102 rc=ompi_datatype_copy_content_same_ddt(dtype,count_this_stripe,
103 scratch_bufers[send_buffer], sbuf_current);
104 if( OMPI_SUCCESS != rc ) {
105 goto Error;
106 }
107
108
109 if(0 < my_exchange_node.n_extra_sources) {
110
111 if ( EXCHANGE_NODE == my_exchange_node.node_type ) {
112
113
114
115
116 extra_rank=my_exchange_node.rank_extra_source;
117 rc=MCA_PML_CALL(recv(scratch_bufers[recv_buffer],
118 count_this_stripe,dtype,ranks_in_comm[extra_rank],
119 -OMPI_COMMON_TAG_ALLREDUCE, comm,
120 MPI_STATUSES_IGNORE));
121 if( 0 > rc ) {
122 fprintf(stderr," first recv failed in ompi_comm_allreduce_pml \n");
123 fflush(stderr);
124 goto Error;
125 }
126
127
128
129 if( 0 < count_this_stripe ) {
130 ompi_op_reduce(op,
131 (void *)scratch_bufers[send_buffer],
132 (void *)scratch_bufers[recv_buffer],
133 count_this_stripe,dtype);
134 }
135
136
137 } else {
138
139
140
141
142 extra_rank=my_exchange_node.rank_extra_source;
143 rc=MCA_PML_CALL(send(scratch_bufers[send_buffer],
144 count_this_stripe,dtype,ranks_in_comm[extra_rank],
145 -OMPI_COMMON_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD,
146 comm));
147 if( 0 > rc ) {
148 fprintf(stderr," first send failed in ompi_comm_allreduce_pml \n");
149 fflush(stderr);
150 goto Error;
151 }
152 }
153
154
155
156
157
158
159 recv_buffer^=1;
160 send_buffer^=1;
161 }
162
163
164 for(exchange=0 ; exchange < my_exchange_node.n_exchanges ; exchange++) {
165
166
167 pair_rank=my_exchange_node.rank_exchanges[exchange];
168
169 rc=ompi_coll_base_sendrecv_actual(scratch_bufers[send_buffer],
170 count_this_stripe,dtype, ranks_in_comm[pair_rank],
171 -OMPI_COMMON_TAG_ALLREDUCE,
172 scratch_bufers[recv_buffer],
173 count_this_stripe,dtype,ranks_in_comm[pair_rank],
174 -OMPI_COMMON_TAG_ALLREDUCE,
175 comm, MPI_STATUS_IGNORE);
176 if( 0 > rc ) {
177 fprintf(stderr," irecv failed in ompi_comm_allreduce_pml at iterations %d \n",
178 exchange);
179 fflush(stderr);
180 goto Error;
181 }
182
183
184 if( 0 < count_this_stripe ) {
185 ompi_op_reduce(op,
186 (void *)scratch_bufers[send_buffer],
187 (void *)scratch_bufers[recv_buffer],
188 count_this_stripe,dtype);
189 }
190
191 recv_buffer^=1;
192 send_buffer^=1;
193
194 }
195
196
197 if(0 < my_exchange_node.n_extra_sources) {
198
199 if ( EXTRA_NODE == my_exchange_node.node_type ) {
200
201
202
203 extra_rank=my_exchange_node.rank_extra_source;
204 rc=MCA_PML_CALL(recv(scratch_bufers[recv_buffer],
205 count_this_stripe,dtype,ranks_in_comm[extra_rank],
206 -OMPI_COMMON_TAG_ALLREDUCE, comm,
207 MPI_STATUSES_IGNORE));
208 if( 0 > rc ) {
209 fprintf(stderr," last recv failed in ompi_comm_allreduce_pml \n");
210 fflush(stderr);
211 goto Error;
212 }
213
214 recv_buffer^=1;
215 send_buffer^=1;
216 } else {
217
218
219
220
221 extra_rank=my_exchange_node.rank_extra_source;
222 rc=MCA_PML_CALL(send((char *)scratch_bufers[send_buffer],
223 count_this_stripe,dtype,ranks_in_comm[extra_rank],
224 -OMPI_COMMON_TAG_ALLREDUCE, MCA_PML_BASE_SEND_STANDARD,
225 comm));
226 if( 0 > rc ) {
227 fprintf(stderr," last send failed in ompi_comm_allreduce_pml \n");
228 fflush(stderr);
229 goto Error;
230 }
231 }
232 }
233
234
235 rbuf_current = (char *) rbuf + count_processed * dt_size;
236 memcpy(rbuf_current,scratch_bufers[send_buffer], count_this_stripe*dt_size);
237
238
239 count_processed += count_this_stripe;
240 }
241
242 ompi_netpatterns_cleanup_recursive_doubling_tree_node(&my_exchange_node);
243
244
245 return OMPI_SUCCESS;
246
247 Error:
248 return rc;
249 }