This source file includes following definitions.
- mca_sharedfp_lockedfile_read
- mca_sharedfp_lockedfile_read_ordered
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_lockedfile.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30
31 int mca_sharedfp_lockedfile_read ( ompio_file_t *fh,
32 void *buf, int count, MPI_Datatype datatype, MPI_Status *status)
33 {
34 int ret = OMPI_SUCCESS;
35 OMPI_MPI_OFFSET_TYPE offset = 0;
36 long bytesRequested = 0;
37 size_t numofBytes;
38 struct mca_sharedfp_base_data_t *sh = NULL;
39
40 if ( fh->f_sharedfp_data == NULL ) {
41 if ( mca_sharedfp_lockedfile_verbose ) {
42 opal_output(ompi_sharedfp_base_framework.framework_output,
43 "sharedfp_lockedfile_read: module not initialized\n");
44 }
45 return OMPI_ERROR;
46 }
47
48
49 opal_datatype_type_size ( &datatype->super, &numofBytes);
50 bytesRequested = count * numofBytes;
51
52 if ( mca_sharedfp_lockedfile_verbose ) {
53 opal_output(ompi_sharedfp_base_framework.framework_output,
54 "sharedfp_lockedfile_read: Bytes Requested is %ld\n",bytesRequested);
55 }
56
57
58 sh = fh->f_sharedfp_data;
59
60
61 ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
62 offset /= fh->f_etype_size;
63
64 if (-1 != ret ) {
65 if ( mca_sharedfp_lockedfile_verbose ) {
66 opal_output(ompi_sharedfp_base_framework.framework_output,
67 "sharedfp_lockedfile_read: Offset received is %lld\n",offset);
68 }
69
70
71 ret = mca_common_ompio_file_read_at(fh,offset,buf,count,datatype,status);
72 }
73
74 return ret;
75 }
76
77 int mca_sharedfp_lockedfile_read_ordered (ompio_file_t *fh,
78 void *buf,
79 int count,
80 struct ompi_datatype_t *datatype,
81 ompi_status_public_t *status)
82 {
83 int ret = OMPI_SUCCESS;
84 OMPI_MPI_OFFSET_TYPE offset = 0;
85 long sendBuff = 0;
86 long *buff=NULL;
87 long offsetBuff;
88 OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
89 long bytesRequested = 0;
90 int recvcnt = 1, sendcnt = 1;
91 size_t numofBytes;
92 int rank, size, i;
93 struct mca_sharedfp_base_data_t *sh = NULL;
94
95 if ( fh->f_sharedfp_data == NULL){
96 opal_output(ompi_sharedfp_base_framework.framework_output,
97 "sharedfp_lockedfile_read_ordered: module not initialized\n");
98 return OMPI_ERROR;
99 }
100
101
102 sh = fh->f_sharedfp_data;
103
104
105 opal_datatype_type_size ( &datatype->super, &numofBytes );
106 sendBuff = count * numofBytes;
107
108
109 rank = ompi_comm_rank ( fh->f_comm );
110 size = ompi_comm_size ( fh->f_comm );
111
112 if ( 0 == rank ) {
113 buff = (long*)malloc(sizeof(long) * size);
114 if ( NULL == buff )
115 return OMPI_ERR_OUT_OF_RESOURCE;
116 }
117
118 ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
119 buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
120 fh->f_comm, fh->f_comm->c_coll->coll_gather_module );
121 if ( OMPI_SUCCESS != ret ) {
122 goto exit;
123 }
124
125
126
127
128 if ( 0 == rank ) {
129 for (i = 0; i < size ; i ++) {
130 bytesRequested += buff[i];
131 if ( mca_sharedfp_lockedfile_verbose ) {
132 opal_output(ompi_sharedfp_base_framework.framework_output,
133 "sharedfp_lockedfile_read_ordered: Bytes requested are %ld\n",bytesRequested);
134 }
135 }
136
137
138
139
140
141
142
143 ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
144 if( OMPI_SUCCESS != ret ){
145 goto exit;
146 }
147 if ( mca_sharedfp_lockedfile_verbose ) {
148 opal_output(ompi_sharedfp_base_framework.framework_output,
149 "sharedfp_lockedfile_read_ordered: Offset received is %lld\n",offsetReceived);
150 }
151 buff[0] += offsetReceived;
152
153 for (i = 1 ; i < size; i++) {
154 buff[i] += buff[i-1];
155 }
156 }
157
158
159 ret = fh->f_comm->c_coll->coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
160 &offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
161 fh->f_comm, fh->f_comm->c_coll->coll_scatter_module );
162
163
164 offset = offsetBuff - sendBuff;
165 offset /= fh->f_etype_size;
166
167 if ( mca_sharedfp_lockedfile_verbose ) {
168 opal_output(ompi_sharedfp_base_framework.framework_output,
169 "sharedfp_lockedfile_read_ordered: Offset returned is %lld\n",offset);
170 }
171
172
173 ret = mca_common_ompio_file_read_at_all(fh,offset,buf,count,datatype,status);
174
175 exit:
176 if ( NULL != buff ) {
177 free ( buff );
178 }
179
180 return ret;
181 }