This source file includes following definitions.
- send_cb
- orte_iof_orted_send_xonxoff
- orte_iof_orted_recv
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 #include "orte_config.h"
  24 #include "orte/constants.h"
  25 
  26 #include <errno.h>
  27 #ifdef HAVE_UNISTD_H
  28 #include <unistd.h>
  29 #endif  
  30 #include <string.h>
  31 
  32 #include "opal/dss/dss.h"
  33 
  34 #include "orte/mca/rml/rml.h"
  35 #include "orte/mca/rml/rml_types.h"
  36 #include "orte/mca/errmgr/errmgr.h"
  37 #include "orte/util/name_fns.h"
  38 #include "orte/runtime/orte_globals.h"
  39 
  40 #include "orte/mca/iof/iof_types.h"
  41 #include "orte/mca/iof/base/base.h"
  42 
  43 #include "iof_orted.h"
  44 
  45 static void send_cb(int status, orte_process_name_t *peer,
  46                     opal_buffer_t *buf, orte_rml_tag_t tag,
  47                     void *cbdata)
  48 {
  49     
  50     OBJ_RELEASE(buf);
  51 }
  52 
  53 void orte_iof_orted_send_xonxoff(orte_iof_tag_t tag)
  54 {
  55     opal_buffer_t *buf;
  56     int rc;
  57 
  58     buf = OBJ_NEW(opal_buffer_t);
  59 
  60     
  61 
  62 
  63     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
  64         ORTE_ERROR_LOG(rc);
  65         OBJ_RELEASE(buf);
  66         return;
  67     }
  68 
  69     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
  70                          "%s sending %s",
  71                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  72                          (ORTE_IOF_XON == tag) ? "xon" : "xoff"));
  73 
  74     
  75     if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
  76                                           send_cb, NULL))) {
  77         ORTE_ERROR_LOG(rc);
  78     }
  79 }
  80 
  81 
  82 
  83 
  84 
  85 
  86 
  87 
  88 
  89 void orte_iof_orted_recv(int status, orte_process_name_t* sender,
  90                          opal_buffer_t* buffer, orte_rml_tag_t tag,
  91                          void* cbdata)
  92 {
  93     unsigned char data[ORTE_IOF_BASE_MSG_MAX];
  94     orte_iof_tag_t stream;
  95     int32_t count, numbytes;
  96     orte_process_name_t target;
  97     orte_iof_proc_t *proct;
  98     int rc;
  99 
 100     
 101     count = 1;
 102     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
 103         ORTE_ERROR_LOG(rc);
 104         return;
 105     }
 106 
 107     
 108     if (ORTE_IOF_STDIN != stream) {
 109         ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
 110         return;
 111     }
 112 
 113     
 114     count = 1;
 115     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &target, &count, ORTE_NAME))) {
 116         ORTE_ERROR_LOG(rc);
 117         return;
 118     }
 119 
 120     
 121     numbytes=ORTE_IOF_BASE_MSG_MAX;
 122     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
 123         ORTE_ERROR_LOG(rc);
 124         return;
 125     }
 126     
 127 
 128     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 129                          "%s unpacked %d bytes for local proc %s",
 130                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
 131                          ORTE_NAME_PRINT(&target)));
 132 
 133     
 134     OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
 135         
 136         if (target.jobid == proct->name.jobid) {
 137             
 138             if (ORTE_VPID_WILDCARD == target.vpid ||
 139                 proct->name.vpid == target.vpid) {
 140                 OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 141                                      "%s writing data to local proc %s",
 142                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 143                                      ORTE_NAME_PRINT(&proct->name)));
 144                 if (NULL == proct->stdinev) {
 145                     continue;
 146                 }
 147                 
 148 
 149 
 150 
 151                 if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&target, stream, data, numbytes, proct->stdinev->wev)) {
 152                     
 153 
 154 
 155                     if (!mca_iof_orted_component.xoff) {
 156                         mca_iof_orted_component.xoff = true;
 157                         orte_iof_orted_send_xonxoff(ORTE_IOF_XOFF);
 158                     }
 159                 }
 160             }
 161         }
 162     }
 163 }