This source file includes following definitions.
- mca_sharedfp_individual_iwrite
- mca_sharedfp_individual_write_ordered_begin
- mca_sharedfp_individual_write_ordered_end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_individual.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30
31 int mca_sharedfp_individual_iwrite(ompio_file_t *fh,
32 const void *buf,
33 int count,
34 ompi_datatype_t *datatype,
35 MPI_Request * request)
36 {
37 int ret = OMPI_SUCCESS;
38 size_t numofbytes = 0;
39 OMPI_MPI_OFFSET_TYPE totalbytes = 0;
40 mca_sharedfp_individual_header_record *headnode = NULL;
41 struct mca_sharedfp_base_data_t *sh = NULL;
42
43 if(fh->f_sharedfp_data==NULL){
44 opal_output(ompi_sharedfp_base_framework.framework_output,
45 "mca_sharedfp_individual_iwrite: module not initialized \n");
46 return OMPI_ERROR;
47 }
48 mca_sharedfp_individual_usage_counter++;
49
50
51 opal_datatype_type_size ( &datatype->super, &numofbytes);
52 totalbytes = count * numofbytes;
53
54 sh = fh->f_sharedfp_data;
55
56 headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
57 if ( NULL == headnode) {
58 opal_output (0, "sharedfp_individual_iwrite: headnode is NULL but file is open\n");
59 return OMPI_ERROR;
60 }
61
62
63 ret = mca_sharedfp_individual_insert_metadata(OMPI_FILE_WRITE_SHARED,totalbytes,sh);
64
65
66
67 ret = mca_common_ompio_file_iwrite_at ( headnode->datafilehandle, headnode->datafile_offset,
68 buf, count, datatype, request);
69 if ( OMPI_SUCCESS != ret ) {
70 opal_output(0,"sharedfp_individual_iwrite: Error while iwriting the datafile \n");
71 return ret;
72 }
73
74
75 headnode->datafile_offset = headnode->datafile_offset + totalbytes;
76
77 return ret;
78 }
79
80 int mca_sharedfp_individual_write_ordered_begin(ompio_file_t *fh,
81 const void *buf,
82 int count,
83 struct ompi_datatype_t *datatype)
84 {
85 int ret = OMPI_SUCCESS;
86 int i = 0;
87 size_t numofbytes = 0;
88 size_t totalbytes = 0;
89 OMPI_MPI_OFFSET_TYPE *offbuff=NULL;
90 OMPI_MPI_OFFSET_TYPE global_offset = 0;
91 OMPI_MPI_OFFSET_TYPE prev_offset = 0;
92 OMPI_MPI_OFFSET_TYPE temp = 0, offset = 0;
93 mca_sharedfp_individual_header_record *headnode = NULL;
94 struct mca_sharedfp_base_data_t *sh = NULL;
95
96 if(fh->f_sharedfp_data==NULL){
97 opal_output(ompi_sharedfp_base_framework.framework_output,
98 "sharedfp_individual_write_ordered_begin - module not initialized\n");
99 return OMPI_ERROR;
100 }
101
102 if ( true == fh->f_split_coll_in_use ) {
103 opal_output(0, "Only one split collective I/O operation allowed per file handle "
104 "at any given point in time!\n");
105 return MPI_ERR_REQUEST;
106 }
107 mca_sharedfp_individual_usage_counter++;
108
109
110 sh = fh->f_sharedfp_data;
111
112
113 opal_datatype_type_size ( &datatype->super, &numofbytes);
114 totalbytes = count * numofbytes;
115
116 headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
117 if ( NULL == headnode) {
118 opal_output (0, "sharedfp_individual_write_ordered_begin: headnode is NULL but file is open\n");
119 return OMPI_ERROR;
120 }
121
122
123 ret = mca_sharedfp_individual_collaborate_data ( sh, fh );
124 if ( OMPI_SUCCESS != ret) {
125 return ret;
126 }
127
128 if ( 0 == fh->f_rank ) {
129 offbuff = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * fh->f_size);
130 if (NULL == offbuff ) {
131 return OMPI_ERR_OUT_OF_RESOURCE;
132 }
133 }
134
135
136 ret = fh->f_comm->c_coll->coll_gather ( &totalbytes,
137 1,
138 OMPI_OFFSET_DATATYPE,
139 offbuff,
140 1,
141 OMPI_OFFSET_DATATYPE,
142 0,
143 fh->f_comm,
144 fh->f_comm->c_coll->coll_gather_module );
145
146 if ( OMPI_SUCCESS != ret ) {
147 opal_output(0,"sharedfp_individual_write_ordered_begin: Error in gatherring offsets \n");
148 goto exit;
149 }
150
151 if ( 0 == fh->f_rank ) {
152 prev_offset = offbuff[0];
153 offbuff[0] = sh->global_offset;
154
155 for (i = 1; i < fh->f_size ; i++){
156 temp = offbuff[i];
157 offbuff[i] = offbuff[i - 1] + prev_offset;
158 prev_offset = temp;
159 }
160
161 for (i = 0; i < fh->f_size; i++){
162 global_offset = offbuff[fh->f_size - 1] + prev_offset;
163 }
164 }
165
166
167
168 ret = fh->f_comm->c_coll->coll_scatter ( offbuff,
169 1,
170 OMPI_OFFSET_DATATYPE,
171 &offset,
172 1,
173 OMPI_OFFSET_DATATYPE,
174 0,
175 fh->f_comm,
176 fh->f_comm->c_coll->coll_scatter_module );
177 if ( OMPI_SUCCESS != ret ) {
178 opal_output(0,"sharedfp_individual_write_ordered_begin: Error in scattering offsets \n");
179 goto exit;
180 }
181
182 ret = fh->f_comm->c_coll->coll_bcast ( &global_offset,
183 1,
184 OMPI_OFFSET_DATATYPE,
185 0,
186 fh->f_comm,
187 fh->f_comm->c_coll->coll_bcast_module );
188 if ( OMPI_SUCCESS != ret ) {
189 opal_output(0,"sharedfp_individual_write_ordered_begin: Error while bcasting global offset \n");
190 goto exit;
191 }
192
193 sh->global_offset = global_offset;
194
195
196 ret = mca_common_ompio_file_iwrite_at_all(fh, offset, buf, count, datatype,
197 &fh->f_split_coll_req);
198 fh->f_split_coll_in_use = true;
199 if ( OMPI_SUCCESS != ret ) {
200 opal_output(0,"sharedfp_individual_write_ordered_begin: Error while writing the datafile \n");
201 }
202
203 exit:
204 if ( NULL != offbuff ) {
205 free ( offbuff);
206 }
207
208 return ret;
209 }
210
211 int mca_sharedfp_individual_write_ordered_end(ompio_file_t *fh,
212 const void *buf,
213 ompi_status_public_t *status)
214 {
215 int ret = OMPI_SUCCESS;
216 ret = ompi_request_wait ( &fh->f_split_coll_req, status );
217
218
219 fh->f_split_coll_in_use = false;
220 return ret;
221 }