This source file includes following definitions.
- ADIOI_Calc_aggregator
- ADIOI_Calc_file_domains
- ADIOI_Calc_my_req
- ADIOI_Calc_others_req
- ADIOI_Icalc_others_req
- ADIOI_Icalc_others_req_main
- ADIOI_Icalc_others_req_fini
   1 
   2 
   3 
   4 
   5 
   6 
   7 #include "adio.h"
   8 #include "adio_extern.h"
   9 
  10 #ifdef AGGREGATION_PROFILE
  11 #include "mpe.h"
  12 #endif
  13 
  14 #undef AGG_DEBUG
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 
  32 
  33 
  34 
  35 
  36 
  37 
  38 
  39 
  40 
  41 
  42 
  43 
  44 
  45 
  46 
  47 
  48 
  49 
  50 
  51 
  52 
  53 
  54 
  55 
  56 
  57 
  58 
  59 
  60 
  61 
  62 
  63 
  64 
  65 
  66 
  67 
  68 
  69 
  70 
  71 
  72 
  73 int ADIOI_Calc_aggregator(ADIO_File fd,
  74                          ADIO_Offset off, 
  75                          ADIO_Offset min_off, 
  76                          ADIO_Offset *len, 
  77                          ADIO_Offset fd_size,
  78                          ADIO_Offset *fd_start,
  79                          ADIO_Offset *fd_end)
  80 {
  81     int rank_index, rank;
  82     ADIO_Offset avail_bytes;
  83 
  84     ADIOI_UNREFERENCED_ARG(fd_start);
  85 
  86     
  87     rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
  88 
  89     if (fd->hints->striping_unit > 0) {
  90         
  91 
  92 
  93 
  94         rank_index = 0;
  95         while (off > fd_end[rank_index]) rank_index++;
  96     }
  97 
  98     
  99 
 100 
 101     if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
 102         FPRINTF(stderr, "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
 103                         rank_index,fd->hints->cb_nodes,fd_size,off);
 104         MPI_Abort(MPI_COMM_WORLD, 1);
 105     }
 106 
 107     
 108 
 109 
 110 
 111 
 112 
 113 
 114     avail_bytes = fd_end[rank_index] + 1 - off;
 115     if (avail_bytes < *len) {
 116         
 117         *len = avail_bytes;
 118     }
 119 
 120     
 121     
 122     rank = fd->hints->ranklist[rank_index];
 123 
 124     return rank;
 125 }
 126 
 127 void ADIOI_Calc_file_domains(ADIO_Offset *st_offsets, ADIO_Offset
 128                              *end_offsets, int nprocs, int nprocs_for_coll,
 129                              ADIO_Offset *min_st_offset_ptr,
 130                              ADIO_Offset **fd_start_ptr, ADIO_Offset 
 131                              **fd_end_ptr, int min_fd_size, 
 132                              ADIO_Offset *fd_size_ptr,
 133                              int striping_unit)
 134 {
 135 
 136 
 137 
 138 
 139     ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
 140     int i;
 141 
 142 #ifdef AGGREGATION_PROFILE
 143     MPE_Log_event (5004, 0, NULL);
 144 #endif
 145 
 146 #ifdef AGG_DEBUG
 147     FPRINTF(stderr, "ADIOI_Calc_file_domains: %d aggregator(s)\n", 
 148             nprocs_for_coll);
 149 #endif
 150 
 151 
 152 
 153     min_st_offset = st_offsets[0];
 154     max_end_offset = end_offsets[0];
 155 
 156     for (i=1; i<nprocs; i++) {
 157         min_st_offset = ADIOI_MIN(min_st_offset, st_offsets[i]);
 158         max_end_offset = ADIOI_MAX(max_end_offset, end_offsets[i]);
 159     }
 160 
 161 
 162 
 163 
 164 
 165  
 166     fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll -
 167                1)/nprocs_for_coll; 
 168     
 169 
 170     
 171 
 172 
 173 
 174 
 175     if (fd_size < min_fd_size)
 176         fd_size = min_fd_size;
 177 
 178     *fd_start_ptr = (ADIO_Offset *)
 179         ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset)); 
 180     *fd_end_ptr = (ADIO_Offset *)
 181         ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset)); 
 182 
 183     fd_start = *fd_start_ptr;
 184     fd_end = *fd_end_ptr;
 185 
 186     
 187 
 188 
 189     if (striping_unit > 0) {
 190         ADIO_Offset end_off;
 191         int         rem_front, rem_back;
 192 
 193         
 194         fd_start[0] = min_st_offset;
 195         end_off     = fd_start[0] + fd_size;
 196         rem_front   = end_off % striping_unit;
 197         rem_back    = striping_unit - rem_front;
 198         if (rem_front < rem_back) 
 199                 end_off -= rem_front;
 200         else                      
 201                 end_off += rem_back;
 202         fd_end[0] = end_off - 1;
 203     
 204         
 205         for (i=1; i<nprocs_for_coll; i++) {
 206             fd_start[i] = fd_end[i-1] + 1;
 207             end_off     = min_st_offset + fd_size * (i+1);
 208             rem_front   = end_off % striping_unit;
 209             rem_back    = striping_unit - rem_front;
 210             if (rem_front < rem_back) 
 211                     end_off -= rem_front;
 212             else                      
 213                     end_off += rem_back;
 214             fd_end[i] = end_off - 1;
 215         }
 216         fd_end[nprocs_for_coll-1] = max_end_offset;
 217     }
 218     else { 
 219         fd_start[0] = min_st_offset;
 220         fd_end[0] = min_st_offset + fd_size - 1;
 221 
 222         for (i=1; i<nprocs_for_coll; i++) {
 223             fd_start[i] = fd_end[i-1] + 1;
 224             fd_end[i] = fd_start[i] + fd_size - 1;
 225         }
 226     }
 227 
 228 
 229 
 230 
 231 
 232 
 233 
 234     for (i=0; i<nprocs_for_coll; i++) {
 235         if (fd_start[i] > max_end_offset)
 236             fd_start[i] = fd_end[i] = -1;
 237         if (fd_end[i] > max_end_offset)
 238             fd_end[i] = max_end_offset;
 239     }
 240 
 241     *fd_size_ptr = fd_size;
 242     *min_st_offset_ptr = min_st_offset;
 243 
 244 #ifdef AGGREGATION_PROFILE
 245     MPE_Log_event (5005, 0, NULL);
 246 #endif
 247 }
 248 
 249 
 250 
 251 
 252 
 253 
 254 void ADIOI_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, ADIO_Offset *len_list, 
 255                        int contig_access_count, ADIO_Offset 
 256                        min_st_offset, ADIO_Offset *fd_start,
 257                        ADIO_Offset *fd_end, ADIO_Offset fd_size,
 258                        int nprocs,
 259                        int *count_my_req_procs_ptr,
 260                        int **count_my_req_per_proc_ptr,
 261                        ADIOI_Access **my_req_ptr,
 262                        int **buf_idx_ptr)
 263 
 264 
 265 {
 266     int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
 267     int i, l, proc;
 268     ADIO_Offset fd_len, rem_len, curr_idx, off;
 269     ADIOI_Access *my_req;
 270 
 271 #ifdef AGGREGATION_PROFILE
 272     MPE_Log_event (5024, 0, NULL);
 273 #endif
 274 
 275     *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs,sizeof(int)); 
 276     count_my_req_per_proc = *count_my_req_per_proc_ptr;
 277 
 278 
 279 
 280 
 281 
 282     buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 283 
 284 
 285 
 286 
 287    
 288     
 289     for (i=0; i < nprocs; i++) buf_idx[i] = -1;
 290 
 291     
 292 
 293 
 294     for (i=0; i < contig_access_count; i++) {
 295         
 296 
 297         if (len_list[i] == 0) 
 298                 continue;
 299         off = offset_list[i];
 300         fd_len = len_list[i];
 301         
 302 
 303 
 304 
 305 
 306         proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, 
 307                                      fd_start, fd_end);
 308         count_my_req_per_proc[proc]++;
 309 
 310         
 311 
 312 
 313 
 314         rem_len = len_list[i] - fd_len;
 315 
 316         while (rem_len != 0) {
 317             off += fd_len; 
 318             fd_len = rem_len; 
 319             proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, 
 320                                          fd_size, fd_start, fd_end);
 321 
 322             count_my_req_per_proc[proc]++;
 323             rem_len -= fd_len; 
 324         }
 325     }
 326 
 327 
 328 
 329     *my_req_ptr = (ADIOI_Access *)
 330         ADIOI_Malloc(nprocs*sizeof(ADIOI_Access)); 
 331     my_req = *my_req_ptr;
 332 
 333     count_my_req_procs = 0;
 334     for (i=0; i < nprocs; i++) {
 335         if (count_my_req_per_proc[i]) {
 336             my_req[i].offsets = (ADIO_Offset *)
 337                 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
 338             my_req[i].lens =
 339                 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
 340             count_my_req_procs++;
 341         }           
 342         my_req[i].count = 0;  
 343 
 344     }
 345 
 346 
 347     curr_idx = 0;
 348     for (i=0; i<contig_access_count; i++) { 
 349         
 350 
 351         if (len_list[i] == 0)
 352                 continue;
 353         off = offset_list[i];
 354         fd_len = len_list[i];
 355         proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, 
 356                                      fd_start, fd_end);
 357 
 358         
 359         if (buf_idx[proc] == -1) 
 360   {
 361     ADIOI_Assert(curr_idx == (int) curr_idx);
 362     buf_idx[proc] = (int) curr_idx;
 363   }
 364 
 365         l = my_req[proc].count;
 366         curr_idx += fd_len; 
 367 
 368         rem_len = len_list[i] - fd_len;
 369 
 370         
 371 
 372 
 373 
 374 
 375         my_req[proc].offsets[l] = off;
 376         my_req[proc].lens[l] = fd_len;
 377         my_req[proc].count++;
 378 
 379         while (rem_len != 0) {
 380             off += fd_len;
 381             fd_len = rem_len;
 382             proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, 
 383                                          fd_size, fd_start, fd_end);
 384 
 385             if (buf_idx[proc] == -1) 
 386       {
 387         ADIOI_Assert(curr_idx == (int) curr_idx);
 388         buf_idx[proc] = (int) curr_idx;
 389       }
 390 
 391             l = my_req[proc].count;
 392             curr_idx += fd_len;
 393             rem_len -= fd_len;
 394 
 395             my_req[proc].offsets[l] = off;
 396             my_req[proc].lens[l] = fd_len;
 397             my_req[proc].count++;
 398         }
 399     }
 400 
 401 #ifdef AGG_DEBUG
 402     for (i=0; i<nprocs; i++) {
 403         if (count_my_req_per_proc[i] > 0) {
 404             FPRINTF(stdout, "data needed from %d (count = %d):\n", i, 
 405                     my_req[i].count);
 406             for (l=0; l < my_req[i].count; l++) {
 407                 FPRINTF(stdout, "   off[%d] = %lld, len[%d] = %d\n", l,
 408                         my_req[i].offsets[l], l, my_req[i].lens[l]);
 409             }
 410         FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
 411         }
 412     }
 413 #endif
 414 
 415     *count_my_req_procs_ptr = count_my_req_procs;
 416     *buf_idx_ptr = buf_idx;
 417 #ifdef AGGREGATION_PROFILE
 418     MPE_Log_event (5025, 0, NULL);
 419 #endif
 420 }
 421 
 422 
 423 
 424 void ADIOI_Calc_others_req(ADIO_File fd, int count_my_req_procs, 
 425                                 int *count_my_req_per_proc,
 426                                 ADIOI_Access *my_req, 
 427                                 int nprocs, int myrank,
 428                                 int *count_others_req_procs_ptr,
 429                                 ADIOI_Access **others_req_ptr)  
 430 {
 431 
 432 
 433 
 434 
 435 
 436 
 437 
 438 
 439     int *count_others_req_per_proc, count_others_req_procs;
 440     int i, j;
 441     MPI_Request *requests;
 442     MPI_Status *statuses;
 443     ADIOI_Access *others_req;
 444 
 445 
 446 #ifdef AGGREGATION_PROFILE
 447     MPE_Log_event (5026, 0, NULL);
 448 #endif
 449     count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 450 
 451     MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
 452                  count_others_req_per_proc, 1, MPI_INT, fd->comm);
 453 
 454     *others_req_ptr = (ADIOI_Access *)
 455         ADIOI_Malloc(nprocs*sizeof(ADIOI_Access)); 
 456     others_req = *others_req_ptr;
 457 
 458     count_others_req_procs = 0;
 459     for (i=0; i<nprocs; i++) {
 460         if (count_others_req_per_proc[i]) {
 461             others_req[i].count = count_others_req_per_proc[i];
 462             others_req[i].offsets = (ADIO_Offset *)
 463                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
 464             others_req[i].lens =
 465                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
 466             others_req[i].mem_ptrs = (MPI_Aint *)
 467                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint)); 
 468             count_others_req_procs++;
 469         }
 470         else others_req[i].count = 0;
 471     }
 472     
 473 
 474 
 475     requests = (MPI_Request *)
 476         ADIOI_Malloc(1+2*(count_my_req_procs+count_others_req_procs)*sizeof(MPI_Request)); 
 477 
 478 
 479     j = 0;
 480     for (i=0; i<nprocs; i++) {
 481         if (others_req[i].count) {
 482             MPI_Irecv(others_req[i].offsets, others_req[i].count, 
 483                       ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
 484             j++;
 485             MPI_Irecv(others_req[i].lens, others_req[i].count, 
 486                       ADIO_OFFSET, i, i+myrank+1, fd->comm, &requests[j]);
 487             j++;
 488         }
 489     }
 490 
 491     for (i=0; i < nprocs; i++) {
 492         if (my_req[i].count) {
 493             MPI_Isend(my_req[i].offsets, my_req[i].count, 
 494                       ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
 495             j++;
 496             MPI_Isend(my_req[i].lens, my_req[i].count, 
 497                       ADIO_OFFSET, i, i+myrank+1, fd->comm, &requests[j]);
 498             j++;
 499         }
 500     }
 501 
 502     if (j) {
 503         statuses = (MPI_Status *) ADIOI_Malloc(j * sizeof(MPI_Status));
 504         MPI_Waitall(j, requests, statuses);
 505         ADIOI_Free(statuses);
 506     }
 507 
 508     ADIOI_Free(requests);
 509     ADIOI_Free(count_others_req_per_proc);
 510 
 511     *count_others_req_procs_ptr = count_others_req_procs;
 512 #ifdef AGGREGATION_PROFILE
 513     MPE_Log_event (5027, 0, NULL);
 514 #endif
 515 }
 516 
 517 
 518 
 519 
 520 
 521 void ADIOI_Icalc_others_req(ADIOI_NBC_Request *nbc_req, int *error_code)
 522 {
 523     ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
 524 
 525     
 526 
 527 
 528     
 529 #ifdef AGGREGATION_PROFILE
 530     MPE_Log_event(5026, 0, NULL);
 531 #endif
 532     vars->count_others_req_per_proc =
 533         (int *)ADIOI_Malloc(vars->nprocs * sizeof(int));
 534 
 535     *error_code = MPI_Ialltoall(vars->count_my_req_per_proc, 1, MPI_INT,
 536             vars->count_others_req_per_proc, 1, MPI_INT, vars->fd->comm,
 537             &vars->req1);
 538 
 539     if (nbc_req->rdwr == ADIOI_READ) {
 540         nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ;
 541     } else {
 542         ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
 543         nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ;
 544     }
 545 }
 546 
 547 void ADIOI_Icalc_others_req_main(ADIOI_NBC_Request *nbc_req, int *error_code)
 548 {
 549     ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
 550     ADIO_File fd = vars->fd;
 551     int count_my_req_procs = vars->count_my_req_procs;
 552     ADIOI_Access *my_req = vars->my_req;
 553     int nprocs = vars->nprocs;
 554     int myrank = vars->myrank;
 555     ADIOI_Access **others_req_ptr = vars->others_req_ptr;
 556 
 557     
 558 
 559 
 560     
 561 
 562 
 563 
 564 
 565     int *count_others_req_per_proc = vars->count_others_req_per_proc;
 566     int count_others_req_procs;
 567     int i, j;
 568     ADIOI_Access *others_req;
 569 
 570     *others_req_ptr = (ADIOI_Access *)ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
 571     others_req = *others_req_ptr;
 572 
 573     count_others_req_procs = 0;
 574     for (i = 0; i < nprocs; i++) {
 575         if (count_others_req_per_proc[i]) {
 576             others_req[i].count = count_others_req_per_proc[i];
 577             others_req[i].offsets = (ADIO_Offset *)
 578                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
 579             others_req[i].lens =
 580                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
 581             others_req[i].mem_ptrs = (MPI_Aint *)
 582                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
 583             count_others_req_procs++;
 584         }
 585         else others_req[i].count = 0;
 586     }
 587     vars->count_others_req_procs = count_others_req_procs;
 588 
 589     
 590 
 591     vars->req2 = (MPI_Request *)
 592         ADIOI_Malloc(1+2*(count_my_req_procs+count_others_req_procs)
 593                      *sizeof(MPI_Request));
 594     
 595 
 596     j = 0;
 597     for (i = 0; i < nprocs; i++) {
 598         if (others_req[i].count) {
 599             MPI_Irecv(others_req[i].offsets, others_req[i].count,
 600                     ADIO_OFFSET, i, i+myrank, fd->comm, &vars->req2[j]);
 601             j++;
 602             MPI_Irecv(others_req[i].lens, others_req[i].count,
 603                     ADIO_OFFSET, i, i+myrank+1, fd->comm, &vars->req2[j]);
 604             j++;
 605         }
 606     }
 607 
 608     for (i=0; i < nprocs; i++) {
 609         if (my_req[i].count) {
 610             MPI_Isend(my_req[i].offsets, my_req[i].count,
 611                     ADIO_OFFSET, i, i+myrank, fd->comm, &vars->req2[j]);
 612             j++;
 613             MPI_Isend(my_req[i].lens, my_req[i].count,
 614                     ADIO_OFFSET, i, i+myrank+1, fd->comm, &vars->req2[j]);
 615             j++;
 616         }
 617     }
 618 
 619     
 620     vars->num_req2 = j;
 621 
 622     if (nbc_req->rdwr == ADIOI_READ) {
 623         nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ_MAIN;
 624     } else {
 625         ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
 626         nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ_MAIN;
 627     }
 628 }
 629 
 630 void ADIOI_Icalc_others_req_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
 631 {
 632     ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
 633     void (*next_fn)(ADIOI_NBC_Request *, int *);
 634 
 635     ADIOI_Free(vars->req2);
 636     ADIOI_Free(vars->count_others_req_per_proc);
 637 
 638     *vars->count_others_req_procs_ptr = vars->count_others_req_procs;
 639 #ifdef AGGREGATION_PROFILE
 640     MPE_Log_event(5027, 0, NULL);
 641 #endif
 642     
 643 
 644     next_fn = vars->next_fn;
 645 
 646     
 647     ADIOI_Free(vars);
 648     nbc_req->cor_vars = NULL;
 649 
 650     
 651     next_fn(nbc_req, error_code);
 652 }
 653