This source file includes following definitions.
- mca_sharedfp_lockedfile_write
- mca_sharedfp_lockedfile_write_ordered
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_lockedfile.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30
31 int mca_sharedfp_lockedfile_write (ompio_file_t *fh,
32 const void *buf,
33 int count,
34 struct ompi_datatype_t *datatype,
35 ompi_status_public_t *status)
36 {
37 OMPI_MPI_OFFSET_TYPE offset = 0;
38 long bytesRequested = 0;
39 size_t numofBytes;
40 struct mca_sharedfp_base_data_t *sh = NULL;
41 int ret = OMPI_SUCCESS;
42
43 if ( NULL == fh->f_sharedfp_data ){
44 opal_output(ompi_sharedfp_base_framework.framework_output,
45 "sharedfp_lockedfile_write - framework not initialized\n");
46 return OMPI_ERROR;
47 }
48
49
50 opal_datatype_type_size( &datatype->super, &numofBytes);
51 bytesRequested = count * numofBytes;
52 if ( mca_sharedfp_lockedfile_verbose ) {
53 opal_output(ompi_sharedfp_base_framework.framework_output,
54 "sharedfp_lockedfile_write: Bytes Requested is %ld\n",bytesRequested);
55 }
56
57
58 sh = fh->f_sharedfp_data;
59
60
61 ret = mca_sharedfp_lockedfile_request_position ( sh, bytesRequested, &offset);
62 offset /= fh->f_etype_size;
63
64 if (-1 != ret ) {
65 if ( mca_sharedfp_lockedfile_verbose ) {
66 opal_output(ompi_sharedfp_base_framework.framework_output,
67 "sharedfp_lockedfile_write: Offset received is %lld\n",offset);
68 }
69
70 ret = mca_common_ompio_file_write_at ( fh, offset, buf, count, datatype, status);
71 }
72
73 return ret;
74 }
75
76 int mca_sharedfp_lockedfile_write_ordered (ompio_file_t *fh,
77 const void *buf,
78 int count,
79 struct ompi_datatype_t *datatype,
80 ompi_status_public_t *status)
81 {
82 int ret = OMPI_SUCCESS;
83 OMPI_MPI_OFFSET_TYPE offset = 0;
84 long sendBuff = 0;
85 long *buff=NULL;
86 long offsetBuff;
87 OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
88 long bytesRequested = 0;
89 int recvcnt = 1, sendcnt = 1;
90 size_t numofBytes;
91 int rank, size, i;
92
93 struct mca_sharedfp_base_data_t *sh = NULL;
94
95 if( NULL == fh->f_sharedfp_data ) {
96 opal_output(ompi_sharedfp_base_framework.framework_output,
97 "sharedfp_lockedfile_write_ordered - framework not initialized\n");
98 return OMPI_ERROR;
99 }
100
101
102 sh = fh->f_sharedfp_data;
103
104
105 opal_datatype_type_size ( &datatype->super, &numofBytes);
106 sendBuff = count * numofBytes;
107
108
109 rank = ompi_comm_rank ( fh->f_comm );
110 size = ompi_comm_size ( fh->f_comm );
111
112 if ( 0 == rank ) {
113 buff = (long*) malloc (sizeof(long) * size);
114 if ( NULL == buff ) {
115 return OMPI_ERR_OUT_OF_RESOURCE;
116 }
117 }
118
119 ret = fh->f_comm->c_coll->coll_gather ( &sendBuff,
120 sendcnt,
121 OMPI_OFFSET_DATATYPE,
122 buff,
123 recvcnt,
124 OMPI_OFFSET_DATATYPE,
125 0,
126 fh->f_comm,
127 fh->f_comm->c_coll->coll_gather_module );
128 if ( OMPI_SUCCESS != ret ) {
129 goto exit;
130 }
131
132
133
134
135 if (rank == 0) {
136 for ( i = 0; i < size ; i ++) {
137 bytesRequested += buff[i];
138 if ( mca_sharedfp_lockedfile_verbose ) {
139 opal_output(ompi_sharedfp_base_framework.framework_output,
140 "sharedfp_lockedfile_write_ordered: Bytes requested are %ld\n",bytesRequested);
141 }
142 }
143
144
145
146
147
148
149
150 ret = mca_sharedfp_lockedfile_request_position(sh, bytesRequested,&offsetReceived);
151 if ( OMPI_SUCCESS != ret ){
152 goto exit;
153 }
154 if ( mca_sharedfp_lockedfile_verbose ) {
155 opal_output(ompi_sharedfp_base_framework.framework_output,
156 "sharedfp_lockedfile_write_ordered: Offset received is %lld\n",offsetReceived);
157 }
158 buff[0] += offsetReceived;
159 for (i = 1 ; i < size; i++) {
160 buff[i] += buff[i-1];
161 }
162 }
163
164
165 ret = fh->f_comm->c_coll->coll_scatter ( buff,
166 sendcnt,
167 OMPI_OFFSET_DATATYPE,
168 &offsetBuff,
169 recvcnt,
170 OMPI_OFFSET_DATATYPE,
171 0,
172 fh->f_comm,
173 fh->f_comm->c_coll->coll_scatter_module );
174 if ( OMPI_SUCCESS != ret ) {
175 goto exit;
176 }
177
178
179 offset = offsetBuff - sendBuff;
180 offset /= fh->f_etype_size;
181
182 if ( mca_sharedfp_lockedfile_verbose ) {
183 opal_output(ompi_sharedfp_base_framework.framework_output,
184 "sharedfp_lockedfile_write_ordered: Offset returned is %lld\n",offset);
185 }
186
187
188 ret = mca_common_ompio_file_write_at_all(fh,offset,buf,count,datatype,status);
189
190 exit:
191 if ( NULL != buff ) {
192 free ( buff);
193 }
194
195 return ret;
196 }