This source file includes following definitions.
- filem_session_dir
- raw_init
- raw_finalize
- xfer_complete
- recv_ack
- raw_preposition_files
- create_link
- raw_link_local_files
- send_chunk
- send_complete
- link_archive
- recv_files
- write_handler
- xfer_construct
- xfer_destruct
- out_construct
- out_destruct
- in_construct
- in_destruct
- output_construct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include "orte_config.h"
20 #include "orte/constants.h"
21
22 #include <string.h>
23 #include <stdlib.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #ifdef HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29 #ifdef HAVE_DIRENT_H
30 #include <dirent.h>
31 #endif
32 #ifdef HAVE_FCNTL_H
33 #include <fcntl.h>
34 #endif
35
36 #include "opal/class/opal_list.h"
37 #include "opal/mca/event/event.h"
38 #include "opal/dss/dss.h"
39
40 #include "orte/util/show_help.h"
41 #include "opal/util/argv.h"
42 #include "opal/util/output.h"
43 #include "opal/util/opal_environ.h"
44 #include "opal/util/os_dirpath.h"
45 #include "opal/util/os_path.h"
46 #include "opal/util/path.h"
47 #include "opal/util/basename.h"
48
49 #include "orte/util/name_fns.h"
50 #include "orte/util/proc_info.h"
51 #include "orte/util/session_dir.h"
52 #include "orte/util/threads.h"
53 #include "orte/runtime/orte_globals.h"
54 #include "orte/mca/errmgr/errmgr.h"
55 #include "orte/mca/grpcomm/base/base.h"
56 #include "orte/mca/rml/rml.h"
57
58 #include "orte/mca/filem/filem.h"
59 #include "orte/mca/filem/base/base.h"
60
61 #include "filem_raw.h"
62
63 static int raw_init(void);
64 static int raw_finalize(void);
65 static int raw_preposition_files(orte_job_t *jdata,
66 orte_filem_completion_cbfunc_t cbfunc,
67 void *cbdata);
68 static int raw_link_local_files(orte_job_t *jdata,
69 orte_app_context_t *app);
70
71 orte_filem_base_module_t mca_filem_raw_module = {
72 .filem_init = raw_init,
73 .filem_finalize = raw_finalize,
74
75 .put = orte_filem_base_none_put,
76 .put_nb = orte_filem_base_none_put_nb,
77 .get = orte_filem_base_none_get,
78 .get_nb = orte_filem_base_none_get_nb,
79 .rm = orte_filem_base_none_rm,
80 .rm_nb = orte_filem_base_none_rm_nb,
81 .wait = orte_filem_base_none_wait,
82 .wait_all = orte_filem_base_none_wait_all,
83
84 .preposition_files = raw_preposition_files,
85 .link_local_files = raw_link_local_files
86 };
87
88 static opal_list_t outbound_files;
89 static opal_list_t incoming_files;
90 static opal_list_t positioned_files;
91
92 static void send_chunk(int fd, short argc, void *cbdata);
93 static void recv_files(int status, orte_process_name_t* sender,
94 opal_buffer_t* buffer, orte_rml_tag_t tag,
95 void* cbdata);
96 static void recv_ack(int status, orte_process_name_t* sender,
97 opal_buffer_t* buffer, orte_rml_tag_t tag,
98 void* cbdata);
99 static void write_handler(int fd, short event, void *cbdata);
100
101 static char *filem_session_dir(void)
102 {
103 char *session_dir = orte_process_info.jobfam_session_dir;
104 if( NULL == session_dir ){
105
106
107 session_dir = orte_process_info.job_session_dir;
108 }
109 return session_dir;
110 }
111
112 static int raw_init(void)
113 {
114 OBJ_CONSTRUCT(&incoming_files, opal_list_t);
115
116
117 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
118 ORTE_RML_TAG_FILEM_BASE,
119 ORTE_RML_PERSISTENT,
120 recv_files,
121 NULL);
122
123
124 if (ORTE_PROC_IS_HNP) {
125 OBJ_CONSTRUCT(&outbound_files, opal_list_t);
126 OBJ_CONSTRUCT(&positioned_files, opal_list_t);
127 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
128 ORTE_RML_TAG_FILEM_BASE_RESP,
129 ORTE_RML_PERSISTENT,
130 recv_ack,
131 NULL);
132 }
133
134 return ORTE_SUCCESS;
135 }
136
137 static int raw_finalize(void)
138 {
139 opal_list_item_t *item;
140
141 while (NULL != (item = opal_list_remove_first(&incoming_files))) {
142 OBJ_RELEASE(item);
143 }
144 OBJ_DESTRUCT(&incoming_files);
145
146 if (ORTE_PROC_IS_HNP) {
147 while (NULL != (item = opal_list_remove_first(&outbound_files))) {
148 OBJ_RELEASE(item);
149 }
150 OBJ_DESTRUCT(&outbound_files);
151 while (NULL != (item = opal_list_remove_first(&positioned_files))) {
152 OBJ_RELEASE(item);
153 }
154 OBJ_DESTRUCT(&positioned_files);
155 }
156
157 return ORTE_SUCCESS;
158 }
159
160 static void xfer_complete(int status, orte_filem_raw_xfer_t *xfer)
161 {
162 orte_filem_raw_outbound_t *outbound = xfer->outbound;
163
164
165 if (ORTE_SUCCESS != status) {
166 outbound->status = status;
167 }
168
169
170 opal_list_remove_item(&outbound->xfers, &xfer->super);
171
172 opal_list_append(&positioned_files, &xfer->super);
173
174
175 if (0 == opal_list_get_size(&outbound->xfers)) {
176
177 if (NULL != outbound->cbfunc) {
178 outbound->cbfunc(outbound->status, outbound->cbdata);
179 }
180
181 opal_list_remove_item(&outbound_files, &outbound->super);
182 OBJ_RELEASE(outbound);
183 }
184 }
185
186 static void recv_ack(int status, orte_process_name_t* sender,
187 opal_buffer_t* buffer, orte_rml_tag_t tag,
188 void* cbdata)
189 {
190 opal_list_item_t *item, *itm;
191 orte_filem_raw_outbound_t *outbound;
192 orte_filem_raw_xfer_t *xfer;
193 char *file;
194 int st, n, rc;
195
196
197 n=1;
198 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &file, &n, OPAL_STRING))) {
199 ORTE_ERROR_LOG(rc);
200 return;
201 }
202
203
204 n=1;
205 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &st, &n, OPAL_INT))) {
206 ORTE_ERROR_LOG(rc);
207 return;
208 }
209
210 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
211 "%s filem:raw: recvd ack from %s for file %s status %d",
212 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
213 ORTE_NAME_PRINT(sender), file, st));
214
215
216 for (item = opal_list_get_first(&outbound_files);
217 item != opal_list_get_end(&outbound_files);
218 item = opal_list_get_next(item)) {
219 outbound = (orte_filem_raw_outbound_t*)item;
220 for (itm = opal_list_get_first(&outbound->xfers);
221 itm != opal_list_get_end(&outbound->xfers);
222 itm = opal_list_get_next(itm)) {
223 xfer = (orte_filem_raw_xfer_t*)itm;
224 if (0 == strcmp(file, xfer->file)) {
225
226 if (0 != st) {
227 xfer->status = st;
228 }
229
230 xfer->nrecvd++;
231
232 if (xfer->nrecvd == orte_process_info.num_procs) {
233 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
234 "%s filem:raw: xfer complete for file %s status %d",
235 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
236 file, xfer->status));
237 xfer_complete(xfer->status, xfer);
238 }
239 free(file);
240 return;
241 }
242 }
243 }
244 }
245
246 static int raw_preposition_files(orte_job_t *jdata,
247 orte_filem_completion_cbfunc_t cbfunc,
248 void *cbdata)
249 {
250 orte_app_context_t *app;
251 opal_list_item_t *item, *itm, *itm2;
252 orte_filem_base_file_set_t *fs;
253 int fd;
254 orte_filem_raw_xfer_t *xfer, *xptr;
255 int flags, i, j;
256 char **files=NULL;
257 orte_filem_raw_outbound_t *outbound, *optr;
258 char *cptr, *nxt, *filestring;
259 opal_list_t fsets;
260 bool already_sent;
261
262 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
263 "%s filem:raw: preposition files for job %s",
264 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
265 ORTE_JOBID_PRINT(jdata->jobid)));
266
267
268
269
270 OBJ_CONSTRUCT(&fsets, opal_list_t);
271 for (i=0; i < jdata->apps->size; i++) {
272 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
273 continue;
274 }
275 if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, NULL, OPAL_BOOL)) {
276
277 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
278 "%s filem:raw: preload executable %s",
279 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
280 app->app));
281 fs = OBJ_NEW(orte_filem_base_file_set_t);
282 fs->local_target = strdup(app->app);
283 fs->target_flag = ORTE_FILEM_TYPE_EXE;
284 opal_list_append(&fsets, &fs->super);
285
286
287
288
289 cptr = opal_basename(app->app);
290 free(app->app);
291 opal_asprintf(&app->app, "./%s", cptr);
292 free(app->argv[0]);
293 app->argv[0] = strdup(app->app);
294 fs->remote_target = strdup(app->app);
295 }
296 if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, (void**)&filestring, OPAL_STRING)) {
297 files = opal_argv_split(filestring, ',');
298 free(filestring);
299 for (j=0; NULL != files[j]; j++) {
300 fs = OBJ_NEW(orte_filem_base_file_set_t);
301 fs->local_target = strdup(files[j]);
302
303 if (NULL != (cptr = strchr(files[j], '.'))) {
304 if (0 == strncmp(cptr, ".tar", 4)) {
305 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
306 "%s filem:raw: marking file %s as TAR",
307 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
308 files[j]));
309 fs->target_flag = ORTE_FILEM_TYPE_TAR;
310 } else if (0 == strncmp(cptr, ".bz", 3)) {
311 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
312 "%s filem:raw: marking file %s as BZIP",
313 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
314 files[j]));
315 fs->target_flag = ORTE_FILEM_TYPE_BZIP;
316 } else if (0 == strncmp(cptr, ".gz", 3)) {
317 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
318 "%s filem:raw: marking file %s as GZIP",
319 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
320 files[j]));
321 fs->target_flag = ORTE_FILEM_TYPE_GZIP;
322 } else {
323 fs->target_flag = ORTE_FILEM_TYPE_FILE;
324 }
325 } else {
326 fs->target_flag = ORTE_FILEM_TYPE_FILE;
327 }
328
329
330
331 if (orte_filem_raw_flatten_trees) {
332 fs->remote_target = opal_basename(files[j]);
333 } else {
334
335
336
337
338
339
340 if (opal_path_is_absolute(files[j])) {
341 fs->remote_target = strdup(&files[j][1]);
342 } else {
343 fs->remote_target = strdup(files[j]);
344 }
345 }
346 opal_list_append(&fsets, &fs->super);
347
348
349
350
351
352
353
354 cptr = fs->remote_target;
355 nxt = cptr;
356 nxt++;
357 while ('\0' != *cptr) {
358 if ('.' == *cptr) {
359
360
361
362 if ('.' == *nxt || '/' == *nxt) {
363 cptr = nxt;
364 nxt++;
365 } else {
366
367
368
369
370 break;
371 }
372 } else if ('/' == *cptr) {
373
374 cptr = nxt;
375 nxt++;
376 } else {
377
378
379
380 break;
381 }
382 }
383 free(files[j]);
384 files[j] = strdup(cptr);
385 }
386
387
388
389 filestring = opal_argv_join(files, ',');
390 orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, ORTE_ATTR_GLOBAL, filestring, OPAL_STRING);
391
392 opal_argv_free(files);
393 free(filestring);
394 }
395 }
396 if (0 == opal_list_get_size(&fsets)) {
397
398 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
399 "%s filem:raw: nothing to preposition",
400 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
401 if (NULL != cbfunc) {
402 cbfunc(ORTE_SUCCESS, cbdata);
403 }
404 OBJ_DESTRUCT(&fsets);
405 return ORTE_SUCCESS;
406 }
407
408 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
409 "%s filem:raw: found %d files to position",
410 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
411 (int)opal_list_get_size(&fsets)));
412
413
414 outbound = OBJ_NEW(orte_filem_raw_outbound_t);
415 outbound->cbfunc = cbfunc;
416 outbound->cbdata = cbdata;
417 opal_list_append(&outbound_files, &outbound->super);
418
419
420
421
422
423 while (NULL != (item = opal_list_remove_first(&fsets))) {
424 fs = (orte_filem_base_file_set_t*)item;
425 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
426 "%s filem:raw: checking prepositioning of file %s",
427 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
428 fs->local_target));
429
430
431 already_sent = false;
432 for (itm = opal_list_get_first(&positioned_files);
433 !already_sent && itm != opal_list_get_end(&positioned_files);
434 itm = opal_list_get_next(itm)) {
435 xptr = (orte_filem_raw_xfer_t*)itm;
436 if (0 == strcmp(fs->local_target, xptr->src)) {
437 already_sent = true;
438 }
439 }
440 if (already_sent) {
441
442 OPAL_OUTPUT_VERBOSE((3, orte_filem_base_framework.framework_output,
443 "%s filem:raw: file %s is already in position - ignoring",
444 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
445 OBJ_RELEASE(item);
446 continue;
447 }
448
449
450
451
452 for (itm = opal_list_get_first(&outbound_files);
453 !already_sent && itm != opal_list_get_end(&outbound_files);
454 itm = opal_list_get_next(itm)) {
455 optr = (orte_filem_raw_outbound_t*)itm;
456 for (itm2 = opal_list_get_first(&optr->xfers);
457 itm2 != opal_list_get_end(&optr->xfers);
458 itm2 = opal_list_get_next(itm2)) {
459 xptr = (orte_filem_raw_xfer_t*)itm2;
460 if (0 == strcmp(fs->local_target, xptr->src)) {
461 already_sent = true;
462 }
463 }
464 }
465 if (already_sent) {
466
467 OPAL_OUTPUT_VERBOSE((3, orte_filem_base_framework.framework_output,
468 "%s filem:raw: file %s is already queued for output - ignoring",
469 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
470 OBJ_RELEASE(item);
471 continue;
472 }
473
474
475 if (0 > (fd = open(fs->local_target, O_RDONLY))) {
476 opal_output(0, "%s CANNOT ACCESS FILE %s",
477 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target);
478 OBJ_RELEASE(item);
479 opal_list_remove_item(&outbound_files, &outbound->super);
480 OBJ_RELEASE(outbound);
481 return ORTE_ERROR;
482 }
483
484 if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
485 opal_output(orte_filem_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
486 __FILE__, __LINE__, errno);
487 } else {
488 flags |= O_NONBLOCK;
489 if (fcntl(fd, F_SETFL, flags) < 0) {
490 opal_output(orte_filem_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
491 __FILE__, __LINE__, errno);
492 }
493 }
494 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
495 "%s filem:raw: setting up to position file %s",
496 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fs->local_target));
497 xfer = OBJ_NEW(orte_filem_raw_xfer_t);
498
499 xfer->src = strdup(fs->local_target);
500
501
502
503
504
505
506 cptr = fs->remote_target;
507 nxt = cptr;
508 nxt++;
509 while ('\0' != *cptr) {
510 if ('.' == *cptr) {
511
512
513
514 if ('.' == *nxt || '/' == *nxt) {
515 cptr = nxt;
516 nxt++;
517 } else {
518
519
520
521
522 break;
523 }
524 } else if ('/' == *cptr) {
525
526 cptr = nxt;
527 nxt++;
528 } else {
529
530
531
532 break;
533 }
534 }
535 xfer->file = strdup(cptr);
536 xfer->type = fs->target_flag;
537 xfer->app_idx = fs->app_idx;
538 xfer->outbound = outbound;
539 opal_list_append(&outbound->xfers, &xfer->super);
540 opal_event_set(orte_event_base, &xfer->ev, fd, OPAL_EV_READ, send_chunk, xfer);
541 opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
542 xfer->pending = true;
543 ORTE_POST_OBJECT(xfer);
544 opal_event_add(&xfer->ev, 0);
545 OBJ_RELEASE(item);
546 }
547 OBJ_DESTRUCT(&fsets);
548
549
550
551
552 if (0 == opal_list_get_size(&outbound->xfers)) {
553 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
554 "%s filem:raw: all duplicate files - no positioning reqd",
555 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
556 opal_list_remove_item(&outbound_files, &outbound->super);
557 OBJ_RELEASE(outbound);
558 if (NULL != cbfunc) {
559 cbfunc(ORTE_SUCCESS, cbdata);
560 }
561 return ORTE_SUCCESS;
562 }
563
564 if (0 < opal_output_get_verbosity(orte_filem_base_framework.framework_output)) {
565 opal_output(0, "%s Files to be positioned:", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
566 for (itm2 = opal_list_get_first(&outbound->xfers);
567 itm2 != opal_list_get_end(&outbound->xfers);
568 itm2 = opal_list_get_next(itm2)) {
569 xptr = (orte_filem_raw_xfer_t*)itm2;
570 opal_output(0, "%s\t%s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xptr->src);
571 }
572 }
573
574 return ORTE_SUCCESS;
575 }
576
577 static int create_link(char *my_dir, char *path,
578 char *link_pt)
579 {
580 char *mypath, *fullname, *basedir;
581 struct stat buf;
582 int rc = ORTE_SUCCESS;
583
584
585 mypath = opal_os_path(false, my_dir, link_pt, NULL);
586
587 fullname = opal_os_path(false, path, link_pt, NULL);
588
589
590
591 if (0 != stat(fullname, &buf)) {
592 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
593 "%s filem:raw: creating symlink to %s\n\tmypath: %s\n\tlink: %s",
594 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), link_pt,
595 mypath, fullname));
596
597 basedir = opal_dirname(fullname);
598 if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(basedir, S_IRWXU))) {
599 ORTE_ERROR_LOG(rc);
600 opal_output(0, "%s Failed to symlink %s to %s",
601 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mypath, fullname);
602 free(basedir);
603 free(mypath);
604 free(fullname);
605 return rc;
606 }
607 free(basedir);
608
609 if (0 != symlink(mypath, fullname)) {
610 opal_output(0, "%s Failed to symlink %s to %s",
611 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mypath, fullname);
612 rc = ORTE_ERROR;
613 }
614 }
615 free(mypath);
616 free(fullname);
617 return rc;
618 }
619
620 static int raw_link_local_files(orte_job_t *jdata,
621 orte_app_context_t *app)
622 {
623 char *session_dir, *path=NULL;
624 orte_proc_t *proc;
625 int i, j, rc;
626 orte_filem_raw_incoming_t *inbnd;
627 opal_list_item_t *item;
628 char **files=NULL, *bname, *filestring;
629
630
631
632
633
634
635
636
637 session_dir = filem_session_dir();
638 if( NULL == session_dir){
639
640 rc = ORTE_ERR_BAD_PARAM;
641 ORTE_ERROR_LOG(rc);
642 return rc;
643 }
644
645
646 if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, (void**)&filestring, OPAL_STRING)) {
647 files = opal_argv_split(filestring, ',');
648 free(filestring);
649 }
650 if (orte_get_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, NULL, OPAL_BOOL)) {
651
652 bname = opal_basename(app->app);
653 opal_argv_append_nosize(&files, bname);
654 free(bname);
655 }
656
657
658 if (NULL == files) {
659 return ORTE_SUCCESS;
660 }
661
662 for (i=0; i < orte_local_children->size; i++) {
663 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
664 continue;
665 }
666 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
667 "%s filem:raw: working symlinks for proc %s",
668 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
669 ORTE_NAME_PRINT(&proc->name)));
670 if (proc->name.jobid != jdata->jobid) {
671 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
672 "%s filem:raw: proc %s not part of job %s",
673 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
674 ORTE_NAME_PRINT(&proc->name),
675 ORTE_JOBID_PRINT(jdata->jobid)));
676 continue;
677 }
678 if (proc->app_idx != app->idx) {
679 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
680 "%s filem:raw: proc %s not part of app_idx %d",
681 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
682 ORTE_NAME_PRINT(&proc->name),
683 (int)app->idx));
684 continue;
685 }
686
687 if (ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_ALIVE) ||
688 (ORTE_PROC_STATE_INIT != proc->state &&
689 ORTE_PROC_STATE_RESTART != proc->state)) {
690 continue;
691 }
692
693 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
694 "%s filem:raw: creating symlinks for %s",
695 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
696 ORTE_NAME_PRINT(&proc->name)));
697
698
699 path = orte_process_info.proc_session_dir;
700
701
702 if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(path, S_IRWXU))) {
703 ORTE_ERROR_LOG(rc);
704
705
706
707 free(files);
708 return rc;
709 }
710
711
712 for (item = opal_list_get_first(&incoming_files);
713 item != opal_list_get_end(&incoming_files);
714 item = opal_list_get_next(item)) {
715 inbnd = (orte_filem_raw_incoming_t*)item;
716 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
717 "%s filem:raw: checking file %s",
718 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), inbnd->file));
719
720
721 for (j=0; NULL != files[j]; j++) {
722 if (0 == strcmp(inbnd->file, files[j])) {
723
724 if (NULL != inbnd->link_pts) {
725 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
726 "%s filem:raw: creating links for file %s",
727 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
728 inbnd->file));
729
730 for (j=0; NULL != inbnd->link_pts[j]; j++) {
731 if (ORTE_SUCCESS != (rc = create_link(session_dir, path, inbnd->link_pts[j]))) {
732 ORTE_ERROR_LOG(rc);
733 free(files);
734 return rc;
735 }
736 }
737 } else {
738 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
739 "%s filem:raw: file %s has no link points",
740 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
741 inbnd->file));
742 }
743 break;
744 }
745 }
746 }
747 }
748 opal_argv_free(files);
749 return ORTE_SUCCESS;
750 }
751
752 static void send_chunk(int fd, short argc, void *cbdata)
753 {
754 orte_filem_raw_xfer_t *rev = (orte_filem_raw_xfer_t*)cbdata;
755 unsigned char data[ORTE_FILEM_RAW_CHUNK_MAX];
756 int32_t numbytes;
757 int rc;
758 opal_buffer_t chunk;
759 orte_grpcomm_signature_t *sig;
760
761 ORTE_ACQUIRE_OBJECT(rev);
762
763
764 rev->pending = false;
765
766
767 numbytes = read(fd, data, sizeof(data));
768
769 if (numbytes < 0) {
770
771
772
773 if (EAGAIN == errno || EINTR == errno) {
774 ORTE_POST_OBJECT(rev);
775 opal_event_add(&rev->ev, 0);
776 return;
777 }
778
779 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
780 "%s filem:raw:read error on file %s",
781 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rev->file));
782
783
784
785
786
787 numbytes = 0;
788 }
789
790
791
792
793 if (orte_job_term_ordered) {
794 OBJ_RELEASE(rev);
795 return;
796 }
797
798 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
799 "%s filem:raw:read handler sending chunk %d of %d bytes for file %s",
800 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
801 rev->nchunk, numbytes, rev->file));
802
803
804 OBJ_CONSTRUCT(&chunk, opal_buffer_t);
805 if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->file, 1, OPAL_STRING))) {
806 ORTE_ERROR_LOG(rc);
807 close(fd);
808 return;
809 }
810 if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->nchunk, 1, OPAL_INT32))) {
811 ORTE_ERROR_LOG(rc);
812 close(fd);
813 return;
814 }
815 if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, data, numbytes, OPAL_BYTE))) {
816 ORTE_ERROR_LOG(rc);
817 close(fd);
818 return;
819 }
820
821 if (0 == rev->nchunk) {
822 if (OPAL_SUCCESS != (rc = opal_dss.pack(&chunk, &rev->type, 1, OPAL_INT32))) {
823 ORTE_ERROR_LOG(rc);
824 close(fd);
825 return;
826 }
827 }
828
829
830 sig = OBJ_NEW(orte_grpcomm_signature_t);
831 sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
832 sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
833 sig->signature[0].vpid = ORTE_VPID_WILDCARD;
834 if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_FILEM_BASE, &chunk))) {
835 ORTE_ERROR_LOG(rc);
836 close(fd);
837 return;
838 }
839 OBJ_DESTRUCT(&chunk);
840 OBJ_RELEASE(sig);
841 rev->nchunk++;
842
843
844
845
846 if (0 == numbytes) {
847 close(fd);
848 return;
849 } else {
850
851 rev->pending = true;
852 ORTE_POST_OBJECT(rev);
853 opal_event_add(&rev->ev, 0);
854 }
855 }
856
857 static void send_complete(char *file, int status)
858 {
859 opal_buffer_t *buf;
860 int rc;
861
862 buf = OBJ_NEW(opal_buffer_t);
863 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &file, 1, OPAL_STRING))) {
864 ORTE_ERROR_LOG(rc);
865 OBJ_RELEASE(buf);
866 return;
867 }
868 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
869 ORTE_ERROR_LOG(rc);
870 OBJ_RELEASE(buf);
871 return;
872 }
873 if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
874 ORTE_RML_TAG_FILEM_BASE_RESP,
875 orte_rml_send_callback, NULL))) {
876 ORTE_ERROR_LOG(rc);
877 OBJ_RELEASE(buf);
878 }
879 }
880
881
882
883
884
885 static int link_archive(orte_filem_raw_incoming_t *inbnd)
886 {
887 FILE *fp;
888 char *cmd;
889 char path[MAXPATHLEN];
890
891 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
892 "%s filem:raw: identifying links for archive %s",
893 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
894 inbnd->fullpath));
895
896 opal_asprintf(&cmd, "tar tf %s", inbnd->fullpath);
897 fp = popen(cmd, "r");
898 free(cmd);
899 if (NULL == fp) {
900 ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
901 return ORTE_ERR_FILE_OPEN_FAILURE;
902 }
903
904
905
906
907 while (fgets(path, sizeof(path), fp) != NULL) {
908 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
909 "%s filem:raw: path %s",
910 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
911 path));
912
913 if (0 == strlen(path)) {
914 continue;
915 }
916
917 path[strlen(path)-1] = '\0';
918
919 if ('/' == path[strlen(path)-1]) {
920 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
921 "%s filem:raw: path %s is a directory - ignoring it",
922 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
923 path));
924 continue;
925 }
926
927 if (NULL != strstr(path, ".deps")) {
928 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
929 "%s filem:raw: path %s includes .deps - ignoring it",
930 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
931 path));
932 continue;
933 }
934 OPAL_OUTPUT_VERBOSE((10, orte_filem_base_framework.framework_output,
935 "%s filem:raw: adding path %s to link points",
936 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
937 path));
938 opal_argv_append_nosize(&inbnd->link_pts, path);
939 }
940
941 pclose(fp);
942 return ORTE_SUCCESS;
943 }
944
945 static void recv_files(int status, orte_process_name_t* sender,
946 opal_buffer_t* buffer, orte_rml_tag_t tag,
947 void* cbdata)
948 {
949 char *file, *session_dir;
950 int32_t nchunk, n, nbytes;
951 unsigned char data[ORTE_FILEM_RAW_CHUNK_MAX];
952 int rc;
953 orte_filem_raw_output_t *output;
954 orte_filem_raw_incoming_t *ptr, *incoming;
955 opal_list_item_t *item;
956 int32_t type;
957 char *cptr;
958
959
960 n=1;
961 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &file, &n, OPAL_STRING))) {
962 ORTE_ERROR_LOG(rc);
963 send_complete(NULL, rc);
964 return;
965 }
966 n=1;
967 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &nchunk, &n, OPAL_INT32))) {
968 ORTE_ERROR_LOG(rc);
969 send_complete(file, rc);
970 free(file);
971 return;
972 }
973
974 if (nchunk < 0) {
975
976 nbytes = 0;
977 } else {
978 nbytes=ORTE_FILEM_RAW_CHUNK_MAX;
979 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, data, &nbytes, OPAL_BYTE))) {
980 ORTE_ERROR_LOG(rc);
981 send_complete(file, rc);
982 free(file);
983 return;
984 }
985 }
986
987 if (0 == nchunk) {
988 n=1;
989 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &type, &n, OPAL_INT32))) {
990 ORTE_ERROR_LOG(rc);
991 send_complete(file, rc);
992 free(file);
993 return;
994 }
995 }
996
997 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
998 "%s filem:raw: received chunk %d for file %s containing %d bytes",
999 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1000 nchunk, file, nbytes));
1001
1002
1003 incoming = NULL;
1004 for (item = opal_list_get_first(&incoming_files);
1005 item != opal_list_get_end(&incoming_files);
1006 item = opal_list_get_next(item)) {
1007 ptr = (orte_filem_raw_incoming_t*)item;
1008 if (0 == strcmp(file, ptr->file)) {
1009 incoming = ptr;
1010 break;
1011 }
1012 }
1013 if (NULL == incoming) {
1014
1015 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1016 "%s filem:raw: adding file %s to incoming list",
1017 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), file));
1018 incoming = OBJ_NEW(orte_filem_raw_incoming_t);
1019 incoming->file = strdup(file);
1020 incoming->type = type;
1021 opal_list_append(&incoming_files, &incoming->super);
1022 }
1023
1024
1025 if (0 == nchunk) {
1026
1027 char *tmp;
1028 tmp = strdup(file);
1029 if (NULL != (cptr = strchr(tmp, '/'))) {
1030 *cptr = '\0';
1031 }
1032
1033 incoming->top = strdup(tmp);
1034 free(tmp);
1035
1036 session_dir = filem_session_dir();
1037
1038 incoming->fullpath = opal_os_path(false, session_dir, file, NULL);
1039
1040 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1041 "%s filem:raw: opening target file %s",
1042 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), incoming->fullpath));
1043
1044 tmp = opal_dirname(incoming->fullpath);
1045 if (OPAL_SUCCESS != (rc = opal_os_dirpath_create(tmp, S_IRWXU))) {
1046 ORTE_ERROR_LOG(rc);
1047 send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
1048 free(file);
1049 free(tmp);
1050 OBJ_RELEASE(incoming);
1051 return;
1052 }
1053
1054 if (ORTE_FILEM_TYPE_EXE == type) {
1055 if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU))) {
1056 opal_output(0, "%s CANNOT CREATE FILE %s",
1057 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1058 incoming->fullpath);
1059 send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
1060 free(file);
1061 free(tmp);
1062 return;
1063 }
1064 } else {
1065 if (0 > (incoming->fd = open(incoming->fullpath, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR))) {
1066 opal_output(0, "%s CANNOT CREATE FILE %s",
1067 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1068 incoming->fullpath);
1069 send_complete(file, ORTE_ERR_FILE_WRITE_FAILURE);
1070 free(file);
1071 free(tmp);
1072 return;
1073 }
1074 }
1075 free(tmp);
1076 opal_event_set(orte_event_base, &incoming->ev, incoming->fd,
1077 OPAL_EV_WRITE, write_handler, incoming);
1078 opal_event_set_priority(&incoming->ev, ORTE_MSG_PRI);
1079 }
1080
1081 output = OBJ_NEW(orte_filem_raw_output_t);
1082 if (0 < nbytes) {
1083
1084
1085
1086
1087 memcpy(output->data, data, nbytes);
1088 }
1089 output->numbytes = nbytes;
1090
1091
1092 opal_list_append(&incoming->outputs, &output->super);
1093
1094 if (!incoming->pending) {
1095
1096 incoming->pending = true;
1097 ORTE_POST_OBJECT(incoming);
1098 opal_event_add(&incoming->ev, 0);
1099 }
1100
1101
1102 free(file);
1103 }
1104
1105
1106 static void write_handler(int fd, short event, void *cbdata)
1107 {
1108 orte_filem_raw_incoming_t *sink = (orte_filem_raw_incoming_t*)cbdata;
1109 opal_list_item_t *item;
1110 orte_filem_raw_output_t *output;
1111 int num_written;
1112 char *dirname, *cmd;
1113 char homedir[MAXPATHLEN];
1114 int rc;
1115
1116 ORTE_ACQUIRE_OBJECT(sink);
1117
1118 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1119 "%s write:handler writing data to %d",
1120 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1121 sink->fd));
1122
1123
1124 sink->pending = false;
1125
1126 while (NULL != (item = opal_list_remove_first(&sink->outputs))) {
1127 output = (orte_filem_raw_output_t*)item;
1128 if (0 == output->numbytes) {
1129
1130 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1131 "%s write:handler zero bytes - reporting complete for file %s",
1132 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1133 sink->file));
1134
1135 close(sink->fd);
1136 sink->fd = -1;
1137 if (ORTE_FILEM_TYPE_FILE == sink->type ||
1138 ORTE_FILEM_TYPE_EXE == sink->type) {
1139
1140
1141
1142 opal_argv_append_nosize(&sink->link_pts, sink->top);
1143 send_complete(sink->file, ORTE_SUCCESS);
1144 } else {
1145
1146 if (ORTE_FILEM_TYPE_TAR == sink->type) {
1147 opal_asprintf(&cmd, "tar xf %s", sink->file);
1148 } else if (ORTE_FILEM_TYPE_BZIP == sink->type) {
1149 opal_asprintf(&cmd, "tar xjf %s", sink->file);
1150 } else if (ORTE_FILEM_TYPE_GZIP == sink->type) {
1151 opal_asprintf(&cmd, "tar xzf %s", sink->file);
1152 } else {
1153 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1154 send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1155 return;
1156 }
1157 if (NULL == getcwd(homedir, sizeof(homedir))) {
1158 ORTE_ERROR_LOG(ORTE_ERROR);
1159 send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1160 return;
1161 }
1162 dirname = opal_dirname(sink->fullpath);
1163 if (0 != chdir(dirname)) {
1164 ORTE_ERROR_LOG(ORTE_ERROR);
1165 send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1166 return;
1167 }
1168 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1169 "%s write:handler unarchiving file %s with cmd: %s",
1170 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1171 sink->file, cmd));
1172 if (0 != system(cmd)) {
1173 ORTE_ERROR_LOG(ORTE_ERROR);
1174 send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1175 return;
1176 }
1177 if (0 != chdir(homedir)) {
1178 ORTE_ERROR_LOG(ORTE_ERROR);
1179 send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1180 return;
1181 }
1182 free(dirname);
1183 free(cmd);
1184
1185 if (ORTE_SUCCESS != (rc = link_archive(sink))) {
1186 ORTE_ERROR_LOG(rc);
1187 send_complete(sink->file, ORTE_ERR_FILE_WRITE_FAILURE);
1188 } else {
1189 send_complete(sink->file, ORTE_SUCCESS);
1190 }
1191 }
1192 return;
1193 }
1194 num_written = write(sink->fd, output->data, output->numbytes);
1195 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1196 "%s write:handler wrote %d bytes to file %s",
1197 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1198 num_written, sink->file));
1199 if (num_written < 0) {
1200 if (EAGAIN == errno || EINTR == errno) {
1201
1202 opal_list_prepend(&sink->outputs, item);
1203
1204
1205
1206 sink->pending = true;
1207 ORTE_POST_OBJECT(sink);
1208 opal_event_add(&sink->ev, 0);
1209 return;
1210 }
1211
1212
1213
1214 OPAL_OUTPUT_VERBOSE((1, orte_filem_base_framework.framework_output,
1215 "%s write:handler error on write for file %s: %s",
1216 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1217 sink->file, strerror(errno)));
1218 OBJ_RELEASE(output);
1219 opal_list_remove_item(&incoming_files, &sink->super);
1220 send_complete(sink->file, OPAL_ERR_FILE_WRITE_FAILURE);
1221 OBJ_RELEASE(sink);
1222 return;
1223 } else if (num_written < output->numbytes) {
1224
1225 memmove(output->data, &output->data[num_written], output->numbytes - num_written);
1226
1227 opal_list_prepend(&sink->outputs, item);
1228
1229
1230
1231 sink->pending = true;
1232 ORTE_POST_OBJECT(sink);
1233 opal_event_add(&sink->ev, 0);
1234 return;
1235 }
1236 OBJ_RELEASE(output);
1237 }
1238 }
1239
1240 static void xfer_construct(orte_filem_raw_xfer_t *ptr)
1241 {
1242 ptr->outbound = NULL;
1243 ptr->app_idx = 0;
1244 ptr->pending = false;
1245 ptr->src = NULL;
1246 ptr->file = NULL;
1247 ptr->nchunk = 0;
1248 ptr->status = ORTE_SUCCESS;
1249 ptr->nrecvd = 0;
1250 }
1251 static void xfer_destruct(orte_filem_raw_xfer_t *ptr)
1252 {
1253 if (ptr->pending) {
1254 opal_event_del(&ptr->ev);
1255 }
1256 if (NULL != ptr->src) {
1257 free(ptr->src);
1258 }
1259 if (NULL != ptr->file) {
1260 free(ptr->file);
1261 }
1262 }
1263 OBJ_CLASS_INSTANCE(orte_filem_raw_xfer_t,
1264 opal_list_item_t,
1265 xfer_construct, xfer_destruct);
1266
1267 static void out_construct(orte_filem_raw_outbound_t *ptr)
1268 {
1269 OBJ_CONSTRUCT(&ptr->xfers, opal_list_t);
1270 ptr->status = ORTE_SUCCESS;
1271 ptr->cbfunc = NULL;
1272 ptr->cbdata = NULL;
1273 }
1274 static void out_destruct(orte_filem_raw_outbound_t *ptr)
1275 {
1276 opal_list_item_t *item;
1277
1278 while (NULL != (item = opal_list_remove_first(&ptr->xfers))) {
1279 OBJ_RELEASE(item);
1280 }
1281 OBJ_DESTRUCT(&ptr->xfers);
1282 }
1283 OBJ_CLASS_INSTANCE(orte_filem_raw_outbound_t,
1284 opal_list_item_t,
1285 out_construct, out_destruct);
1286
1287 static void in_construct(orte_filem_raw_incoming_t *ptr)
1288 {
1289 ptr->app_idx = 0;
1290 ptr->pending = false;
1291 ptr->fd = -1;
1292 ptr->file = NULL;
1293 ptr->top = NULL;
1294 ptr->fullpath = NULL;
1295 ptr->link_pts = NULL;
1296 OBJ_CONSTRUCT(&ptr->outputs, opal_list_t);
1297 }
1298 static void in_destruct(orte_filem_raw_incoming_t *ptr)
1299 {
1300 opal_list_item_t *item;
1301
1302 if (ptr->pending) {
1303 opal_event_del(&ptr->ev);
1304 }
1305 if (0 <= ptr->fd) {
1306 close(ptr->fd);
1307 }
1308 if (NULL != ptr->file) {
1309 free(ptr->file);
1310 }
1311 if (NULL != ptr->top) {
1312 free(ptr->top);
1313 }
1314 if (NULL != ptr->fullpath) {
1315 free(ptr->fullpath);
1316 }
1317 opal_argv_free(ptr->link_pts);
1318 while (NULL != (item = opal_list_remove_first(&ptr->outputs))) {
1319 OBJ_RELEASE(item);
1320 }
1321 OBJ_DESTRUCT(&ptr->outputs);
1322 }
1323 OBJ_CLASS_INSTANCE(orte_filem_raw_incoming_t,
1324 opal_list_item_t,
1325 in_construct, in_destruct);
1326
1327 static void output_construct(orte_filem_raw_output_t *ptr)
1328 {
1329 ptr->numbytes = 0;
1330 }
1331 OBJ_CLASS_INSTANCE(orte_filem_raw_output_t,
1332 opal_list_item_t,
1333 output_construct, NULL);