This source file includes following definitions.
- mca_fbtl_posix_ipwritev
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23 #include "fbtl_posix.h"
24
25 #include <unistd.h>
26 #include <sys/uio.h>
27 #if HAVE_AIO_H
28 #include <aio.h>
29 #endif
30
31 #include "mpi.h"
32 #include "ompi/constants.h"
33 #include "ompi/mca/fbtl/fbtl.h"
34
35 ssize_t mca_fbtl_posix_ipwritev (ompio_file_t *fh,
36 ompi_request_t *request)
37 {
38 #if defined(FBTL_POSIX_HAVE_AIO)
39 mca_fbtl_posix_request_data_t *data;
40 mca_ompio_request_t *req = (mca_ompio_request_t *) request;
41 int i=0, ret;
42 off_t start_offset, end_offset, total_length;
43
44 data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t));
45 if ( NULL == data ) {
46 opal_output (1,"could not allocate memory\n");
47 return 0;
48 }
49
50 data->aio_req_count = fh->f_num_of_io_entries;
51 data->aio_open_reqs = fh->f_num_of_io_entries;
52 data->aio_req_type = FBTL_POSIX_WRITE;
53 data->aio_req_chunks = fbtl_posix_max_aio_active_reqs;
54 data->aio_total_len = 0;
55 data->aio_reqs = (struct aiocb *) malloc (sizeof(struct aiocb) *
56 fh->f_num_of_io_entries);
57 if (NULL == data->aio_reqs) {
58 opal_output(1, "OUT OF MEMORY\n");
59 free(data);
60 return 0;
61 }
62
63 data->aio_req_status = (int *) malloc (sizeof(int) * fh->f_num_of_io_entries);
64 if (NULL == data->aio_req_status) {
65 opal_output(1, "OUT OF MEMORY\n");
66 free(data->aio_reqs);
67 free(data);
68 return 0;
69 }
70 data->aio_fh = fh;
71
72 for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
73 data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
74 fh->f_io_array[i].offset;
75 data->aio_reqs[i].aio_buf = fh->f_io_array[i].memory_address;
76 data->aio_reqs[i].aio_nbytes = fh->f_io_array[i].length;
77 data->aio_reqs[i].aio_fildes = fh->fd;
78 data->aio_reqs[i].aio_reqprio = 0;
79 data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
80 data->aio_req_status[i] = EINPROGRESS;
81 }
82
83 data->aio_first_active_req = 0;
84 if ( data->aio_req_count > data->aio_req_chunks ) {
85 data->aio_last_active_req = data->aio_req_chunks;
86 }
87 else {
88 data->aio_last_active_req = data->aio_req_count;
89 }
90
91 start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
92 end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
93 total_length = (end_offset - start_offset);
94 ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
95 if ( 0 < ret ) {
96 opal_output(1, "mca_fbtl_posix_ipwritev: error in mca_fbtl_posix_lock() error ret=%d %s", ret, strerror(errno));
97 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
98 free(data->aio_reqs);
99 free(data->aio_req_status);
100 free(data);
101 return OMPI_ERROR;
102 }
103
104 for (i=0; i < data->aio_last_active_req; i++) {
105 if (-1 == aio_write(&data->aio_reqs[i])) {
106 opal_output(1, "mca_fbtl_posix_ipwritev: error in aio_write(): %s", strerror(errno));
107 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
108 free(data->aio_req_status);
109 free(data->aio_reqs);
110 free(data);
111 return OMPI_ERROR;
112 }
113 }
114
115 req->req_data = data;
116 req->req_progress_fn = mca_fbtl_posix_progress;
117 req->req_free_fn = mca_fbtl_posix_request_free;
118 #endif
119 return OMPI_SUCCESS;
120 }