root/orte/mca/iof/orted/iof_orted_receive.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. send_cb
  2. orte_iof_orted_send_xonxoff
  3. orte_iof_orted_recv

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2005 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2011      Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "orte_config.h"
  24 #include "orte/constants.h"
  25 
  26 #include <errno.h>
  27 #ifdef HAVE_UNISTD_H
  28 #include <unistd.h>
  29 #endif  /* HAVE_UNISTD_H */
  30 #include <string.h>
  31 
  32 #include "opal/dss/dss.h"
  33 
  34 #include "orte/mca/rml/rml.h"
  35 #include "orte/mca/rml/rml_types.h"
  36 #include "orte/mca/errmgr/errmgr.h"
  37 #include "orte/util/name_fns.h"
  38 #include "orte/runtime/orte_globals.h"
  39 
  40 #include "orte/mca/iof/iof_types.h"
  41 #include "orte/mca/iof/base/base.h"
  42 
  43 #include "iof_orted.h"
  44 
  45 static void send_cb(int status, orte_process_name_t *peer,
  46                     opal_buffer_t *buf, orte_rml_tag_t tag,
  47                     void *cbdata)
  48 {
  49     /* nothing to do here - just release buffer and return */
  50     OBJ_RELEASE(buf);
  51 }
  52 
  53 void orte_iof_orted_send_xonxoff(orte_iof_tag_t tag)
  54 {
  55     opal_buffer_t *buf;
  56     int rc;
  57 
  58     buf = OBJ_NEW(opal_buffer_t);
  59 
  60     /* pack the tag - we do this first so that flow control messages can
  61      * consist solely of the tag
  62      */
  63     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
  64         ORTE_ERROR_LOG(rc);
  65         OBJ_RELEASE(buf);
  66         return;
  67     }
  68 
  69     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
  70                          "%s sending %s",
  71                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  72                          (ORTE_IOF_XON == tag) ? "xon" : "xoff"));
  73 
  74     /* send the buffer to the HNP */
  75     if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
  76                                           send_cb, NULL))) {
  77         ORTE_ERROR_LOG(rc);
  78     }
  79 }
  80 
  81 /*
  82  * The only messages coming to an orted are either:
  83  *
  84  * (a) stdin, which is to be copied to whichever local
  85  *     procs "pull'd" a copy
  86  *
  87  * (b) flow control messages
  88  */
  89 void orte_iof_orted_recv(int status, orte_process_name_t* sender,
  90                          opal_buffer_t* buffer, orte_rml_tag_t tag,
  91                          void* cbdata)
  92 {
  93     unsigned char data[ORTE_IOF_BASE_MSG_MAX];
  94     orte_iof_tag_t stream;
  95     int32_t count, numbytes;
  96     orte_process_name_t target;
  97     orte_iof_proc_t *proct;
  98     int rc;
  99 
 100     /* see what stream generated this data */
 101     count = 1;
 102     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
 103         ORTE_ERROR_LOG(rc);
 104         return;
 105     }
 106 
 107     /* if this isn't stdin, then we have an error */
 108     if (ORTE_IOF_STDIN != stream) {
 109         ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
 110         return;
 111     }
 112 
 113     /* unpack the intended target */
 114     count = 1;
 115     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &target, &count, ORTE_NAME))) {
 116         ORTE_ERROR_LOG(rc);
 117         return;
 118     }
 119 
 120     /* unpack the data */
 121     numbytes=ORTE_IOF_BASE_MSG_MAX;
 122     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
 123         ORTE_ERROR_LOG(rc);
 124         return;
 125     }
 126     /* numbytes will contain the actual #bytes that were sent */
 127 
 128     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 129                          "%s unpacked %d bytes for local proc %s",
 130                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
 131                          ORTE_NAME_PRINT(&target)));
 132 
 133     /* cycle through our list of procs */
 134     OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
 135         /* is this intended for this jobid? */
 136         if (target.jobid == proct->name.jobid) {
 137             /* yes - is this intended for all vpids or this vpid? */
 138             if (ORTE_VPID_WILDCARD == target.vpid ||
 139                 proct->name.vpid == target.vpid) {
 140                 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 141                                      "%s writing data to local proc %s",
 142                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 143                                      ORTE_NAME_PRINT(&proct->name)));
 144                 if (NULL == proct->stdinev) {
 145                     continue;
 146                 }
 147                 /* send the bytes down the pipe - we even send 0 byte events
 148                  * down the pipe so it forces out any preceding data before
 149                  * closing the output stream
 150                  */
 151                 if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdinev->wev)) {
 152                     /* getting too backed up - tell the HNP to hold off any more input if we
 153                      * haven't already told it
 154                      */
 155                     if (!mca_iof_orted_component.xoff) {
 156                         mca_iof_orted_component.xoff = true;
 157                         orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
 158                     }
 159                 }
 160             }
 161         }
 162     }
 163 }

/* [<][>][^][v][top][bottom][index][help] */