This source file includes following definitions.
- mca_sharedfp_lockedfile_iwrite
- mca_sharedfp_lockedfile_write_ordered_begin
- mca_sharedfp_lockedfile_write_ordered_end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_lockedfile.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30 #include "ompi/mca/common/ompio/common_ompio.h"
31
32 int mca_sharedfp_lockedfile_iwrite(ompio_file_t *fh,
33 const void *buf,
34 int count,
35 ompi_datatype_t *datatype,
36 MPI_Request * request)
37 {
38 int ret = OMPI_SUCCESS;
39 OMPI_MPI_OFFSET_TYPE offset = 0;
40 long bytesRequested = 0;
41 size_t numofBytes;
42 struct mca_sharedfp_base_data_t *sh = NULL;
43
44 if(fh->f_sharedfp_data==NULL){
45 opal_output(ompi_sharedfp_base_framework.framework_output,
46 "sharedfp_lockedfile_iwrite: module not initialized \n");
47 return OMPI_ERROR;
48 }
49
50
51 opal_datatype_type_size ( &datatype->super, &numofBytes);
52 bytesRequested = count * numofBytes;
53 if ( mca_sharedfp_lockedfile_verbose ) {
54 opal_output(ompi_sharedfp_base_framework.framework_output,
55 "sharedfp_lockedfile_iwrite: Bytes Requested is %ld\n",bytesRequested);
56 }
57
58
59 sh = fh->f_sharedfp_data;
60
61
62 ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
63 offset /= fh->f_etype_size;
64
65 if ( -1 != ret) {
66 if ( mca_sharedfp_lockedfile_verbose ) {
67 opal_output(ompi_sharedfp_base_framework.framework_output,
68 "sharedfp_lockedfile_iwrite: Offset received is %lld\n",offset);
69 }
70
71
72 ret = mca_common_ompio_file_iwrite_at(fh,offset,buf,count,datatype,request);
73 }
74
75 return ret;
76 }
77
78 int mca_sharedfp_lockedfile_write_ordered_begin(ompio_file_t *fh,
79 const void *buf,
80 int count,
81 struct ompi_datatype_t *datatype)
82 {
83 int ret = OMPI_SUCCESS;
84 OMPI_MPI_OFFSET_TYPE offset = 0;
85 long sendBuff = 0;
86 long *buff=NULL;
87 long offsetBuff;
88 OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
89 long bytesRequested = 0;
90 int recvcnt = 1, sendcnt = 1;
91 size_t numofBytes;
92 int rank, size, i;
93 struct mca_sharedfp_base_data_t *sh = NULL;
94
95 if(fh->f_sharedfp_data==NULL){
96 opal_output(ompi_sharedfp_base_framework.framework_output,
97 "sharedfp_lockedfile_write_ordered_begin: module not initialized \n");
98 return OMPI_ERROR;
99 }
100
101
102 if ( true == fh->f_split_coll_in_use ) {
103 opal_output(0, "Only one split collective I/O operation allowed per file handle at "
104 "any given point in time!\n");
105 return MPI_ERR_REQUEST;
106 }
107
108
109 sh = fh->f_sharedfp_data;
110
111
112 opal_datatype_type_size ( &datatype->super, &numofBytes);
113 sendBuff = count * numofBytes;
114
115
116 rank = ompi_comm_rank ( fh->f_comm );
117 size = ompi_comm_size ( fh->f_comm );
118
119 if ( 0 == rank ) {
120 buff = (long*) malloc (sizeof(long) * size);
121 if ( NULL == buff ) {
122 return OMPI_ERR_OUT_OF_RESOURCE;
123 }
124 }
125
126 ret = fh->f_comm->c_coll->coll_gather ( &sendBuff,
127 sendcnt,
128 OMPI_OFFSET_DATATYPE,
129 buff,
130 recvcnt,
131 OMPI_OFFSET_DATATYPE,
132 0,
133 fh->f_comm,
134 fh->f_comm->c_coll->coll_gather_module );
135 if ( OMPI_SUCCESS != ret ) {
136 goto exit;
137 }
138
139
140
141
142 if (rank == 0) {
143 for ( i = 0; i < size ; i ++) {
144 bytesRequested += buff[i];
145 if ( mca_sharedfp_lockedfile_verbose ) {
146 opal_output(ompi_sharedfp_base_framework.framework_output,
147 "sharedfp_lockedfile_write_ordered_begin: Bytes requested are %ld\n",bytesRequested);
148 }
149 }
150
151
152
153
154
155
156
157 ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
158 if ( OMPI_SUCCESS != ret ){
159 goto exit;
160 }
161 if ( mca_sharedfp_lockedfile_verbose ) {
162 opal_output(ompi_sharedfp_base_framework.framework_output,
163 "sharedfp_lockedfile_write_ordered_begin: Offset received is %lld\n",offsetReceived);
164 }
165 buff[0] += offsetReceived;
166 for (i = 1 ; i < size; i++) {
167 buff[i] += buff[i-1];
168 }
169 }
170
171
172 ret = fh->f_comm->c_coll->coll_scatter ( buff,
173 sendcnt,
174 OMPI_OFFSET_DATATYPE,
175 &offsetBuff,
176 recvcnt,
177 OMPI_OFFSET_DATATYPE,
178 0,
179 fh->f_comm,
180 fh->f_comm->c_coll->coll_scatter_module );
181 if ( OMPI_SUCCESS != ret ) {
182 goto exit;
183 }
184
185
186 offset = offsetBuff - sendBuff;
187 offset /= fh->f_etype_size;
188
189 if ( mca_sharedfp_lockedfile_verbose ) {
190 opal_output(ompi_sharedfp_base_framework.framework_output,
191 "sharedfp_lockedfile_write_ordered_begin: Offset returned is %lld\n",offset);
192 }
193
194 ret = mca_common_ompio_file_iwrite_at_all ( fh, offset, buf, count, datatype, &fh->f_split_coll_req );
195 fh->f_split_coll_in_use = true;
196
197 exit:
198 if ( NULL != buff ) {
199 free ( buff);
200 }
201
202 return ret;
203 }
204
205
206
207 int mca_sharedfp_lockedfile_write_ordered_end(ompio_file_t *fh,
208 const void *buf,
209 ompi_status_public_t *status)
210 {
211 int ret = OMPI_SUCCESS;
212 ret = ompi_request_wait ( &fh->f_split_coll_req, status );
213
214
215 fh->f_split_coll_in_use = false;
216 return ret;
217 }