This source file includes following definitions.
- mca_sharedfp_sm_iwrite
- mca_sharedfp_sm_write_ordered_begin
- mca_sharedfp_sm_write_ordered_end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_sm.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30
31 int mca_sharedfp_sm_iwrite(ompio_file_t *fh,
32 const void *buf,
33 int count,
34 ompi_datatype_t *datatype,
35 MPI_Request * request)
36 {
37 int ret = OMPI_SUCCESS;
38 OMPI_MPI_OFFSET_TYPE offset = 0;
39 long bytesRequested = 0;
40 size_t numofBytes;
41
42 if( NULL == fh->f_sharedfp_data){
43 opal_output(ompi_sharedfp_base_framework.framework_output,
44 "sharedfp_sm_iwrite - module not initialized\n");
45 return OMPI_ERROR;
46 }
47
48
49 opal_datatype_type_size ( &datatype->super, &numofBytes);
50 bytesRequested = count * numofBytes;
51
52 if ( mca_sharedfp_sm_verbose ) {
53 opal_output(ompi_sharedfp_base_framework.framework_output,
54 "sharedfp_sm_iwrite: Bytes Requested is %ld\n",bytesRequested);
55 }
56
57 ret = mca_sharedfp_sm_request_position(fh,bytesRequested,&offset);
58 offset /= fh->f_etype_size;
59
60 if ( -1 != ret ) {
61 if ( mca_sharedfp_sm_verbose ) {
62 opal_output(ompi_sharedfp_base_framework.framework_output,
63 "sharedfp_sm_iwrite: Offset received is %lld\n",offset);
64 }
65
66 ret = mca_common_ompio_file_iwrite_at(fh,offset,buf,count,datatype,request);
67 }
68
69 return ret;
70
71 }
72
73 int mca_sharedfp_sm_write_ordered_begin(ompio_file_t *fh,
74 const void *buf,
75 int count,
76 struct ompi_datatype_t *datatype)
77 {
78 int ret = OMPI_SUCCESS;
79 OMPI_MPI_OFFSET_TYPE offset = 0;
80 long sendBuff = 0;
81 long *buff=NULL;
82 long offsetBuff;
83 OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
84 long bytesRequested = 0;
85 int recvcnt = 1, sendcnt = 1;
86 size_t numofBytes;
87 int i;
88
89 if ( NULL == fh->f_sharedfp_data){
90 opal_output(ompi_sharedfp_base_framework.framework_output,
91 "sharedfp_sm_write_ordered_begin: module not initialized\n");
92 return OMPI_ERROR;
93 }
94
95 if ( true == fh->f_split_coll_in_use ) {
96 opal_output(0, "Only one split collective I/O operation allowed per file "
97 "handle at any given point in time!\n");
98 return MPI_ERR_REQUEST;
99 }
100
101
102 opal_datatype_type_size ( &datatype->super, &numofBytes);
103 sendBuff = count * numofBytes;
104
105 if ( 0 == fh->f_rank ) {
106 buff = (long*)malloc(sizeof(long) * fh->f_size);
107 if ( NULL == buff )
108 return OMPI_ERR_OUT_OF_RESOURCE;
109 }
110
111 ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
112 buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
113 fh->f_comm, fh->f_comm->c_coll->coll_gather_module );
114 if( OMPI_SUCCESS != ret){
115 goto exit;
116 }
117
118
119
120
121 if ( 0 == fh->f_rank ) {
122 for (i = 0; i < fh->f_size ; i ++) {
123 bytesRequested += buff[i];
124 if ( mca_sharedfp_sm_verbose ) {
125 opal_output(ompi_sharedfp_base_framework.framework_output,
126 "mca_sharedfp_sm_write_ordered_begin: Bytes requested are %ld\n",
127 bytesRequested);
128 }
129 }
130
131
132
133
134
135
136
137 ret = mca_sharedfp_sm_request_position(fh,bytesRequested,&offsetReceived);
138 if( OMPI_SUCCESS != ret){
139 goto exit;
140 }
141 if ( mca_sharedfp_sm_verbose ) {
142 opal_output(ompi_sharedfp_base_framework.framework_output,
143 "mca_sharedfp_sm_write_ordered_begin: Offset received is %lld\n",offsetReceived);
144 }
145
146 buff[0] += offsetReceived;
147 for (i = 1 ; i < fh->f_size; i++) {
148 buff[i] += buff[i-1];
149 }
150 }
151
152
153 ret = fh->f_comm->c_coll->coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
154 &offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
155 fh->f_comm, fh->f_comm->c_coll->coll_scatter_module );
156 if( OMPI_SUCCESS != ret){
157 goto exit;
158 }
159
160
161 offset = offsetBuff - sendBuff;
162 offset /= fh->f_etype_size;
163
164 if ( mca_sharedfp_sm_verbose ) {
165 opal_output(ompi_sharedfp_base_framework.framework_output,
166 "mca_sharedfp_sm_write_ordered_begin: Offset returned is %lld\n",offset);
167 }
168
169
170 ret = mca_common_ompio_file_iwrite_at_all(fh,offset,buf,count,datatype,
171 &fh->f_split_coll_req);
172 fh->f_split_coll_in_use = true;
173
174 exit:
175 if ( NULL != buff ) {
176 free ( buff );
177 }
178
179 return ret;
180 }
181
182
183 int mca_sharedfp_sm_write_ordered_end(ompio_file_t *fh,
184 const void *buf,
185 ompi_status_public_t *status)
186 {
187 int ret = OMPI_SUCCESS;
188 ret = ompi_request_wait ( &fh->f_split_coll_req, status );
189
190
191 fh->f_split_coll_in_use = false;
192 return ret;
193 }