This source file includes following definitions.
- mca_sharedfp_individual_collaborate_data
- mca_sharedfp_individual_get_timestamps_and_reclengths
- mca_sharedfp_individual_create_buff
- mca_sharedfp_individual_sort_timestamps
- mca_sharedfp_individual_assign_globaloffset
- mca_sharedfp_individual_getoffset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "sharedfp_individual.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/sharedfp/sharedfp.h"
29 #include "ompi/mca/sharedfp/base/base.h"
30 #include "ompi/mca/common/ompio/common_ompio.h"
31
32 #include <stdlib.h>
33 #include <stdio.h>
34
35 int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh, ompio_file_t *ompio_fh)
36 {
37 int ret = OMPI_SUCCESS;
38 mca_sharedfp_individual_header_record *headnode = NULL;
39 char *buff=NULL;
40 int nodesoneachprocess = 0;
41 int idx=0,i=0,j=0, l=0;
42 int *ranks = NULL;
43 double *timestampbuff = NULL;
44 OMPI_MPI_OFFSET_TYPE *offsetbuff = NULL;
45 int *countbuff = NULL;
46 int *displ = NULL;
47 double *ind_ts = NULL;
48 long *ind_recordlength = NULL;
49 OMPI_MPI_OFFSET_TYPE *local_off = NULL;
50 int totalnodes = 0;
51 ompi_status_public_t status;
52 int recordlength=0;
53
54 headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
55 if ( NULL == headnode) {
56 opal_output(0, "sharedfp_individual_collaborate_data: headnode is NULL but file is open\n");
57 return OMPI_ERROR;
58 }
59
60
61
62
63 nodesoneachprocess = headnode->numofrecordsonfile + headnode->numofrecords;
64
65 if ( mca_sharedfp_individual_verbose ) {
66 opal_output(ompi_sharedfp_base_framework.framework_output,
67 "Nodes of each process = %d\n",nodesoneachprocess);
68 }
69
70 countbuff = (int*)malloc(ompio_fh->f_size * sizeof(int));
71 if ( NULL == countbuff ) {
72 return OMPI_ERR_OUT_OF_RESOURCE;
73 }
74
75 displ = (int*)malloc(sizeof(int) * ompio_fh->f_size);
76 if ( NULL == displ ) {
77 ret = OMPI_ERR_OUT_OF_RESOURCE;
78 goto exit;
79 }
80
81
82
83 ret = mca_sharedfp_individual_get_timestamps_and_reclengths ( &ind_ts, &ind_recordlength,
84 &local_off, sh );
85 if ( OMPI_SUCCESS != ret ) {
86 goto exit;
87 }
88
89 ret = ompio_fh->f_comm->c_coll->coll_allgather ( &nodesoneachprocess,
90 1,
91 MPI_INT,
92 countbuff,
93 1,
94 MPI_INT,
95 ompio_fh->f_comm,
96 ompio_fh->f_comm->c_coll->coll_allgather_module );
97
98 if ( OMPI_SUCCESS != ret ) {
99 goto exit;
100 }
101
102
103 if ( mca_sharedfp_individual_verbose) {
104 for (i = 0; i < ompio_fh->f_size ; i++) {
105 opal_output(ompi_sharedfp_base_framework.framework_output,"sharedfp_individual_collaborate_data: "
106 "Countbuff[%d] = %d\n", i, countbuff[i]);
107 }
108 }
109
110 if ( 0 == nodesoneachprocess ) {
111 ind_ts[0] = 0;
112 ind_recordlength[0] = 0;
113 local_off[0] = 0;
114 }
115
116 for(i = 0; i < ompio_fh->f_size; i++) {
117 displ[i] = totalnodes;
118 if ( mca_sharedfp_individual_verbose ) {
119 opal_output(ompi_sharedfp_base_framework.framework_output,
120 "sharedfp_individual_collaborate_data: displ[%d] = %d\n",i,displ[i]);
121 }
122 totalnodes = totalnodes + countbuff[i];
123 }
124
125 if (totalnodes <= 0 ) {
126 goto exit;
127 }
128
129 ranks = (int *) malloc ( totalnodes * sizeof(int));
130 if ( NULL == ranks ) {
131 ret = OMPI_ERR_OUT_OF_RESOURCE;
132 goto exit;
133 }
134 for ( l=0, i=0; i< ompio_fh->f_size; i++ ) {
135 for ( j=0; j< countbuff[i]; j++ ) {
136 ranks[l++]=i;
137 }
138 }
139
140 ret = mca_sharedfp_individual_create_buff ( ×tampbuff, &offsetbuff, totalnodes, ompio_fh->f_size);
141 if ( OMPI_SUCCESS != ret ) {
142 goto exit;
143 }
144
145 ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_ts,
146 countbuff[ompio_fh->f_rank],
147 MPI_DOUBLE,
148 timestampbuff,
149 countbuff,
150 displ,
151 MPI_DOUBLE,
152 ompio_fh->f_comm,
153 ompio_fh->f_comm->c_coll->coll_allgatherv_module );
154 if ( OMPI_SUCCESS != ret ) {
155 goto exit;
156 }
157
158 ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_recordlength,
159 countbuff[ompio_fh->f_rank],
160 OMPI_OFFSET_DATATYPE,
161 offsetbuff,
162 countbuff,
163 displ,
164 OMPI_OFFSET_DATATYPE,
165 ompio_fh->f_comm,
166 ompio_fh->f_comm->c_coll->coll_allgatherv_module );
167 if ( OMPI_SUCCESS != ret ) {
168 goto exit;
169 }
170
171 ret = mca_sharedfp_individual_sort_timestamps(×tampbuff, &offsetbuff, &ranks, totalnodes);
172 if ( OMPI_SUCCESS != ret ) {
173 goto exit;
174 }
175
176 sh->global_offset = mca_sharedfp_individual_assign_globaloffset ( &offsetbuff, totalnodes, sh);
177
178 recordlength = ind_recordlength[0] * 1.2;
179 buff = (char * ) malloc( recordlength );
180 if ( NULL == buff ) {
181 ret = OMPI_ERR_OUT_OF_RESOURCE;
182 goto exit;
183 }
184
185 for (i = 0; i < nodesoneachprocess ; i++) {
186 if ( ind_recordlength[i] > recordlength ) {
187 recordlength = ind_recordlength[i] * 1.2;
188 buff = (char *) realloc ( buff, recordlength );
189 if ( NULL == buff ) {
190 ret = OMPI_ERR_OUT_OF_RESOURCE;
191 goto exit;
192 }
193 }
194
195
196 ret = mca_common_ompio_file_read_at ( headnode->datafilehandle,
197 local_off[i], buff, ind_recordlength[i],
198 MPI_BYTE, &status);
199 if ( OMPI_SUCCESS != ret ) {
200 goto exit;
201 }
202
203 idx = mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff, ranks, ompio_fh->f_rank, totalnodes);
204
205 if ( mca_sharedfp_individual_verbose ) {
206 opal_output(ompi_sharedfp_base_framework.framework_output,
207 "sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file at position"
208 "%lld (%d)\n", ompio_fh->f_rank, ind_recordlength[i], offsetbuff[idx], idx);
209 }
210
211
212 ret = mca_common_ompio_file_write_at( ompio_fh, offsetbuff[idx], buff,
213 ind_recordlength[i], MPI_BYTE, &status);
214 if ( OMPI_SUCCESS != ret ) {
215 goto exit;
216 }
217
218 }
219
220 exit:
221 if ( NULL != countbuff ) {
222 free ( countbuff );
223 }
224 if ( NULL != displ ) {
225 free ( displ );
226 }
227
228 if( NULL != timestampbuff ){
229 free ( timestampbuff );
230 }
231 if ( NULL != offsetbuff ){
232 free ( offsetbuff );
233 }
234 if ( NULL != ind_ts ) {
235 free ( ind_ts );
236 }
237 if ( NULL != ind_recordlength ) {
238 free ( ind_recordlength );
239 }
240 if ( NULL != local_off ) {
241 free ( local_off );
242 }
243 if ( NULL != buff ) {
244 free ( buff );
245 }
246 if ( NULL != ranks ) {
247 free ( ranks );
248 }
249
250 return ret;
251 }
252
253
254 int mca_sharedfp_individual_get_timestamps_and_reclengths ( double **buff, long **rec_length,
255 MPI_Offset **offbuff,struct mca_sharedfp_base_data_t *sh)
256 {
257 int num = 0, i= 0, ctr = 0;
258 int ret=OMPI_SUCCESS;
259 mca_sharedfp_individual_metadata_node *currnode;
260 mca_sharedfp_individual_header_record *headnode;
261 OMPI_MPI_OFFSET_TYPE metaoffset = 0;
262 struct mca_sharedfp_individual_record2 rec;
263 MPI_Status status;
264
265 headnode = (mca_sharedfp_individual_header_record*)(sh->selected_module_data);
266 num = ( headnode->numofrecords + headnode->numofrecordsonfile);
267 currnode = headnode->next;
268
269 if ( mca_sharedfp_individual_verbose ) {
270 opal_output(ompi_sharedfp_base_framework.framework_output,"Num is %d\n",num);
271 }
272
273 if ( 0 == num ) {
274 *buff = (double*) malloc ( sizeof ( double ));
275 *rec_length = (long *) malloc ( sizeof ( long ));
276 *offbuff = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) );
277 if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
278 ret = OMPI_ERR_OUT_OF_RESOURCE;
279 goto exit;
280 }
281 }
282 else {
283 *buff = (double* ) malloc(sizeof ( double) * num);
284 *rec_length = (long *) malloc(sizeof ( long) * num);
285 *offbuff = (OMPI_MPI_OFFSET_TYPE *) malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * num);
286 if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
287 ret = OMPI_ERR_OUT_OF_RESOURCE;
288 goto exit;
289 }
290 }
291
292 if ( mca_sharedfp_individual_verbose ) {
293 opal_output(ompi_sharedfp_base_framework.framework_output,
294 "sharedfp_individual_get_timestamps_and_reclengths: Numofrecords on file %d\n",
295 headnode->numofrecordsonfile);
296 }
297
298 if (headnode->numofrecordsonfile > 0) {
299 metaoffset = headnode->metafile_start_offset;
300 ctr = 0;
301 for (i = 0; i < headnode->numofrecordsonfile ; i++) {
302
303 ret = mca_common_ompio_file_read_at(headnode->metadatafilehandle,metaoffset,
304 &rec, 32, MPI_BYTE,&status);
305 if ( OMPI_SUCCESS != ret ) {
306 goto exit;
307 }
308
309 *(*rec_length + ctr) = rec.recordlength;
310 *(*buff + ctr) = rec.timestamp;
311 *(*offbuff + ctr) = rec.localposition;
312
313 metaoffset = metaoffset + sizeof(struct mca_sharedfp_individual_record2);
314
315 if ( mca_sharedfp_individual_verbose ) {
316 opal_output(ompi_sharedfp_base_framework.framework_output,
317 "sharedfp_individual_get_timestamps_and_reclengths: Ctr = %d\n",ctr);
318 }
319 ctr++;
320 }
321
322 headnode->numofrecordsonfile = 0;
323 headnode->metafile_start_offset = metaoffset;
324
325 }
326
327
328 currnode = headnode->next;
329 while (currnode) {
330 if ( mca_sharedfp_individual_verbose ) {
331 opal_output(ompi_sharedfp_base_framework.framework_output,"Ctr = %d\n",ctr);
332 }
333
334
335
336 *(*rec_length + ctr) = currnode->recordlength;
337 *(*buff + ctr) = currnode->timestamp;
338 *(*offbuff + ctr) = currnode->localposition;
339
340 ctr = ctr + 1;
341
342 headnode->next = currnode->next;
343 if ( mca_sharedfp_individual_verbose ) {
344 opal_output(ompi_sharedfp_base_framework.framework_output,
345 "sharedfp_individual_get_timestamps_and_reclengths: node deleted from the metadatalinked list\n");
346 }
347 free(currnode);
348 currnode = headnode->next;
349
350 }
351
352
353
354 headnode->numofrecords = 0;
355
356 exit:
357
358 return ret;
359 }
360
361 int mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totalnodes, int size)
362 {
363
364 if ( totalnodes) {
365 *off = (OMPI_MPI_OFFSET_TYPE *) malloc ( totalnodes * sizeof(OMPI_MPI_OFFSET_TYPE));
366 if ( NULL == *off ) {
367 return OMPI_ERR_OUT_OF_RESOURCE;
368 }
369
370 *ts = (double *) malloc ( totalnodes * sizeof(double) );
371 if (NULL == *ts ) {
372 return OMPI_ERR_OUT_OF_RESOURCE;
373 }
374
375 }
376
377 return OMPI_SUCCESS;
378 }
379
380
381 int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int **ranks, int totalnodes)
382 {
383
384 int i = 0;
385 int j = 0;
386 int flag = 1;
387 double tempts = 0.0;
388 OMPI_MPI_OFFSET_TYPE tempoffset = 0;
389 int temprank = 0;
390
391 for (i= 1; (i <= totalnodes)&&(flag) ; i++) {
392 flag = 0;
393 for (j = 0; j < (totalnodes - 1); j++) {
394 if ( *(*ts + j + 1) < *(*ts + j )) {
395
396 tempts = *(*ts + j );
397 *(*ts + j) = *(*ts + j + 1);
398 *(*ts + j + 1) = tempts;
399
400
401 tempoffset = *(*off + j);
402 *(*off + j) = *(*off + j + 1);
403 *(*off + j + 1) = tempoffset;
404
405
406 temprank = *(*ranks + j);
407 *(*ranks + j) = *(*ranks + j + 1);
408 *(*ranks + j + 1) = temprank;
409
410 flag = 1;
411 }
412 }
413
414 }
415
416 return OMPI_SUCCESS;
417 }
418
419
420 MPI_Offset mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,int totalnodes,
421 struct mca_sharedfp_base_data_t *sh)
422 {
423 int i = 0;
424 OMPI_MPI_OFFSET_TYPE temp = 0,prevoffset = 0;
425 OMPI_MPI_OFFSET_TYPE global_offset = 0;
426
427 for (i = 0; i < totalnodes; i++) {
428 temp = *(*offsetbuff + i);
429
430 if (i == 0) {
431 *(*offsetbuff + i ) = sh->global_offset;
432 }
433 else {
434 *(*offsetbuff + i) = *(*offsetbuff + i - 1) + prevoffset;
435 }
436 prevoffset = temp;
437 }
438 global_offset = *(*offsetbuff + i - 1) + prevoffset;
439
440 return global_offset;
441 }
442
443
444 int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes)
445 {
446 int i = 0;
447 int notfound = 1;
448
449
450 while (notfound) {
451 if (ts[i] == timestamp && ranks[i] == myrank )
452 break;
453
454 i++;
455
456 if (i == totalnodes) {
457 notfound = 0;
458 }
459 }
460
461 if (!notfound) {
462 return -1;
463 }
464
465 return i;
466 }