root/opal/mca/pmix/pmix4x/pmix/src/common/pmix_iof.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. msgcbfunc
  2. PMIx_IOF_pull
  3. stdincbfunc
  4. PMIx_IOF_push
  5. pmix_iof_write_output
  6. pmix_iof_static_dump_output
  7. pmix_iof_write_handler
  8. pmix_iof_stdin_check
  9. pmix_iof_stdin_cb
  10. iof_stdin_cbfunc
  11. pmix_iof_read_local_handler
  12. iof_sink_construct
  13. iof_sink_destruct
  14. iof_read_event_construct
  15. iof_read_event_destruct
  16. iof_write_event_construct
  17. iof_write_event_destruct

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2016      Mellanox Technologies, Inc.
   5  *                         All rights reserved.
   6  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
   7  * Copyright (c) 2019      Research Organization for Information Science
   8  *                         and Technology (RIST).  All rights reserved.
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 #include <src/include/pmix_config.h>
  16 
  17 #ifdef HAVE_FCNTL_H
  18 #include <fcntl.h>
  19 #else
  20 #ifdef HAVE_SYS_FCNTL_H
  21 #include <sys/fcntl.h>
  22 #endif
  23 #endif
  24 
  25 #include <src/include/pmix_stdint.h>
  26 #include <src/include/pmix_socket_errno.h>
  27 
  28 #include <pmix.h>
  29 #include <pmix_common.h>
  30 #include <pmix_server.h>
  31 #include <pmix_rename.h>
  32 
  33 #include "src/threads/threads.h"
  34 #include "src/util/argv.h"
  35 #include "src/util/error.h"
  36 #include "src/util/name_fns.h"
  37 #include "src/util/output.h"
  38 #include "src/mca/bfrops/bfrops.h"
  39 #include "src/mca/ptl/ptl.h"
  40 
  41 #include "src/client/pmix_client_ops.h"
  42 #include "src/server/pmix_server_ops.h"
  43 #include "src/include/pmix_globals.h"
  44 
  45 static void msgcbfunc(struct pmix_peer_t *peer,
  46                        pmix_ptl_hdr_t *hdr,
  47                        pmix_buffer_t *buf, void *cbdata)
  48 {
  49     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
  50     int32_t m;
  51     pmix_status_t rc, status;
  52 
  53     /* unpack the return status */
  54     m=1;
  55     PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &m, PMIX_STATUS);
  56     if (PMIX_SUCCESS == rc && PMIX_SUCCESS == status) {
  57         /* store the request on our list - we are in an event, and
  58          * so this is safe */
  59         pmix_list_append(&pmix_globals.iof_requests, &cd->iofreq->super);
  60     } else if (PMIX_SUCCESS != rc) {
  61         status = rc;
  62         PMIX_RELEASE(cd->iofreq);
  63     }
  64 
  65     pmix_output_verbose(2, pmix_client_globals.iof_output,
  66                         "pmix:iof_register returned status %s", PMIx_Error_string(status));
  67 
  68     if (NULL != cd->cbfunc.opcbfn) {
  69         cd->cbfunc.opcbfn(status, cd->cbdata);
  70     }
  71     PMIX_RELEASE(cd);
  72 }
  73 
  74 PMIX_EXPORT pmix_status_t PMIx_IOF_pull(const pmix_proc_t procs[], size_t nprocs,
  75                                         const pmix_info_t directives[], size_t ndirs,
  76                                         pmix_iof_channel_t channel, pmix_iof_cbfunc_t cbfunc,
  77                                         pmix_hdlr_reg_cbfunc_t regcbfunc, void *regcbdata)
  78 {
  79     pmix_shift_caddy_t *cd;
  80     pmix_cmd_t cmd = PMIX_IOF_PULL_CMD;
  81     pmix_buffer_t *msg;
  82     pmix_status_t rc;
  83 
  84     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
  85 
  86     pmix_output_verbose(2, pmix_client_globals.iof_output,
  87                         "pmix:iof_register");
  88 
  89     if (pmix_globals.init_cntr <= 0) {
  90         PMIX_RELEASE_THREAD(&pmix_global_lock);
  91         return PMIX_ERR_INIT;
  92     }
  93 
  94     /* if we are a server, we cannot do this */
  95     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
  96         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
  97         PMIX_RELEASE_THREAD(&pmix_global_lock);
  98         return PMIX_ERR_NOT_SUPPORTED;
  99     }
 100 
 101     /* we don't allow stdin to flow thru this path */
 102     if (PMIX_FWD_STDIN_CHANNEL & channel) {
 103         PMIX_RELEASE_THREAD(&pmix_global_lock);
 104         return PMIX_ERR_NOT_SUPPORTED;
 105     }
 106 
 107     /* if we aren't connected, don't attempt to send */
 108     if (!pmix_globals.connected) {
 109         PMIX_RELEASE_THREAD(&pmix_global_lock);
 110         return PMIX_ERR_UNREACH;
 111     }
 112     PMIX_RELEASE_THREAD(&pmix_global_lock);
 113 
 114     /* send this request to the server */
 115     cd = PMIX_NEW(pmix_shift_caddy_t);
 116     if (NULL == cd) {
 117         return PMIX_ERR_NOMEM;
 118     }
 119     cd->cbfunc.hdlrregcbfn = regcbfunc;
 120     cd->cbdata = regcbdata;
 121     /* setup the request item */
 122     cd->iofreq = PMIX_NEW(pmix_iof_req_t);
 123     if (NULL == cd->iofreq) {
 124         PMIX_RELEASE(cd);
 125         return PMIX_ERR_NOMEM;
 126     }
 127     /* retain the channels and cbfunc */
 128     cd->iofreq->channels = channel;
 129     cd->iofreq->cbfunc = cbfunc;
 130     /* we don't need the source specifications - only the
 131     * server cares as it will filter against them */
 132 
 133     /* setup the registration cmd */
 134     msg = PMIX_NEW(pmix_buffer_t);
 135     if (NULL == msg) {
 136         PMIX_RELEASE(cd->iofreq);
 137         PMIX_RELEASE(cd);
 138         return PMIX_ERR_NOMEM;
 139     }
 140     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 141                      msg, &cmd, 1, PMIX_COMMAND);
 142     if (PMIX_SUCCESS != rc) {
 143         PMIX_ERROR_LOG(rc);
 144         goto cleanup;
 145     }
 146     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 147                      msg, &nprocs, 1, PMIX_SIZE);
 148     if (PMIX_SUCCESS != rc) {
 149         PMIX_ERROR_LOG(rc);
 150         goto cleanup;
 151     }
 152     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 153                      msg, procs, nprocs, PMIX_PROC);
 154     if (PMIX_SUCCESS != rc) {
 155         PMIX_ERROR_LOG(rc);
 156         goto cleanup;
 157     }
 158     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 159                      msg, &ndirs, 1, PMIX_SIZE);
 160     if (PMIX_SUCCESS != rc) {
 161         PMIX_ERROR_LOG(rc);
 162         goto cleanup;
 163     }
 164     if (0 < ndirs) {
 165         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 166                          msg, directives, ndirs, PMIX_INFO);
 167         if (PMIX_SUCCESS != rc) {
 168             PMIX_ERROR_LOG(rc);
 169             goto cleanup;
 170         }
 171     }
 172     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 173                      msg, &channel, 1, PMIX_IOF_CHANNEL);
 174     if (PMIX_SUCCESS != rc) {
 175         PMIX_ERROR_LOG(rc);
 176         goto cleanup;
 177     }
 178 
 179     pmix_output_verbose(2, pmix_client_globals.iof_output,
 180                         "pmix:iof_request sending to server");
 181     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 182                        msg, msgcbfunc, (void*)cd);
 183 
 184   cleanup:
 185     if (PMIX_SUCCESS != rc) {
 186         PMIX_ERROR_LOG(rc);
 187         PMIX_RELEASE(msg);
 188         PMIX_RELEASE(cd->iofreq);
 189         PMIX_RELEASE(cd);
 190     }
 191     return rc;
 192 }
 193 
 194 typedef struct {
 195     pmix_op_cbfunc_t cbfunc;
 196     void *cbdata;
 197 } pmix_ltcaddy_t;
 198 
 199 static pmix_event_t stdinsig_ev;
 200 static pmix_iof_read_event_t *stdinev = NULL;
 201 
 202 static void stdincbfunc(struct pmix_peer_t *peer,
 203                         pmix_ptl_hdr_t *hdr,
 204                         pmix_buffer_t *buf, void *cbdata)
 205 {
 206     pmix_ltcaddy_t *cd = (pmix_ltcaddy_t*)cbdata;
 207     int cnt;
 208     pmix_status_t rc, status;
 209 
 210     /* a zero-byte buffer indicates that this recv is being
 211      * completed due to a lost connection */
 212     if (PMIX_BUFFER_IS_EMPTY(buf)) {
 213         /* release the caller */
 214         if (NULL != cd->cbfunc) {
 215             cd->cbfunc(PMIX_ERR_COMM_FAILURE, cd->cbdata);
 216         }
 217         free(cd);
 218         return;
 219     }
 220 
 221     /* unpack the status */
 222     cnt = 1;
 223     PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &cnt, PMIX_STATUS);
 224     if (PMIX_SUCCESS != rc) {
 225         status = rc;
 226     }
 227     if (NULL != cd->cbfunc) {
 228         cd->cbfunc(status, cd->cbdata);
 229     }
 230     free(cd);
 231 }
 232 
 233 pmix_status_t PMIx_IOF_push(const pmix_proc_t targets[], size_t ntargets,
 234                             pmix_byte_object_t *bo,
 235                             const pmix_info_t directives[], size_t ndirs,
 236                             pmix_op_cbfunc_t cbfunc, void *cbdata)
 237 {
 238     pmix_buffer_t *msg;
 239     pmix_cmd_t cmd = PMIX_IOF_PUSH_CMD;
 240     pmix_status_t rc = PMIX_SUCCESS;
 241     pmix_ltcaddy_t *cd;
 242     size_t n;
 243     bool begincollecting, stopcollecting;
 244     int flags, fd = fileno(stdin);
 245 
 246     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 247     if (pmix_globals.init_cntr <= 0) {
 248         PMIX_RELEASE_THREAD(&pmix_global_lock);
 249         return PMIX_ERR_INIT;
 250     }
 251     PMIX_RELEASE_THREAD(&pmix_global_lock);
 252 
 253     if (NULL == bo) {
 254         /* check the directives */
 255         for (n=0; n < ndirs; n++) {
 256             if (PMIX_CHECK_KEY(&directives[n], PMIX_IOF_PUSH_STDIN)) {
 257                 /* we are to start collecting our stdin and pushing
 258                  * it to the specified targets */
 259                 begincollecting = PMIX_INFO_TRUE(&directives[n]);
 260                 if (begincollecting) {
 261                     /* add these targets to our list */
 262                     if (!pmix_globals.pushstdin) {
 263                         /* not already collecting, so start */
 264                         pmix_globals.pushstdin = true;
 265                         /* We don't want to set nonblocking on our
 266                          * stdio stream.  If we do so, we set the file descriptor to
 267                          * non-blocking for everyone that has that file descriptor, which
 268                          * includes everyone else in our shell pipeline chain.  (See
 269                          * http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).
 270                          * This causes things like "prun -np 1 big_app | cat" to lose
 271                          * output, because cat's stdout is then ALSO non-blocking and cat
 272                          * isn't built to deal with that case (same with almost all other
 273                          * unix text utils).
 274                          */
 275                         if (0 != fd) {
 276                             if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
 277                                 pmix_output(pmix_client_globals.iof_output,
 278                                             "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
 279                                             __FILE__, __LINE__, errno);
 280                             } else {
 281                                 flags |= O_NONBLOCK;
 282                                 fcntl(fd, F_SETFL, flags);
 283                             }
 284                         }
 285                         if (isatty(fd)) {
 286                             /* We should avoid trying to read from stdin if we
 287                              * have a terminal, but are backgrounded.  Catch the
 288                              * signals that are commonly used when we switch
 289                              * between being backgrounded and not.  If the
 290                              * filedescriptor is not a tty, don't worry about it
 291                              * and always stay connected.
 292                              */
 293                             pmix_event_signal_set(pmix_globals.evbase, &stdinsig_ev,
 294                                                   SIGCONT, pmix_iof_stdin_cb,
 295                                                   NULL);
 296 
 297                             /* setup a read event to read stdin, but don't activate it yet. The
 298                              * dst_name indicates who should receive the stdin. If that recipient
 299                              * doesn't do a corresponding pull, however, then the stdin will
 300                              * be dropped upon receipt at the local daemon
 301                              */
 302                             PMIX_IOF_READ_EVENT(&stdinev,
 303                                                 targets, ntargets,
 304                                                 directives, ndirs, fd,
 305                                                 pmix_iof_read_local_handler, false);
 306 
 307                             /* check to see if we want the stdin read event to be
 308                              * active - we will always at least define the event,
 309                              * but may delay its activation
 310                              */
 311                             if (pmix_iof_stdin_check(fd)) {
 312                                 PMIX_IOF_READ_ACTIVATE(stdinev);
 313                             }
 314                         } else {
 315                             /* if we are not looking at a tty, just setup a read event
 316                              * and activate it
 317                              */
 318                             PMIX_IOF_READ_EVENT(&stdinev, targets, ntargets,
 319                                                 directives, ndirs, fd,
 320                                                 pmix_iof_read_local_handler, true);
 321                         }
 322                     }
 323                 } else {
 324                     if (pmix_globals.pushstdin) {
 325                         /* remove these targets from the list of
 326                          * recipients - if the list is then empty,
 327                          * stop collecting. If the targets param
 328                          * is NULL, then remove all targets and stop.
 329                          * Flush any cached input before calling
 330                          * the cbfunc */
 331                     }
 332                 }
 333             } else if (PMIX_CHECK_KEY(&directives[n], PMIX_IOF_COMPLETE)) {
 334                 /* if we are collecting our stdin for the specified
 335                  * targets, then stop - a NULL for targets indicates
 336                  * stop for everyone. Flush any remaining cached input
 337                  * before calling the cbfunc */
 338                 stopcollecting = PMIX_INFO_TRUE(&directives[n]);
 339                 if (stopcollecting) {
 340                     if (pmix_globals.pushstdin) {
 341                         /* remove these targets from the list of
 342                          * recipients - if the list is then empty,
 343                          * stop collecting */
 344                     }
 345                 }
 346             }
 347         }
 348         return PMIX_OPERATION_SUCCEEDED;
 349     }
 350 
 351     /* if we are not a server, then we send the provided
 352      * data to our server for processing */
 353     if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) ||
 354         PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 355         msg = PMIX_NEW(pmix_buffer_t);
 356         if (NULL == msg) {
 357             return PMIX_ERR_NOMEM;
 358         }
 359         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 360                          msg, &cmd, 1, PMIX_COMMAND);
 361         if (PMIX_SUCCESS != rc) {
 362             PMIX_ERROR_LOG(rc);
 363             PMIX_RELEASE(msg);
 364             return rc;
 365         }
 366         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 367                          msg, &ntargets, 1, PMIX_SIZE);
 368         if (PMIX_SUCCESS != rc) {
 369             PMIX_ERROR_LOG(rc);
 370             PMIX_RELEASE(msg);
 371             return rc;
 372         }
 373         if (0 < ntargets) {
 374             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 375                              msg, targets, ntargets, PMIX_PROC);
 376             if (PMIX_SUCCESS != rc) {
 377                 PMIX_ERROR_LOG(rc);
 378                 PMIX_RELEASE(msg);
 379                 return rc;
 380             }
 381         }
 382         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 383                          msg, &ndirs, 1, PMIX_SIZE);
 384         if (PMIX_SUCCESS != rc) {
 385             PMIX_ERROR_LOG(rc);
 386             PMIX_RELEASE(msg);
 387             return rc;
 388         }
 389         if (0 < ndirs) {
 390             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 391                              msg, directives, ndirs, PMIX_INFO);
 392             if (PMIX_SUCCESS != rc) {
 393                 PMIX_ERROR_LOG(rc);
 394                 PMIX_RELEASE(msg);
 395                 return rc;
 396             }
 397         }
 398         if (NULL != bo) {
 399             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 400                              msg, bo, 1, PMIX_BYTE_OBJECT);
 401             if (PMIX_SUCCESS != rc) {
 402                 PMIX_ERROR_LOG(rc);
 403                 PMIX_RELEASE(msg);
 404                 return rc;
 405             }
 406         }
 407 
 408         cd = (pmix_ltcaddy_t*)malloc(sizeof(pmix_ltcaddy_t));
 409         if (NULL == cd) {
 410             PMIX_RELEASE(msg);
 411             rc = PMIX_ERR_NOMEM;
 412             return rc;
 413         }
 414         cd->cbfunc = cbfunc;
 415         cd->cbdata = cbdata;
 416         PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 417                            msg, stdincbfunc, cd);
 418         if (PMIX_SUCCESS != rc) {
 419             PMIX_ERROR_LOG(rc);
 420             PMIX_RELEASE(msg);
 421             free(cd);
 422         }
 423         return rc;
 424     }
 425 
 426     /* if we are a server, just pass the data up to our host */
 427     if (NULL == pmix_host_server.push_stdin) {
 428         return PMIX_ERR_NOT_SUPPORTED;
 429     }
 430     rc = pmix_host_server.push_stdin(&pmix_globals.myid,
 431                                      targets, ntargets,
 432                                      directives, ndirs,
 433                                      bo, cbfunc, cbdata);
 434     return rc;
 435 }
 436 
 437 pmix_status_t pmix_iof_write_output(const pmix_proc_t *name,
 438                                     pmix_iof_channel_t stream,
 439                                     const pmix_byte_object_t *bo,
 440                                     pmix_iof_flags_t *flags)
 441 {
 442     char starttag[PMIX_IOF_BASE_TAG_MAX], endtag[PMIX_IOF_BASE_TAG_MAX], *suffix;
 443     pmix_iof_write_output_t *output;
 444     size_t i;
 445     int j, k, starttaglen, endtaglen, num_buffered;
 446     bool endtagged;
 447     char qprint[10];
 448     pmix_iof_write_event_t *channel;
 449     pmix_iof_flags_t myflags;
 450 
 451     if (PMIX_FWD_STDOUT_CHANNEL & stream) {
 452         channel = &pmix_client_globals.iof_stdout.wev;
 453     } else {
 454         channel = &pmix_client_globals.iof_stderr.wev;
 455     }
 456     if (NULL == flags) {
 457         myflags.xml = pmix_globals.xml_output;
 458         if (pmix_globals.timestamp_output) {
 459             time(&myflags.timestamp);
 460         } else {
 461             myflags.timestamp = 0;
 462         }
 463         myflags.tag = pmix_globals.tag_output;
 464     } else {
 465         myflags = *flags;
 466     }
 467 
 468     PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
 469                          "%s write:output setting up to write %lu bytes to %s for %s on fd %d",
 470                          PMIX_NAME_PRINT(&pmix_globals.myid),
 471                          (unsigned long)bo->size,
 472                          PMIx_IOF_channel_string(stream),
 473                          PMIX_NAME_PRINT(name),
 474                          (NULL == channel) ? -1 : channel->fd));
 475 
 476     /* setup output object */
 477     output = PMIX_NEW(pmix_iof_write_output_t);
 478     memset(starttag, 0, PMIX_IOF_BASE_TAG_MAX);
 479     memset(endtag, 0, PMIX_IOF_BASE_TAG_MAX);
 480 
 481     /* write output data to the corresponding tag */
 482     if (PMIX_FWD_STDIN_CHANNEL & stream) {
 483         /* copy over the data to be written */
 484         if (0 < bo->size) {
 485             /* don't copy 0 bytes - we just need to pass
 486              * the zero bytes so the fd can be closed
 487              * after it writes everything out
 488              */
 489             memcpy(output->data, bo->bytes, bo->size);
 490         }
 491         output->numbytes = bo->size;
 492         goto process;
 493     } else if (PMIX_FWD_STDOUT_CHANNEL & stream) {
 494         /* write the bytes to stdout */
 495         suffix = "stdout";
 496     } else if (PMIX_FWD_STDERR_CHANNEL & stream) {
 497         /* write the bytes to stderr */
 498         suffix = "stderr";
 499     } else if (PMIX_FWD_STDDIAG_CHANNEL & stream) {
 500         /* write the bytes to stderr */
 501         suffix = "stddiag";
 502     } else {
 503         /* error - this should never happen */
 504         PMIX_ERROR_LOG(PMIX_ERR_VALUE_OUT_OF_BOUNDS);
 505         PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
 506                              "%s stream %0x", PMIX_NAME_PRINT(&pmix_globals.myid), stream));
 507         return PMIX_ERR_VALUE_OUT_OF_BOUNDS;
 508     }
 509 
 510     /* if this is to be xml tagged, create a tag with the correct syntax - we do not allow
 511      * timestamping of xml output
 512      */
 513     if (myflags.xml) {
 514         snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "<%s rank=\"%s\">", suffix, PMIX_RANK_PRINT(name->rank));
 515         snprintf(endtag, PMIX_IOF_BASE_TAG_MAX, "</%s>", suffix);
 516         goto construct;
 517     }
 518 
 519     /* if we are to timestamp output, start the tag with that */
 520     if (0 < myflags.timestamp) {
 521         char *cptr;
 522         /* get the timestamp */
 523         cptr = ctime(&myflags.timestamp);
 524         cptr[strlen(cptr)-1] = '\0';  /* remove trailing newline */
 525 
 526         if (myflags.tag) {
 527             /* if we want it tagged as well, use both */
 528             snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "%s[%s]<%s>:",
 529                      cptr, PMIX_NAME_PRINT(name), suffix);
 530         } else {
 531             /* only use timestamp */
 532             snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "%s<%s>:", cptr, suffix);
 533         }
 534         /* no endtag for this option */
 535         memset(endtag, '\0', PMIX_IOF_BASE_TAG_MAX);
 536         goto construct;
 537     }
 538 
 539     if (myflags.tag) {
 540         snprintf(starttag, PMIX_IOF_BASE_TAG_MAX, "[%s]<%s>:",
 541                  PMIX_NAME_PRINT(name), suffix);
 542         /* no endtag for this option */
 543         memset(endtag, '\0', PMIX_IOF_BASE_TAG_MAX);
 544         goto construct;
 545     }
 546 
 547     /* if we get here, then the data is not to be tagged - just copy it
 548      * and move on to processing
 549      */
 550     if (0 < bo->size) {
 551         /* don't copy 0 bytes - we just need to pass
 552          * the zero bytes so the fd can be closed
 553          * after it writes everything out
 554          */
 555         memcpy(output->data, bo->bytes, bo->size);
 556     }
 557     output->numbytes = bo->size;
 558     goto process;
 559 
 560   construct:
 561     starttaglen = strlen(starttag);
 562     endtaglen = strlen(endtag);
 563     endtagged = false;
 564     /* start with the tag */
 565     for (j=0, k=0; j < starttaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
 566         output->data[k++] = starttag[j];
 567     }
 568     /* cycle through the data looking for <cr>
 569      * and replace those with the tag
 570      */
 571     for (i=0; i < bo->size && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; i++) {
 572         if (myflags.xml) {
 573             if ('&' == bo->bytes[i]) {
 574                 if (k+5 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
 575                     PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 576                     goto process;
 577                 }
 578                 snprintf(qprint, 10, "&amp;");
 579                 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
 580                     output->data[k++] = qprint[j];
 581                 }
 582             } else if ('<' == bo->bytes[i]) {
 583                 if (k+4 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
 584                     PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 585                     goto process;
 586                 }
 587                 snprintf(qprint, 10, "&lt;");
 588                 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
 589                     output->data[k++] = qprint[j];
 590                 }
 591             } else if ('>' == bo->bytes[i]) {
 592                 if (k+4 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
 593                     PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 594                     goto process;
 595                 }
 596                 snprintf(qprint, 10, "&gt;");
 597                 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
 598                     output->data[k++] = qprint[j];
 599                 }
 600             } else if (bo->bytes[i] < 32 || bo->bytes[i] > 127) {
 601                 /* this is a non-printable character, so escape it too */
 602                 if (k+7 >= PMIX_IOF_BASE_TAGGED_OUT_MAX) {
 603                     PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 604                     goto process;
 605                 }
 606                 snprintf(qprint, 10, "&#%03d;", (int)bo->bytes[i]);
 607                 for (j=0; j < (int)strlen(qprint) && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
 608                     output->data[k++] = qprint[j];
 609                 }
 610                 /* if this was a \n, then we also need to break the line with the end tag */
 611                 if ('\n' == bo->bytes[i] && (k+endtaglen+1) < PMIX_IOF_BASE_TAGGED_OUT_MAX) {
 612                     /* we need to break the line with the end tag */
 613                     for (j=0; j < endtaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
 614                         output->data[k++] = endtag[j];
 615                     }
 616                     /* move the <cr> over */
 617                     output->data[k++] = '\n';
 618                     /* if this isn't the end of the data buffer, add a new start tag */
 619                     if (i < bo->size-1 && (k+starttaglen) < PMIX_IOF_BASE_TAGGED_OUT_MAX) {
 620                         for (j=0; j < starttaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
 621                             output->data[k++] = starttag[j];
 622                             endtagged = false;
 623                         }
 624                     } else {
 625                         endtagged = true;
 626                     }
 627                 }
 628             } else {
 629                 output->data[k++] = bo->bytes[i];
 630             }
 631         } else {
 632             if ('\n' == bo->bytes[i]) {
 633                 /* we need to break the line with the end tag */
 634                 for (j=0; j < endtaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
 635                     output->data[k++] = endtag[j];
 636                 }
 637                 /* move the <cr> over */
 638                 output->data[k++] = '\n';
 639                 /* if this isn't the end of the data buffer, add a new start tag */
 640                 if (i < bo->size-1) {
 641                     for (j=0; j < starttaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX; j++) {
 642                         output->data[k++] = starttag[j];
 643                         endtagged = false;
 644                     }
 645                 } else {
 646                     endtagged = true;
 647                 }
 648             } else {
 649                 output->data[k++] = bo->bytes[i];
 650             }
 651         }
 652     }
 653     if (!endtagged && k < PMIX_IOF_BASE_TAGGED_OUT_MAX) {
 654         /* need to add an endtag */
 655         for (j=0; j < endtaglen && k < PMIX_IOF_BASE_TAGGED_OUT_MAX-1; j++) {
 656             output->data[k++] = endtag[j];
 657         }
 658         output->data[k] = '\n';
 659     }
 660     output->numbytes = k;
 661 
 662   process:
 663     /* add this data to the write list for this fd */
 664     pmix_list_append(&channel->outputs, &output->super);
 665 
 666     /* record how big the buffer is */
 667     num_buffered = pmix_list_get_size(&channel->outputs);
 668 
 669     /* is the write event issued? */
 670     if (!channel->pending) {
 671         /* issue it */
 672         PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
 673                              "%s write:output adding write event",
 674                              PMIX_NAME_PRINT(&pmix_globals.myid)));
 675         PMIX_IOF_SINK_ACTIVATE(channel);
 676     }
 677 
 678     return num_buffered;
 679 }
 680 
 681 void pmix_iof_static_dump_output(pmix_iof_sink_t *sink)
 682 {
 683     bool dump;
 684     int num_written;
 685     pmix_iof_write_event_t *wev = &sink->wev;
 686     pmix_iof_write_output_t *output;
 687 
 688     if (!pmix_list_is_empty(&wev->outputs)) {
 689         dump = false;
 690         /* make one last attempt to write this out */
 691         while (NULL != (output = (pmix_iof_write_output_t*)pmix_list_remove_first(&wev->outputs))) {
 692             if (!dump) {
 693                 num_written = write(wev->fd, output->data, output->numbytes);
 694                 if (num_written < output->numbytes) {
 695                     /* don't retry - just cleanout the list and dump it */
 696                     dump = true;
 697                 }
 698             }
 699             PMIX_RELEASE(output);
 700         }
 701     }
 702 }
 703 
 704 void pmix_iof_write_handler(int _fd, short event, void *cbdata)
 705 {
 706     pmix_iof_sink_t *sink = (pmix_iof_sink_t*)cbdata;
 707     pmix_iof_write_event_t *wev = &sink->wev;
 708     pmix_list_item_t *item;
 709     pmix_iof_write_output_t *output;
 710     int num_written, total_written = 0;
 711 
 712     PMIX_ACQUIRE_OBJECT(sink);
 713 
 714     PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
 715                          "%s write:handler writing data to %d",
 716                          PMIX_NAME_PRINT(&pmix_globals.myid),
 717                          wev->fd));
 718 
 719     while (NULL != (item = pmix_list_remove_first(&wev->outputs))) {
 720         output = (pmix_iof_write_output_t*)item;
 721         if (0 == output->numbytes) {
 722             /* indicates we are to close this stream */
 723             PMIX_RELEASE(sink);
 724             return;
 725         }
 726         num_written = write(wev->fd, output->data, output->numbytes);
 727         if (num_written < 0) {
 728             if (EAGAIN == errno || EINTR == errno) {
 729                 /* push this item back on the front of the list */
 730                 pmix_list_prepend(&wev->outputs, item);
 731                 /* if the list is getting too large, abort */
 732                 if (pmix_globals.output_limit < pmix_list_get_size(&wev->outputs)) {
 733                     pmix_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
 734                     goto ABORT;
 735                 }
 736                 /* leave the write event running so it will call us again
 737                  * when the fd is ready.
 738                  */
 739                 goto NEXT_CALL;
 740             }
 741             /* otherwise, something bad happened so all we can do is abort
 742              * this attempt
 743              */
 744             PMIX_RELEASE(output);
 745             goto ABORT;
 746         } else if (num_written < output->numbytes) {
 747             /* incomplete write - adjust data to avoid duplicate output */
 748             memmove(output->data, &output->data[num_written], output->numbytes - num_written);
 749             /* adjust the number of bytes remaining to be written */
 750             output->numbytes -= num_written;
 751             /* push this item back on the front of the list */
 752             pmix_list_prepend(&wev->outputs, item);
 753             /* if the list is getting too large, abort */
 754             if (pmix_globals.output_limit < pmix_list_get_size(&wev->outputs)) {
 755                 pmix_output(0, "IO Forwarding is running too far behind - something is blocking us from writing");
 756                 goto ABORT;
 757             }
 758             /* leave the write event running so it will call us again
 759              * when the fd is ready
 760              */
 761             goto NEXT_CALL;
 762         }
 763         PMIX_RELEASE(output);
 764 
 765         total_written += num_written;
 766         if(wev->always_writable && (PMIX_IOF_SINK_BLOCKSIZE <= total_written)){
 767             /* If this is a regular file it will never tell us it will block
 768              * Write no more than PMIX_IOF_REGULARF_BLOCK at a time allowing
 769              * other fds to progress
 770              */
 771             goto NEXT_CALL;
 772         }
 773     }
 774   ABORT:
 775     wev->pending = false;
 776     PMIX_POST_OBJECT(wev);
 777     return;
 778 NEXT_CALL:
 779     PMIX_IOF_SINK_ACTIVATE(wev);
 780 }
 781 
 782 /* return true if we should read stdin from fd, false otherwise */
 783 bool pmix_iof_stdin_check(int fd)
 784 {
 785 #if defined(HAVE_TCGETPGRP)
 786     if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) {
 787         return false;
 788     }
 789 #endif
 790     return true;
 791 }
 792 
 793 void pmix_iof_stdin_cb(int fd, short event, void *cbdata)
 794 {
 795     bool should_process;
 796     pmix_iof_read_event_t *stdinev = (pmix_iof_read_event_t*)cbdata;
 797 
 798     PMIX_ACQUIRE_OBJECT(stdinev);
 799 
 800     should_process = pmix_iof_stdin_check(0);
 801 
 802     if (should_process) {
 803         PMIX_IOF_READ_ACTIVATE(stdinev);
 804     } else {
 805         pmix_event_del(&stdinev->ev);
 806         stdinev->active = false;
 807         PMIX_POST_OBJECT(stdinev);
 808     }
 809 }
 810 
 811 static void iof_stdin_cbfunc(struct pmix_peer_t *peer,
 812                              pmix_ptl_hdr_t *hdr,
 813                              pmix_buffer_t *buf, void *cbdata)
 814 {
 815     pmix_iof_read_event_t *stdinev = (pmix_iof_read_event_t*)cbdata;
 816     int cnt;
 817     pmix_status_t rc, ret;
 818 
 819     PMIX_ACQUIRE_OBJECT(stdinev);
 820 
 821     /* check the return status */
 822     cnt = 1;
 823     PMIX_BFROPS_UNPACK(rc, peer, buf, &ret, &cnt, PMIX_STATUS);
 824     if (PMIX_SUCCESS != rc) {
 825         PMIX_ERROR_LOG(rc);
 826         pmix_event_del(&stdinev->ev);
 827         stdinev->active = false;
 828         PMIX_POST_OBJECT(stdinev);
 829         return;
 830     }
 831     /* if the status wasn't success, then terminate the forward */
 832     if (PMIX_SUCCESS != ret) {
 833         pmix_event_del(&stdinev->ev);
 834         stdinev->active = false;
 835         PMIX_POST_OBJECT(stdinev);
 836         if (PMIX_ERR_IOF_COMPLETE != ret) {
 837             /* generate an IOF-failed event so the tool knows */
 838             PMIx_Notify_event(PMIX_ERR_IOF_FAILURE,
 839                               &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL,
 840                               NULL, 0, NULL, NULL);
 841         }
 842         return;
 843     }
 844 
 845     pmix_iof_stdin_cb(0, 0, stdinev);
 846 }
 847 
 848 /* this is the read handler for stdin */
 849 void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata)
 850 {
 851     pmix_iof_read_event_t *rev = (pmix_iof_read_event_t*)cbdata;
 852     unsigned char data[PMIX_IOF_BASE_MSG_MAX];
 853     int32_t numbytes;
 854     int fd;
 855     pmix_status_t rc;
 856     pmix_buffer_t *msg;
 857     pmix_cmd_t cmd = PMIX_IOF_PUSH_CMD;
 858     pmix_byte_object_t bo;
 859 
 860     PMIX_ACQUIRE_OBJECT(rev);
 861 
 862     /* As we may use timer events, fd can be bogus (-1)
 863      * use the right one here
 864      */
 865     fd = fileno(stdin);
 866 
 867     /* read up to the fragment size */
 868     memset(data, 0, PMIX_IOF_BASE_MSG_MAX);
 869     numbytes = read(fd, data, sizeof(data));
 870 
 871     if (numbytes < 0) {
 872         /* either we have a connection error or it was a non-blocking read */
 873 
 874         /* non-blocking, retry */
 875         if (EAGAIN == errno || EINTR == errno) {
 876             PMIX_IOF_READ_ACTIVATE(rev);
 877             return;
 878         }
 879 
 880         PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output,
 881                              "%s iof:read handler Error on stdin",
 882                              PMIX_NAME_PRINT(&pmix_globals.myid)));
 883         /* Un-recoverable error. Allow the code to flow as usual in order to
 884          * to send the zero bytes message up the stream, and then close the
 885          * file descriptor and delete the event.
 886          */
 887         numbytes = 0;
 888     }
 889 
 890     /* The event has fired, so it's no longer active until we
 891        re-add it */
 892     rev->active = false;
 893 
 894     /* pass the data to our PMIx server so it can relay it
 895      * to the host RM for distribution */
 896     msg = PMIX_NEW(pmix_buffer_t);
 897     if (NULL == msg) {
 898         /* don't restart the event - just return */
 899         return;
 900     }
 901     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 902                      msg, &cmd, 1, PMIX_COMMAND);
 903     if (PMIX_SUCCESS != rc) {
 904         PMIX_ERROR_LOG(rc);
 905         PMIX_RELEASE(msg);
 906         return;
 907     }
 908     /* pack the number of targets */
 909     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 910                      msg, &rev->ntargets, 1, PMIX_SIZE);
 911     if (PMIX_SUCCESS != rc) {
 912         PMIX_ERROR_LOG(rc);
 913         PMIX_RELEASE(msg);
 914         return;
 915     }
 916     /* and the targets */
 917     if (0 < rev->ntargets) {
 918         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 919                          msg, rev->targets, rev->ntargets, PMIX_PROC);
 920         if (PMIX_SUCCESS != rc) {
 921             PMIX_ERROR_LOG(rc);
 922             PMIX_RELEASE(msg);
 923             return;
 924         }
 925     }
 926     /* pack the number of directives */
 927     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 928                      msg, &rev->ndirs, 1, PMIX_SIZE);
 929     if (PMIX_SUCCESS != rc) {
 930         PMIX_ERROR_LOG(rc);
 931         PMIX_RELEASE(msg);
 932         return;
 933     }
 934     /* and the directives */
 935     if (0 < rev->ndirs) {
 936         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 937                          msg, rev->directives, rev->ndirs, PMIX_INFO);
 938         if (PMIX_SUCCESS != rc) {
 939             PMIX_ERROR_LOG(rc);
 940             PMIX_RELEASE(msg);
 941             return;
 942         }
 943     }
 944 
 945     /* pack the data */
 946     bo.bytes = (char*)data;
 947     bo.size = numbytes;
 948     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 949                      msg, &bo, 1, PMIX_BYTE_OBJECT);
 950     if (PMIX_SUCCESS != rc) {
 951         PMIX_ERROR_LOG(rc);
 952         PMIX_RELEASE(msg);
 953         return;
 954     }
 955 
 956     /* send it to the server */
 957     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 958                        msg, iof_stdin_cbfunc, rev);
 959     if (PMIX_SUCCESS != rc) {
 960         PMIX_ERROR_LOG(rc);
 961         PMIX_RELEASE(msg);
 962     }
 963     /* nothing more to do */
 964     return;
 965 }
 966 
 967 /* class instances */
 968 static void iof_sink_construct(pmix_iof_sink_t* ptr)
 969 {
 970     PMIX_CONSTRUCT(&ptr->wev, pmix_iof_write_event_t);
 971     ptr->xoff = false;
 972     ptr->exclusive = false;
 973     ptr->closed = false;
 974 }
 975 static void iof_sink_destruct(pmix_iof_sink_t* ptr)
 976 {
 977     if (0 <= ptr->wev.fd) {
 978         PMIX_OUTPUT_VERBOSE((20, pmix_client_globals.iof_output,
 979                              "%s iof: closing sink for process %s on fd %d",
 980                              PMIX_NAME_PRINT(&pmix_globals.myid),
 981                              PMIX_NAME_PRINT(&ptr->name), ptr->wev.fd));
 982         PMIX_DESTRUCT(&ptr->wev);
 983     }
 984 }
 985 PMIX_CLASS_INSTANCE(pmix_iof_sink_t,
 986                     pmix_list_item_t,
 987                     iof_sink_construct,
 988                     iof_sink_destruct);
 989 
 990 
 991 static void iof_read_event_construct(pmix_iof_read_event_t* rev)
 992 {
 993     rev->fd = -1;
 994     rev->active = false;
 995     rev->tv.tv_sec = 0;
 996     rev->tv.tv_usec = 0;
 997     rev->targets = NULL;
 998     rev->ntargets = 0;
 999     rev->directives = NULL;
1000     rev->ndirs = 0;
1001 }
1002 static void iof_read_event_destruct(pmix_iof_read_event_t* rev)
1003 {
1004     pmix_event_del(&rev->ev);
1005     if (0 <= rev->fd) {
1006         PMIX_OUTPUT_VERBOSE((20, pmix_client_globals.iof_output,
1007                              "%s iof: closing fd %d",
1008                              PMIX_NAME_PRINT(&pmix_globals.myid), rev->fd));
1009         close(rev->fd);
1010         rev->fd = -1;
1011     }
1012     if (NULL != rev->targets) {
1013         PMIX_PROC_FREE(rev->targets, rev->ntargets);
1014     }
1015     if (NULL != rev->directives) {
1016         PMIX_INFO_FREE(rev->directives, rev->ndirs);
1017     }
1018 }
1019 PMIX_CLASS_INSTANCE(pmix_iof_read_event_t,
1020                     pmix_object_t,
1021                     iof_read_event_construct,
1022                     iof_read_event_destruct);
1023 
1024 static void iof_write_event_construct(pmix_iof_write_event_t* wev)
1025 {
1026     wev->pending = false;
1027     wev->always_writable = false;
1028     wev->fd = -1;
1029     PMIX_CONSTRUCT(&wev->outputs, pmix_list_t);
1030     wev->tv.tv_sec = 0;
1031     wev->tv.tv_usec = 0;
1032 }
1033 static void iof_write_event_destruct(pmix_iof_write_event_t* wev)
1034 {
1035     pmix_event_del(&wev->ev);
1036     if (2 < wev->fd) {
1037         PMIX_OUTPUT_VERBOSE((20, pmix_client_globals.iof_output,
1038                              "%s iof: closing fd %d for write event",
1039                              PMIX_NAME_PRINT(&pmix_globals.myid), wev->fd));
1040         close(wev->fd);
1041     }
1042     PMIX_DESTRUCT(&wev->outputs);
1043 }
1044 PMIX_CLASS_INSTANCE(pmix_iof_write_event_t,
1045                     pmix_list_item_t,
1046                     iof_write_event_construct,
1047                     iof_write_event_destruct);
1048 
1049 PMIX_CLASS_INSTANCE(pmix_iof_write_output_t,
1050                     pmix_list_item_t,
1051                     NULL, NULL);

/* [<][>][^][v][top][bottom][index][help] */