This source file includes following definitions.
- ADIOI_GEN_IwriteStridedColl
- ADIOI_GEN_IwriteStridedColl_inter
- ADIOI_GEN_IwriteStridedColl_indio
- ADIOI_GEN_IwriteStridedColl_exch
- ADIOI_GEN_IwriteStridedColl_bcast
- ADIOI_GEN_IwriteStridedColl_free
- ADIOI_GEN_IwriteStridedColl_fini
- ADIOI_Iexch_and_write
- ADIOI_Iexch_and_write_l1_begin
- ADIOI_Iexch_and_write_l1_body
- ADIOI_Iexch_and_write_l1_end
- ADIOI_Iexch_and_write_reset
- ADIOI_Iexch_and_write_l2_begin
- ADIOI_Iexch_and_write_l2_end
- ADIOI_Iexch_and_write_fini
- ADIOI_W_Iexchange_data
- ADIOI_W_Iexchange_data_hole
- ADIOI_W_Iexchange_data_send
- ADIOI_W_Iexchange_data_wait
- ADIOI_W_Iexchange_data_fini
- ADIOI_GEN_iwc_query_fn
- ADIOI_GEN_iwc_free_fn
- ADIOI_GEN_iwc_poll_fn
- ADIOI_GEN_iwc_wait_fn
1
2
3
4
5
6
7 #include "adio.h"
8 #include "adio_extern.h"
9 #include "mpiu_greq.h"
10 #include "mpioimpl.h"
11
12 #ifdef AGGREGATION_PROFILE
13 #include "mpe.h"
14 #endif
15
16 #ifdef HAVE_MPI_GREQUEST_EXTENSIONS
17
18
19 struct ADIOI_GEN_IwriteStridedColl_vars {
20
21 MPI_Request req_offset[2];
22 MPI_Request req_ind_io;
23 MPI_Request req_err;
24
25
26 ADIO_File fd;
27 const void *buf;
28 int count;
29 MPI_Datatype datatype;
30 int file_ptr_type;
31 ADIO_Offset offset;
32
33
34 ADIOI_Access *my_req;
35
36
37
38 ADIOI_Access *others_req;
39
40
41
42 int nprocs;
43 int nprocs_for_coll;
44 int myrank;
45 int contig_access_count;
46 int interleave_count;
47 int buftype_is_contig;
48 int *count_my_req_per_proc;
49 int count_my_req_procs;
50 int count_others_req_procs;
51 ADIO_Offset start_offset;
52 ADIO_Offset end_offset;
53 ADIO_Offset orig_fp;
54 ADIO_Offset fd_size;
55 ADIO_Offset min_st_offset;
56 ADIO_Offset *offset_list;
57 ADIO_Offset *st_offsets;
58 ADIO_Offset *fd_start;
59 ADIO_Offset *fd_end;
60 ADIO_Offset *end_offsets;
61 int *buf_idx;
62 ADIO_Offset *len_list;
63 int old_error;
64 int tmp_error;
65 int error_code;
66 };
67
68
69 struct ADIOI_Iexch_and_write_vars {
70
71 MPI_Request req1;
72 MPI_Request req3;
73
74
75 ADIO_File fd;
76 void *buf;
77 MPI_Datatype datatype;
78 int nprocs;
79 int myrank;
80 ADIOI_Access *others_req;
81 ADIO_Offset *offset_list;
82 ADIO_Offset *len_list;
83 int contig_access_count;
84 ADIO_Offset min_st_offset;
85 ADIO_Offset fd_size;
86 ADIO_Offset *fd_start;
87 ADIO_Offset *fd_end;
88 int *buf_idx;
89
90
91
92 ADIO_Offset size;
93 int hole;
94 int m;
95 int ntimes;
96 int max_ntimes;
97 int buftype_is_contig;
98 ADIO_Offset st_loc;
99 ADIO_Offset end_loc;
100 ADIO_Offset off;
101 ADIO_Offset done;
102 char *write_buf;
103 int *curr_offlen_ptr;
104 int *count;
105 int *send_size;
106 int *recv_size;
107 int *partial_recv;
108 int *sent_to_proc;
109 int *start_pos;
110 int *send_buf_idx;
111 int *curr_to_proc;
112 int *done_to_proc;
113 ADIOI_Flatlist_node *flat_buf;
114 MPI_Aint buftype_extent;
115 int coll_bufsize;
116
117
118 void (*next_fn)(ADIOI_NBC_Request *, int *);
119 };
120
121
122 struct ADIOI_W_Iexchange_data_vars {
123
124 MPI_Request req1;
125 MPI_Request req2;
126 MPI_Request *req3;
127
128
129 ADIO_File fd;
130 void *buf;
131 char *write_buf;
132 ADIOI_Flatlist_node *flat_buf;
133 ADIO_Offset *offset_list;
134 ADIO_Offset *len_list;
135 int *send_size;
136 int *recv_size;
137 ADIO_Offset off;
138 int size;
139 int *count;
140 int *start_pos;
141 int *partial_recv;
142 int *sent_to_proc;
143 int nprocs;
144 int myrank;
145 int buftype_is_contig;
146 int contig_access_count;
147 ADIO_Offset min_st_offset;
148 ADIO_Offset fd_size;
149 ADIO_Offset *fd_start;
150 ADIO_Offset *fd_end;
151 ADIOI_Access *others_req;
152 int *send_buf_idx;
153 int *curr_to_proc;
154 int *done_to_proc;
155 int *hole;
156 int iter;
157 MPI_Aint buftype_extent;
158 int *buf_idx;
159
160
161 int nprocs_recv;
162 int nprocs_send;
163 int err;
164 char **send_buf;
165 MPI_Request *requests;
166 MPI_Request *send_req;
167 MPI_Datatype *recv_types;
168 int sum;
169 ADIO_Offset *srt_off;
170
171
172 void (*next_fn)(ADIOI_NBC_Request *, int *);
173 };
174
175
176 void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
177 *flat_buf, char **send_buf, ADIO_Offset
178 *offset_list, ADIO_Offset *len_list, int *send_size,
179 MPI_Request *requests, int *sent_to_proc,
180 int nprocs, int myrank,
181 int contig_access_count, ADIO_Offset
182 min_st_offset, ADIO_Offset fd_size,
183 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
184 int *send_buf_idx, int *curr_to_proc,
185 int *done_to_proc, int iter,
186 MPI_Aint buftype_extent);
187 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
188 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
189 int nprocs, int nprocs_recv, int total_elements);
190
191
192
193 static void ADIOI_GEN_IwriteStridedColl_inter(ADIOI_NBC_Request *, int *);
194 static void ADIOI_GEN_IwriteStridedColl_indio(ADIOI_NBC_Request *, int *);
195 static void ADIOI_GEN_IwriteStridedColl_exch(ADIOI_NBC_Request *, int *);
196 static void ADIOI_GEN_IwriteStridedColl_bcast(ADIOI_NBC_Request *, int *);
197 static void ADIOI_GEN_IwriteStridedColl_free(ADIOI_NBC_Request *, int *);
198 static void ADIOI_GEN_IwriteStridedColl_fini(ADIOI_NBC_Request *, int *);
199
200 static void ADIOI_Iexch_and_write(ADIOI_NBC_Request *, int *);
201 static void ADIOI_Iexch_and_write_l1_begin(ADIOI_NBC_Request *, int *);
202 static void ADIOI_Iexch_and_write_l1_body(ADIOI_NBC_Request *, int *);
203 static void ADIOI_Iexch_and_write_l1_end(ADIOI_NBC_Request *, int *);
204 static void ADIOI_Iexch_and_write_reset(ADIOI_NBC_Request *, int *);
205 static void ADIOI_Iexch_and_write_l2_begin(ADIOI_NBC_Request *, int *);
206 static void ADIOI_Iexch_and_write_l2_end(ADIOI_NBC_Request *, int *);
207 static void ADIOI_Iexch_and_write_fini(ADIOI_NBC_Request *, int *);
208
209 static void ADIOI_W_Iexchange_data(ADIOI_NBC_Request *, int *);
210 static void ADIOI_W_Iexchange_data_hole(ADIOI_NBC_Request *, int *);
211 static void ADIOI_W_Iexchange_data_send(ADIOI_NBC_Request *, int *);
212 static void ADIOI_W_Iexchange_data_wait(ADIOI_NBC_Request *, int *);
213 static void ADIOI_W_Iexchange_data_fini(ADIOI_NBC_Request *, int *);
214
215 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
216 static int ADIOI_GEN_iwc_query_fn(void *extra_state, MPI_Status *status);
217 static int ADIOI_GEN_iwc_free_fn(void *extra_state);
218 static int ADIOI_GEN_iwc_poll_fn(void *extra_state, MPI_Status *status);
219 static int ADIOI_GEN_iwc_wait_fn(int count, void **array_of_states,
220 double timeout, MPI_Status *status);
221
222
223
224 void ADIOI_GEN_IwriteStridedColl(ADIO_File fd, const void *buf, int count,
225 MPI_Datatype datatype, int file_ptr_type,
226 ADIO_Offset offset, MPI_Request *request,
227 int *error_code)
228 {
229
230
231
232
233
234
235 ADIOI_NBC_Request *nbc_req = NULL;
236 ADIOI_GEN_IwriteStridedColl_vars *vars = NULL;
237 int nprocs, myrank;
238
239 #if 0
240
241 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
242
243
244 ADIOI_IOIstridedColl(fd, (char *) buf, count, ADIOI_WRITE, datatype,
245 file_ptr_type, offset, request, error_code);
246 return;
247 }
248 #endif
249
250
251 nbc_req = (ADIOI_NBC_Request *)ADIOI_Calloc(1, sizeof(ADIOI_NBC_Request));
252 nbc_req->rdwr = ADIOI_WRITE;
253
254
255 if (ADIOI_GEN_greq_class == 0) {
256 MPIX_Grequest_class_create(ADIOI_GEN_iwc_query_fn,
257 ADIOI_GEN_iwc_free_fn, MPIU_Greq_cancel_fn,
258 ADIOI_GEN_iwc_poll_fn, ADIOI_GEN_iwc_wait_fn,
259 &ADIOI_GEN_greq_class);
260 }
261 MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, nbc_req, request);
262 memcpy(&nbc_req->req, request, sizeof(MPI_Request));
263
264
265 vars = (ADIOI_GEN_IwriteStridedColl_vars *)ADIOI_Calloc(
266 1, sizeof(ADIOI_GEN_IwriteStridedColl_vars));
267 nbc_req->data.wr.wsc_vars = vars;
268
269
270 vars->fd = fd;
271 vars->buf = buf;
272 vars->count = count;
273 vars->datatype = datatype;
274 vars->file_ptr_type = file_ptr_type;
275 vars->offset = offset;
276
277 MPI_Comm_size(fd->comm, &nprocs);
278 MPI_Comm_rank(fd->comm, &myrank);
279 vars->nprocs = nprocs;
280 vars->myrank = myrank;
281
282
283
284
285 vars->nprocs_for_coll = fd->hints->cb_nodes;
286 vars->orig_fp = fd->fp_ind;
287
288
289 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
290
291
292
293
294
295
296 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
297 &vars->offset_list, &vars->len_list,
298 &vars->start_offset, &vars->end_offset,
299 &vars->contig_access_count);
300
301
302
303
304
305 vars->st_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
306 vars->end_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
307
308 *error_code = MPI_Iallgather(&vars->start_offset, 1, ADIO_OFFSET,
309 vars->st_offsets, 1, ADIO_OFFSET,
310 fd->comm, &vars->req_offset[0]);
311 if (*error_code != MPI_SUCCESS) return;
312 *error_code = MPI_Iallgather(&vars->end_offset, 1, ADIO_OFFSET,
313 vars->end_offsets, 1, ADIO_OFFSET,
314 fd->comm, &vars->req_offset[1]);
315
316 nbc_req->data.wr.state = ADIOI_IWC_STATE_GEN_IWRITESTRIDEDCOLL;
317 return;
318 }
319
320 ADIOI_GEN_IwriteStridedColl_indio(nbc_req, error_code);
321 }
322
323 static void ADIOI_GEN_IwriteStridedColl_inter(ADIOI_NBC_Request *nbc_req,
324 int *error_code)
325 {
326 ADIOI_GEN_IwriteStridedColl_vars *vars = nbc_req->data.wr.wsc_vars;
327 int nprocs = vars->nprocs;
328 ADIO_Offset *st_offsets = vars->st_offsets;
329 ADIO_Offset *end_offsets = vars->end_offsets;
330 int i, interleave_count = 0;
331
332
333 for (i = 1; i < nprocs; i++)
334 if ((st_offsets[i] < end_offsets[i-1]) &&
335 (st_offsets[i] <= end_offsets[i]))
336 interleave_count++;
337
338
339
340 vars->interleave_count = interleave_count;
341
342 ADIOI_GEN_IwriteStridedColl_indio(nbc_req, error_code);
343 }
344
345 static void ADIOI_GEN_IwriteStridedColl_indio(ADIOI_NBC_Request *nbc_req,
346 int *error_code)
347 {
348 ADIOI_GEN_IwriteStridedColl_vars *vars = nbc_req->data.wr.wsc_vars;
349 ADIOI_Icalc_others_req_vars *cor_vars = NULL;
350 ADIO_File fd = vars->fd;
351 const void *buf;
352 int count, file_ptr_type;
353 MPI_Datatype datatype = vars->datatype;
354 ADIO_Offset offset;
355 int filetype_is_contig;
356 ADIO_Offset off;
357 int nprocs;
358
359 ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
360
361 if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
362 (!vars->interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
363 {
364 buf = vars->buf;
365 count = vars->count;
366 file_ptr_type = vars->file_ptr_type;
367 offset = vars->offset;
368
369
370 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
371 ADIOI_Free(vars->offset_list);
372 ADIOI_Free(vars->len_list);
373 ADIOI_Free(vars->st_offsets);
374 ADIOI_Free(vars->end_offsets);
375 }
376
377 fd->fp_ind = vars->orig_fp;
378 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
379
380 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
381
382
383
384
385 MPI_Status status;
386 if (vars->buftype_is_contig && filetype_is_contig) {
387 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
388 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
389 ADIO_WriteContig(fd, buf, count, datatype,
390 ADIO_EXPLICIT_OFFSET,
391 off, &status, error_code);
392 }
393 else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
394 0, &status, error_code);
395 }
396 else {
397 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
398 offset, &status, error_code);
399 }
400 ADIOI_GEN_IwriteStridedColl_fini(nbc_req, error_code);
401 #else
402 if (vars->buftype_is_contig && filetype_is_contig) {
403 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
404 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
405 ADIO_IwriteContig(fd, buf, count, datatype,
406 ADIO_EXPLICIT_OFFSET,
407 off, &vars->req_ind_io, error_code);
408 }
409 else ADIO_IwriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
410 0, &vars->req_ind_io, error_code);
411 }
412 else {
413 ADIO_IwriteStrided(fd, buf, count, datatype, file_ptr_type,
414 offset, &vars->req_ind_io, error_code);
415 }
416
417 nbc_req->data.wr.state = ADIOI_IWC_STATE_GEN_IWRITESTRIDEDCOLL_INDIO;
418 #endif
419 return;
420 }
421
422 nprocs = vars->nprocs;
423
424
425
426
427
428 ADIOI_Calc_file_domains(vars->st_offsets, vars->end_offsets, nprocs,
429 vars->nprocs_for_coll, &vars->min_st_offset,
430 &vars->fd_start, &vars->fd_end,
431 fd->hints->min_fdomain_size, &vars->fd_size,
432 fd->hints->striping_unit);
433
434
435
436
437 ADIOI_Calc_my_req(fd, vars->offset_list, vars->len_list,
438 vars->contig_access_count, vars->min_st_offset,
439 vars->fd_start, vars->fd_end, vars->fd_size,
440 nprocs, &vars->count_my_req_procs,
441 &vars->count_my_req_per_proc, &vars->my_req,
442 &vars->buf_idx);
443
444
445
446
447
448
449
450
451 cor_vars = (ADIOI_Icalc_others_req_vars *)ADIOI_Calloc(
452 1, sizeof(ADIOI_Icalc_others_req_vars));
453 nbc_req->cor_vars = cor_vars;
454 cor_vars->fd = vars->fd;
455 cor_vars->count_my_req_procs = vars->count_my_req_procs;
456 cor_vars->count_my_req_per_proc = vars->count_my_req_per_proc;
457 cor_vars->my_req = vars->my_req;
458 cor_vars->nprocs = vars->nprocs;
459 cor_vars->myrank = vars->myrank;
460 cor_vars->count_others_req_procs_ptr = &vars->count_others_req_procs;
461 cor_vars->others_req_ptr = &vars->others_req;
462 cor_vars->next_fn = ADIOI_GEN_IwriteStridedColl_exch;
463
464 ADIOI_Icalc_others_req(nbc_req, error_code);
465 }
466
467 static void ADIOI_GEN_IwriteStridedColl_exch(ADIOI_NBC_Request *nbc_req,
468 int *error_code)
469 {
470 ADIOI_GEN_IwriteStridedColl_vars *vars = nbc_req->data.wr.wsc_vars;
471 ADIOI_Iexch_and_write_vars *eaw_vars = NULL;
472 ADIOI_Access *my_req = vars->my_req;
473 int nprocs = vars->nprocs;
474 int i;
475
476 ADIOI_Free(vars->count_my_req_per_proc);
477 for (i = 0; i < nprocs; i++) {
478 if (my_req[i].count) {
479 ADIOI_Free(my_req[i].offsets);
480 ADIOI_Free(my_req[i].lens);
481 }
482 }
483 ADIOI_Free(my_req);
484
485
486
487 eaw_vars = (ADIOI_Iexch_and_write_vars *)ADIOI_Calloc(
488 1, sizeof(ADIOI_Iexch_and_write_vars));
489 nbc_req->data.wr.eaw_vars = eaw_vars;
490 eaw_vars->fd = vars->fd;
491 eaw_vars->buf = (char *)vars->buf;
492 eaw_vars->datatype = vars->datatype;
493 eaw_vars->nprocs = vars->nprocs;
494 eaw_vars->myrank = vars->myrank;
495 eaw_vars->others_req = vars->others_req;
496 eaw_vars->offset_list = vars->offset_list;
497 eaw_vars->len_list = vars->len_list;
498 eaw_vars->contig_access_count = vars->contig_access_count;
499 eaw_vars->min_st_offset = vars->min_st_offset;
500 eaw_vars->fd_size = vars->fd_size;
501 eaw_vars->fd_start = vars->fd_start;
502 eaw_vars->fd_end = vars->fd_end;
503 eaw_vars->buf_idx = vars->buf_idx;
504 eaw_vars->next_fn = ADIOI_GEN_IwriteStridedColl_bcast;
505
506 ADIOI_Iexch_and_write(nbc_req, error_code);
507 }
508
509 static void ADIOI_GEN_IwriteStridedColl_bcast(ADIOI_NBC_Request *nbc_req,
510 int *error_code)
511 {
512 ADIOI_GEN_IwriteStridedColl_vars *vars = nbc_req->data.wr.wsc_vars;
513 ADIO_File fd = vars->fd;
514
515
516
517
518
519
520
521
522
523
524
525
526 vars->old_error = *error_code;
527 if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO;
528
529
530
531 #ifdef ADIOI_MPE_LOGGING
532 MPE_Log_event( ADIOI_MPE_postwrite_a, 0, NULL );
533 #endif
534 vars->error_code = *error_code;
535 if (fd->hints->cb_nodes == 1) {
536 *error_code = MPI_Ibcast(&vars->error_code, 1, MPI_INT,
537 fd->hints->ranklist[0], fd->comm,
538 &vars->req_err);
539 } else {
540 vars->tmp_error = *error_code;
541 *error_code = MPI_Iallreduce(&vars->tmp_error, &vars->error_code, 1,
542 MPI_INT, MPI_MAX, fd->comm,
543 &vars->req_err);
544 }
545
546 nbc_req->data.wr.state = ADIOI_IWC_STATE_GEN_IWRITESTRIDEDCOLL_BCAST;
547 }
548
549 static void ADIOI_GEN_IwriteStridedColl_free(ADIOI_NBC_Request *nbc_req,
550 int *error_code)
551 {
552 ADIOI_GEN_IwriteStridedColl_vars *vars = nbc_req->data.wr.wsc_vars;
553 ADIO_File fd = vars->fd;
554 MPI_Datatype datatype = vars->datatype;
555 ADIOI_Access *others_req = vars->others_req;
556 int nprocs = vars->nprocs;
557 int old_error = vars->old_error;
558 int i;
559
560 #ifdef ADIOI_MPE_LOGGING
561 MPE_Log_event( ADIOI_MPE_postwrite_b, 0, NULL );
562 #endif
563 #ifdef AGGREGATION_PROFILE
564 MPE_Log_event(5012, 0, NULL);
565 #endif
566
567 if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) )
568 *error_code = old_error;
569
570
571 if (!vars->buftype_is_contig) ADIOI_Delete_flattened(datatype);
572
573
574 for (i = 0; i < nprocs; i++) {
575 if (others_req[i].count) {
576 ADIOI_Free(others_req[i].offsets);
577 ADIOI_Free(others_req[i].lens);
578 ADIOI_Free(others_req[i].mem_ptrs);
579 }
580 }
581 ADIOI_Free(others_req);
582
583 ADIOI_Free(vars->buf_idx);
584 ADIOI_Free(vars->offset_list);
585 ADIOI_Free(vars->len_list);
586 ADIOI_Free(vars->st_offsets);
587 ADIOI_Free(vars->end_offsets);
588 ADIOI_Free(vars->fd_start);
589 ADIOI_Free(vars->fd_end);
590
591 fd->fp_sys_posn = -1;
592 #ifdef AGGREGATION_PROFILE
593 MPE_Log_event (5013, 0, NULL);
594 #endif
595
596 ADIOI_GEN_IwriteStridedColl_fini(nbc_req, error_code);
597 }
598
599 static void ADIOI_GEN_IwriteStridedColl_fini(ADIOI_NBC_Request *nbc_req,
600 int *error_code)
601 {
602 ADIOI_GEN_IwriteStridedColl_vars *vars = nbc_req->data.wr.wsc_vars;
603 MPI_Count size;
604
605
606
607 MPI_Type_size_x(vars->datatype, &size);
608 nbc_req->nbytes = size * vars->count;
609
610
611 if (nbc_req->data.wr.wsc_vars) {
612 ADIOI_Free(nbc_req->data.wr.wsc_vars);
613 nbc_req->data.wr.wsc_vars = NULL;
614 }
615
616
617 *error_code = MPI_Grequest_complete(nbc_req->req);
618 nbc_req->data.wr.state = ADIOI_IWC_STATE_COMPLETE;
619 }
620
621
622 static void ADIOI_Iexch_and_write(ADIOI_NBC_Request *nbc_req, int *error_code)
623 {
624 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
625 ADIO_File fd = vars->fd;
626 MPI_Datatype datatype = vars->datatype;
627 int nprocs = vars->nprocs;
628 ADIOI_Access *others_req = vars->others_req;
629 MPI_Aint lb;
630
631
632
633
634
635
636
637
638
639
640 int i, j;
641 ADIO_Offset st_loc = -1, end_loc = -1;
642 int info_flag, coll_bufsize;
643 char *value;
644
645 *error_code = MPI_SUCCESS;
646
647
648
649
650
651
652 value = (char *)ADIOI_Malloc((MPI_MAX_INFO_VAL+1) * sizeof(char));
653 ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value,
654 &info_flag);
655 coll_bufsize = atoi(value);
656 vars->coll_bufsize = coll_bufsize;
657 ADIOI_Free(value);
658
659 for (i = 0; i < nprocs; i++) {
660 if (others_req[i].count) {
661 st_loc = others_req[i].offsets[0];
662 end_loc = others_req[i].offsets[0];
663 break;
664 }
665 }
666
667 for (i = 0; i < nprocs; i++)
668 for (j = 0; j < others_req[i].count; j++) {
669 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
670 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
671 + others_req[i].lens[j] - 1));
672 }
673
674 vars->st_loc = st_loc;
675 vars->end_loc = end_loc;
676
677
678
679 vars->ntimes = (int)((end_loc - st_loc + coll_bufsize) / coll_bufsize);
680
681 if ((st_loc==-1) && (end_loc==-1)) {
682 vars->ntimes = 0;
683 }
684
685 *error_code = MPI_Iallreduce(&vars->ntimes, &vars->max_ntimes, 1, MPI_INT,
686 MPI_MAX, fd->comm, &vars->req1);
687
688 vars->write_buf = fd->io_buf;
689
690 vars->curr_offlen_ptr = (int *)ADIOI_Calloc(nprocs, sizeof(int));
691
692
693 vars->count = (int *)ADIOI_Malloc(nprocs*sizeof(int));
694
695
696
697 vars->partial_recv = (int *)ADIOI_Calloc(nprocs, sizeof(int));
698
699
700
701
702 vars->send_size = (int *)ADIOI_Malloc(nprocs*sizeof(int));
703
704
705
706 vars->recv_size = (int *)ADIOI_Malloc(nprocs*sizeof(int));
707
708
709 vars->sent_to_proc = (int *)ADIOI_Calloc(nprocs, sizeof(int));
710
711
712
713 vars->send_buf_idx = (int *)ADIOI_Malloc(nprocs*sizeof(int));
714 vars->curr_to_proc = (int *)ADIOI_Malloc(nprocs*sizeof(int));
715 vars->done_to_proc = (int *)ADIOI_Malloc(nprocs*sizeof(int));
716
717
718 vars->start_pos = (int *)ADIOI_Malloc(nprocs*sizeof(int));
719
720
721
722 ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
723 if (!vars->buftype_is_contig) {
724 vars->flat_buf = ADIOI_Flatten_and_find(datatype);
725 }
726 MPI_Type_get_extent(datatype, &lb, &vars->buftype_extent);
727
728
729
730
731
732
733
734
735
736
737
738
739
740 vars->done = 0;
741 vars->off = st_loc;
742
743
744 nbc_req->data.wr.state = ADIOI_IWC_STATE_IEXCH_AND_WRITE;
745 }
746
747 static void ADIOI_Iexch_and_write_l1_begin(ADIOI_NBC_Request *nbc_req,
748 int *error_code)
749 {
750 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
751 int nprocs;
752 ADIOI_Access *others_req;
753
754 int i, j;
755 ADIO_Offset off, req_off;
756 char *write_buf;
757 int *curr_offlen_ptr, *count, req_len, *recv_size;
758 int *partial_recv, *start_pos;
759 ADIO_Offset size;
760 static char myname[] = "ADIOI_IEXCH_AND_WRITE_L1_BEGIN";
761
762 ADIOI_W_Iexchange_data_vars *wed_vars = NULL;
763
764
765 if (vars->m >= vars->ntimes) {
766 ADIOI_Iexch_and_write_reset(nbc_req, error_code);
767 return;
768 }
769
770 nprocs = vars->nprocs;
771 others_req = vars->others_req;
772
773 off = vars->off;
774 write_buf = vars->write_buf;
775 curr_offlen_ptr = vars->curr_offlen_ptr;
776 count = vars->count;
777 recv_size = vars->recv_size;
778 partial_recv = vars->partial_recv;
779 start_pos = vars->start_pos;
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798 for (i = 0; i < nprocs; i++) count[i] = recv_size[i] = 0;
799
800 size = ADIOI_MIN((unsigned)vars->coll_bufsize,
801 vars->end_loc - vars->st_loc + 1 - vars->done);
802 vars->size = size;
803
804 for (i = 0; i < nprocs; i++) {
805 if (others_req[i].count) {
806 start_pos[i] = curr_offlen_ptr[i];
807 for (j = curr_offlen_ptr[i]; j < others_req[i].count; j++) {
808 if (partial_recv[i]) {
809
810
811 req_off = others_req[i].offsets[j] +
812 partial_recv[i];
813 req_len = others_req[i].lens[j] -
814 partial_recv[i];
815 partial_recv[i] = 0;
816
817 others_req[i].offsets[j] = req_off;
818 others_req[i].lens[j] = req_len;
819 }
820 else {
821 req_off = others_req[i].offsets[j];
822 req_len = others_req[i].lens[j];
823 }
824 if (req_off < off + size) {
825 count[i]++;
826 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
827 MPI_Get_address(write_buf + req_off - off,
828 &(others_req[i].mem_ptrs[j]));
829 ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
830 recv_size[i] += (int)(ADIOI_MIN(off + size - req_off,
831 (unsigned)req_len));
832
833 if (off+size-req_off < (unsigned)req_len)
834 {
835 partial_recv[i] = (int)(off + size - req_off);
836
837
838 if ((j+1 < others_req[i].count) &&
839 (others_req[i].offsets[j+1] < off+size))
840 {
841 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
842 MPIR_ERR_RECOVERABLE,
843 myname,
844 __LINE__,
845 MPI_ERR_ARG,
846 "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
847
848
849
850 }
851
852 break;
853 }
854 }
855 else break;
856 }
857 curr_offlen_ptr[i] = j;
858 }
859 }
860
861
862 wed_vars = (ADIOI_W_Iexchange_data_vars *)ADIOI_Calloc(
863 1, sizeof(ADIOI_W_Iexchange_data_vars));
864 nbc_req->data.wr.wed_vars = wed_vars;
865
866 wed_vars->fd = vars->fd;
867 wed_vars->buf = vars->buf;
868 wed_vars->write_buf = vars->write_buf;
869 wed_vars->flat_buf = vars->flat_buf;
870 wed_vars->offset_list = vars->offset_list;
871 wed_vars->len_list = vars->len_list;
872 wed_vars->send_size = vars->send_size;
873 wed_vars->recv_size = vars->recv_size;
874 wed_vars->off = vars->off;
875 wed_vars->size = vars->size;
876 wed_vars->count = vars->count;
877 wed_vars->start_pos = vars->start_pos;
878 wed_vars->partial_recv = vars->partial_recv;
879 wed_vars->sent_to_proc = vars->sent_to_proc;
880 wed_vars->nprocs = vars->nprocs;
881 wed_vars->myrank = vars->myrank;
882 wed_vars->buftype_is_contig = vars->buftype_is_contig;
883 wed_vars->contig_access_count = vars->contig_access_count;
884 wed_vars->min_st_offset = vars->min_st_offset;
885 wed_vars->fd_size = vars->fd_size;
886 wed_vars->fd_start = vars->fd_start;
887 wed_vars->fd_end = vars->fd_end;
888 wed_vars->others_req = vars->others_req;
889 wed_vars->send_buf_idx = vars->send_buf_idx;
890 wed_vars->curr_to_proc = vars->curr_to_proc;
891 wed_vars->done_to_proc = vars->done_to_proc;
892 wed_vars->hole = &vars->hole;
893 wed_vars->iter = vars->m;
894 wed_vars->buftype_extent = vars->buftype_extent;
895 wed_vars->buf_idx = vars->buf_idx;
896 wed_vars->next_fn = ADIOI_Iexch_and_write_l1_body;
897
898 ADIOI_W_Iexchange_data(nbc_req, error_code);
899 }
900
901 static void ADIOI_Iexch_and_write_l1_body(ADIOI_NBC_Request *nbc_req,
902 int *error_code)
903 {
904 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
905 ADIO_File fd = vars->fd;
906 int nprocs = vars->nprocs;
907 ADIO_Offset size = vars->size;
908 char *write_buf = vars->write_buf;
909 int *count = vars->count;
910 int flag, i;
911
912 flag = 0;
913 for (i = 0; i < nprocs; i++)
914 if (count[i]) flag = 1;
915
916 if (flag) {
917 ADIOI_Assert(size == (int)size);
918 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
919 MPI_Status status;
920 ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE,
921 ADIO_EXPLICIT_OFFSET, vars->off, &status,
922 error_code);
923 #else
924 ADIO_IwriteContig(fd, write_buf, (int)size, MPI_BYTE,
925 ADIO_EXPLICIT_OFFSET, vars->off, &vars->req3,
926 error_code);
927
928 nbc_req->data.wr.state = ADIOI_IWC_STATE_IEXCH_AND_WRITE_L1_BODY;
929 return;
930 #endif
931 }
932
933 ADIOI_Iexch_and_write_l1_end(nbc_req, error_code);
934 }
935
936 static void ADIOI_Iexch_and_write_l1_end(ADIOI_NBC_Request *nbc_req,
937 int *error_code)
938 {
939 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
940 ADIO_Offset size = vars->size;
941
942 vars->off += size;
943 vars->done += size;
944
945
946 vars->m++;
947 ADIOI_Iexch_and_write_l1_begin(nbc_req, error_code);
948 }
949
950 static void ADIOI_Iexch_and_write_reset(ADIOI_NBC_Request *nbc_req,
951 int *error_code)
952 {
953 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
954 int nprocs = vars->nprocs;
955 int *count = vars->count;
956 int *recv_size = vars->recv_size;
957 int i;
958
959 for (i = 0; i < nprocs; i++) count[i] = recv_size[i] = 0;
960
961 vars->m = vars->ntimes;
962 ADIOI_Iexch_and_write_l2_begin(nbc_req, error_code);
963 }
964
965 static void ADIOI_Iexch_and_write_l2_begin(ADIOI_NBC_Request *nbc_req,
966 int *error_code)
967 {
968 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
969 ADIO_Offset size = vars->size;
970 ADIOI_W_Iexchange_data_vars *wed_vars = NULL;
971
972
973 if (vars->m >= vars->max_ntimes) {
974 ADIOI_Iexch_and_write_fini(nbc_req, error_code);
975 return;
976 }
977
978 ADIOI_Assert(size == (int)size);
979
980
981 wed_vars = (ADIOI_W_Iexchange_data_vars *)ADIOI_Calloc(
982 1, sizeof(ADIOI_W_Iexchange_data_vars));
983 nbc_req->data.wr.wed_vars = wed_vars;
984
985 wed_vars->fd = vars->fd;
986 wed_vars->buf = vars->buf;
987 wed_vars->write_buf = vars->write_buf;
988 wed_vars->flat_buf = vars->flat_buf;
989 wed_vars->offset_list = vars->offset_list;
990 wed_vars->len_list = vars->len_list;
991 wed_vars->send_size = vars->send_size;
992 wed_vars->recv_size = vars->recv_size;
993 wed_vars->off = vars->off;
994 wed_vars->size = (int)vars->size;
995 wed_vars->count = vars->count;
996 wed_vars->start_pos = vars->start_pos;
997 wed_vars->partial_recv = vars->partial_recv;
998 wed_vars->sent_to_proc = vars->sent_to_proc;
999 wed_vars->nprocs = vars->nprocs;
1000 wed_vars->myrank = vars->myrank;
1001 wed_vars->buftype_is_contig = vars->buftype_is_contig;
1002 wed_vars->contig_access_count = vars->contig_access_count;
1003 wed_vars->min_st_offset = vars->min_st_offset;
1004 wed_vars->fd_size = vars->fd_size;
1005 wed_vars->fd_start = vars->fd_start;
1006 wed_vars->fd_end = vars->fd_end;
1007 wed_vars->others_req = vars->others_req;
1008 wed_vars->send_buf_idx = vars->send_buf_idx;
1009 wed_vars->curr_to_proc = vars->curr_to_proc;
1010 wed_vars->done_to_proc = vars->done_to_proc;
1011 wed_vars->hole = &vars->hole;
1012 wed_vars->iter = vars->m;
1013 wed_vars->buftype_extent = vars->buftype_extent;
1014 wed_vars->buf_idx = vars->buf_idx;
1015 wed_vars->next_fn = ADIOI_Iexch_and_write_l2_end;
1016
1017
1018 ADIOI_W_Iexchange_data(nbc_req, error_code);
1019 }
1020
1021 static void ADIOI_Iexch_and_write_l2_end(ADIOI_NBC_Request *nbc_req,
1022 int *error_code)
1023 {
1024 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
1025
1026 vars->m++;
1027 ADIOI_Iexch_and_write_l2_begin(nbc_req, error_code);
1028 }
1029
1030 static void ADIOI_Iexch_and_write_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
1031 {
1032 ADIOI_Iexch_and_write_vars *vars = nbc_req->data.wr.eaw_vars;
1033 void (*next_fn)(ADIOI_NBC_Request *, int *);
1034
1035 ADIOI_Free(vars->curr_offlen_ptr);
1036 ADIOI_Free(vars->count);
1037 ADIOI_Free(vars->partial_recv);
1038 ADIOI_Free(vars->send_size);
1039 ADIOI_Free(vars->recv_size);
1040 ADIOI_Free(vars->sent_to_proc);
1041 ADIOI_Free(vars->start_pos);
1042 ADIOI_Free(vars->send_buf_idx);
1043 ADIOI_Free(vars->curr_to_proc);
1044 ADIOI_Free(vars->done_to_proc);
1045
1046 next_fn = vars->next_fn;
1047
1048
1049 ADIOI_Free(nbc_req->data.wr.eaw_vars);
1050 nbc_req->data.wr.eaw_vars = NULL;
1051
1052
1053 next_fn(nbc_req, error_code);
1054 }
1055
1056
1057 static void ADIOI_W_Iexchange_data(ADIOI_NBC_Request *nbc_req, int *error_code)
1058 {
1059 ADIOI_W_Iexchange_data_vars *vars = nbc_req->data.wr.wed_vars;
1060
1061
1062
1063
1064 *error_code = MPI_Ialltoall(vars->recv_size, 1, MPI_INT, vars->send_size, 1,
1065 MPI_INT, vars->fd->comm, &vars->req1);
1066
1067 nbc_req->data.wr.state = ADIOI_IWC_STATE_W_IEXCHANGE_DATA;
1068 }
1069
1070 static void ADIOI_W_Iexchange_data_hole(ADIOI_NBC_Request *nbc_req,
1071 int *error_code)
1072 {
1073 ADIOI_W_Iexchange_data_vars *vars = nbc_req->data.wr.wed_vars;
1074 ADIO_File fd = vars->fd;
1075 int *recv_size = vars->recv_size;
1076 ADIO_Offset off = vars->off;
1077 int size = vars->size;
1078 int *count = vars->count;
1079 int *start_pos = vars->start_pos;
1080 int *partial_recv = vars->partial_recv;
1081 int nprocs = vars->nprocs;
1082 ADIOI_Access *others_req = vars->others_req;
1083 int *hole = vars->hole;
1084
1085 int i, j, k, *tmp_len, nprocs_recv;
1086 MPI_Datatype *recv_types;
1087 int *srt_len = NULL, sum;
1088 ADIO_Offset *srt_off = NULL;
1089
1090
1091
1092 nprocs_recv = 0;
1093 for (i = 0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
1094 vars->nprocs_recv = nprocs_recv;
1095
1096 recv_types = (MPI_Datatype *)
1097 ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype));
1098 vars->recv_types = recv_types;
1099
1100
1101 tmp_len = (int *)ADIOI_Malloc(nprocs*sizeof(int));
1102 j = 0;
1103 for (i = 0; i < nprocs; i++) {
1104 if (recv_size[i]) {
1105
1106 if (partial_recv[i]) {
1107 k = start_pos[i] + count[i] - 1;
1108 tmp_len[i] = others_req[i].lens[k];
1109 others_req[i].lens[k] = partial_recv[i];
1110 }
1111 ADIOI_Type_create_hindexed_x(count[i],
1112 &(others_req[i].lens[start_pos[i]]),
1113 &(others_req[i].mem_ptrs[start_pos[i]]),
1114 MPI_BYTE, recv_types+j);
1115
1116 MPI_Type_commit(recv_types+j);
1117 j++;
1118 }
1119 }
1120
1121
1122
1123
1124
1125 sum = 0;
1126 for (i = 0; i < nprocs; i++) sum += count[i];
1127
1128
1129 if (sum) {
1130 srt_off = (ADIO_Offset *)ADIOI_Malloc(sum*sizeof(ADIO_Offset));
1131 srt_len = (int *)ADIOI_Malloc(sum*sizeof(int));
1132
1133 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
1134 nprocs, nprocs_recv, sum);
1135 }
1136
1137
1138 for (i = 0; i < nprocs; i++)
1139 if (partial_recv[i]) {
1140 k = start_pos[i] + count[i] - 1;
1141 others_req[i].lens[k] = tmp_len[i];
1142 }
1143 ADIOI_Free(tmp_len);
1144
1145
1146
1147
1148
1149
1150
1151
1152 *hole = 0;
1153 if (sum) {
1154 if (off != srt_off[0])
1155 *hole = 1;
1156 else {
1157 for (i = 1; i < sum; i++) {
1158 if (srt_off[i] <= srt_off[0] + srt_len[0]) {
1159
1160 int new_len = (int)srt_off[i] + srt_len[i] - (int)srt_off[0];
1161 if (new_len > srt_len[0]) srt_len[0] = new_len;
1162 }
1163 else
1164 break;
1165 }
1166 if (i < sum || size != srt_len[0])
1167 *hole = 1;
1168 }
1169
1170 ADIOI_Free(srt_off);
1171 ADIOI_Free(srt_len);
1172 }
1173
1174 if (nprocs_recv) {
1175 if (*hole) {
1176 ADIO_IreadContig(fd, vars->write_buf, size, MPI_BYTE,
1177 ADIO_EXPLICIT_OFFSET, off, &vars->req2,
1178 &vars->err);
1179 nbc_req->data.wr.state = ADIOI_IWC_STATE_W_IEXCHANGE_DATA_HOLE;
1180 return;
1181 }
1182 }
1183
1184 ADIOI_W_Iexchange_data_send(nbc_req, error_code);
1185 }
1186
1187 static void ADIOI_W_Iexchange_data_send(ADIOI_NBC_Request *nbc_req,
1188 int *error_code)
1189 {
1190 ADIOI_W_Iexchange_data_vars *vars = nbc_req->data.wr.wed_vars;
1191 ADIO_File fd = vars->fd;
1192 void *buf = vars->buf;
1193 int *send_size = vars->send_size;
1194 int *recv_size = vars->recv_size;
1195 int nprocs = vars->nprocs;
1196 int myrank = vars->myrank;
1197 int iter = vars->iter;
1198 int *buf_idx = vars->buf_idx;
1199
1200 int nprocs_recv = vars->nprocs_recv;
1201 MPI_Datatype *recv_types = vars->recv_types;
1202
1203 int i, j;
1204 int nprocs_send;
1205 char **send_buf = NULL;
1206
1207 nprocs_send = 0;
1208 for (i = 0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
1209 vars->nprocs_send = nprocs_send;
1210
1211 if (fd->atomicity) {
1212
1213 vars->requests = (MPI_Request *)
1214 ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request));
1215 vars->send_req = vars->requests;
1216 }
1217 else {
1218 vars->requests = (MPI_Request *)
1219 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
1220
1221
1222
1223 j = 0;
1224 for (i = 0; i < nprocs; i++) {
1225 if (recv_size[i]) {
1226 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
1227 fd->comm, vars->requests+j);
1228 j++;
1229 }
1230 }
1231 vars->send_req = vars->requests + nprocs_recv;
1232 }
1233
1234
1235
1236
1237 #ifdef AGGREGATION_PROFILE
1238 MPE_Log_event (5032, 0, NULL);
1239 #endif
1240 if (vars->buftype_is_contig) {
1241 j = 0;
1242 for (i = 0; i < nprocs; i++)
1243 if (send_size[i]) {
1244 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
1245 MPI_BYTE, i, myrank+i+100*iter, fd->comm,
1246 vars->send_req+j);
1247 j++;
1248 buf_idx[i] += send_size[i];
1249 }
1250 }
1251 else if (nprocs_send) {
1252
1253 send_buf = (char **)ADIOI_Malloc(nprocs*sizeof(char*));
1254 vars->send_buf = send_buf;
1255 for (i = 0; i < nprocs; i++)
1256 if (send_size[i])
1257 send_buf[i] = (char *)ADIOI_Malloc(send_size[i]);
1258
1259 ADIOI_Fill_send_buffer(fd, buf, vars->flat_buf, send_buf,
1260 vars->offset_list, vars->len_list, send_size,
1261 vars->send_req,
1262 vars->sent_to_proc, nprocs, myrank,
1263 vars->contig_access_count,
1264 vars->min_st_offset, vars->fd_size,
1265 vars->fd_start, vars->fd_end,
1266 vars->send_buf_idx, vars->curr_to_proc,
1267 vars->done_to_proc, iter,
1268 vars->buftype_extent);
1269
1270 }
1271
1272 if (fd->atomicity) {
1273 vars->req3 = (MPI_Request *)
1274 ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Request));
1275
1276
1277
1278 j = 0;
1279 for (i = 0; i < nprocs; i++) {
1280 if (recv_size[i]) {
1281 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
1282 fd->comm, vars->req3+j);
1283 j++;
1284 }
1285 }
1286
1287 nbc_req->data.wr.state = ADIOI_IWC_STATE_W_IEXCHANGE_DATA_SEND;
1288 return;
1289 }
1290
1291 ADIOI_W_Iexchange_data_wait(nbc_req, error_code);
1292 }
1293
1294 static void ADIOI_W_Iexchange_data_wait(ADIOI_NBC_Request *nbc_req,
1295 int *error_code)
1296 {
1297 ADIOI_W_Iexchange_data_vars *vars = nbc_req->data.wr.wed_vars;
1298 ADIO_File fd = vars->fd;
1299 int nprocs_send = vars->nprocs_send;
1300 int nprocs_recv = vars->nprocs_recv;
1301 MPI_Datatype *recv_types = vars->recv_types;
1302 int i;
1303
1304 for (i = 0; i < nprocs_recv; i++) MPI_Type_free(recv_types+i);
1305 ADIOI_Free(recv_types);
1306
1307 i= 0;
1308 if (fd->atomicity) {
1309
1310 MPI_Testall(nprocs_send, vars->send_req, &i, MPI_STATUSES_IGNORE);
1311 }
1312 else {
1313 MPI_Testall(nprocs_send+nprocs_recv, vars->requests, &i,
1314 MPI_STATUSES_IGNORE);
1315 }
1316
1317 if (i) {
1318 ADIOI_W_Iexchange_data_fini(nbc_req, error_code);
1319 } else {
1320 nbc_req->data.wr.state = ADIOI_IWC_STATE_W_IEXCHANGE_DATA_WAIT;
1321 }
1322 }
1323
1324 static void ADIOI_W_Iexchange_data_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
1325 {
1326 ADIOI_W_Iexchange_data_vars *vars = nbc_req->data.wr.wed_vars;
1327 void (*next_fn)(ADIOI_NBC_Request *, int *);
1328 ADIO_File fd = vars->fd;
1329 int *send_size = vars->send_size;
1330 int nprocs = vars->nprocs;
1331 char **send_buf = vars->send_buf;
1332 int i;
1333
1334 if (fd->atomicity) ADIOI_Free(vars->req3);
1335
1336 #ifdef AGGREGATION_PROFILE
1337 MPE_Log_event (5033, 0, NULL);
1338 #endif
1339 ADIOI_Free(vars->requests);
1340 if (!vars->buftype_is_contig && vars->nprocs_send) {
1341 for (i = 0; i < nprocs; i++)
1342 if (send_size[i]) ADIOI_Free(send_buf[i]);
1343 ADIOI_Free(send_buf);
1344 }
1345
1346 next_fn = vars->next_fn;
1347
1348
1349 ADIOI_Free(vars);
1350 nbc_req->data.wr.wed_vars = NULL;
1351
1352
1353 next_fn(nbc_req, error_code);
1354 }
1355
1356
1357 static int ADIOI_GEN_iwc_query_fn(void *extra_state, MPI_Status *status)
1358 {
1359 ADIOI_NBC_Request *nbc_req;
1360
1361 nbc_req = (ADIOI_NBC_Request *)extra_state;
1362
1363 MPI_Status_set_elements_x(status, MPI_BYTE, nbc_req->nbytes);
1364
1365
1366 MPI_Status_set_cancelled(status, 0);
1367
1368
1369 status->MPI_SOURCE = MPI_UNDEFINED;
1370
1371 status->MPI_TAG = MPI_UNDEFINED;
1372
1373
1374 return MPI_SUCCESS;
1375 }
1376
1377 static int ADIOI_GEN_iwc_free_fn(void *extra_state)
1378 {
1379 ADIOI_NBC_Request *nbc_req;
1380
1381 nbc_req = (ADIOI_NBC_Request *)extra_state;
1382 ADIOI_Free(nbc_req);
1383
1384 return MPI_SUCCESS;
1385 }
1386
1387 static int ADIOI_GEN_iwc_poll_fn(void *extra_state, MPI_Status *status)
1388 {
1389 ADIOI_NBC_Request *nbc_req;
1390 ADIOI_GEN_IwriteStridedColl_vars *wsc_vars = NULL;
1391 ADIOI_Icalc_others_req_vars *cor_vars = NULL;
1392 ADIOI_Iexch_and_write_vars *eaw_vars = NULL;
1393 ADIOI_W_Iexchange_data_vars *wed_vars = NULL;
1394 int errcode = MPI_SUCCESS;
1395 int flag;
1396
1397 nbc_req = (ADIOI_NBC_Request *)extra_state;
1398
1399 switch (nbc_req->data.wr.state) {
1400 case ADIOI_IWC_STATE_GEN_IWRITESTRIDEDCOLL:
1401 wsc_vars = nbc_req->data.wr.wsc_vars;
1402 errcode = MPI_Testall(2, wsc_vars->req_offset, &flag,
1403 MPI_STATUSES_IGNORE);
1404 if (errcode == MPI_SUCCESS && flag) {
1405 ADIOI_GEN_IwriteStridedColl_inter(nbc_req, &errcode);
1406 }
1407 break;
1408
1409 case ADIOI_IWC_STATE_GEN_IWRITESTRIDEDCOLL_INDIO:
1410 wsc_vars = nbc_req->data.wr.wsc_vars;
1411 errcode = MPI_Test(&wsc_vars->req_ind_io, &flag, MPI_STATUS_IGNORE);
1412 if (errcode == MPI_SUCCESS && flag) {
1413
1414 ADIOI_GEN_IwriteStridedColl_fini(nbc_req, &errcode);
1415 }
1416 break;
1417
1418 case ADIOI_IWC_STATE_GEN_IWRITESTRIDEDCOLL_BCAST:
1419 wsc_vars = nbc_req->data.wr.wsc_vars;
1420 errcode = MPI_Test(&wsc_vars->req_err, &flag, MPI_STATUS_IGNORE);
1421 if (errcode == MPI_SUCCESS && flag) {
1422 errcode = wsc_vars->error_code;
1423 ADIOI_GEN_IwriteStridedColl_free(nbc_req, &errcode);
1424 }
1425 break;
1426
1427 case ADIOI_IWC_STATE_ICALC_OTHERS_REQ:
1428 cor_vars = nbc_req->cor_vars;
1429 errcode = MPI_Test(&cor_vars->req1, &flag, MPI_STATUS_IGNORE);
1430 if (errcode == MPI_SUCCESS && flag) {
1431 ADIOI_Icalc_others_req_main(nbc_req, &errcode);
1432 }
1433 break;
1434
1435 case ADIOI_IWC_STATE_ICALC_OTHERS_REQ_MAIN:
1436 cor_vars = nbc_req->cor_vars;
1437 if (cor_vars->num_req2) {
1438 errcode = MPI_Testall(cor_vars->num_req2, cor_vars->req2,
1439 &flag, MPI_STATUSES_IGNORE);
1440 if (errcode == MPI_SUCCESS && flag) {
1441 ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1442 }
1443 } else {
1444 ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1445 }
1446 break;
1447
1448 case ADIOI_IWC_STATE_IEXCH_AND_WRITE:
1449 eaw_vars = nbc_req->data.wr.eaw_vars;
1450 errcode = MPI_Test(&eaw_vars->req1, &flag, MPI_STATUS_IGNORE);
1451 if (errcode == MPI_SUCCESS && flag) {
1452 eaw_vars->m = 0;
1453 ADIOI_Iexch_and_write_l1_begin(nbc_req, &errcode);
1454 }
1455 break;
1456
1457 case ADIOI_IWC_STATE_IEXCH_AND_WRITE_L1_BODY:
1458 eaw_vars = nbc_req->data.wr.eaw_vars;
1459 errcode = MPI_Test(&eaw_vars->req3, &flag, MPI_STATUS_IGNORE);
1460 if (errcode == MPI_SUCCESS && flag) {
1461 ADIOI_Iexch_and_write_l1_end(nbc_req, &errcode);
1462 }
1463 break;
1464
1465 case ADIOI_IWC_STATE_W_IEXCHANGE_DATA:
1466 wed_vars = nbc_req->data.wr.wed_vars;
1467 errcode = MPI_Test(&wed_vars->req1, &flag, MPI_STATUS_IGNORE);
1468 if (errcode == MPI_SUCCESS && flag) {
1469 ADIOI_W_Iexchange_data_hole(nbc_req, &errcode);
1470 }
1471 break;
1472
1473 case ADIOI_IWC_STATE_W_IEXCHANGE_DATA_HOLE:
1474 wed_vars = nbc_req->data.wr.wed_vars;
1475 errcode = MPI_Test(&wed_vars->req2, &flag, MPI_STATUSES_IGNORE);
1476 if (errcode == MPI_SUCCESS && flag) {
1477
1478 if (wed_vars->err != MPI_SUCCESS) {
1479 errcode = MPIO_Err_create_code(wed_vars->err,
1480 MPIR_ERR_RECOVERABLE,
1481 "ADIOI_W_EXCHANGE_DATA",
1482 __LINE__, MPI_ERR_IO,
1483 "**ioRMWrdwr", 0);
1484 break;;
1485 }
1486
1487 ADIOI_W_Iexchange_data_send(nbc_req, &errcode);
1488 }
1489 break;
1490
1491 case ADIOI_IWC_STATE_W_IEXCHANGE_DATA_SEND:
1492 wed_vars = nbc_req->data.wr.wed_vars;
1493 errcode = MPI_Testall(wed_vars->nprocs_recv, wed_vars->req3,
1494 &flag, MPI_STATUSES_IGNORE);
1495 if (errcode == MPI_SUCCESS && flag) {
1496 ADIOI_W_Iexchange_data_wait(nbc_req, &errcode);
1497 }
1498 break;
1499
1500 case ADIOI_IWC_STATE_W_IEXCHANGE_DATA_WAIT:
1501 wed_vars = nbc_req->data.wr.wed_vars;
1502 if (wed_vars->fd->atomicity) {
1503
1504 errcode = MPI_Testall(wed_vars->nprocs_send, wed_vars->send_req,
1505 &flag, MPI_STATUSES_IGNORE);
1506 } else {
1507 errcode = MPI_Testall(wed_vars->nprocs_send +
1508 wed_vars->nprocs_recv,
1509 wed_vars->requests,
1510 &flag, MPI_STATUSES_IGNORE);
1511 }
1512 if (errcode == MPI_SUCCESS && flag) {
1513 ADIOI_W_Iexchange_data_fini(nbc_req, &errcode);
1514 }
1515 break;
1516
1517 default:
1518 break;
1519 }
1520
1521
1522 if (errcode != MPI_SUCCESS) {
1523 errcode = MPIO_Err_create_code(MPI_SUCCESS,
1524 MPIR_ERR_RECOVERABLE,
1525 "ADIOI_GEN_iwc_poll_fn", __LINE__,
1526 MPI_ERR_IO, "**mpi_grequest_complete",
1527 0);
1528 }
1529
1530
1531 return errcode;
1532 }
1533
1534
1535 static int ADIOI_GEN_iwc_wait_fn(int count, void **array_of_states,
1536 double timeout, MPI_Status *status)
1537 {
1538 int i, errcode = MPI_SUCCESS;
1539 double starttime;
1540 ADIOI_NBC_Request **nbc_reqlist;
1541
1542 nbc_reqlist = (ADIOI_NBC_Request **)array_of_states;
1543
1544 starttime = MPI_Wtime();
1545 for (i = 0; i < count ; i++) {
1546 while (nbc_reqlist[i]->data.wr.state != ADIOI_IWC_STATE_COMPLETE) {
1547 errcode = ADIOI_GEN_iwc_poll_fn(nbc_reqlist[i], MPI_STATUS_IGNORE);
1548
1549 if (errcode != MPI_SUCCESS) {
1550 errcode = MPIO_Err_create_code(MPI_SUCCESS,
1551 MPIR_ERR_RECOVERABLE,
1552 "ADIOI_GEN_iwc_wait_fn",
1553 __LINE__, MPI_ERR_IO,
1554 "**mpi_grequest_complete", 0);
1555 }
1556
1557
1558 if ((timeout > 0) && (timeout < (MPI_Wtime() - starttime)))
1559 goto fn_exit;
1560
1561
1562
1563 MPIR_Ext_cs_yield();
1564 }
1565 }
1566
1567 fn_exit:
1568 return errcode;
1569 }
1570
1571 #endif