This source file includes following definitions.
- init
- hnp_push
- hnp_pull
- hnp_close
- hnp_complete
- finalize
- hnp_ft_event
- stdin_write_handler
- hnp_output
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 #include "orte_config.h"
  27 #include "opal/util/output.h"
  28 #include "orte/constants.h"
  29 
  30 #include <errno.h>
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif  
  34 #include <string.h>
  35 
  36 #ifdef HAVE_FCNTL_H
  37 #include <fcntl.h>
  38 #else
  39 #ifdef HAVE_SYS_FCNTL_H
  40 #include <sys/fcntl.h>
  41 #endif
  42 #endif
  43 
  44 #include "opal/mca/event/event.h"
  45 
  46 #include "orte/runtime/orte_globals.h"
  47 #include "orte/mca/errmgr/errmgr.h"
  48 #include "orte/mca/ess/ess.h"
  49 #include "orte/mca/rml/rml.h"
  50 #include "orte/util/name_fns.h"
  51 #include "orte/util/threads.h"
  52 #include "orte/mca/odls/odls_types.h"
  53 
  54 #include "orte/mca/iof/base/base.h"
  55 #include "orte/mca/iof/base/iof_base_setup.h"
  56 #include "iof_hnp.h"
  57 
  58 
  59 static void stdin_write_handler(int fd, short event, void *cbdata);
  60 
  61 
  62 static int init(void);
  63 
  64 static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
  65 
  66 static int hnp_pull(const orte_process_name_t* src_name,
  67                 orte_iof_tag_t src_tag,
  68                 int fd);
  69 
  70 static int hnp_close(const orte_process_name_t* peer,
  71                  orte_iof_tag_t source_tag);
  72 
  73 static int hnp_output(const orte_process_name_t* peer,
  74                       orte_iof_tag_t source_tag,
  75                       const char *msg);
  76 
  77 static void hnp_complete(const orte_job_t *jdata);
  78 
  79 static int finalize(void);
  80 
  81 static int hnp_ft_event(int state);
  82 
  83 
  84 
  85 
  86 
  87 
  88 
  89 orte_iof_base_module_t orte_iof_hnp_module = {
  90     .init = init,
  91     .push = hnp_push,
  92     .pull = hnp_pull,
  93     .close = hnp_close,
  94     .output = hnp_output,
  95     .complete = hnp_complete,
  96     .finalize = finalize,
  97     .ft_event = hnp_ft_event
  98 };
  99 
 100 
 101 static int init(void)
 102 {
 103     
 104 
 105 
 106     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 107                             ORTE_RML_TAG_IOF_HNP,
 108                             ORTE_RML_PERSISTENT,
 109                             orte_iof_hnp_recv,
 110                             NULL);
 111 
 112     OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
 113     mca_iof_hnp_component.stdinev = NULL;
 114 
 115     return ORTE_SUCCESS;
 116 }
 117 
 118 
 119 
 120 
 121 
 122 
 123 
 124 
 125 
 126 
 127 
 128 
 129 
 130 
 131 
 132 
 133 
 134 static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
 135 {
 136     orte_job_t *jdata;
 137     orte_proc_t *proc;
 138     orte_iof_proc_t *proct, *pptr;
 139     int flags, rc;
 140     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 141 
 142     
 143     if (ORTE_VPID_INVALID == dst_name->vpid || fd < 0) {
 144         return ORTE_SUCCESS;
 145     }
 146 
 147     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 148                          "%s iof:hnp pushing fd %d for process %s",
 149                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 150                          fd, ORTE_NAME_PRINT(dst_name)));
 151 
 152     
 153     OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 154         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
 155             
 156             goto SETUP;
 157         }
 158     }
 159     
 160     proct = OBJ_NEW(orte_iof_proc_t);
 161     proct->name.jobid = dst_name->jobid;
 162     proct->name.vpid = dst_name->vpid;
 163     opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
 164 
 165   SETUP:
 166     if (!(src_tag & ORTE_IOF_STDIN)) {
 167         
 168 
 169 
 170         if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 171             opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 172                         __FILE__, __LINE__, errno);
 173         } else {
 174             flags |= O_NONBLOCK;
 175             fcntl(fd, F_SETFL, flags);
 176         }
 177         
 178         if (NULL == (jdata = orte_get_job_data_object(proct->name.jobid))) {
 179             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 180             return ORTE_ERR_NOT_FOUND;
 181         }
 182         
 183         if (src_tag & ORTE_IOF_STDOUT) {
 184             ORTE_IOF_READ_EVENT(&proct->revstdout, proct, fd, ORTE_IOF_STDOUT,
 185                                 orte_iof_hnp_read_local_handler, false);
 186         } else if (src_tag & ORTE_IOF_STDERR) {
 187             ORTE_IOF_READ_EVENT(&proct->revstderr, proct, fd, ORTE_IOF_STDERR,
 188                                 orte_iof_hnp_read_local_handler, false);
 189 #if OPAL_PMIX_V1
 190         } else if (src_tag & ORTE_IOF_STDDIAG) {
 191             ORTE_IOF_READ_EVENT(&proct->revstddiag, proct, fd, ORTE_IOF_STDDIAG,
 192                                 orte_iof_hnp_read_local_handler, false);
 193 #endif
 194         }
 195         
 196         if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct))) {
 197             ORTE_ERROR_LOG(rc);
 198             return rc;
 199         }
 200 
 201         
 202 
 203 
 204 
 205 
 206         if (NULL != proct->revstdout &&
 207 #if OPAL_PMIX_V1
 208            NULL != proct->revstddiag &&
 209 #endif
 210             (orte_iof_base.redirect_app_stderr_to_stdout || NULL != proct->revstderr)) {
 211             if (proct->copy) {
 212                 
 213 
 214                 OPAL_LIST_FOREACH(pptr, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 215                     if (dst_name->jobid == pptr->name.jobid &&
 216                         ORTE_VPID_WILDCARD == pptr->name.vpid &&
 217                         NULL != pptr->subscribers) {
 218                         OBJ_RETAIN(pptr->subscribers);
 219                         proct->subscribers = pptr->subscribers;
 220                         break;
 221                     }
 222                 }
 223             }
 224             ORTE_IOF_READ_ACTIVATE(proct->revstdout);
 225             if (!orte_iof_base.redirect_app_stderr_to_stdout) {
 226                 ORTE_IOF_READ_ACTIVATE(proct->revstderr);
 227             }
 228 #if OPAL_PMIX_V1
 229             if (NULL != proct->revstddiag) {
 230                 ORTE_IOF_READ_ACTIVATE(proct->revstddiag);
 231             }
 232 #endif
 233        }
 234         return ORTE_SUCCESS;
 235     }
 236 
 237     
 238 
 239 
 240     if (ORTE_VPID_WILDCARD == dst_name->vpid) {
 241         
 242         ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
 243                              stdin_write_handler);
 244         proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 245         proct->stdinev->daemon.vpid = ORTE_VPID_WILDCARD;
 246      } else {
 247         
 248         if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
 249             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 250             return ORTE_ERR_BAD_PARAM;
 251         }
 252         if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, dst_name->vpid))) {
 253             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 254             return ORTE_ERR_NOT_FOUND;
 255         }
 256         
 257         if (ORTE_PROC_MY_NAME->vpid != proc->node->daemon->name.vpid) {
 258             ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
 259                                  stdin_write_handler);
 260             proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 261             proct->stdinev->daemon.vpid = proc->node->daemon->name.vpid;
 262         }
 263     }
 264 
 265     
 266     if (NULL == mca_iof_hnp_component.stdinev) {
 267         
 268 
 269 
 270 
 271 
 272 
 273 
 274 
 275 
 276 
 277         if (0 != fd) {
 278             if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 279                 opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 280                             __FILE__, __LINE__, errno);
 281             } else {
 282                 flags |= O_NONBLOCK;
 283                 fcntl(fd, F_SETFL, flags);
 284             }
 285         }
 286         if (isatty(fd)) {
 287             
 288 
 289 
 290 
 291 
 292 
 293 
 294             opal_event_signal_set(orte_event_base, &mca_iof_hnp_component.stdinsig,
 295                                   SIGCONT, orte_iof_hnp_stdin_cb,
 296                                   NULL);
 297 
 298             
 299 
 300 
 301 
 302 
 303             ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
 304                                 proct, fd, ORTE_IOF_STDIN,
 305                                 orte_iof_hnp_read_local_handler, false);
 306 
 307             
 308 
 309 
 310 
 311             if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
 312                 ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
 313             }
 314         } else {
 315             
 316 
 317 
 318             ORTE_IOF_READ_EVENT(&mca_iof_hnp_component.stdinev,
 319                                 proct, fd, ORTE_IOF_STDIN,
 320                                 orte_iof_hnp_read_local_handler, true);
 321         }
 322     }
 323     return ORTE_SUCCESS;
 324 }
 325 
 326 
 327 
 328 
 329 
 330 
 331 
 332 static int hnp_pull(const orte_process_name_t* dst_name,
 333                     orte_iof_tag_t src_tag,
 334                     int fd)
 335 {
 336     orte_iof_proc_t *proct;
 337     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 338     int flags;
 339 
 340     
 341     if (ORTE_IOF_STDIN != src_tag) {
 342         return ORTE_ERR_NOT_SUPPORTED;
 343     }
 344 
 345     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 346                          "%s iof:hnp pulling fd %d for process %s",
 347                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 348                          fd, ORTE_NAME_PRINT(dst_name)));
 349 
 350     
 351 
 352 
 353     if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 354         opal_output(orte_iof_base_framework.framework_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 355                     __FILE__, __LINE__, errno);
 356     } else {
 357         flags |= O_NONBLOCK;
 358         fcntl(fd, F_SETFL, flags);
 359     }
 360 
 361     
 362     OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 363         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, dst_name)) {
 364             
 365             goto SETUP;
 366         }
 367     }
 368     
 369     proct = OBJ_NEW(orte_iof_proc_t);
 370     proct->name.jobid = dst_name->jobid;
 371     proct->name.vpid = dst_name->vpid;
 372     opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
 373 
 374   SETUP:
 375     ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
 376                          stdin_write_handler);
 377     proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
 378     proct->stdinev->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
 379 
 380     return ORTE_SUCCESS;
 381 }
 382 
 383 
 384 
 385 
 386 
 387 static int hnp_close(const orte_process_name_t* peer,
 388                      orte_iof_tag_t source_tag)
 389 {
 390     orte_iof_proc_t* proct;
 391     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL;
 392 
 393     OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 394         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
 395             if (ORTE_IOF_STDIN & source_tag) {
 396                 if (NULL != proct->stdinev) {
 397                     OBJ_RELEASE(proct->stdinev);
 398                 }
 399                 proct->stdinev = NULL;
 400             }
 401             if ((ORTE_IOF_STDOUT & source_tag) ||
 402                 (ORTE_IOF_STDMERGE & source_tag)) {
 403                 if (NULL != proct->revstdout) {
 404                     orte_iof_base_static_dump_output(proct->revstdout);
 405                     OBJ_RELEASE(proct->revstdout);
 406                 }
 407                 proct->revstdout = NULL;
 408             }
 409             if (ORTE_IOF_STDERR & source_tag) {
 410                 if (NULL != proct->revstderr) {
 411                     orte_iof_base_static_dump_output(proct->revstderr);
 412                     OBJ_RELEASE(proct->revstderr);
 413                 }
 414                 proct->revstderr = NULL;
 415             }
 416 #if OPAL_PMIX_V1
 417             if (ORTE_IOF_STDDIAG & source_tag) {
 418                 if (NULL != proct->revstddiag) {
 419                     orte_iof_base_static_dump_output(proct->revstddiag);
 420                     OBJ_RELEASE(proct->revstddiag);
 421                 }
 422                 proct->revstddiag = NULL;
 423             }
 424 #endif
 425             
 426             if (NULL == proct->stdinev &&
 427                 NULL == proct->revstdout &&
 428 #if OPAL_PMIX_V1
 429                 NULL == proct->revstddiag &&
 430 #endif
 431                 NULL == proct->revstderr) {
 432                 opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
 433                 OBJ_RELEASE(proct);
 434             }
 435             break;
 436         }
 437     }
 438     return ORTE_SUCCESS;
 439 }
 440 
 441 static void hnp_complete(const orte_job_t *jdata)
 442 {
 443     orte_iof_proc_t *proct, *next;
 444 
 445     
 446     OPAL_LIST_FOREACH_SAFE(proct, next, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 447         if (jdata->jobid == proct->name.jobid) {
 448             opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
 449             OBJ_RELEASE(proct);
 450         }
 451     }
 452 }
 453 
 454 static int finalize(void)
 455 {
 456     orte_iof_write_event_t *wev;
 457     orte_iof_proc_t *proct;
 458     bool dump;
 459     orte_iof_write_output_t *output;
 460     int num_written;
 461 
 462     
 463     wev = orte_iof_base.iof_write_stdout->wev;
 464     if (!opal_list_is_empty(&wev->outputs)) {
 465         dump = false;
 466         
 467         while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
 468             if (!dump) {
 469                 num_written = write(wev->fd, output->data, output->numbytes);
 470                 if (num_written < output->numbytes) {
 471                     
 472                     dump = true;
 473                 }
 474             }
 475             OBJ_RELEASE(output);
 476         }
 477     }
 478     if (!orte_xml_output) {
 479         
 480         wev = orte_iof_base.iof_write_stderr->wev;
 481         if (!opal_list_is_empty(&wev->outputs)) {
 482             dump = false;
 483             
 484             while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
 485                 if (!dump) {
 486                     num_written = write(wev->fd, output->data, output->numbytes);
 487                     if (num_written < output->numbytes) {
 488                         
 489                         dump = true;
 490                     }
 491                 }
 492                 OBJ_RELEASE(output);
 493             }
 494         }
 495     }
 496 
 497     
 498 
 499     while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_hnp_component.procs))) {
 500         if (NULL != proct->revstdout) {
 501             orte_iof_base_static_dump_output(proct->revstdout);
 502         }
 503         if (NULL != proct->revstderr) {
 504             orte_iof_base_static_dump_output(proct->revstderr);
 505         }
 506 #if OPAL_PMIX_V1
 507         if (NULL != proct->revstddiag) {
 508             orte_iof_base_static_dump_output(proct->revstddiag);
 509         }
 510 #endif
 511         OBJ_RELEASE(proct);
 512     }
 513     OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
 514 
 515     return ORTE_SUCCESS;
 516 }
 517 
 518 int hnp_ft_event(int state) {
 519     
 520 
 521 
 522     return ORTE_SUCCESS;
 523 }
 524 
 525 
 526 
 527 
 528 
 529 static void stdin_write_handler(int fd, short event, void *cbdata)
 530 {
 531     orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
 532     orte_iof_write_event_t *wev = sink->wev;
 533     opal_list_item_t *item;
 534     orte_iof_write_output_t *output;
 535     int num_written, total_written = 0;
 536 
 537     ORTE_ACQUIRE_OBJECT(sink);
 538 
 539     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 540                          "%s hnp:stdin:write:handler writing data to %d",
 541                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 542                          wev->fd));
 543 
 544     wev->pending = false;
 545 
 546     while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 547         output = (orte_iof_write_output_t*)item;
 548         
 549 
 550 
 551         if (orte_abnormal_term_ordered) {
 552             OBJ_RELEASE(output);
 553             continue;
 554         }
 555         if (0 == output->numbytes) {
 556             
 557 
 558 
 559             OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
 560                                  "%s iof:hnp closing fd %d on write event due to zero bytes output",
 561                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
 562             goto finish;
 563         }
 564         num_written = write(wev->fd, output->data, output->numbytes);
 565         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 566                              "%s hnp:stdin:write:handler wrote %d bytes",
 567                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 568                              num_written));
 569         if (num_written < 0) {
 570             if (EAGAIN == errno || EINTR == errno) {
 571                 
 572                 opal_list_prepend(&wev->outputs, item);
 573                 
 574 
 575 
 576                 goto re_enter;
 577             }
 578             
 579 
 580 
 581             OBJ_RELEASE(output);
 582             OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
 583                                  "%s iof:hnp closing fd %d on write event due to negative bytes written",
 584                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
 585             goto finish;
 586         } else if (num_written < output->numbytes) {
 587             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 588                                  "%s hnp:stdin:write:handler incomplete write %d - adjusting data",
 589                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_written));
 590             
 591             memmove(output->data, &output->data[num_written], output->numbytes - num_written);
 592             
 593             opal_list_prepend(&wev->outputs, item);
 594             
 595 
 596 
 597             goto re_enter;
 598         }
 599         OBJ_RELEASE(output);
 600 
 601         total_written += num_written;
 602         if ((ORTE_IOF_SINK_BLOCKSIZE <= total_written) && wev->always_writable) {
 603             goto re_enter;
 604         }
 605     }
 606     goto check;
 607   re_enter:
 608     ORTE_IOF_SINK_ACTIVATE(wev);
 609   check:
 610     if (NULL != mca_iof_hnp_component.stdinev &&
 611         !orte_abnormal_term_ordered &&
 612         !mca_iof_hnp_component.stdinev->active) {
 613         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 614                             "read event is off - checking if okay to restart"));
 615         
 616 
 617 
 618 
 619 
 620 
 621 
 622 
 623 
 624 
 625         if (opal_list_get_size(&wev->outputs) < ORTE_IOF_MAX_INPUT_BUFFERS) {
 626             
 627             OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 628                                  "restarting read event"));
 629             ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
 630         }
 631     }
 632     if (sink->closed && 0 == opal_list_get_size(&wev->outputs)) {
 633         
 634         OBJ_RELEASE(sink);
 635     }
 636     return;
 637   finish:
 638     OBJ_RELEASE(wev);
 639     sink->wev = NULL;
 640     return;
 641 }
 642 
 643 static int hnp_output(const orte_process_name_t* peer,
 644                       orte_iof_tag_t source_tag,
 645                       const char *msg)
 646 {
 647     
 648     if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
 649         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
 650     } else {
 651         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
 652     }
 653 
 654     return ORTE_SUCCESS;
 655 }