This source file includes following definitions.
- mca_common_ompio_file_write
- mca_common_ompio_file_write_at
- mca_common_ompio_file_iwrite
- mca_common_ompio_file_iwrite_at
- mca_common_ompio_file_write_all
- mca_common_ompio_file_write_at_all
- mca_common_ompio_file_iwrite_all
- mca_common_ompio_file_iwrite_at_all
- mca_common_ompio_build_io_array
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23
24 #include "ompi/communicator/communicator.h"
25 #include "ompi/info/info.h"
26 #include "ompi/file/file.h"
27 #include "ompi/mca/fcoll/fcoll.h"
28 #include "ompi/mca/fcoll/base/base.h"
29 #include "ompi/mca/fbtl/fbtl.h"
30 #include "ompi/mca/fbtl/base/base.h"
31
32 #include "common_ompio.h"
33 #include "common_ompio_request.h"
34 #include "common_ompio_buffer.h"
35 #include <unistd.h>
36 #include <math.h>
37
38 int mca_common_ompio_file_write (ompio_file_t *fh,
39 const void *buf,
40 int count,
41 struct ompi_datatype_t *datatype,
42 ompi_status_public_t *status)
43 {
44 int ret = OMPI_SUCCESS;
45 int index = 0;
46 int cycles = 0;
47
48 uint32_t iov_count = 0;
49 struct iovec *decoded_iov = NULL;
50 size_t bytes_per_cycle=0;
51 size_t total_bytes_written = 0;
52 size_t max_data=0, real_bytes_written=0;
53 ssize_t ret_code=0;
54 size_t spc=0;
55 int i = 0;
56 int j = 0;
57
58 if (fh->f_amode & MPI_MODE_RDONLY){
59
60 ret = MPI_ERR_READ_ONLY;
61 return ret;
62 }
63
64
65 if ( 0 == count ) {
66 if ( MPI_STATUS_IGNORE != status ) {
67 status->_ucount = 0;
68 }
69 return ret;
70 }
71
72 bool need_to_copy = false;
73
74 #if OPAL_CUDA_SUPPORT
75 int is_gpu, is_managed;
76 mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
77 if ( is_gpu && !is_managed ) {
78 need_to_copy = true;
79 }
80 #endif
81
82 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
83 !(datatype == &ompi_mpi_byte.dt ||
84 datatype == &ompi_mpi_char.dt )) {
85
86
87
88
89
90
91 need_to_copy = true;
92 }
93
94 if ( need_to_copy ) {
95 size_t pos=0;
96 char *tbuf=NULL;
97 opal_convertor_t convertor;
98
99 OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
100 opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos );
101 opal_convertor_cleanup ( &convertor);
102 }
103 else {
104 mca_common_ompio_decode_datatype (fh,
105 datatype,
106 count,
107 buf,
108 &max_data,
109 fh->f_mem_convertor,
110 &decoded_iov,
111 &iov_count);
112 }
113
114 if ( 0 < max_data && 0 == fh->f_iov_count ) {
115 if ( MPI_STATUS_IGNORE != status ) {
116 status->_ucount = 0;
117 }
118 if (NULL != decoded_iov) {
119 free (decoded_iov);
120 decoded_iov = NULL;
121 }
122 return OMPI_SUCCESS;
123 }
124
125 if ( -1 == OMPIO_MCA_GET(fh, cycle_buffer_size )) {
126 bytes_per_cycle = max_data;
127 }
128 else {
129 bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size);
130 }
131 cycles = ceil((double)max_data/bytes_per_cycle);
132
133 #if 0
134 printf ("Bytes per Cycle: %d Cycles: %d\n", bytes_per_cycle, cycles);
135 #endif
136
137 j = fh->f_index_in_file_view;
138 for (index = 0; index < cycles; index++) {
139 mca_common_ompio_build_io_array ( fh,
140 index,
141 cycles,
142 bytes_per_cycle,
143 max_data,
144 iov_count,
145 decoded_iov,
146 &i,
147 &j,
148 &total_bytes_written,
149 &spc,
150 &fh->f_io_array,
151 &fh->f_num_of_io_entries);
152
153 if (fh->f_num_of_io_entries) {
154 ret_code =fh->f_fbtl->fbtl_pwritev (fh);
155 if ( 0<= ret_code ) {
156 real_bytes_written+= (size_t)ret_code;
157 }
158 }
159
160 fh->f_num_of_io_entries = 0;
161 if (NULL != fh->f_io_array) {
162 free (fh->f_io_array);
163 fh->f_io_array = NULL;
164 }
165 }
166
167 if ( need_to_copy ) {
168 mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
169 }
170
171
172 if (NULL != decoded_iov) {
173 free (decoded_iov);
174 decoded_iov = NULL;
175 }
176
177 if ( MPI_STATUS_IGNORE != status ) {
178 status->_ucount = real_bytes_written;
179 }
180
181 return ret;
182 }
183
184 int mca_common_ompio_file_write_at (ompio_file_t *fh,
185 OMPI_MPI_OFFSET_TYPE offset,
186 const void *buf,
187 int count,
188 struct ompi_datatype_t *datatype,
189 ompi_status_public_t *status)
190 {
191 int ret = OMPI_SUCCESS;
192 OMPI_MPI_OFFSET_TYPE prev_offset;
193 mca_common_ompio_file_get_position (fh, &prev_offset );
194
195 mca_common_ompio_set_explicit_offset (fh, offset);
196 ret = mca_common_ompio_file_write (fh,
197 buf,
198 count,
199 datatype,
200 status);
201
202
203
204 mca_common_ompio_set_explicit_offset (fh, prev_offset );
205 return ret;
206 }
207
208 int mca_common_ompio_file_iwrite (ompio_file_t *fh,
209 const void *buf,
210 int count,
211 struct ompi_datatype_t *datatype,
212 ompi_request_t **request)
213 {
214 int ret = OMPI_SUCCESS;
215 mca_ompio_request_t *ompio_req=NULL;
216 size_t spc=0;
217
218 if (fh->f_amode & MPI_MODE_RDONLY){
219
220 ret = MPI_ERR_READ_ONLY;
221 return ret;
222 }
223
224 mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE);
225
226 if ( 0 == count ) {
227 ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
228 ompio_req->req_ompi.req_status._ucount = 0;
229 ompi_request_complete (&ompio_req->req_ompi, false);
230 *request = (ompi_request_t *) ompio_req;
231
232 return OMPI_SUCCESS;
233 }
234
235 if ( NULL != fh->f_fbtl->fbtl_ipwritev ) {
236
237
238 uint32_t iov_count = 0;
239 struct iovec *decoded_iov = NULL;
240 size_t max_data = 0;
241 size_t total_bytes_written =0;
242 int i = 0;
243 int j = 0;
244
245 bool need_to_copy = false;
246
247 #if OPAL_CUDA_SUPPORT
248 int is_gpu, is_managed;
249 mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
250 if ( is_gpu && !is_managed ) {
251 need_to_copy = true;
252 }
253 #endif
254
255 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
256 !(datatype == &ompi_mpi_byte.dt ||
257 datatype == &ompi_mpi_char.dt )) {
258
259
260
261
262
263
264 need_to_copy = true;
265 }
266
267 if ( need_to_copy ) {
268 size_t pos=0;
269 char *tbuf=NULL;
270 opal_convertor_t convertor;
271
272 OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
273 opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos );
274 opal_convertor_cleanup (&convertor);
275
276 ompio_req->req_tbuf = tbuf;
277 ompio_req->req_size = max_data;
278 }
279 else {
280 mca_common_ompio_decode_datatype (fh,
281 datatype,
282 count,
283 buf,
284 &max_data,
285 fh->f_mem_convertor,
286 &decoded_iov,
287 &iov_count);
288 }
289
290 if ( 0 < max_data && 0 == fh->f_iov_count ) {
291 ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
292 ompio_req->req_ompi.req_status._ucount = 0;
293 ompi_request_complete (&ompio_req->req_ompi, false);
294 *request = (ompi_request_t *) ompio_req;
295 if (NULL != decoded_iov) {
296 free (decoded_iov);
297 decoded_iov = NULL;
298 }
299
300 return OMPI_SUCCESS;
301 }
302
303 j = fh->f_index_in_file_view;
304
305
306 mca_common_ompio_build_io_array ( fh,
307 0,
308 1,
309 max_data,
310 max_data,
311 iov_count,
312 decoded_iov,
313 &i,
314 &j,
315 &total_bytes_written,
316 &spc,
317 &fh->f_io_array,
318 &fh->f_num_of_io_entries);
319
320 if (fh->f_num_of_io_entries) {
321 fh->f_fbtl->fbtl_ipwritev (fh, (ompi_request_t *) ompio_req);
322 }
323
324 mca_common_ompio_register_progress ();
325
326 fh->f_num_of_io_entries = 0;
327 if (NULL != fh->f_io_array) {
328 free (fh->f_io_array);
329 fh->f_io_array = NULL;
330 }
331 if (NULL != decoded_iov) {
332 free (decoded_iov);
333 decoded_iov = NULL;
334 }
335 }
336 else {
337
338 ompi_status_public_t status;
339 ret = mca_common_ompio_file_write(fh,buf,count,datatype, &status);
340
341 ompio_req->req_ompi.req_status.MPI_ERROR = ret;
342 ompio_req->req_ompi.req_status._ucount = status._ucount;
343 ompi_request_complete (&ompio_req->req_ompi, false);
344 }
345
346 *request = (ompi_request_t *) ompio_req;
347 return ret;
348 }
349
350 int mca_common_ompio_file_iwrite_at (ompio_file_t *fh,
351 OMPI_MPI_OFFSET_TYPE offset,
352 const void *buf,
353 int count,
354 struct ompi_datatype_t *datatype,
355 ompi_request_t **request)
356 {
357 int ret = OMPI_SUCCESS;
358 OMPI_MPI_OFFSET_TYPE prev_offset;
359 mca_common_ompio_file_get_position (fh, &prev_offset );
360
361 mca_common_ompio_set_explicit_offset (fh, offset);
362 ret = mca_common_ompio_file_iwrite (fh,
363 buf,
364 count,
365 datatype,
366 request);
367
368
369
370
371
372
373
374
375
376 mca_common_ompio_set_explicit_offset (fh, prev_offset);
377
378 return ret;
379 }
380
381
382
383 int mca_common_ompio_file_write_all (ompio_file_t *fh,
384 const void *buf,
385 int count,
386 struct ompi_datatype_t *datatype,
387 ompi_status_public_t *status)
388 {
389 int ret = OMPI_SUCCESS;
390
391 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
392 !(datatype == &ompi_mpi_byte.dt ||
393 datatype == &ompi_mpi_char.dt )) {
394
395
396
397
398
399
400
401
402
403
404
405
406
407 size_t pos=0, max_data=0;
408 char *tbuf=NULL;
409 opal_convertor_t convertor;
410 struct iovec *decoded_iov = NULL;
411 uint32_t iov_count = 0;
412
413 OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
414 opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos );
415 opal_convertor_cleanup ( &convertor);
416
417 ret = fh->f_fcoll->fcoll_file_write_all (fh,
418 decoded_iov->iov_base,
419 decoded_iov->iov_len,
420 MPI_BYTE,
421 status);
422
423
424 mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
425 if (NULL != decoded_iov) {
426 free (decoded_iov);
427 decoded_iov = NULL;
428 }
429 }
430 else {
431 ret = fh->f_fcoll->fcoll_file_write_all (fh,
432 buf,
433 count,
434 datatype,
435 status);
436 }
437 return ret;
438 }
439
440 int mca_common_ompio_file_write_at_all (ompio_file_t *fh,
441 OMPI_MPI_OFFSET_TYPE offset,
442 const void *buf,
443 int count,
444 struct ompi_datatype_t *datatype,
445 ompi_status_public_t *status)
446 {
447 int ret = OMPI_SUCCESS;
448 OMPI_MPI_OFFSET_TYPE prev_offset;
449 mca_common_ompio_file_get_position (fh, &prev_offset );
450
451 mca_common_ompio_set_explicit_offset (fh, offset);
452 ret = mca_common_ompio_file_write_all (fh,
453 buf,
454 count,
455 datatype,
456 status);
457
458 mca_common_ompio_set_explicit_offset (fh, prev_offset);
459 return ret;
460 }
461
462 int mca_common_ompio_file_iwrite_all (ompio_file_t *fp,
463 const void *buf,
464 int count,
465 struct ompi_datatype_t *datatype,
466 ompi_request_t **request)
467 {
468 int ret = OMPI_SUCCESS;
469
470 if ( NULL != fp->f_fcoll->fcoll_file_iwrite_all ) {
471 ret = fp->f_fcoll->fcoll_file_iwrite_all (fp,
472 buf,
473 count,
474 datatype,
475 request);
476 }
477 else {
478
479
480
481 ret = mca_common_ompio_file_iwrite ( fp, buf, count, datatype, request );
482 }
483
484 return ret;
485 }
486
487
488 int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp,
489 OMPI_MPI_OFFSET_TYPE offset,
490 const void *buf,
491 int count,
492 struct ompi_datatype_t *datatype,
493 ompi_request_t **request)
494 {
495
496 int ret = OMPI_SUCCESS;
497 OMPI_MPI_OFFSET_TYPE prev_offset;
498
499 mca_common_ompio_file_get_position (fp, &prev_offset );
500 mca_common_ompio_set_explicit_offset (fp, offset);
501
502 ret = mca_common_ompio_file_iwrite_all ( fp, buf, count, datatype, request );
503
504 mca_common_ompio_set_explicit_offset (fp, prev_offset);
505 return ret;
506 }
507
508
509
510
511
512
513 int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
514 size_t bytes_per_cycle, int max_data, uint32_t iov_count,
515 struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw,
516 size_t *spc, mca_common_ompio_io_array_t **io_array,
517 int *num_io_entries)
518 {
519 ptrdiff_t disp;
520 int block = 1;
521 size_t total_bytes_written = *tbw;
522 size_t bytes_to_write_in_cycle = 0;
523 size_t sum_previous_counts = *spc;
524
525
526 size_t sum_previous_length = 0;
527 int k = 0;
528 int i = *ii;
529 int j = *jj;
530 mca_common_ompio_io_array_t *f_io_array=NULL;
531 int f_num_io_entries=0;
532
533 sum_previous_length = fh->f_position_in_file_view;
534
535 if ((index == cycles-1) && (max_data % bytes_per_cycle)) {
536 bytes_to_write_in_cycle = max_data % bytes_per_cycle;
537 }
538 else {
539 bytes_to_write_in_cycle = bytes_per_cycle;
540 }
541
542 f_io_array = (mca_common_ompio_io_array_t *)malloc
543 (OMPIO_IOVEC_INITIAL_SIZE * sizeof (mca_common_ompio_io_array_t));
544 if (NULL == f_io_array) {
545 opal_output(1, "OUT OF MEMORY\n");
546 return OMPI_ERR_OUT_OF_RESOURCE;
547 }
548
549 while (bytes_to_write_in_cycle) {
550
551 if (OMPIO_IOVEC_INITIAL_SIZE*block <= k) {
552 block ++;
553 f_io_array = (mca_common_ompio_io_array_t *)realloc
554 (f_io_array, OMPIO_IOVEC_INITIAL_SIZE *
555 block * sizeof (mca_common_ompio_io_array_t));
556 if (NULL == f_io_array) {
557 opal_output(1, "OUT OF MEMORY\n");
558 return OMPI_ERR_OUT_OF_RESOURCE;
559 }
560 }
561
562 if (decoded_iov[i].iov_len -
563 (total_bytes_written - sum_previous_counts) <= 0) {
564 sum_previous_counts += decoded_iov[i].iov_len;
565 i = i + 1;
566 }
567
568 disp = (ptrdiff_t)decoded_iov[i].iov_base +
569 (total_bytes_written - sum_previous_counts);
570 f_io_array[k].memory_address = (IOVBASE_TYPE *)disp;
571
572 if (decoded_iov[i].iov_len -
573 (total_bytes_written - sum_previous_counts) >=
574 bytes_to_write_in_cycle) {
575 f_io_array[k].length = bytes_to_write_in_cycle;
576 }
577 else {
578 f_io_array[k].length = decoded_iov[i].iov_len -
579 (total_bytes_written - sum_previous_counts);
580 }
581
582 if (! (fh->f_flags & OMPIO_CONTIGUOUS_FVIEW)) {
583 if (fh->f_decoded_iov[j].iov_len -
584 (fh->f_total_bytes - sum_previous_length) <= 0) {
585 sum_previous_length += fh->f_decoded_iov[j].iov_len;
586 j = j + 1;
587 if (j == (int)fh->f_iov_count) {
588 j = 0;
589 sum_previous_length = 0;
590 fh->f_offset += fh->f_view_extent;
591 fh->f_position_in_file_view = sum_previous_length;
592 fh->f_index_in_file_view = j;
593 fh->f_total_bytes = 0;
594 }
595 }
596 }
597
598 disp = (ptrdiff_t)fh->f_decoded_iov[j].iov_base +
599 (fh->f_total_bytes - sum_previous_length);
600 f_io_array[k].offset = (IOVBASE_TYPE *)(intptr_t)(disp + fh->f_offset);
601
602 if (! (fh->f_flags & OMPIO_CONTIGUOUS_FVIEW)) {
603 if (fh->f_decoded_iov[j].iov_len -
604 (fh->f_total_bytes - sum_previous_length)
605 < f_io_array[k].length) {
606 f_io_array[k].length = fh->f_decoded_iov[j].iov_len -
607 (fh->f_total_bytes - sum_previous_length);
608 }
609 }
610
611 total_bytes_written += f_io_array[k].length;
612 fh->f_total_bytes += f_io_array[k].length;
613 bytes_to_write_in_cycle -= f_io_array[k].length;
614 k = k + 1;
615 }
616 fh->f_position_in_file_view = sum_previous_length;
617 fh->f_index_in_file_view = j;
618 f_num_io_entries = k;
619
620 #if 0
621 if (fh->f_rank == 0) {
622 int d;
623 printf("*************************** %d\n", f_num_io_entries);
624
625 for (d=0 ; d<f_num_of_entries ; d++) {
626 printf(" ADDRESS: %p OFFSET: %p LENGTH: %d prev_count=%ld prev_length=%ld\n",
627 f_io_array[d].memory_address,
628 f_io_array[d].offset,
629 f_io_array[d].length,
630 sum_previous_counts, sum_previous_length);
631 }
632 }
633 #endif
634 *ii = i;
635 *jj = j;
636 *tbw = total_bytes_written;
637 *spc = sum_previous_counts;
638 *io_array = f_io_array;
639 *num_io_entries = f_num_io_entries;
640
641 return OMPI_SUCCESS;
642 }
643