This source file includes following definitions.
- mca_fcoll_two_phase_domain_partition
- mca_fcoll_two_phase_calc_aggregator
- mca_fcoll_two_phase_calc_others_requests
- mca_fcoll_two_phase_calc_my_requests
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 #include "ompi_config.h"
26 #include "fcoll_two_phase.h"
27
28 #include "mpi.h"
29 #include "ompi/constants.h"
30 #include "ompi/mca/fcoll/fcoll.h"
31 #include "ompi/mca/common/ompio/common_ompio.h"
32 #include "ompi/mca/io/io.h"
33 #include "opal/mca/base/base.h"
34 #include "math.h"
35 #include "ompi/mca/pml/pml.h"
36 #include <unistd.h>
37
38
39
40
41
42
43
44 int mca_fcoll_two_phase_domain_partition (ompio_file_t *fh,
45 OMPI_MPI_OFFSET_TYPE *start_offsets,
46 OMPI_MPI_OFFSET_TYPE *end_offsets,
47 OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr,
48 OMPI_MPI_OFFSET_TYPE **fd_st_ptr,
49 OMPI_MPI_OFFSET_TYPE **fd_end_ptr,
50 int min_fd_size,
51 OMPI_MPI_OFFSET_TYPE *fd_size_ptr,
52 int striping_unit,
53 int nprocs_for_coll){
54
55 OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start=NULL, *fd_end=NULL, fd_size;
56 int i;
57
58 min_st_offset = start_offsets[0];
59 max_end_offset = end_offsets[0];
60
61 for (i=0; i< fh->f_size; i++){
62 min_st_offset = OMPIO_MIN(min_st_offset, start_offsets[i]);
63 max_end_offset = OMPIO_MAX(max_end_offset, end_offsets[i]);
64
65 }
66
67 fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1)/nprocs_for_coll;
68
69 if (fd_size < min_fd_size)
70 fd_size = min_fd_size;
71
72 *fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *)
73 malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
74
75 if ( NULL == *fd_st_ptr ) {
76 return OMPI_ERR_OUT_OF_RESOURCE;
77 }
78
79 *fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *)
80 malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
81
82 if ( NULL == *fd_end_ptr ) {
83 return OMPI_ERR_OUT_OF_RESOURCE;
84 }
85
86
87 fd_start = *fd_st_ptr;
88 fd_end = *fd_end_ptr;
89
90
91 if (striping_unit > 0){
92
93 int rem_front, rem_back;
94 OMPI_MPI_OFFSET_TYPE end_off;
95
96 fd_start[0] = min_st_offset;
97 end_off = fd_start[0] + fd_size;
98 rem_front = end_off % striping_unit;
99 rem_back = striping_unit - rem_front;
100 if (rem_front < rem_back)
101 end_off -= rem_front;
102 else
103 end_off += rem_back;
104 fd_end[0] = end_off - 1;
105
106
107 for (i=1; i<nprocs_for_coll; i++) {
108 fd_start[i] = fd_end[i-1] + 1;
109 end_off = min_st_offset + fd_size * (i+1);
110 rem_front = end_off % striping_unit;
111 rem_back = striping_unit - rem_front;
112 if (rem_front < rem_back)
113 end_off -= rem_front;
114 else
115 end_off += rem_back;
116 fd_end[i] = end_off - 1;
117 }
118 fd_end[nprocs_for_coll-1] = max_end_offset;
119 }
120 else{
121 fd_start[0] = min_st_offset;
122 fd_end[0] = min_st_offset + fd_size - 1;
123
124 for (i=1; i<nprocs_for_coll; i++) {
125 fd_start[i] = fd_end[i-1] + 1;
126 fd_end[i] = fd_start[i] + fd_size - 1;
127 }
128
129 }
130
131 for (i=0; i<nprocs_for_coll; i++) {
132 if (fd_start[i] > max_end_offset)
133 fd_start[i] = fd_end[i] = -1;
134 if (fd_end[i] > max_end_offset)
135 fd_end[i] = max_end_offset;
136 }
137
138 *fd_size_ptr = fd_size;
139 *min_st_offset_ptr = min_st_offset;
140
141 return OMPI_SUCCESS;
142 }
143
144
145
146 int mca_fcoll_two_phase_calc_aggregator(ompio_file_t *fh,
147 OMPI_MPI_OFFSET_TYPE off,
148 OMPI_MPI_OFFSET_TYPE min_off,
149 OMPI_MPI_OFFSET_TYPE *len,
150 OMPI_MPI_OFFSET_TYPE fd_size,
151 OMPI_MPI_OFFSET_TYPE *fd_start,
152 OMPI_MPI_OFFSET_TYPE *fd_end,
153 int striping_unit,
154 int num_aggregators,
155 int *aggregator_list)
156 {
157
158
159 int rank_index, rank;
160 OMPI_MPI_OFFSET_TYPE avail_bytes;
161
162 rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
163
164 if (striping_unit > 0){
165 rank_index = 0;
166 while (off > fd_end[rank_index]) rank_index++;
167 }
168
169
170 if (rank_index >= num_aggregators || rank_index < 0) {
171 fprintf(stderr,
172 "Error in ompi_io_ompio_calcl_aggregator():");
173 fprintf(stderr,
174 "rank_index(%d) >= num_aggregators(%d)fd_size=%lld off=%lld\n",
175 rank_index,num_aggregators,fd_size,off);
176 ompi_mpi_abort(&ompi_mpi_comm_world.comm, 1);
177 }
178
179
180 avail_bytes = fd_end[rank_index] + 1 - off;
181 if (avail_bytes < *len){
182 *len = avail_bytes;
183 }
184
185 rank = aggregator_list[rank_index];
186
187 #if 0
188 printf("rank : %d, rank_index : %d\n",rank, rank_index);
189 #endif
190
191 return rank;
192 }
193
194 int mca_fcoll_two_phase_calc_others_requests(ompio_file_t *fh,
195 int count_my_req_procs,
196 int *count_my_req_per_proc,
197 mca_common_ompio_access_array_t *my_req,
198 int *count_others_req_procs_ptr,
199 mca_common_ompio_access_array_t **others_req_ptr)
200 {
201
202
203 int *count_others_req_per_proc=NULL, count_others_req_procs;
204 int i,j, ret=OMPI_SUCCESS;
205 MPI_Request *requests=NULL;
206 mca_common_ompio_access_array_t *others_req=NULL;
207
208 count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int));
209
210 if ( NULL == count_others_req_per_proc ) {
211 return OMPI_ERR_OUT_OF_RESOURCE;
212 }
213
214
215 ret = fh->f_comm->c_coll->coll_alltoall (count_my_req_per_proc,
216 1,
217 MPI_INT,
218 count_others_req_per_proc,
219 1,
220 MPI_INT,
221 fh->f_comm,
222 fh->f_comm->c_coll->coll_alltoall_module);
223 if ( OMPI_SUCCESS != ret ) {
224 return ret;
225 }
226
227 #if 0
228 for( i = 0; i< fh->f_size; i++){
229 printf("my: %d, others: %d\n",count_my_req_per_proc[i],
230 count_others_req_per_proc[i]);
231
232 }
233 #endif
234
235 *others_req_ptr = (mca_common_ompio_access_array_t *) malloc
236 (fh->f_size*sizeof(mca_common_ompio_access_array_t));
237 others_req = *others_req_ptr;
238
239 count_others_req_procs = 0;
240 for (i=0; i<fh->f_size; i++) {
241 if (count_others_req_per_proc[i]) {
242 others_req[i].count = count_others_req_per_proc[i];
243 others_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
244 malloc(count_others_req_per_proc[i]*sizeof(OMPI_MPI_OFFSET_TYPE));
245 others_req[i].lens = (int *)
246 malloc(count_others_req_per_proc[i]*sizeof(int));
247 others_req[i].mem_ptrs = (MPI_Aint *)
248 malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
249 count_others_req_procs++;
250 }
251 else
252 others_req[i].count = 0;
253 }
254
255
256 requests = (MPI_Request *)
257 malloc(1+2*(count_my_req_procs+count_others_req_procs)*
258 sizeof(MPI_Request));
259
260 if ( NULL == requests ) {
261 ret = OMPI_ERR_OUT_OF_RESOURCE;
262 goto exit;
263 }
264
265 j = 0;
266 for (i=0; i<fh->f_size; i++){
267 if (others_req[i].count){
268 ret = MCA_PML_CALL(irecv(others_req[i].offsets,
269 others_req[i].count,
270 OMPI_OFFSET_DATATYPE,
271 i,
272 i+fh->f_rank,
273 fh->f_comm,
274 &requests[j]));
275 if ( OMPI_SUCCESS != ret ) {
276 goto exit;
277 }
278
279 j++;
280
281 ret = MCA_PML_CALL(irecv(others_req[i].lens,
282 others_req[i].count,
283 MPI_INT,
284 i,
285 i+fh->f_rank+1,
286 fh->f_comm,
287 &requests[j]));
288 if ( OMPI_SUCCESS != ret ) {
289 goto exit;
290 }
291
292 j++;
293 }
294 }
295
296
297 for (i=0; i < fh->f_size; i++) {
298 if (my_req[i].count) {
299 ret = MCA_PML_CALL(isend(my_req[i].offsets,
300 my_req[i].count,
301 OMPI_OFFSET_DATATYPE,
302 i,
303 i+fh->f_rank,
304 MCA_PML_BASE_SEND_STANDARD,
305 fh->f_comm,
306 &requests[j]));
307 if ( OMPI_SUCCESS != ret ) {
308 goto exit;
309 }
310
311 j++;
312 ret = MCA_PML_CALL(isend(my_req[i].lens,
313 my_req[i].count,
314 MPI_INT,
315 i,
316 i+fh->f_rank+1,
317 MCA_PML_BASE_SEND_STANDARD,
318 fh->f_comm,
319 &requests[j]));
320 if ( OMPI_SUCCESS != ret ) {
321 goto exit;
322 }
323
324 j++;
325 }
326 }
327
328 if (j) {
329 ret = ompi_request_wait_all ( j, requests, MPI_STATUSES_IGNORE );
330 if ( OMPI_SUCCESS != ret ) {
331 return ret;
332 }
333 }
334
335 *count_others_req_procs_ptr = count_others_req_procs;
336
337 exit:
338 if ( NULL != requests ) {
339 free(requests);
340 }
341 if ( NULL != count_others_req_per_proc ) {
342 free(count_others_req_per_proc);
343 }
344
345
346 return ret;
347 }
348
349
350 int mca_fcoll_two_phase_calc_my_requests (ompio_file_t *fh,
351 struct iovec *offset_len,
352 int contig_access_count,
353 OMPI_MPI_OFFSET_TYPE min_st_offset,
354 OMPI_MPI_OFFSET_TYPE *fd_start,
355 OMPI_MPI_OFFSET_TYPE *fd_end,
356 OMPI_MPI_OFFSET_TYPE fd_size,
357 int *count_my_req_procs_ptr,
358 int **count_my_req_per_proc_ptr,
359 mca_common_ompio_access_array_t **my_req_ptr,
360 size_t **buf_indices,
361 int striping_unit,
362 int num_aggregators,
363 int *aggregator_list)
364 {
365 int ret = MPI_SUCCESS;
366 int *count_my_req_per_proc, count_my_req_procs;
367 size_t *buf_idx = NULL;
368 int i, l, proc;
369 OMPI_MPI_OFFSET_TYPE fd_len, rem_len, curr_idx, off;
370 mca_common_ompio_access_array_t *my_req = NULL;
371
372
373 *count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int));
374
375 if ( NULL == *count_my_req_per_proc_ptr ){
376 return OMPI_ERR_OUT_OF_RESOURCE;
377 }
378
379 count_my_req_per_proc = *count_my_req_per_proc_ptr;
380
381 for (i=0;i<fh->f_size;i++){
382 count_my_req_per_proc[i] = 0;
383 }
384
385 buf_idx = (size_t *) malloc (fh->f_size * sizeof(size_t));
386
387 if ( NULL == buf_idx ){
388 return OMPI_ERR_OUT_OF_RESOURCE;
389 }
390
391 for (i=0; i < fh->f_size; i++) buf_idx[i] = -1;
392
393 for (i=0;i<contig_access_count; i++){
394
395 if (offset_len[i].iov_len==0)
396 continue;
397 off = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_len[i].iov_base;
398 fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
399 proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset, &fd_len, fd_size,
400 fd_start, fd_end, striping_unit, num_aggregators,aggregator_list);
401 count_my_req_per_proc[proc]++;
402 rem_len = offset_len[i].iov_len - fd_len;
403
404 while (rem_len != 0) {
405 off += fd_len;
406 fd_len = rem_len;
407 proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset, &fd_len,
408 fd_size, fd_start, fd_end, striping_unit,
409 num_aggregators, aggregator_list);
410
411 count_my_req_per_proc[proc]++;
412 rem_len -= fd_len;
413 }
414
415 }
416
417
418 *my_req_ptr = (mca_common_ompio_access_array_t *)
419 malloc (fh->f_size * sizeof(mca_common_ompio_access_array_t));
420 if ( NULL == *my_req_ptr ) {
421 ret = OMPI_ERR_OUT_OF_RESOURCE;
422 goto err_exit;
423 }
424 my_req = *my_req_ptr;
425
426 count_my_req_procs = 0;
427 for (i = 0; i < fh->f_size; i++){
428 if(count_my_req_per_proc[i]) {
429 my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
430 malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE));
431
432 if ( NULL == my_req[i].offsets ) {
433 ret = OMPI_ERR_OUT_OF_RESOURCE;
434 goto err_exit;
435 }
436
437 my_req[i].lens = (int *)
438 malloc(count_my_req_per_proc[i] * sizeof(int));
439
440 if ( NULL == my_req[i].lens ) {
441 ret = OMPI_ERR_OUT_OF_RESOURCE;
442 goto err_exit;
443 }
444 count_my_req_procs++;
445 }
446 my_req[i].count = 0;
447 }
448 curr_idx = 0;
449 for (i=0; i<contig_access_count; i++) {
450 if ((int)offset_len[i].iov_len == 0)
451 continue;
452 off = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_len[i].iov_base;
453 fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
454 proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset, &fd_len,
455 fd_size, fd_start, fd_end,
456 striping_unit, num_aggregators,
457 aggregator_list);
458 if (buf_idx[proc] == (size_t) -1){
459 buf_idx[proc] = (int) curr_idx;
460 }
461 l = my_req[proc].count;
462 curr_idx += fd_len;
463 rem_len = offset_len[i].iov_len - fd_len;
464 my_req[proc].offsets[l] = off;
465 my_req[proc].lens[l] = (int)fd_len;
466 my_req[proc].count++;
467
468 while (rem_len != 0) {
469 off += fd_len;
470 fd_len = rem_len;
471 proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset,
472 &fd_len, fd_size, fd_start,
473 fd_end, striping_unit,
474 num_aggregators,
475 aggregator_list);
476
477 if (buf_idx[proc] == (size_t) -1){
478 buf_idx[proc] = (int) curr_idx;
479 }
480
481 l = my_req[proc].count;
482 curr_idx += fd_len;
483 rem_len -= fd_len;
484
485 my_req[proc].offsets[l] = off;
486 my_req[proc].lens[l] = (int) fd_len;
487 my_req[proc].count++;
488
489 }
490
491 }
492
493 #if 0
494 for (i=0; i<fh->f_size; i++) {
495 if (count_my_req_per_proc[i] > 0) {
496 fprintf(stdout, "data needed from %d (count = %d):\n", i,
497 my_req[i].count);
498 for (l=0; l < my_req[i].count; l++) {
499 fprintf(stdout, " %d: off[%d] = %lld, len[%d] = %d\n", fh->f_rank, l,
500 my_req[i].offsets[l], l, my_req[i].lens[l]);
501 }
502 fprintf(stdout, "%d: buf_idx[%d] = 0x%x\n", fh->f_rank, i, buf_idx[i]);
503 }
504 }
505 #endif
506
507
508 *count_my_req_procs_ptr = count_my_req_procs;
509 *buf_indices = buf_idx;
510
511 return ret;
512 err_exit:
513 if (NULL != my_req) {
514 for (i = 0; i < fh->f_size; i++) {
515 if (NULL != my_req[i].offsets) {
516 free(my_req[i].offsets);
517 }
518 if (NULL != my_req[i].lens) {
519 free(my_req[i].lens);
520 }
521 }
522 }
523 if (NULL != buf_idx) {
524 free(buf_idx);
525 }
526 return ret;
527 }
528