This source file includes following definitions.
- mca_common_ompio_file_read
- mca_common_ompio_file_read_at
- mca_common_ompio_file_iread
- mca_common_ompio_file_iread_at
- mca_common_ompio_file_read_all
- mca_common_ompio_file_read_at_all
- mca_common_ompio_file_iread_all
- mca_common_ompio_file_iread_at_all
- mca_common_ompio_set_explicit_offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23
24 #include "ompi/communicator/communicator.h"
25 #include "ompi/info/info.h"
26 #include "ompi/file/file.h"
27 #include "ompi/mca/fs/fs.h"
28 #include "ompi/mca/fs/base/base.h"
29 #include "ompi/mca/fcoll/fcoll.h"
30 #include "ompi/mca/fcoll/base/base.h"
31 #include "ompi/mca/fbtl/fbtl.h"
32 #include "ompi/mca/fbtl/base/base.h"
33
34 #include "common_ompio.h"
35 #include "common_ompio_request.h"
36 #include "common_ompio_buffer.h"
37 #include <unistd.h>
38 #include <math.h>
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 int mca_common_ompio_file_read (ompio_file_t *fh,
57 void *buf,
58 int count,
59 struct ompi_datatype_t *datatype,
60 ompi_status_public_t *status)
61 {
62 int ret = OMPI_SUCCESS;
63
64 size_t total_bytes_read = 0;
65 size_t bytes_per_cycle = 0;
66 int index = 0;
67 int cycles = 0;
68
69 uint32_t iov_count = 0;
70 struct iovec *decoded_iov = NULL;
71
72 size_t max_data=0, real_bytes_read=0;
73 size_t spc=0;
74 ssize_t ret_code=0;
75 int i = 0;
76 int j = 0;
77
78 if (fh->f_amode & MPI_MODE_WRONLY){
79
80 ret = MPI_ERR_ACCESS;
81 return ret;
82 }
83
84 if ( 0 == count ) {
85 if ( MPI_STATUS_IGNORE != status ) {
86 status->_ucount = 0;
87 }
88 return ret;
89 }
90
91 bool need_to_copy = false;
92 opal_convertor_t convertor;
93 #if OPAL_CUDA_SUPPORT
94 int is_gpu, is_managed;
95 mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
96 if ( is_gpu && !is_managed ) {
97 need_to_copy = true;
98 }
99 #endif
100
101 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
102 !(datatype == &ompi_mpi_byte.dt ||
103 datatype == &ompi_mpi_char.dt )) {
104
105
106
107
108
109
110 need_to_copy = true;
111 }
112
113 if ( need_to_copy ) {
114 char *tbuf=NULL;
115
116 OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
117 }
118 else {
119 mca_common_ompio_decode_datatype (fh,
120 datatype,
121 count,
122 buf,
123 &max_data,
124 fh->f_mem_convertor,
125 &decoded_iov,
126 &iov_count);
127 }
128
129 if ( 0 < max_data && 0 == fh->f_iov_count ) {
130 if ( MPI_STATUS_IGNORE != status ) {
131 status->_ucount = 0;
132 }
133 if (NULL != decoded_iov) {
134 free (decoded_iov);
135 decoded_iov = NULL;
136 }
137 return OMPI_SUCCESS;
138 }
139
140 if ( -1 == OMPIO_MCA_GET(fh, cycle_buffer_size )) {
141 bytes_per_cycle = max_data;
142 }
143 else {
144 bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size);
145 }
146 cycles = ceil((double)max_data/bytes_per_cycle);
147
148 #if 0
149 printf ("Bytes per Cycle: %d Cycles: %d max_data:%d \n",bytes_per_cycle, cycles, max_data);
150 #endif
151
152 j = fh->f_index_in_file_view;
153
154 for (index = 0; index < cycles; index++) {
155
156 mca_common_ompio_build_io_array ( fh,
157 index,
158 cycles,
159 bytes_per_cycle,
160 max_data,
161 iov_count,
162 decoded_iov,
163 &i,
164 &j,
165 &total_bytes_read,
166 &spc,
167 &fh->f_io_array,
168 &fh->f_num_of_io_entries);
169
170 if (fh->f_num_of_io_entries) {
171 ret_code = fh->f_fbtl->fbtl_preadv (fh);
172 if ( 0<= ret_code ) {
173 real_bytes_read+=(size_t)ret_code;
174 }
175 }
176
177 fh->f_num_of_io_entries = 0;
178 if (NULL != fh->f_io_array) {
179 free (fh->f_io_array);
180 fh->f_io_array = NULL;
181 }
182 }
183
184 if ( need_to_copy ) {
185 size_t pos=0;
186
187 opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
188 opal_convertor_cleanup (&convertor);
189 mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
190 }
191
192 if (NULL != decoded_iov) {
193 free (decoded_iov);
194 decoded_iov = NULL;
195 }
196
197 if ( MPI_STATUS_IGNORE != status ) {
198 status->_ucount = real_bytes_read;
199 }
200
201 return ret;
202 }
203
204 int mca_common_ompio_file_read_at (ompio_file_t *fh,
205 OMPI_MPI_OFFSET_TYPE offset,
206 void *buf,
207 int count,
208 struct ompi_datatype_t *datatype,
209 ompi_status_public_t * status)
210 {
211 int ret = OMPI_SUCCESS;
212 OMPI_MPI_OFFSET_TYPE prev_offset;
213
214 mca_common_ompio_file_get_position (fh, &prev_offset );
215
216 mca_common_ompio_set_explicit_offset (fh, offset);
217 ret = mca_common_ompio_file_read (fh,
218 buf,
219 count,
220 datatype,
221 status);
222
223
224
225
226 mca_common_ompio_set_explicit_offset (fh, prev_offset);
227
228 return ret;
229 }
230
231
232 int mca_common_ompio_file_iread (ompio_file_t *fh,
233 void *buf,
234 int count,
235 struct ompi_datatype_t *datatype,
236 ompi_request_t **request)
237 {
238 int ret = OMPI_SUCCESS;
239 mca_ompio_request_t *ompio_req=NULL;
240 size_t spc=0;
241
242 if (fh->f_amode & MPI_MODE_WRONLY){
243
244 ret = MPI_ERR_ACCESS;
245 return ret;
246 }
247
248 mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ);
249
250 if ( 0 == count ) {
251 ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
252 ompio_req->req_ompi.req_status._ucount = 0;
253 ompi_request_complete (&ompio_req->req_ompi, false);
254 *request = (ompi_request_t *) ompio_req;
255
256 return OMPI_SUCCESS;
257 }
258
259 if ( NULL != fh->f_fbtl->fbtl_ipreadv ) {
260
261
262 size_t total_bytes_read = 0;
263 uint32_t iov_count = 0;
264 struct iovec *decoded_iov = NULL;
265
266 size_t max_data = 0;
267 int i = 0;
268 int j = 0;
269
270 bool need_to_copy = false;
271
272 #if OPAL_CUDA_SUPPORT
273 int is_gpu, is_managed;
274 mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
275 if ( is_gpu && !is_managed ) {
276 need_to_copy = true;
277 }
278 #endif
279
280 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
281 !(datatype == &ompi_mpi_byte.dt ||
282 datatype == &ompi_mpi_char.dt )) {
283
284
285
286
287
288
289 need_to_copy = true;
290 }
291
292 if ( need_to_copy ) {
293 char *tbuf=NULL;
294
295 OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&ompio_req->req_convertor,max_data,decoded_iov,iov_count);
296
297 ompio_req->req_tbuf = tbuf;
298 ompio_req->req_size = max_data;
299 }
300 else {
301 mca_common_ompio_decode_datatype (fh,
302 datatype,
303 count,
304 buf,
305 &max_data,
306 fh->f_mem_convertor,
307 &decoded_iov,
308 &iov_count);
309 }
310
311 if ( 0 < max_data && 0 == fh->f_iov_count ) {
312 ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
313 ompio_req->req_ompi.req_status._ucount = 0;
314 ompi_request_complete (&ompio_req->req_ompi, false);
315 *request = (ompi_request_t *) ompio_req;
316 if (NULL != decoded_iov) {
317 free (decoded_iov);
318 decoded_iov = NULL;
319 }
320
321 return OMPI_SUCCESS;
322 }
323
324
325 j = fh->f_index_in_file_view;
326
327 mca_common_ompio_build_io_array ( fh,
328 0,
329 1,
330 max_data,
331 max_data,
332 iov_count,
333 decoded_iov,
334 &i,
335 &j,
336 &total_bytes_read,
337 &spc,
338 &fh->f_io_array,
339 &fh->f_num_of_io_entries);
340
341 if (fh->f_num_of_io_entries) {
342 fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
343 }
344
345 mca_common_ompio_register_progress ();
346
347 fh->f_num_of_io_entries = 0;
348 if (NULL != fh->f_io_array) {
349 free (fh->f_io_array);
350 fh->f_io_array = NULL;
351 }
352
353 if (NULL != decoded_iov) {
354 free (decoded_iov);
355 decoded_iov = NULL;
356 }
357 }
358 else {
359
360 ompi_status_public_t status;
361 ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status);
362
363 ompio_req->req_ompi.req_status.MPI_ERROR = ret;
364 ompio_req->req_ompi.req_status._ucount = status._ucount;
365 ompi_request_complete (&ompio_req->req_ompi, false);
366 }
367
368 *request = (ompi_request_t *) ompio_req;
369 return ret;
370 }
371
372
373 int mca_common_ompio_file_iread_at (ompio_file_t *fh,
374 OMPI_MPI_OFFSET_TYPE offset,
375 void *buf,
376 int count,
377 struct ompi_datatype_t *datatype,
378 ompi_request_t **request)
379 {
380 int ret = OMPI_SUCCESS;
381 OMPI_MPI_OFFSET_TYPE prev_offset;
382 mca_common_ompio_file_get_position (fh, &prev_offset );
383
384 mca_common_ompio_set_explicit_offset (fh, offset);
385 ret = mca_common_ompio_file_iread (fh,
386 buf,
387 count,
388 datatype,
389 request);
390
391
392
393
394
395
396
397
398
399 mca_common_ompio_set_explicit_offset (fh, prev_offset);
400
401 return ret;
402 }
403
404
405
406 int mca_common_ompio_file_read_all (ompio_file_t *fh,
407 void *buf,
408 int count,
409 struct ompi_datatype_t *datatype,
410 ompi_status_public_t * status)
411 {
412 int ret = OMPI_SUCCESS;
413
414
415 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
416 !(datatype == &ompi_mpi_byte.dt ||
417 datatype == &ompi_mpi_char.dt )) {
418
419
420
421
422
423
424
425
426
427
428
429
430
431 size_t pos=0, max_data=0;
432 char *tbuf=NULL;
433 opal_convertor_t convertor;
434 struct iovec *decoded_iov = NULL;
435 uint32_t iov_count = 0;
436
437 OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
438 ret = fh->f_fcoll->fcoll_file_read_all (fh,
439 decoded_iov->iov_base,
440 decoded_iov->iov_len,
441 MPI_BYTE,
442 status);
443 opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
444
445 opal_convertor_cleanup (&convertor);
446 mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
447 if (NULL != decoded_iov) {
448 free (decoded_iov);
449 decoded_iov = NULL;
450 }
451 }
452 else {
453 ret = fh->f_fcoll->fcoll_file_read_all (fh,
454 buf,
455 count,
456 datatype,
457 status);
458 }
459 return ret;
460 }
461
462 int mca_common_ompio_file_read_at_all (ompio_file_t *fh,
463 OMPI_MPI_OFFSET_TYPE offset,
464 void *buf,
465 int count,
466 struct ompi_datatype_t *datatype,
467 ompi_status_public_t * status)
468 {
469 int ret = OMPI_SUCCESS;
470 OMPI_MPI_OFFSET_TYPE prev_offset;
471 mca_common_ompio_file_get_position (fh, &prev_offset );
472
473 mca_common_ompio_set_explicit_offset (fh, offset);
474 ret = mca_common_ompio_file_read_all (fh,
475 buf,
476 count,
477 datatype,
478 status);
479
480 mca_common_ompio_set_explicit_offset (fh, prev_offset);
481 return ret;
482 }
483
484 int mca_common_ompio_file_iread_all (ompio_file_t *fp,
485 void *buf,
486 int count,
487 struct ompi_datatype_t *datatype,
488 ompi_request_t **request)
489 {
490 int ret = OMPI_SUCCESS;
491
492 if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
493 ret = fp->f_fcoll->fcoll_file_iread_all (fp,
494 buf,
495 count,
496 datatype,
497 request);
498 }
499 else {
500
501
502
503 ret = mca_common_ompio_file_iread ( fp, buf, count, datatype, request );
504 }
505
506 return ret;
507 }
508
509 int mca_common_ompio_file_iread_at_all (ompio_file_t *fp,
510 OMPI_MPI_OFFSET_TYPE offset,
511 void *buf,
512 int count,
513 struct ompi_datatype_t *datatype,
514 ompi_request_t **request)
515 {
516 int ret = OMPI_SUCCESS;
517 OMPI_MPI_OFFSET_TYPE prev_offset;
518
519 mca_common_ompio_file_get_position (fp, &prev_offset );
520 mca_common_ompio_set_explicit_offset (fp, offset);
521
522 ret = mca_common_ompio_file_iread_all (fp,
523 buf,
524 count,
525 datatype,
526 request);
527
528 mca_common_ompio_set_explicit_offset (fp, prev_offset);
529 return ret;
530 }
531
532
533 int mca_common_ompio_set_explicit_offset (ompio_file_t *fh,
534 OMPI_MPI_OFFSET_TYPE offset)
535 {
536 int i = 0;
537 int k = 0;
538
539 if ( fh->f_view_size > 0 ) {
540
541 fh->f_offset = (fh->f_view_extent *
542 ((offset*fh->f_etype_size) / fh->f_view_size)) + fh->f_disp;
543
544
545
546 fh->f_total_bytes = (offset*fh->f_etype_size) % fh->f_view_size;
547 i = fh->f_total_bytes;
548
549
550
551
552 fh->f_index_in_file_view = 0;
553 fh->f_position_in_file_view = 0;
554
555
556
557 k = fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
558 while (i >= k) {
559 fh->f_position_in_file_view = k;
560 fh->f_index_in_file_view++;
561 k += fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
562 }
563 }
564
565 return OMPI_SUCCESS;
566 }