This source file includes following definitions.
- dfs_open_cbfunc
- dfs_close_cbfunc
- dfs_size_cbfunc
- dfs_seek_cbfunc
- dfs_post_cbfunc
- dfs_getfm_cbfunc
- read_cbfunc
- main
1
2
3
4
5
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <fcntl.h>
9
10 #include "opal/util/output.h"
11 #include "opal/util/uri.h"
12 #include "opal/mca/event/event.h"
13
14 #include "orte/util/proc_info.h"
15 #include "orte/util/name_fns.h"
16 #include "orte/mca/errmgr/errmgr.h"
17 #include "orte/runtime/orte_globals.h"
18 #include "orte/runtime/runtime.h"
19 #include "orte/runtime/orte_wait.h"
20
21 #include "orte/mca/dfs/dfs.h"
22
23 static bool active;
24 static bool read_active;
25 static int numread = 0;
26
27 #define READ_SIZE 500
28 #define OFFSET_VALUE 313
29
30 static void dfs_open_cbfunc(int fd, void *cbdata)
31 {
32 int *remote_fd = (int*)cbdata;
33
34 opal_output(0, "GOT FD %d", fd);
35 *remote_fd = fd;
36 active = false;
37
38 }
39
40 static void dfs_close_cbfunc(int fd, void *cbdata)
41 {
42 opal_output(0, "CLOSE CONFIRMED");
43 active = false;
44 }
45
46 static void dfs_size_cbfunc(long size, void *cbdata)
47 {
48 opal_output(0, "GOT FILE SIZE %ld", size);
49 active = false;
50
51 }
52
53 static void dfs_seek_cbfunc(long offset, void *cbdata)
54 {
55 int *check = (int*)cbdata;
56
57 opal_output(0, "GOT FILE OFFSET %ld", offset);
58 active = false;
59 if (NULL != cbdata && offset != *check) {
60 exit(1);
61 }
62 }
63
64 static void dfs_post_cbfunc(void *cbdata)
65 {
66 opal_buffer_t *bo = (opal_buffer_t*)cbdata;
67
68 opal_output(0, "GOT POST CALLBACK");
69 active = false;
70 OBJ_RELEASE(bo);
71 }
72
73 static void dfs_getfm_cbfunc(opal_buffer_t *bo, void *cbdata)
74 {
75 opal_buffer_t *bptr = (opal_buffer_t*)cbdata;
76
77 opal_output(0, "GOT GETFM CALLBACK");
78 active = false;
79 opal_dss.copy_payload(bptr, bo);
80 }
81
82 static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
83 {
84 int *check = (int*)cbdata;
85
86 if (status < 0) {
87 read_active = false;
88 active = false;
89 return;
90 }
91 numread += status;
92
93 if (NULL != cbdata && status < *check) {
94 read_active = false;
95 opal_output(0, "EOF RECEIVED: read total of %d bytes", numread);
96 active = false;
97 return;
98 }
99 active = false;
100 }
101
102 int main(int argc, char* argv[])
103 {
104 int rc;
105 int fd;
106 char *uri, *host, *path;
107 uint8_t buffer[READ_SIZE];
108 opal_buffer_t *buf, *xfer;
109 int i, k, cnt;
110 int64_t i64, length, offset, partition;
111 int32_t n, nvpids, nentries;
112 orte_vpid_t vpid;
113
114 if (0 != (rc = orte_init(&argc, &argv, ORTE_PROC_NON_MPI))) {
115 fprintf(stderr, "orte_dfs: couldn't init orte - error code %d\n", rc);
116 return rc;
117 }
118
119
120
121
122 if (1 == ORTE_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid)) {
123
124
125
126 if (1 == argc) {
127 fprintf(stderr, "Usage: orte_dfs <input-file> <host [optional]\n");
128 orte_finalize();
129 return 1;
130 }
131 if (3 == argc) {
132 host = strdup(argv[2]);
133 } else {
134 host = NULL;
135 }
136
137 if (NULL == (uri = opal_filename_to_uri(argv[1], host))) {
138 return 1;
139 }
140
141 active = true;
142 orte_dfs.open(uri, dfs_open_cbfunc, &fd);
143 ORTE_WAIT_FOR_COMPLETION(active);
144
145 if (fd < 0) {
146
147 return 1;
148 }
149
150 active = true;
151 orte_dfs.get_file_size(fd, dfs_size_cbfunc, NULL);
152 ORTE_WAIT_FOR_COMPLETION(active);
153
154 active = true;
155 read_active = true;
156 rc = 0;
157 numread = 0;
158 while (read_active) {
159 i = READ_SIZE;
160 active = true;
161 orte_dfs.read(fd, buffer, READ_SIZE, read_cbfunc, &i);
162 ORTE_WAIT_FOR_COMPLETION(active);
163 rc++;
164 if (2 == rc) {
165 active = true;
166 i = OFFSET_VALUE;
167 opal_output(0, "%s execute absolute seek of %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OFFSET_VALUE);
168 orte_dfs.seek(fd, OFFSET_VALUE, SEEK_SET, dfs_seek_cbfunc, &i);
169 ORTE_WAIT_FOR_COMPLETION(active);
170 }
171 if (5 == rc) {
172 active = true;
173 i = OFFSET_VALUE;
174 opal_output(0, "%s execute relative seek of %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OFFSET_VALUE);
175 orte_dfs.seek(fd, OFFSET_VALUE, SEEK_CUR, dfs_seek_cbfunc, &i);
176 ORTE_WAIT_FOR_COMPLETION(active);
177 }
178 active = true;
179 }
180
181 active= true;
182 orte_dfs.close(fd, dfs_close_cbfunc, NULL);
183 ORTE_WAIT_FOR_COMPLETION(active);
184
185
186 for (i=0; i < 10; i++) {
187 buf = OBJ_NEW(opal_buffer_t);
188 opal_dss.pack(buf, &host, 1, OPAL_STRING);
189 opal_dss.pack(buf, &argv[1], 1, OPAL_STRING);
190 i64 = 100;
191 opal_dss.pack(buf, &i64, 1, OPAL_INT64);
192 i64 = i * 100;
193 opal_dss.pack(buf, &i64, 1, OPAL_INT64);
194 i64 = i;
195 opal_dss.pack(buf, &i64, 1, OPAL_INT64);
196 active = true;
197 orte_dfs.post_file_map(buf, dfs_post_cbfunc, buf);
198 ORTE_WAIT_FOR_COMPLETION(active);
199 }
200 } else {
201 opal_output(0, "PROC %s REPORTING IN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
202
203 active = true;
204 buf = OBJ_NEW(opal_buffer_t);
205 orte_dfs.get_file_map(ORTE_PROC_MY_NAME, dfs_getfm_cbfunc, buf);
206 ORTE_WAIT_FOR_COMPLETION(active);
207
208 opal_output(0, "%s RECVD %d BYTES IN FILE MAPS",
209 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf->bytes_used);
210
211
212 cnt = 1;
213 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nvpids, &cnt, OPAL_INT32))) {
214 ORTE_ERROR_LOG(rc);
215 return 1;
216 }
217
218 opal_output(0, "%s RECVD DATA FROM %d VPIDS",
219 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nvpids);
220
221
222 for (k=0; k < nvpids; k++) {
223 cnt = 1;
224 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &vpid, &cnt, ORTE_VPID))) {
225 ORTE_ERROR_LOG(rc);
226 break;
227 }
228 opal_output(0, "CHECKING VPID %s", ORTE_VPID_PRINT(vpid));
229 cnt = 1;
230 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nentries, &cnt, OPAL_INT32))) {
231 ORTE_ERROR_LOG(rc);
232 break;
233 }
234 opal_output(0, "%s RECVD %d ENTRIES IN THIS MAP",
235 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nentries);
236 cnt = 1;
237 for (i=0; i < nentries; i++) {
238 cnt = 1;
239 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &xfer, &cnt, OPAL_BUFFER))) {
240 ORTE_ERROR_LOG(rc);
241 break;
242 }
243 cnt = 1;
244 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &host, &cnt, OPAL_STRING))) {
245 ORTE_ERROR_LOG(rc);
246 break;
247 }
248 cnt = 1;
249 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &path, &cnt, OPAL_STRING))) {
250 ORTE_ERROR_LOG(rc);
251 break;
252 }
253 cnt = 1;
254 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &length, &cnt, OPAL_INT64))) {
255 ORTE_ERROR_LOG(rc);
256 break;
257 }
258 cnt = 1;
259 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &offset, &cnt, OPAL_INT64))) {
260 ORTE_ERROR_LOG(rc);
261 break;
262 }
263 cnt = 1;
264 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &partition, &cnt, OPAL_INT64))) {
265 ORTE_ERROR_LOG(rc);
266 break;
267 }
268 OBJ_RELEASE(xfer);
269 opal_output(0, "CHECKING PARTITION %d\n\thost %s\n\tpath %s\n\tlength: %d offset: %d",
270 (int)partition, (NULL == host) ? "NULL" : host, path, (int)length, (int)offset);
271 continue;
272
273 if (partition == (int64_t)ORTE_PROC_MY_NAME->vpid) {
274
275 if (NULL == (uri = opal_filename_to_uri(path, host))) {
276 return 1;
277 }
278
279 active = true;
280 orte_dfs.open(uri, dfs_open_cbfunc, &fd);
281 ORTE_WAIT_FOR_COMPLETION(active);
282
283 if (fd < 0) {
284
285 return 1;
286 }
287
288 active = true;
289 orte_dfs.seek(fd, offset, SEEK_SET, dfs_seek_cbfunc, NULL);
290 ORTE_WAIT_FOR_COMPLETION(active);
291
292 active = true;
293 numread = 0;
294 orte_dfs.read(fd, buffer, length, read_cbfunc, NULL);
295 ORTE_WAIT_FOR_COMPLETION(active);
296
297 opal_output(0, "%s successfully read %d bytes",
298 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numread);
299 active= true;
300 orte_dfs.close(fd, dfs_close_cbfunc, NULL);
301 ORTE_WAIT_FOR_COMPLETION(active);
302 goto complete;
303 }
304 }
305 }
306 }
307
308 complete:
309 orte_finalize();
310 return 0;
311 }