This source file includes following definitions.
- mca_sharedfp_lockedfile_iread
- mca_sharedfp_lockedfile_read_ordered_begin
- mca_sharedfp_lockedfile_read_ordered_end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_lockedfile.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30 #include "ompi/mca/common/ompio/common_ompio.h"
31
32 int mca_sharedfp_lockedfile_iread(ompio_file_t *fh,
33 void *buf,
34 int count,
35 ompi_datatype_t *datatype,
36 MPI_Request * request)
37 {
38 int ret = OMPI_SUCCESS;
39 OMPI_MPI_OFFSET_TYPE offset = 0;
40 long bytesRequested = 0;
41 size_t numofBytes;
42 struct mca_sharedfp_base_data_t *sh = NULL;
43
44 if ( NULL == fh->f_sharedfp_data ) {
45 opal_output(ompi_sharedfp_base_framework.framework_output,
46 "sharedfp_lockedfile_iread: module not initialized\n");
47 return OMPI_ERROR;
48 }
49
50
51 opal_datatype_type_size ( &datatype->super, &numofBytes);
52 bytesRequested = count * numofBytes;
53
54 if ( mca_sharedfp_lockedfile_verbose ) {
55 opal_output(ompi_sharedfp_base_framework.framework_output,
56 "sharedfp_lockedfile_iread - Bytes Requested is %ld\n",bytesRequested);
57 }
58
59
60
61 sh = fh->f_sharedfp_data;
62
63
64 ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
65 offset /= fh->f_etype_size;
66
67 if ( -1 != ret ) {
68 if ( mca_sharedfp_lockedfile_verbose ) {
69 opal_output(ompi_sharedfp_base_framework.framework_output,
70 "sharedfp_lockedfile_iread - Offset received is %lld\n",offset);
71 }
72
73
74 ret = mca_common_ompio_file_iread_at(fh,offset,buf,count,datatype,request);
75 }
76
77 return ret;
78 }
79
80 int mca_sharedfp_lockedfile_read_ordered_begin(ompio_file_t *fh,
81 void *buf,
82 int count,
83 struct ompi_datatype_t *datatype)
84 {
85 int ret = OMPI_SUCCESS;
86 OMPI_MPI_OFFSET_TYPE offset = 0;
87 long sendBuff = 0;
88 long *buff=NULL;
89 long offsetBuff;
90 OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
91 long bytesRequested = 0;
92 int recvcnt = 1, sendcnt = 1;
93 size_t numofBytes;
94 int rank, size, i;
95 struct mca_sharedfp_base_data_t *sh = NULL;
96
97 if(fh->f_sharedfp_data==NULL){
98 opal_output(ompi_sharedfp_base_framework.framework_output,
99 "sharedfp_lockedfile_read_ordered_begin: module not initialized\n");
100 return OMPI_ERROR;
101 }
102
103
104 if ( true == fh->f_split_coll_in_use ) {
105 opal_output(ompi_sharedfp_base_framework.framework_output,
106 "Only one split collective I/O operation allowed per file handle at any "
107 "given point in time!\n");
108 return MPI_ERR_REQUEST;
109 }
110
111
112 sh = fh->f_sharedfp_data;
113
114
115 opal_datatype_type_size ( &datatype->super, &numofBytes);
116 sendBuff = count * numofBytes;
117
118
119 rank = ompi_comm_rank ( fh->f_comm );
120 size = ompi_comm_size ( fh->f_comm );
121
122 if ( 0 == rank ) {
123 buff = (long*) malloc (sizeof(long) * size);
124 if ( NULL == buff ) {
125 return OMPI_ERR_OUT_OF_RESOURCE;
126 }
127 }
128
129 ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE, buff, recvcnt,
130 OMPI_OFFSET_DATATYPE, 0, fh->f_comm,
131 fh->f_comm->c_coll->coll_gather_module );
132 if ( OMPI_SUCCESS != ret ) {
133 goto exit;
134 }
135
136
137
138
139 if (rank == 0) {
140 for ( i = 0; i < size ; i ++) {
141 bytesRequested += buff[i];
142 if ( mca_sharedfp_lockedfile_verbose ) {
143 opal_output(ompi_sharedfp_base_framework.framework_output,
144 "sharedfp_lockedfile_read_ordered_begin: Bytes requested are %ld\n",bytesRequested);
145 }
146 }
147
148
149
150
151
152
153
154 ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
155 if ( OMPI_SUCCESS != ret ){
156 goto exit;
157 }
158 if ( mca_sharedfp_lockedfile_verbose ) {
159 opal_output(ompi_sharedfp_base_framework.framework_output,
160 "sharedfp_lockedfile_read_ordered_begin: Offset received is %lld\n",offsetReceived);
161 }
162 buff[0] += offsetReceived;
163 for (i = 1 ; i < size; i++) {
164 buff[i] += buff[i-1];
165 }
166 }
167
168
169 ret = fh->f_comm->c_coll->coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
170 &offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
171 fh->f_comm, fh->f_comm->c_coll->coll_scatter_module );
172 if ( OMPI_SUCCESS != ret ) {
173 goto exit;
174 }
175
176
177 offset = offsetBuff - sendBuff;
178 offset /= fh->f_etype_size;
179
180 if ( mca_sharedfp_lockedfile_verbose ) {
181 opal_output(ompi_sharedfp_base_framework.framework_output,
182 "sharedfp_lockedfile_read_ordered_begin: Offset returned is %lld\n",offset);
183 }
184
185 ret = mca_common_ompio_file_iread_at_all ( fh, offset, buf, count, datatype, &fh->f_split_coll_req );
186 fh->f_split_coll_in_use = true;
187
188 exit:
189 if ( NULL != buff ) {
190 free ( buff);
191 }
192
193 return ret;
194 }
195
196
197 int mca_sharedfp_lockedfile_read_ordered_end(ompio_file_t *fh,
198 void *buf,
199 ompi_status_public_t *status)
200 {
201 int ret = OMPI_SUCCESS;
202 ret = ompi_request_wait ( &fh->f_split_coll_req, status );
203
204
205 fh->f_split_coll_in_use = false;
206 return ret;
207 }