root/orte/mca/filem/raw/filem_raw_module.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. filem_session_dir
  2. raw_init
  3. raw_finalize
  4. xfer_complete
  5. recv_ack
  6. raw_preposition_files
  7. create_link
  8. raw_link_local_files
  9. send_chunk
  10. send_complete
  11. link_archive
  12. recv_files
  13. write_handler
  14. xfer_construct
  15. xfer_destruct
  16. out_construct
  17. out_destruct
  18. in_construct
  19. in_destruct
  20. output_construct

   1 /*
   2  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
   3  *                         All rights reserved
   4  * Copyright (c) 2013      Cisco Systems, Inc.  All rights reserved.
   5  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   6  * Copyright (c) 2015-2017 Research Organization for Information Science
   7  *                         and Technology (RIST). All rights reserved.
   8  * $COPYRIGHT$
   9  *
  10  * Additional copyrights may follow
  11  *
  12  * $HEADER$
  13  */
  14 
  15 /*
  16  *
  17  */
  18 
  19 #include "orte_config.h"
  20 #include "orte/constants.h"
  21 
  22 #include <string.h>
  23 #include <stdlib.h>
  24 #include <sys/types.h>
  25 #include <sys/stat.h>
  26 #ifdef HAVE_UNISTD_H
  27 #include <unistd.h>
  28 #endif  /* HAVE_UNISTD_H */
  29 #ifdef HAVE_DIRENT_H
  30 #include <dirent.h>
  31 #endif  /* HAVE_DIRENT_H */
  32 #ifdef HAVE_FCNTL_H
  33 #include <fcntl.h>
  34 #endif
  35 
  36 #include "opal/class/opal_list.h"
  37 #include "opal/mca/event/event.h"
  38 #include "opal/dss/dss.h"
  39 
  40 #include "orte/util/show_help.h"
  41 #include "opal/util/argv.h"
  42 #include "opal/util/output.h"
  43 #include "opal/util/opal_environ.h"
  44 #include "opal/util/os_dirpath.h"
  45 #include "opal/util/os_path.h"
  46 #include "opal/util/path.h"
  47 #include "opal/util/basename.h"
  48 
  49 #include "orte/util/name_fns.h"
  50 #include "orte/util/proc_info.h"
  51 #include "orte/util/session_dir.h"
  52 #include "orte/util/threads.h"
  53 #include "orte/runtime/orte_globals.h"
  54 #include "orte/mca/errmgr/errmgr.h"
  55 #include "orte/mca/grpcomm/base/base.h"
  56 #include "orte/mca/rml/rml.h"
  57 
  58 #include "orte/mca/filem/filem.h"
  59 #include "orte/mca/filem/base/base.h"
  60 
  61 #include "filem_raw.h"
  62 
  63 static int raw_init(void);
  64 static int raw_finalize(void);
  65 static int raw_preposition_files(orte_job_t *jdata,
  66                                  orte_filem_completion_cbfunc_t cbfunc,
  67                                  void *cbdata);
  68 static int raw_link_local_files(orte_job_t *jdata,
  69                                 orte_app_context_t *app);
  70 
  71 orte_filem_base_module_t mca_filem_raw_module = {
  72     .filem_init = raw_init,
  73     .filem_finalize = raw_finalize,
  74     /* we don't use any of the following */
  75     .put = orte_filem_base_none_put,
  76     .put_nb = orte_filem_base_none_put_nb,
  77     .get = orte_filem_base_none_get,
  78     .get_nb = orte_filem_base_none_get_nb,
  79     .rm = orte_filem_base_none_rm,
  80     .rm_nb = orte_filem_base_none_rm_nb,
  81     .wait = orte_filem_base_none_wait,
  82     .wait_all = orte_filem_base_none_wait_all,
  83     /* now the APIs we *do* use */
  84     .preposition_files = raw_preposition_files,
  85     .link_local_files = raw_link_local_files
  86 };
  87 
  88 static opal_list_t outbound_files;
  89 static opal_list_t incoming_files;
  90 static opal_list_t positioned_files;
  91 
  92 static void send_chunk(int fd, short argc, void *cbdata);
  93 static void recv_files(int status, orte_process_name_t* sender,
  94                        opal_buffer_t* buffer, orte_rml_tag_t tag,
  95                        void* cbdata);
  96 static void recv_ack(int status, orte_process_name_t* sender,
  97                      opal_buffer_t* buffer, orte_rml_tag_t tag,
  98                      void* cbdata);
  99 static void write_handler(int fd, short event, void *cbdata);
 100 
 101 static char *filem_session_dir(void)
 102 {
 103     char *session_dir = orte_process_info.jobfam_session_dir;
 104     if( NULL == session_dir ){
 105         /* if no job family session dir was provided -
 106          * use the job session dir */
 107         session_dir = orte_process_info.job_session_dir;
 108     }
 109     return session_dir;
 110 }
 111 
 112 static int raw_init(void)
 113 {
 114     OBJ_CONSTRUCT(&incoming_files, opal_list_t);
 115 
 116     /* start a recv to catch any files sent to me */
 117     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 118                             ORTE_RML_TAG_FILEM_BASE,
 119                             ORTE_RML_PERSISTENT,
 120                             recv_files,
 121                             NULL);
 122 
 123     /* if I'm the HNP, start a recv to catch acks sent to me */
 124     if (ORTE_PROC_IS_HNP) {
 125         OBJ_CONSTRUCT(&outbound_files, opal_list_t);
 126         OBJ_CONSTRUCT(&positioned_files, opal_list_t);
 127         orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 128                                 ORTE_RML_TAG_FILEM_BASE_RESP,
 129                                 ORTE_RML_PERSISTENT,
 130                                 recv_ack,
 131                                 NULL);
 132     }
 133 
 134     return ORTE_SUCCESS;
 135 }
 136 
 137 static int raw_finalize(void)
 138 {
 139     opal_list_item_t *item;
 140 
 141     while (NULL != (item = opal_list_remove_first(&incoming_files))) {
 142         OBJ_RELEASE(item);
 143     }
 144     OBJ_DESTRUCT(&incoming_files);
 145 
 146     if (ORTE_PROC_IS_HNP) {
 147         while (NULL != (item = opal_list_remove_first(&outbound_files))) {
 148             OBJ_RELEASE(item);
 149         }
 150         OBJ_DESTRUCT(&outbound_files);
 151         while (NULL != (item = opal_list_remove_first(&positioned_files))) {
 152             OBJ_RELEASE(item);
 153         }
 154         OBJ_DESTRUCT(&positioned_files);
 155     }
 156 
 157     return ORTE_SUCCESS;
 158 }
 159 
 160 static void xfer_complete(int status, orte_filem_raw_xfer_t *xfer)
 161 {
 162     orte_filem_raw_outbound_t *outbound = xfer->outbound;
 163 
 164     /* transfer the status, if not success */
 165     if (ORTE_SUCCESS != status) {
 166         outbound->status = status;
 167     }
 168 
 169     /* this transfer is complete - remove it from list */
 170     opal_list_remove_item(&outbound->xfers, &xfer->super);
 171     /* add it to the list of files that have been positioned */
 172     opal_list_append(&positioned_files, &xfer->super);
 173 
 174     /* if the list is now empty, then the xfer is complete */
 175     if (0 == opal_list_get_size(&outbound->xfers)) {
 176         /* do the callback */
 177         if (NULL != outbound->cbfunc) {
 178             outbound->cbfunc(outbound->status, outbound->cbdata);
 179         }
 180         /* release the object */
 181         opal_list_remove_item(&outbound_files, &outbound->super);
 182         OBJ_RELEASE(outbound);
 183     }
 184 }
 185 
 186 static void recv_ack(int status, orte_process_name_t* sender,
 187                      opal_buffer_t* buffer, orte_rml_tag_t tag,
 188                      void* cbdata)
 189 {
 190     opal_list_item_t *item, *itm;
 191     orte_filem_raw_outbound_t *outbound;
 192     orte_filem_raw_xfer_t *xfer;
 193     char *file;
 194     int st, n, rc;
 195 
 196     /* unpack the file */
 197     n=1;
 198     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &file, &n, OPAL_STRING))) {
 199         ORTE_ERROR_LOG(rc);
 200         return;
 201     }
 202 
 203     /* unpack the status */
 204     n=1;
 205     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &st, &n, OPAL_INT))) {
 206         ORTE_ERROR_LOG(rc);
 207         return;
 208     }
 209 
 210     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 211                          "%s filem:raw: recvd ack from %s for file %s status %d",
 212                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 213                          ORTE_NAME_PRINT(sender), file, st));
 214 
 215     /* find the corresponding outbound object */
 216     for (item = opal_list_get_first(&outbound_files);
 217          item != opal_list_get_end(&outbound_files);
 218          item = opal_list_get_next(item)) {
 219         outbound = (orte_filem_raw_outbound_t*)item;
 220         for (itm = opal_list_get_first(&outbound->xfers);
 221              itm != opal_list_get_end(&outbound->xfers);
 222              itm = opal_list_get_next(itm)) {
 223             xfer = (orte_filem_raw_xfer_t*)itm;
 224             if (0 == strcmp(file, xfer->file)) {
 225                 /* if the status isn't success, record it */
 226                 if (0 != st) {
 227                     xfer->status = st;
 228                 }
 229                 /* track number of respondents */
 230                 xfer->nrecvd++;
 231                 /* if all daemons have responded, then this is complete */
 232                 if (xfer->nrecvd == orte_process_info.num_procs) {
 233                     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 234                                          "%s filem:raw: xfer complete for file %s status %d",
 235                                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 236                                          file, xfer->status));
 237                     xfer_complete(xfer->status, xfer);
 238                 }
 239                 free(file);
 240                 return;
 241             }
 242         }
 243     }
 244 }
 245 
 246 static int raw_preposition_files(orte_job_t *jdata,
 247                                  orte_filem_completion_cbfunc_t cbfunc,
 248                                  void *cbdata)
 249 {
 250     orte_app_context_t *app;
 251     opal_list_item_t *item, *itm, *itm2;
 252     orte_filem_base_file_set_t *fs;
 253     int fd;
 254     orte_filem_raw_xfer_t *xfer, *xptr;
 255     int flags, i, j;
 256     char **files=NULL;
 257     orte_filem_raw_outbound_t *outbound, *optr;
 258     char *cptr, *nxt, *filestring;
 259     opal_list_t fsets;
 260     bool already_sent;
 261 
 262     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 263                          "%s filem:raw: preposition files for job %s",
 264                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 265                          ORTE_JOBID_PRINT(jdata->jobid)));
 266 
 267     /* cycle across the app_contexts looking for files or
 268      * binaries to be prepositioned
 269      */
 270     OBJ_CONSTRUCT(&fsets, opal_list_t);
 271     for (i=0; i < jdata->apps->size; i++) {
 272         if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
 273             continue;
 274         }
 275         if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, NULL, OPAL_BOOL)) {
 276             /* add the executable to our list */
 277             OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 278                                  "%s filem:raw: preload executable %s",
 279                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 280                                  app->app));
 281             fs = OBJ_NEW(orte_filem_base_file_set_t);
 282             fs->local_target = strdup(app->app);
 283             fs->target_flag = ORTE_FILEM_TYPE_EXE;
 284             opal_list_append(&fsets, &fs->super);
 285             /* if we are preloading the binary, then the app must be in relative
 286              * syntax or we won't find it - the binary will be positioned in the
 287              * session dir, so ensure the app is relative to that location
 288              */
 289             cptr = opal_basename(app->app);
 290             free(app->app);
 291             opal_asprintf(&app->app, "./%s", cptr);
 292             free(app->argv[0]);
 293             app->argv[0] = strdup(app->app);
 294             fs->remote_target = strdup(app->app);
 295         }
 296         if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, (void**)&filestring, OPAL_STRING)) {
 297             files = opal_argv_split(filestring, ',');
 298             free(filestring);
 299             for (j=0; NULL != files[j]; j++) {
 300                 fs = OBJ_NEW(orte_filem_base_file_set_t);
 301                 fs->local_target = strdup(files[j]);
 302                 /* check any suffix for file type */
 303                 if (NULL != (cptr = strchr(files[j], '.'))) {
 304                     if (0 == strncmp(cptr, ".tar", 4)) {
 305                         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 306                                              "%s filem:raw: marking file %s as TAR",
 307                                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 308                                              files[j]));
 309                         fs->target_flag = ORTE_FILEM_TYPE_TAR;
 310                     } else if (0 == strncmp(cptr, ".bz", 3)) {
 311                         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 312                                              "%s filem:raw: marking file %s as BZIP",
 313                                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 314                                              files[j]));
 315                         fs->target_flag = ORTE_FILEM_TYPE_BZIP;
 316                     } else if (0 == strncmp(cptr, ".gz", 3)) {
 317                         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 318                                              "%s filem:raw: marking file %s as GZIP",
 319                                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 320                                              files[j]));
 321                         fs->target_flag = ORTE_FILEM_TYPE_GZIP;
 322                     } else {
 323                         fs->target_flag = ORTE_FILEM_TYPE_FILE;
 324                     }
 325                 } else {
 326                     fs->target_flag = ORTE_FILEM_TYPE_FILE;
 327                 }
 328                 /* if we are flattening directory trees, then the
 329                  * remote path is just the basename file name
 330                  */
 331                 if (orte_filem_raw_flatten_trees) {
 332                     fs->remote_target = opal_basename(files[j]);
 333                 } else {
 334                     /* if this was an absolute path, then we need
 335                      * to convert it to a relative path - we do not
 336                      * allow positioning of files to absolute locations
 337                      * due to the potential for unintentional overwriting
 338                      * of files
 339                      */
 340                     if (opal_path_is_absolute(files[j])) {
 341                         fs->remote_target = strdup(&files[j][1]);
 342                     } else {
 343                         fs->remote_target = strdup(files[j]);
 344                     }
 345                 }
 346                 opal_list_append(&fsets, &fs->super);
 347                 /* prep the filename for matching on the remote
 348                  * end by stripping any leading '.' directories to avoid
 349                  * stepping above the session dir location - all
 350                  * files will be relative to that point. Ensure
 351                  * we *don't* mistakenly strip the dot from a
 352                  * filename that starts with one
 353                  */
 354                 cptr = fs->remote_target;
 355                 nxt = cptr;
 356                 nxt++;
 357                 while ('\0' != *cptr) {
 358                     if ('.' == *cptr) {
 359                         /* have to check the next character to
 360                          * see if it's a dotfile or not
 361                          */
 362                         if ('.' == *nxt || '/' == *nxt) {
 363                             cptr = nxt;
 364                             nxt++;
 365                         } else {
 366                             /* if the next character isn't a dot
 367                              * or a slash, then this is a dot-file
 368                              * and we need to leave it alone
 369                              */
 370                             break;
 371                         }
 372                     } else if ('/' == *cptr) {
 373                         /* move to the next character */
 374                         cptr = nxt;
 375                         nxt++;
 376                     } else {
 377                         /* the character isn't a dot or a slash,
 378                          * so this is the beginning of the filename
 379                          */
 380                         break;
 381                     }
 382                 }
 383                 free(files[j]);
 384                 files[j] = strdup(cptr);
 385             }
 386             /* replace the app's file list with the revised one so we
 387              * can find them on the remote end
 388              */
 389             filestring = opal_argv_join(files, ',');
 390             orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, ORTE_ATTR_GLOBAL, filestring, OPAL_STRING);
 391             /* cleanup for the next app */
 392             opal_argv_free(files);
 393             free(filestring);
 394         }
 395     }
 396     if (0 == opal_list_get_size(&fsets)) {
 397         /* nothing to preposition */
 398         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 399                              "%s filem:raw: nothing to preposition",
 400                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 401         if (NULL != cbfunc) {
 402             cbfunc(ORTE_SUCCESS, cbdata);
 403         }
 404         OBJ_DESTRUCT(&fsets);
 405         return ORTE_SUCCESS;
 406     }
 407 
 408     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 409                          "%s filem:raw: found %d files to position",
 410                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 411                          (int)opal_list_get_size(&fsets)));
 412 
 413     /* track the outbound file sets */
 414     outbound = OBJ_NEW(orte_filem_raw_outbound_t);
 415     outbound->cbfunc = cbfunc;
 416     outbound->cbdata = cbdata;
 417     opal_list_append(&outbound_files, &outbound->super);
 418 
 419     /* only the HNP should ever call this function - loop thru the
 420      * fileset and initiate xcast transfer of each file to every
 421      * daemon
 422      */
 423     while (NULL != (item = opal_list_remove_first(&fsets))) {
 424         fs = (orte_filem_base_file_set_t*)item;
 425         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 426                              "%s filem:raw: checking prepositioning of file %s",
 427                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 428                              fs->local_target));
 429 
 430         /* have we already sent this file? */
 431         already_sent = false;
 432         for (itm = opal_list_get_first(&positioned_files);
 433              !already_sent && itm != opal_list_get_end(&positioned_files);
 434              itm = opal_list_get_next(itm)) {
 435             xptr = (orte_filem_raw_xfer_t*)itm;
 436             if (0 == strcmp(fs->local_target, xptr->src)) {
 437                 already_sent = true;
 438             }
 439         }
 440         if (already_sent) {
 441             /* no need to send it again */
 442             OPAL_OUTPUT_VERBOSE((3, orte_filem_base_framework.framework_output,
 443                                  "%s filem:raw: file %s is already in position - ignoring",
 444                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
 445             OBJ_RELEASE(item);
 446             continue;
 447         }
 448         /* also have to check if this file is already in the process
 449          * of being transferred, or was included multiple times
 450          * for transfer
 451          */
 452         for (itm = opal_list_get_first(&outbound_files);
 453              !already_sent && itm != opal_list_get_end(&outbound_files);
 454              itm = opal_list_get_next(itm)) {
 455             optr = (orte_filem_raw_outbound_t*)itm;
 456             for (itm2 = opal_list_get_first(&optr->xfers);
 457                  itm2 != opal_list_get_end(&optr->xfers);
 458                  itm2 = opal_list_get_next(itm2)) {
 459                 xptr = (orte_filem_raw_xfer_t*)itm2;
 460                 if (0 == strcmp(fs->local_target, xptr->src)) {
 461                     already_sent = true;
 462                 }
 463             }
 464         }
 465         if (already_sent) {
 466             /* no need to send it again */
 467             OPAL_OUTPUT_VERBOSE((3, orte_filem_base_framework.framework_output,
 468                                  "%s filem:raw: file %s is already queued for output - ignoring",
 469                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
 470             OBJ_RELEASE(item);
 471             continue;
 472         }
 473 
 474         /* attempt to open the specified file */
 475         if (0 > (fd = open(fs->local_target, O_RDONLY))) {
 476             opal_output(0, "%s CANNOT ACCESS FILE %s",
 477                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target);
 478             OBJ_RELEASE(item);
 479             opal_list_remove_item(&outbound_files, &outbound->super);
 480             OBJ_RELEASE(outbound);
 481             return ORTE_ERROR;
 482         }
 483         /* set the flags to non-blocking */
 484         if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 485             opal_output(orte_filem_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 486                         __FILE__, __LINE__, errno);
 487         } else {
 488             flags |= O_NONBLOCK;
 489             if (fcntl(fd, F_SETFL, flags) < 0) {
 490                 opal_output(orte_filem_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 491                             __FILE__, __LINE__, errno);
 492             }
 493         }
 494         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 495                              "%s filem:raw: setting up to position file %s",
 496                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
 497         xfer = OBJ_NEW(orte_filem_raw_xfer_t);
 498         /* save the source so we can avoid duplicate transfers */
 499         xfer->src = strdup(fs->local_target);
 500         /* strip any leading '.' directories to avoid
 501          * stepping above the session dir location - all
 502          * files will be relative to that point. Ensure
 503          * we *don't* mistakenly strip the dot from a
 504          * filename that starts with one
 505          */
 506         cptr = fs->remote_target;
 507         nxt = cptr;
 508         nxt++;
 509         while ('\0' != *cptr) {
 510             if ('.' == *cptr) {
 511                 /* have to check the next character to
 512                  * see if it's a dotfile or not
 513                  */
 514                 if ('.' == *nxt || '/' == *nxt) {
 515                     cptr = nxt;
 516                     nxt++;
 517                 } else {
 518                     /* if the next character isn't a dot
 519                      * or a slash, then this is a dot-file
 520                      * and we need to leave it alone
 521                      */
 522                     break;
 523                 }
 524             } else if ('/' == *cptr) {
 525                 /* move to the next character */
 526                 cptr = nxt;
 527                 nxt++;
 528             } else {
 529                 /* the character isn't a dot or a slash,
 530                  * so this is the beginning of the filename
 531                  */
 532                 break;
 533             }
 534         }
 535         xfer->file = strdup(cptr);
 536         xfer->type = fs->target_flag;
 537         xfer->app_idx = fs->app_idx;
 538         xfer->outbound = outbound;
 539         opal_list_append(&outbound->xfers, &xfer->super);
 540         opal_event_set(orte_event_base, &xfer->ev, fd, OPAL_EV_READ, send_chunk, xfer);
 541         opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
 542         xfer->pending = true;
 543         ORTE_POST_OBJECT(xfer);
 544         opal_event_add(&xfer->ev, 0);
 545         OBJ_RELEASE(item);
 546     }
 547     OBJ_DESTRUCT(&fsets);
 548 
 549     /* check to see if anything remains to be sent - if everything
 550      * is a duplicate, then the list will be empty
 551      */
 552     if (0 == opal_list_get_size(&outbound->xfers)) {
 553         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 554                              "%s filem:raw: all duplicate files - no positioning reqd",
 555                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 556         opal_list_remove_item(&outbound_files, &outbound->super);
 557         OBJ_RELEASE(outbound);
 558         if (NULL != cbfunc) {
 559             cbfunc(ORTE_SUCCESS, cbdata);
 560         }
 561         return ORTE_SUCCESS;
 562     }
 563 
 564     if (0 < opal_output_get_verbosity(orte_filem_base_framework.framework_output)) {
 565         opal_output(0, "%s Files to be positioned:", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 566         for (itm2 = opal_list_get_first(&outbound->xfers);
 567              itm2 != opal_list_get_end(&outbound->xfers);
 568              itm2 = opal_list_get_next(itm2)) {
 569             xptr = (orte_filem_raw_xfer_t*)itm2;
 570             opal_output(0, "%s\t%s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xptr->src);
 571         }
 572     }
 573 
 574     return ORTE_SUCCESS;
 575 }
 576 
 577 static int create_link(char *my_dir, char *path,
 578                        char *link_pt)
 579 {
 580     char *mypath, *fullname, *basedir;
 581     struct stat buf;
 582     int rc = ORTE_SUCCESS;
 583 
 584     /* form the full source path name */
 585     mypath = opal_os_path(false, my_dir, link_pt, NULL);
 586     /* form the full target path name */
 587     fullname = opal_os_path(false, path, link_pt, NULL);
 588     /* there may have been multiple files placed under the
 589      * same directory, so check for existence first
 590      */
 591     if (0 != stat(fullname, &buf)) {
 592         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 593                              "%s filem:raw: creating symlink to %s\n\tmypath: %s\n\tlink: %s",
 594                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), link_pt,
 595                              mypath, fullname));
 596         /* create any required path to the link location */
 597         basedir = opal_dirname(fullname);
 598         if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(basedir, S_IRWXU))) {
 599             ORTE_ERROR_LOG(rc);
 600             opal_output(0, "%s Failed to symlink %s to %s",
 601                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mypath, fullname);
 602             free(basedir);
 603             free(mypath);
 604             free(fullname);
 605             return rc;
 606         }
 607         free(basedir);
 608         /* do the symlink */
 609         if (0 != symlink(mypath, fullname)) {
 610             opal_output(0, "%s Failed to symlink %s to %s",
 611                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mypath, fullname);
 612             rc = ORTE_ERROR;
 613         }
 614     }
 615     free(mypath);
 616     free(fullname);
 617     return rc;
 618 }
 619 
 620 static int raw_link_local_files(orte_job_t *jdata,
 621                                 orte_app_context_t *app)
 622 {
 623     char *session_dir, *path=NULL;
 624     orte_proc_t *proc;
 625     int i, j, rc;
 626     orte_filem_raw_incoming_t *inbnd;
 627     opal_list_item_t *item;
 628     char **files=NULL, *bname, *filestring;
 629 
 630     /* check my jobfam session directory for files I have received and
 631      * symlink them to the proc-level session directory of each
 632      * local process in the job
 633      *
 634      * TODO: @rhc - please check that I've correctly interpret your
 635      *  intention here
 636      */
 637     session_dir = filem_session_dir();
 638     if( NULL == session_dir){
 639         /* we were unable to find any suitable directory */
 640         rc = ORTE_ERR_BAD_PARAM;
 641         ORTE_ERROR_LOG(rc);
 642         return rc;
 643     }
 644 
 645     /* get the list of files this app wants */
 646     if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, (void**)&filestring, OPAL_STRING)) {
 647         files = opal_argv_split(filestring, ',');
 648         free(filestring);
 649     }
 650     if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, NULL, OPAL_BOOL)) {
 651         /* add the app itself to the list */
 652         bname = opal_basename(app->app);
 653         opal_argv_append_nosize(&files, bname);
 654         free(bname);
 655     }
 656 
 657     /* if there are no files to link, then ignore this */
 658     if (NULL == files) {
 659         return ORTE_SUCCESS;
 660     }
 661 
 662     for (i=0; i < orte_local_children->size; i++) {
 663         if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 664             continue;
 665         }
 666         OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 667                              "%s filem:raw: working symlinks for proc %s",
 668                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 669                              ORTE_NAME_PRINT(&proc->name)));
 670         if (proc->name.jobid != jdata->jobid) {
 671             OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 672                                  "%s filem:raw: proc %s not part of job %s",
 673                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 674                                  ORTE_NAME_PRINT(&proc->name),
 675                                  ORTE_JOBID_PRINT(jdata->jobid)));
 676             continue;
 677         }
 678         if (proc->app_idx != app->idx) {
 679             OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 680                                  "%s filem:raw: proc %s not part of app_idx %d",
 681                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 682                                  ORTE_NAME_PRINT(&proc->name),
 683                                  (int)app->idx));
 684             continue;
 685         }
 686         /* ignore children we have already handled */
 687         if (ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_ALIVE) ||
 688             (ORTE_PROC_STATE_INIT != proc->state &&
 689              ORTE_PROC_STATE_RESTART != proc->state)) {
 690             continue;
 691         }
 692 
 693         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 694                              "%s filem:raw: creating symlinks for %s",
 695                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 696                              ORTE_NAME_PRINT(&proc->name)));
 697 
 698         /* get the session dir name in absolute form */
 699         path = orte_process_info.proc_session_dir;
 700 
 701         /* create it, if it doesn't already exist */
 702         if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(path, S_IRWXU))) {
 703             ORTE_ERROR_LOG(rc);
 704             /* doesn't exist with correct permissions, and/or we can't
 705              * create it - either way, we are done
 706              */
 707             free(files);
 708             return rc;
 709         }
 710 
 711         /* cycle thru the incoming files */
 712         for (item = opal_list_get_first(&incoming_files);
 713              item != opal_list_get_end(&incoming_files);
 714              item = opal_list_get_next(item)) {
 715             inbnd = (orte_filem_raw_incoming_t*)item;
 716             OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 717                                  "%s filem:raw: checking file %s",
 718                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), inbnd->file));
 719 
 720             /* is this file for this app_context? */
 721             for (j=0; NULL != files[j]; j++) {
 722                 if (0 == strcmp(inbnd->file, files[j])) {
 723                     /* this must be one of the files we are to link against */
 724                     if (NULL != inbnd->link_pts) {
 725                         OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 726                                              "%s filem:raw: creating links for file %s",
 727                                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 728                                              inbnd->file));
 729                         /* cycle thru the link points and create symlinks to them */
 730                         for (j=0; NULL != inbnd->link_pts[j]; j++) {
 731                             if (ORTE_SUCCESS != (rc = create_link(session_dir, path, inbnd->link_pts[j]))) {
 732                                 ORTE_ERROR_LOG(rc);
 733                                 free(files);
 734                                 return rc;
 735                             }
 736                         }
 737                     } else {
 738                         OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 739                                              "%s filem:raw: file %s has no link points",
 740                                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 741                                              inbnd->file));
 742                     }
 743                     break;
 744                 }
 745             }
 746         }
 747     }
 748     opal_argv_free(files);
 749     return ORTE_SUCCESS;
 750 }
 751 
 752 static void send_chunk(int fd, short argc, void *cbdata)
 753 {
 754     orte_filem_raw_xfer_t *rev = (orte_filem_raw_xfer_t*)cbdata;
 755     unsigned char data[ORTE_FILEM_RAW_CHUNK_MAX];
 756     int32_t numbytes;
 757     int rc;
 758     opal_buffer_t chunk;
 759     orte_grpcomm_signature_t *sig;
 760 
 761     ORTE_ACQUIRE_OBJECT(rev);
 762 
 763     /* flag that event has fired */
 764     rev->pending = false;
 765 
 766     /* read up to the fragment size */
 767     numbytes = read(fd, data, sizeof(data));
 768 
 769     if (numbytes < 0) {
 770         /* either we have a connection error or it was a non-blocking read */
 771 
 772         /* non-blocking, retry */
 773         if (EAGAIN == errno || EINTR == errno) {
 774             ORTE_POST_OBJECT(rev);
 775             opal_event_add(&rev->ev, 0);
 776             return;
 777         }
 778 
 779         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 780                              "%s filem:raw:read error on file %s",
 781                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rev->file));
 782 
 783         /* Un-recoverable error. Allow the code to flow as usual in order to
 784          * to send the zero bytes message up the stream, and then close the
 785          * file descriptor and delete the event.
 786          */
 787         numbytes = 0;
 788     }
 789 
 790     /* if job termination has been ordered, just ignore the
 791      * data and delete the read event
 792      */
 793     if (orte_job_term_ordered) {
 794         OBJ_RELEASE(rev);
 795         return;
 796     }
 797 
 798     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 799                          "%s filem:raw:read handler sending chunk %d of %d bytes for file %s",
 800                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 801                          rev->nchunk, numbytes, rev->file));
 802 
 803     /* package it for transmission */
 804     OBJ_CONSTRUCT(&chunk, opal_buffer_t);
 805     if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->file, 1, OPAL_STRING))) {
 806         ORTE_ERROR_LOG(rc);
 807         close(fd);
 808         return;
 809     }
 810     if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->nchunk, 1, OPAL_INT32))) {
 811         ORTE_ERROR_LOG(rc);
 812         close(fd);
 813         return;
 814     }
 815     if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, data, numbytes, OPAL_BYTE))) {
 816         ORTE_ERROR_LOG(rc);
 817         close(fd);
 818         return;
 819     }
 820     /* if it is the first chunk, then add file type and index of the app */
 821     if (0 == rev->nchunk) {
 822         if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->type, 1, OPAL_INT32))) {
 823             ORTE_ERROR_LOG(rc);
 824             close(fd);
 825             return;
 826         }
 827     }
 828 
 829     /* goes to all daemons */
 830     sig = OBJ_NEW(orte_grpcomm_signature_t);
 831     sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
 832     sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
 833     sig->signature[0].vpid = ORTE_VPID_WILDCARD;
 834     if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_FILEM_BASE, &chunk))) {
 835         ORTE_ERROR_LOG(rc);
 836         close(fd);
 837         return;
 838     }
 839     OBJ_DESTRUCT(&chunk);
 840     OBJ_RELEASE(sig);
 841     rev->nchunk++;
 842 
 843     /* if num_bytes was zero, then we need to terminate the event
 844      * and close the file descriptor
 845      */
 846     if (0 == numbytes) {
 847         close(fd);
 848         return;
 849     } else {
 850         /* restart the read event */
 851         rev->pending = true;
 852         ORTE_POST_OBJECT(rev);
 853         opal_event_add(&rev->ev, 0);
 854     }
 855 }
 856 
 857 static void send_complete(char *file, int status)
 858 {
 859     opal_buffer_t *buf;
 860     int rc;
 861 
 862     buf = OBJ_NEW(opal_buffer_t);
 863     if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &file, 1, OPAL_STRING))) {
 864         ORTE_ERROR_LOG(rc);
 865         OBJ_RELEASE(buf);
 866         return;
 867     }
 868     if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
 869         ORTE_ERROR_LOG(rc);
 870         OBJ_RELEASE(buf);
 871         return;
 872     }
 873     if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
 874                                           ORTE_RML_TAG_FILEM_BASE_RESP,
 875                                           orte_rml_send_callback, NULL))) {
 876         ORTE_ERROR_LOG(rc);
 877         OBJ_RELEASE(buf);
 878     }
 879 }
 880 
 881 /* This is a little tricky as the name of the archive doesn't
 882  * necessarily have anything to do with the paths inside it -
 883  * so we have to first query the archive to retrieve that info
 884  */
 885 static int link_archive(orte_filem_raw_incoming_t *inbnd)
 886 {
 887     FILE *fp;
 888     char *cmd;
 889     char path[MAXPATHLEN];
 890 
 891     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 892                          "%s filem:raw: identifying links for archive %s",
 893                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 894                          inbnd->fullpath));
 895 
 896     opal_asprintf(&cmd, "tar tf %s", inbnd->fullpath);
 897     fp = popen(cmd, "r");
 898     free(cmd);
 899     if (NULL == fp) {
 900         ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
 901         return ORTE_ERR_FILE_OPEN_FAILURE;
 902     }
 903     /* because app_contexts might share part or all of a
 904      * directory tree, but link to different files, we
 905      * have to link to each individual file
 906      */
 907     while (fgets(path, sizeof(path), fp) != NULL) {
 908         OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 909                              "%s filem:raw: path %s",
 910                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 911                              path));
 912         /* protect against an empty result */
 913         if (0 == strlen(path)) {
 914             continue;
 915         }
 916         /* trim the trailing cr */
 917         path[strlen(path)-1] = '\0';
 918         /* ignore directories */
 919         if ('/' == path[strlen(path)-1]) {
 920             OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 921                                  "%s filem:raw: path %s is a directory - ignoring it",
 922                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 923                                  path));
 924             continue;
 925         }
 926         /* ignore specific useless directory trees */
 927         if (NULL != strstr(path, ".deps")) {
 928             OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 929                                  "%s filem:raw: path %s includes .deps - ignoring it",
 930                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 931                                  path));
 932             continue;
 933         }
 934         OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
 935                              "%s filem:raw: adding path %s to link points",
 936                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 937                              path));
 938         opal_argv_append_nosize(&inbnd->link_pts, path);
 939     }
 940     /* close */
 941     pclose(fp);
 942     return ORTE_SUCCESS;
 943 }
 944 
 945 static void recv_files(int status, orte_process_name_t* sender,
 946                        opal_buffer_t* buffer, orte_rml_tag_t tag,
 947                        void* cbdata)
 948 {
 949     char *file, *session_dir;
 950     int32_t nchunk, n, nbytes;
 951     unsigned char data[ORTE_FILEM_RAW_CHUNK_MAX];
 952     int rc;
 953     orte_filem_raw_output_t *output;
 954     orte_filem_raw_incoming_t *ptr, *incoming;
 955     opal_list_item_t *item;
 956     int32_t type;
 957     char *cptr;
 958 
 959     /* unpack the data */
 960     n=1;
 961     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &file, &n, OPAL_STRING))) {
 962         ORTE_ERROR_LOG(rc);
 963         send_complete(NULL, rc);
 964         return;
 965     }
 966     n=1;
 967     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &nchunk, &n, OPAL_INT32))) {
 968         ORTE_ERROR_LOG(rc);
 969         send_complete(file, rc);
 970         free(file);
 971         return;
 972     }
 973     /* if the chunk number is < 0, then this is an EOF message */
 974     if (nchunk < 0) {
 975         /* just set nbytes to zero so we close the fd */
 976         nbytes = 0;
 977     } else {
 978         nbytes=ORTE_FILEM_RAW_CHUNK_MAX;
 979         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, data, &nbytes, OPAL_BYTE))) {
 980             ORTE_ERROR_LOG(rc);
 981             send_complete(file, rc);
 982             free(file);
 983             return;
 984         }
 985     }
 986     /* if the chunk is 0, then additional info should be present */
 987     if (0 == nchunk) {
 988         n=1;
 989         if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &type, &n, OPAL_INT32))) {
 990             ORTE_ERROR_LOG(rc);
 991             send_complete(file, rc);
 992             free(file);
 993             return;
 994         }
 995     }
 996 
 997     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
 998                          "%s filem:raw: received chunk %d for file %s containing %d bytes",
 999                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1000                          nchunk, file, nbytes));
