root/orte/test/system/orte_dfs.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dfs_open_cbfunc
  2. dfs_close_cbfunc
  3. dfs_size_cbfunc
  4. dfs_seek_cbfunc
  5. dfs_post_cbfunc
  6. dfs_getfm_cbfunc
  7. read_cbfunc
  8. main

   1 /* -*- C -*-
   2  *
   3  * $HEADER$
   4  *
   5  */
   6 #include <stdio.h>
   7 #include <unistd.h>
   8 #include <fcntl.h>
   9 
  10 #include "opal/util/output.h"
  11 #include "opal/util/uri.h"
  12 #include "opal/mca/event/event.h"
  13 
  14 #include "orte/util/proc_info.h"
  15 #include "orte/util/name_fns.h"
  16 #include "orte/mca/errmgr/errmgr.h"
  17 #include "orte/runtime/orte_globals.h"
  18 #include "orte/runtime/runtime.h"
  19 #include "orte/runtime/orte_wait.h"
  20 
  21 #include "orte/mca/dfs/dfs.h"
  22 
  23 static bool active;
  24 static bool read_active;
  25 static int numread = 0;
  26 
  27 #define READ_SIZE 500
  28 #define OFFSET_VALUE   313
  29 
  30 static void dfs_open_cbfunc(int fd, void *cbdata)
  31 {
  32     int *remote_fd = (int*)cbdata;
  33 
  34     opal_output(0, "GOT FD %d", fd);
  35     *remote_fd = fd;
  36     active = false;
  37 
  38 }
  39 
  40 static void dfs_close_cbfunc(int fd, void *cbdata)
  41 {
  42     opal_output(0, "CLOSE CONFIRMED");
  43     active = false;
  44 }
  45 
  46 static void dfs_size_cbfunc(long size, void *cbdata)
  47 {
  48     opal_output(0, "GOT FILE SIZE %ld", size);
  49     active = false;
  50 
  51 }
  52 
  53 static void dfs_seek_cbfunc(long offset, void *cbdata)
  54 {
  55     int *check = (int*)cbdata;
  56 
  57     opal_output(0, "GOT FILE OFFSET %ld", offset);
  58     active = false;
  59     if (NULL != cbdata && offset != *check) {
  60         exit(1);
  61     }
  62 }
  63 
  64 static void dfs_post_cbfunc(void *cbdata)
  65 {
  66     opal_buffer_t *bo = (opal_buffer_t*)cbdata;
  67 
  68     opal_output(0, "GOT POST CALLBACK");
  69     active = false;
  70     OBJ_RELEASE(bo);
  71 }
  72 
  73 static void dfs_getfm_cbfunc(opal_buffer_t *bo, void *cbdata)
  74 {
  75     opal_buffer_t *bptr = (opal_buffer_t*)cbdata;
  76 
  77     opal_output(0, "GOT GETFM CALLBACK");
  78     active = false;
  79     opal_dss.copy_payload(bptr, bo);
  80 }
  81 
  82 static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
  83 {
  84     int *check = (int*)cbdata;
  85 
  86     if (status < 0) {
  87         read_active = false;
  88         active = false;
  89         return;
  90     }
  91     numread += status;
  92 
  93     if (NULL != cbdata && status < *check) {
  94         read_active = false;
  95         opal_output(0, "EOF RECEIVED: read total of %d bytes", numread);
  96         active = false;
  97         return;
  98     }
  99     active = false;
 100 }
 101 
 102 int main(int argc, char* argv[])
 103 {
 104     int rc;
 105     int fd;
 106     char *uri, *host, *path;
 107     uint8_t buffer[READ_SIZE];
 108     opal_buffer_t *buf, *xfer;
 109     int i, k, cnt;
 110     int64_t i64, length, offset, partition;
 111     int32_t n, nvpids, nentries;
 112     orte_vpid_t vpid;
 113 
 114     if (0 != (rc = orte_init(&argc, &argv, ORTE_PROC_NON_MPI))) {
 115         fprintf(stderr, "orte_dfs: couldn't init orte - error code %d\n", rc);
 116         return rc;
 117     }
 118 
 119     /* if I am part of an initial job, then test my basic
 120      * API operations
 121      */
 122     if (1 == ORTE_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid)) {
 123         /* user must provide a file to be read - the contents
 124          * of the file will be output to stdout
 125          */
 126         if (1 == argc) {
 127             fprintf(stderr, "Usage: orte_dfs <input-file> <host [optional]\n");
 128             orte_finalize();
 129             return 1;
 130         }
 131         if (3 == argc) {
 132             host = strdup(argv[2]);
 133         } else {
 134             host = NULL;
 135         }
 136 
 137         if (NULL == (uri = opal_filename_to_uri(argv[1], host))) {
 138             return 1;
 139         }
 140 
 141         active = true;
 142         orte_dfs.open(uri, dfs_open_cbfunc, &fd);
 143         ORTE_WAIT_FOR_COMPLETION(active);
 144 
 145         if (fd < 0) {
 146             /* hit an error */
 147             return 1;
 148         }
 149 
 150         active = true;
 151         orte_dfs.get_file_size(fd, dfs_size_cbfunc, NULL);
 152         ORTE_WAIT_FOR_COMPLETION(active);
 153 
 154         active = true;
 155         read_active = true;
 156         rc = 0;
 157         numread = 0;
 158         while (read_active) {
 159             i = READ_SIZE;
 160             active = true;
 161             orte_dfs.read(fd, buffer, READ_SIZE, read_cbfunc, &i);
 162             ORTE_WAIT_FOR_COMPLETION(active);
 163             rc++;
 164             if (2 == rc) {
 165                 active = true;
 166                 i = OFFSET_VALUE;
 167                 opal_output(0, "%s execute absolute seek of %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OFFSET_VALUE);
 168                 orte_dfs.seek(fd, OFFSET_VALUE, SEEK_SET, dfs_seek_cbfunc, &i);
 169                 ORTE_WAIT_FOR_COMPLETION(active);
 170             }
 171             if (5 == rc) {
 172                 active = true;
 173                 i = OFFSET_VALUE;
 174                 opal_output(0, "%s execute relative seek of %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OFFSET_VALUE);
 175                 orte_dfs.seek(fd, OFFSET_VALUE, SEEK_CUR, dfs_seek_cbfunc, &i);
 176                 ORTE_WAIT_FOR_COMPLETION(active);
 177             }
 178             active = true;
 179         }
 180 
 181         active= true;
 182         orte_dfs.close(fd, dfs_close_cbfunc, NULL);
 183         ORTE_WAIT_FOR_COMPLETION(active);
 184 
 185         /* construct a file map to pass to our successor */
 186         for (i=0; i < 10; i++) {
 187             buf = OBJ_NEW(opal_buffer_t);
 188             opal_dss.pack(buf, &host, 1, OPAL_STRING);
 189             opal_dss.pack(buf, &argv[1], 1, OPAL_STRING);
 190             i64 = 100; /* assign 100 bytes to this partition */
 191             opal_dss.pack(buf, &i64, 1, OPAL_INT64);
 192             i64 = i * 100;  /* space things out */
 193             opal_dss.pack(buf, &i64, 1, OPAL_INT64);
 194             i64 = i;  /* set the partition */
 195             opal_dss.pack(buf, &i64, 1, OPAL_INT64);
 196             active = true;
 197             orte_dfs.post_file_map(buf, dfs_post_cbfunc, buf);
 198             ORTE_WAIT_FOR_COMPLETION(active);
 199         }
 200     } else {
 201         opal_output(0, "PROC %s REPORTING IN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 202         /* retrieve any file maps from our predecessor */
 203         active = true;
 204         buf = OBJ_NEW(opal_buffer_t);
 205         orte_dfs.get_file_map(ORTE_PROC_MY_NAME, dfs_getfm_cbfunc, buf);
 206         ORTE_WAIT_FOR_COMPLETION(active);
 207 
 208         opal_output(0, "%s RECVD %d BYTES IN FILE MAPS",
 209                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buf->bytes_used);
 210 
 211         /* retrieve the number of vpids in the map */
 212         cnt = 1;
 213         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nvpids, &cnt, OPAL_INT32))) {
 214             ORTE_ERROR_LOG(rc);
 215             return 1;
 216         }
 217 
 218         opal_output(0, "%s RECVD DATA FROM %d VPIDS",
 219                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nvpids);
 220 
 221         /* find a partition for us */
 222         for (k=0; k < nvpids; k++) {
 223             cnt = 1;
 224             if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &vpid, &cnt, ORTE_VPID))) {
 225                 ORTE_ERROR_LOG(rc);
 226                 break;
 227             }
 228             opal_output(0, "CHECKING VPID %s", ORTE_VPID_PRINT(vpid));
 229             cnt = 1;
 230             if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nentries, &cnt, OPAL_INT32))) {
 231                 ORTE_ERROR_LOG(rc);
 232                 break;
 233             }
 234             opal_output(0, "%s RECVD %d ENTRIES IN THIS MAP",
 235                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nentries);
 236             cnt = 1;
 237             for (i=0; i < nentries; i++) {
 238                 cnt = 1;
 239                 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &xfer, &cnt, OPAL_BUFFER))) {
 240                     ORTE_ERROR_LOG(rc);
 241                     break;
 242                 }
 243                 cnt = 1;
 244                 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &host, &cnt, OPAL_STRING))) {
 245                     ORTE_ERROR_LOG(rc);
 246                     break;
 247                 }
 248                 cnt = 1;
 249                 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &path, &cnt, OPAL_STRING))) {
 250                     ORTE_ERROR_LOG(rc);
 251                     break;
 252                 }
 253                 cnt = 1;
 254                 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &length, &cnt, OPAL_INT64))) {
 255                     ORTE_ERROR_LOG(rc);
 256                     break;
 257                 }
 258                 cnt = 1;
 259                 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &offset, &cnt, OPAL_INT64))) {
 260                     ORTE_ERROR_LOG(rc);
 261                     break;
 262                 }
 263                 cnt = 1;
 264                 if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &partition, &cnt, OPAL_INT64))) {
 265                     ORTE_ERROR_LOG(rc);
 266                     break;
 267                 }
 268                 OBJ_RELEASE(xfer);
 269                 opal_output(0, "CHECKING PARTITION %d\n\thost %s\n\tpath %s\n\tlength: %d offset: %d",
 270                             (int)partition, (NULL == host) ? "NULL" : host, path, (int)length, (int)offset);
 271                 continue;
 272                 /* if this is my partition, use the file data */
 273                 if (partition == (int64_t)ORTE_PROC_MY_NAME->vpid) {
 274                     /* open the file */
 275                     if (NULL == (uri = opal_filename_to_uri(path, host))) {
 276                         return 1;
 277                     }
 278 
 279                     active = true;
 280                     orte_dfs.open(uri, dfs_open_cbfunc, &fd);
 281                     ORTE_WAIT_FOR_COMPLETION(active);
 282 
 283                     if (fd < 0) {
 284                         /* hit an error */
 285                         return 1;
 286                     }
 287                     /* position it */
 288                     active = true;
 289                     orte_dfs.seek(fd, offset, SEEK_SET, dfs_seek_cbfunc, NULL);
 290                     ORTE_WAIT_FOR_COMPLETION(active);
 291                     /* read it */
 292                     active = true;
 293                     numread = 0;
 294                     orte_dfs.read(fd, buffer, length, read_cbfunc, NULL);
 295                     ORTE_WAIT_FOR_COMPLETION(active);
 296 
 297                     opal_output(0, "%s successfully read %d bytes",
 298                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numread);
 299                     active= true;
 300                     orte_dfs.close(fd, dfs_close_cbfunc, NULL);
 301                     ORTE_WAIT_FOR_COMPLETION(active);
 302                     goto complete;
 303                 }
 304             }
 305         }
 306     }
 307 
 308  complete:
 309     orte_finalize();
 310     return 0;
 311 }

/* [<][>][^][v][top][bottom][index][help] */