This source file includes following definitions.
- mca_sharedfp_individual_write
- mca_sharedfp_individual_write_ordered
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_individual.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30
31 int mca_sharedfp_individual_write (ompio_file_t *fh,
32 const void *buf,
33 int count,
34 struct ompi_datatype_t *datatype,
35 ompi_status_public_t *status)
36 {
37 int ret = OMPI_SUCCESS;
38 size_t numofbytes = 0;
39 size_t totalbytes = 0;
40 mca_sharedfp_individual_header_record *headnode = NULL;
41 struct mca_sharedfp_base_data_t *sh = NULL;
42
43 if ( NULL == fh->f_sharedfp_data ) {
44 opal_output(ompi_sharedfp_base_framework.framework_output,
45 "sharedfp_individual_write: module not initialized \n");
46 return OMPI_ERROR;
47 }
48 mca_sharedfp_individual_usage_counter++;
49
50
51 opal_datatype_type_size ( &datatype->super, &numofbytes);
52 totalbytes = count * numofbytes;
53
54
55 sh = fh->f_sharedfp_data;
56 headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
57
58 if (headnode) {
59
60 mca_sharedfp_individual_insert_metadata(OMPI_FILE_WRITE_SHARED, totalbytes, sh);
61
62
63 ret = mca_common_ompio_file_write_at ( headnode->datafilehandle,
64 headnode->datafile_offset,
65 buf, count, datatype, status);
66 if ( OMPI_SUCCESS != ret ) {
67 opal_output(0,"mca_sharedfp_individual_write: Error while writing the datafile \n");
68 return -1;
69 }
70
71
72 headnode->datafile_offset = headnode->datafile_offset + totalbytes;
73 }
74
75 return ret;
76 }
77
78 int mca_sharedfp_individual_write_ordered (ompio_file_t *fh,
79 const void *buf,
80 int count,
81 struct ompi_datatype_t *datatype,
82 ompi_status_public_t *status)
83 {
84 int ret = OMPI_SUCCESS;
85 int i = 0;
86 size_t numofbytes = 0;
87 size_t totalbytes = 0;
88 OMPI_MPI_OFFSET_TYPE *offbuff=NULL;
89 OMPI_MPI_OFFSET_TYPE global_offset = 0;
90 OMPI_MPI_OFFSET_TYPE prev_offset = 0;
91 OMPI_MPI_OFFSET_TYPE temp = 0, offset = 0;
92 mca_sharedfp_individual_header_record *headnode = NULL;
93 struct mca_sharedfp_base_data_t *sh = NULL;
94
95
96 if(fh->f_sharedfp_data==NULL){
97 opal_output(ompi_sharedfp_base_framework.framework_output,
98 "sharedfp_individual_write_ordered: module not initialized \n");
99 return OMPI_ERROR;
100 }
101
102 mca_sharedfp_individual_usage_counter++;
103
104
105 sh = fh->f_sharedfp_data;
106
107
108 opal_datatype_type_size ( &datatype->super, &numofbytes);
109 totalbytes = count * numofbytes;
110
111 headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
112 if ( NULL == headnode) {
113 opal_output (0, "sharedfp_individual_write_ordered: headnode is NULL but file is open\n");
114 return OMPI_ERROR;
115 }
116
117
118 ret = mca_sharedfp_individual_collaborate_data ( sh, fh );
119 if ( OMPI_SUCCESS != ret) {
120 return ret;
121 }
122
123 if ( 0 == fh->f_rank ) {
124 offbuff = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * fh->f_size);
125 if (NULL == offbuff ) {
126 return OMPI_ERR_OUT_OF_RESOURCE;
127 }
128 }
129
130
131 ret = fh->f_comm->c_coll->coll_gather ( &totalbytes,
132 1,
133 OMPI_OFFSET_DATATYPE,
134 offbuff,
135 1,
136 OMPI_OFFSET_DATATYPE,
137 0,
138 fh->f_comm,
139 fh->f_comm->c_coll->coll_gather_module );
140
141 if ( OMPI_SUCCESS != ret ) {
142 opal_output(0,"sharedfp_individual_write_ordered: Error in gathering offsets \n");
143 goto exit;
144 }
145
146 if ( 0 == fh->f_rank ) {
147 prev_offset = offbuff[0];
148 offbuff[0] = sh->global_offset;
149
150 for (i = 1; i < fh->f_size ; i++){
151 temp = offbuff[i];
152 offbuff[i] = offbuff[i - 1] + prev_offset;
153 prev_offset = temp;
154 }
155
156 for (i = 0; i < fh->f_size; i++){
157 global_offset = offbuff[fh->f_size - 1] + prev_offset;
158 }
159 }
160
161
162
163 ret = fh->f_comm->c_coll->coll_scatter ( offbuff,
164 1,
165 OMPI_OFFSET_DATATYPE,
166 &offset,
167 1,
168 OMPI_OFFSET_DATATYPE,
169 0,
170 fh->f_comm,
171 fh->f_comm->c_coll->coll_scatter_module );
172 if ( OMPI_SUCCESS != ret ) {
173 opal_output(0,"sharedfp_individual_write_ordered: Error in scattering offsets \n");
174 goto exit;
175 }
176
177 ret = fh->f_comm->c_coll->coll_bcast ( &global_offset,
178 1,
179 OMPI_OFFSET_DATATYPE,
180 0,
181 fh->f_comm,
182 fh->f_comm->c_coll->coll_bcast_module );
183 if ( OMPI_SUCCESS != ret ) {
184 opal_output(0,"sharedfp_individual_write_ordered: Error while bcasting global offset \n");
185 goto exit;
186 }
187
188 sh->global_offset = global_offset;
189
190
191 ret = mca_common_ompio_file_write_at_all(fh, offset, buf,count,datatype,status);
192 if ( OMPI_SUCCESS != ret ) {
193 opal_output(0,"sharedfp_individual_write_ordered: Error while writing the datafile \n");
194 }
195
196 exit:
197 if ( NULL != offbuff ) {
198 free ( offbuff);
199 }
200
201 return ret;
202 }