1001 
1002     /* do we already have this file on our list of incoming? */
1003     incoming = NULL;
1004     for (item = opal_list_get_first(&incoming_files);
1005          item != opal_list_get_end(&incoming_files);
1006          item = opal_list_get_next(item)) {
1007         ptr = (orte_filem_raw_incoming_t*)item;
1008         if (0 == strcmp(file, ptr->file)) {
1009             incoming = ptr;
1010             break;
1011         }
1012     }
1013     if (NULL == incoming) {
1014         /* nope - add it */
1015         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1016                              "%s filem:raw: adding file %s to incoming list",
1017                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), file));
1018         incoming = OBJ_NEW(orte_filem_raw_incoming_t);
1019         incoming->file = strdup(file);
1020         incoming->type = type;
1021         opal_list_append(&incoming_files, &incoming->super);
1022     }
1023 
1024     /* if this is the first chunk, we need to open the file descriptor */
1025     if (0 == nchunk) {
1026         /* separate out the top-level directory of the target */
1027         char *tmp;
1028         tmp = strdup(file);
1029         if (NULL != (cptr = strchr(tmp, '/'))) {
1030             *cptr = '\0';
1031         }
1032         /* save it */
1033         incoming->top = strdup(tmp);
1034         free(tmp);
1035         /* define the full path to where we will put it */
1036         session_dir = filem_session_dir();
1037 
1038         incoming->fullpath = opal_os_path(false, session_dir, file, NULL);
1039 
1040         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1041                              "%s filem:raw: opening target file %s",
1042                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), incoming->fullpath));
1043         /* create the path to the target, if not already existing */
1044         tmp = opal_dirname(incoming->fullpath);
1045         if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(tmp, S_IRWXU))) {
1046             ORTE_ERROR_LOG(rc);
1047             send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
1048             free(file);
1049             free(tmp);
1050             OBJ_RELEASE(incoming);
1051             return;
1052         }
1053         /* open the file descriptor for writing */
1054         if (ORTE_FILEM_TYPE_EXE == type) {
1055             if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU))) {
1056                 opal_output(0, "%s CANNOT CREATE FILE %s",
1057                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1058                             incoming->fullpath);
1059                 send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
1060                 free(file);
1061                 free(tmp);
1062                 return;
1063             }
1064         } else {
1065             if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR))) {
1066                 opal_output(0, "%s CANNOT CREATE FILE %s",
1067                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1068                             incoming->fullpath);
1069                 send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
1070                 free(file);
1071                 free(tmp);
1072                 return;
1073             }
1074         }
1075         free(tmp);
1076         opal_event_set(orte_event_base, &incoming->ev, incoming->fd,
1077                        OPAL_EV_WRITE, write_handler, incoming);
1078         opal_event_set_priority(&incoming->ev, ORTE_MSG_PRI);
1079     }
1080     /* create an output object for this data */
1081     output = OBJ_NEW(orte_filem_raw_output_t);
1082     if (0 < nbytes) {
1083         /* don't copy 0 bytes - we just need to pass
1084          * the zero bytes so the fd can be closed
1085          * after it writes everything out
1086          */
1087         memcpy(output->data, data, nbytes);
1088     }
1089     output->numbytes = nbytes;
1090 
1091     /* add this data to the write list for this fd */
1092     opal_list_append(&incoming->outputs, &output->super);
1093 
1094     if (!incoming->pending) {
1095         /* add the event */
1096         incoming->pending = true;
1097         ORTE_POST_OBJECT(incoming);
1098         opal_event_add(&incoming->ev, 0);
1099     }
1100 
1101     /* cleanup */
1102     free(file);
1103 }
1104 
1105 
1106 static void write_handler(int fd, short event, void *cbdata)
1107 {
1108     orte_filem_raw_incoming_t *sink = (orte_filem_raw_incoming_t*)cbdata;
1109     opal_list_item_t *item;
1110     orte_filem_raw_output_t *output;
1111     int num_written;
1112     char *dirname, *cmd;
1113     char homedir[MAXPATHLEN];
1114     int rc;
1115 
1116     ORTE_ACQUIRE_OBJECT(sink);
1117 
1118     OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1119                          "%s write:handler writing data to %d",
1120                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1121                          sink->fd));
1122 
1123     /* note that the event is off */
1124     sink->pending = false;
1125 
1126     while (NULL != (item = opal_list_remove_first(&sink->outputs))) {
1127         output = (orte_filem_raw_output_t*)item;
1128         if (0 == output->numbytes) {
1129             /* indicates we are to close this stream */
1130             OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1131                                  "%s write:handler zero bytes - reporting complete for file %s",
1132                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1133                                  sink->file));
1134             /* close the file descriptor */
1135             close(sink->fd);
1136             sink->fd = -1;
1137             if (ORTE_FILEM_TYPE_FILE == sink->type ||
1138                 ORTE_FILEM_TYPE_EXE == sink->type) {
1139                 /* just link to the top as this will be the
1140                  * name we will want in each proc's session dir
1141                  */
1142                 opal_argv_append_nosize(&sink->link_pts, sink->top);
1143                 send_complete(sink->file, ORTE_SUCCESS);
1144             } else {
1145                 /* unarchive the file */
1146                 if (ORTE_FILEM_TYPE_TAR == sink->type) {
1147                     opal_asprintf(&cmd, "tar xf %s", sink->file);
1148                 } else if (ORTE_FILEM_TYPE_BZIP == sink->type) {
1149                     opal_asprintf(&cmd, "tar xjf %s", sink->file);
1150                 } else if (ORTE_FILEM_TYPE_GZIP == sink->type) {
1151                     opal_asprintf(&cmd, "tar xzf %s", sink->file);
1152                 } else {
1153                     ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1154                     send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1155                     return;
1156                 }
1157                 if (NULL == getcwd(homedir, sizeof(homedir))) {
1158                     ORTE_ERROR_LOG(ORTE_ERROR);
1159                     send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1160                     return;
1161                 }
1162                 dirname = opal_dirname(sink->fullpath);
1163                 if (0 != chdir(dirname)) {
1164                     ORTE_ERROR_LOG(ORTE_ERROR);
1165                     send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1166                     return;
1167                 }
1168                 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1169                                      "%s write:handler unarchiving file %s with cmd: %s",
1170                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1171                                      sink->file, cmd));
1172                 if (0 != system(cmd)) {
1173                     ORTE_ERROR_LOG(ORTE_ERROR);
1174                     send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1175                     return;
1176                 }
1177                 if (0 != chdir(homedir)) {
1178                     ORTE_ERROR_LOG(ORTE_ERROR);
1179                     send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1180                     return;
1181                 }
1182                 free(dirname);
1183                 free(cmd);
1184                 /* setup the link points */
1185                 if (ORTE_SUCCESS != (rc = link_archive(sink))) {
1186                     ORTE_ERROR_LOG(rc);
1187                     send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1188                 } else {
1189                     send_complete(sink->file, ORTE_SUCCESS);
1190                 }
1191             }
1192             return;
1193         }
1194         num_written = write(sink->fd, output->data, output->numbytes);
1195         OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1196                              "%s write:handler wrote %d bytes to file %s",
1197                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1198                              num_written, sink->file));
1199         if (num_written < 0) {
1200             if (EAGAIN == errno || EINTR == errno) {
1201                 /* push this item back on the front of the list */
1202                 opal_list_prepend(&sink->outputs, item);
1203                 /* leave the write event running so it will call us again
1204                  * when the fd is ready.
1205                  */
1206                 sink->pending = true;
1207                 ORTE_POST_OBJECT(sink);
1208                 opal_event_add(&sink->ev, 0);
1209                 return;
1210             }
1211             /* otherwise, something bad happened so all we can do is abort
1212              * this attempt
1213              */
1214             OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1215                                  "%s write:handler error on write for file %s: %s",
1216                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1217                                  sink->file, strerror(errno)));
1218             OBJ_RELEASE(output);
1219             opal_list_remove_item(&incoming_files, &sink->super);
1220             send_complete(sink->file, OPAL_ERR_FILE_WRITE_FAILURE);
1221             OBJ_RELEASE(sink);
1222             return;
1223         } else if (num_written < output->numbytes) {
1224             /* incomplete write - adjust data to avoid duplicate output */
1225             memmove(output->data, &output->data[num_written], output->numbytes - num_written);
1226             /* push this item back on the front of the list */
1227             opal_list_prepend(&sink->outputs, item);
1228             /* leave the write event running so it will call us again
1229              * when the fd is ready
1230              */
1231             sink->pending = true;
1232             ORTE_POST_OBJECT(sink);
1233             opal_event_add(&sink->ev, 0);
1234             return;
1235         }
1236         OBJ_RELEASE(output);
1237     }
1238 }
1239 
1240 static void xfer_construct(orte_filem_raw_xfer_t *ptr)
1241 {
1242     ptr->outbound = NULL;
1243     ptr->app_idx = 0;
1244     ptr->pending = false;
1245     ptr->src = NULL;
1246     ptr->file = NULL;
1247     ptr->nchunk = 0;
1248     ptr->status = ORTE_SUCCESS;
1249     ptr->nrecvd = 0;
1250 }
1251 static void xfer_destruct(orte_filem_raw_xfer_t *ptr)
1252 {
1253     if (ptr->pending) {
1254         opal_event_del(&ptr->ev);
1255     }
1256     if (NULL != ptr->src) {
1257         free(ptr->src);
1258     }
1259     if (NULL != ptr->file) {
1260         free(ptr->file);
1261     }
1262 }
1263 OBJ_CLASS_INSTANCE(orte_filem_raw_xfer_t,
1264                    opal_list_item_t,
1265                    xfer_construct, xfer_destruct);
1266 
1267 static void out_construct(orte_filem_raw_outbound_t *ptr)
1268 {
1269     OBJ_CONSTRUCT(&ptr->xfers, opal_list_t);
1270     ptr->status = ORTE_SUCCESS;
1271     ptr->cbfunc = NULL;
1272     ptr->cbdata = NULL;
1273 }
1274 static void out_destruct(orte_filem_raw_outbound_t *ptr)
1275 {
1276     opal_list_item_t *item;
1277 
1278     while (NULL != (item = opal_list_remove_first(&ptr->xfers))) {
1279         OBJ_RELEASE(item);
1280     }
1281     OBJ_DESTRUCT(&ptr->xfers);
1282 }
1283 OBJ_CLASS_INSTANCE(orte_filem_raw_outbound_t,
1284                    opal_list_item_t,
1285                    out_construct, out_destruct);
1286 
1287 static void in_construct(orte_filem_raw_incoming_t *ptr)
1288 {
1289     ptr->app_idx = 0;
1290     ptr->pending = false;
1291     ptr->fd = -1;
1292     ptr->file = NULL;
1293     ptr->top = NULL;
1294     ptr->fullpath = NULL;
1295     ptr->link_pts = NULL;
1296     OBJ_CONSTRUCT(&ptr->outputs, opal_list_t);
1297 }
1298 static void in_destruct(orte_filem_raw_incoming_t *ptr)
1299 {
1300     opal_list_item_t *item;
1301 
1302     if (ptr->pending) {
1303         opal_event_del(&ptr->ev);
1304     }
1305     if (0 <= ptr->fd) {
1306         close(ptr->fd);
1307     }
1308     if (NULL != ptr->file) {
1309         free(ptr->file);
1310     }
1311     if (NULL != ptr->top) {
1312         free(ptr->top);
1313     }
1314     if (NULL != ptr->fullpath) {
1315         free(ptr->fullpath);
1316     }
1317     opal_argv_free(ptr->link_pts);
1318     while (NULL != (item = opal_list_remove_first(&ptr->outputs))) {
1319         OBJ_RELEASE(item);
1320     }
1321     OBJ_DESTRUCT(&ptr->outputs);
1322 }
1323 OBJ_CLASS_INSTANCE(orte_filem_raw_incoming_t,
1324                    opal_list_item_t,
1325                    in_construct, in_destruct);
1326 
1327 static void output_construct(orte_filem_raw_output_t *ptr)
1328 {
1329     ptr->numbytes = 0;
1330 }
1331 OBJ_CLASS_INSTANCE(orte_filem_raw_output_t,
1332                    opal_list_item_t,
1333                    output_construct, NULL);

/* [<][>][^][v][top][bottom][index][help] */