This source file includes following definitions.
- init
- tool_push
- send_cb
- tool_pull
- tool_close
- finalize
- tool_output
- tool_ft_event
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 #include "orte_config.h"
  24 #include "orte/constants.h"
  25 
  26 #include <errno.h>
  27 #ifdef HAVE_UNISTD_H
  28 #include <unistd.h>
  29 #endif  
  30 #include <string.h>
  31 
  32 #include "orte/mca/rml/rml.h"
  33 #include "orte/mca/rml/rml_types.h"
  34 #include "orte/mca/errmgr/errmgr.h"
  35 #include "orte/util/name_fns.h"
  36 #include "orte/runtime/orte_globals.h"
  37 #include "orte/runtime/orte_wait.h"
  38 
  39 #include "orte/mca/iof/iof.h"
  40 #include "orte/mca/iof/base/base.h"
  41 
  42 #include "iof_tool.h"
  43 
  44 static int init(void);
  45 
  46 static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
  47 
  48 static int tool_pull(const orte_process_name_t* src_name,
  49                       orte_iof_tag_t src_tag,
  50                       int fd);
  51 
  52 static int tool_close(const orte_process_name_t* peer,
  53                        orte_iof_tag_t source_tag);
  54 
  55 static int tool_output(const orte_process_name_t* peer,
  56                        orte_iof_tag_t source_tag,
  57                        const char *msg);
  58 
  59 static int finalize(void);
  60 
  61 static int tool_ft_event(int state);
  62 
  63 orte_iof_base_module_t orte_iof_tool_module = {
  64     .init = init,
  65     .push = tool_push,
  66     .pull = tool_pull,
  67     .close = tool_close,
  68     .output = tool_output,
  69     .finalize = finalize,
  70     .ft_event = tool_ft_event
  71 };
  72 
  73 
  74 static int init(void)
  75 {
  76     
  77 
  78     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
  79                             ORTE_RML_TAG_IOF_PROXY,
  80                             ORTE_RML_PERSISTENT,
  81                             orte_iof_tool_recv,
  82                             NULL);
  83 
  84     mca_iof_tool_component.closed = false;
  85 
  86     return ORTE_SUCCESS;
  87 }
  88 
  89 
  90 
  91 
  92 
  93 
  94 static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd)
  95 {
  96     
  97 
  98 
  99 
 100 
 101 
 102     return ORTE_ERR_NOT_SUPPORTED;
 103 }
 104 
 105 
 106 
 107 
 108 
 109 static void send_cb(int status, orte_process_name_t *peer,
 110                     opal_buffer_t *buf, orte_rml_tag_t tag,
 111                     void *cbdata)
 112 {
 113     
 114     OBJ_RELEASE(buf);
 115 }
 116 
 117 
 118 
 119 
 120 
 121 
 122 static int tool_pull(const orte_process_name_t* src_name,
 123                      orte_iof_tag_t src_tag,
 124                      int fd)
 125 {
 126     
 127 
 128 
 129 
 130 
 131 
 132 
 133 
 134 
 135     opal_buffer_t *buf;
 136     orte_iof_tag_t tag;
 137     orte_process_name_t hnp;
 138     int rc;
 139 
 140     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 141                          "%s pulling output for proc %s",
 142                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 143                          ORTE_NAME_PRINT(src_name)));
 144 
 145     buf = OBJ_NEW(opal_buffer_t);
 146 
 147     
 148     tag = src_tag | ORTE_IOF_PULL;
 149 
 150     
 151 
 152 
 153     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
 154         ORTE_ERROR_LOG(rc);
 155         OBJ_RELEASE(buf);
 156         return rc;
 157     }
 158     
 159     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) {
 160         ORTE_ERROR_LOG(rc);
 161         OBJ_RELEASE(buf);
 162         return rc;
 163     }
 164     
 165     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
 166         ORTE_ERROR_LOG(rc);
 167         OBJ_RELEASE(buf);
 168         return rc;
 169     }
 170 
 171     
 172     ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid);
 173     orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP,
 174                             send_cb, NULL);
 175 
 176     return ORTE_SUCCESS;
 177 }
 178 
 179 
 180 static int tool_close(const orte_process_name_t* src_name,
 181                       orte_iof_tag_t src_tag)
 182 {
 183     
 184 
 185 
 186 
 187     opal_buffer_t *buf;
 188     orte_iof_tag_t tag;
 189     orte_process_name_t hnp;
 190     int rc;
 191 
 192     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 193                          "%s closing output for proc %s",
 194                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 195                          ORTE_NAME_PRINT(src_name)));
 196 
 197     buf = OBJ_NEW(opal_buffer_t);
 198 
 199     
 200     tag = src_tag | ORTE_IOF_CLOSE;
 201 
 202     
 203 
 204 
 205     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
 206         ORTE_ERROR_LOG(rc);
 207         OBJ_RELEASE(buf);
 208         return rc;
 209     }
 210     
 211     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, src_name, 1, ORTE_NAME))) {
 212         ORTE_ERROR_LOG(rc);
 213         OBJ_RELEASE(buf);
 214         return rc;
 215     }
 216 
 217     
 218     mca_iof_tool_component.closed = false;
 219 
 220     
 221     ORTE_HNP_NAME_FROM_JOB(&hnp, src_name->jobid);
 222     orte_rml.send_buffer_nb(&hnp, buf, ORTE_RML_TAG_IOF_HNP,
 223                             send_cb, NULL);
 224 
 225     return ORTE_SUCCESS;
 226 }
 227 
 228 static int finalize(void)
 229 {
 230     opal_list_item_t* item;
 231     orte_iof_write_output_t *output;
 232     orte_iof_write_event_t *wev;
 233     int num_written;
 234     bool dump;
 235 
 236     
 237     wev = orte_iof_base.iof_write_stdout->wev;
 238     if (!opal_list_is_empty(&wev->outputs)) {
 239         dump = false;
 240         
 241         while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 242             output = (orte_iof_write_output_t*)item;
 243             if (!dump) {
 244                 num_written = write(wev->fd, output->data, output->numbytes);
 245                 if (num_written < output->numbytes) {
 246                     
 247                     dump = true;
 248                 }
 249             }
 250             OBJ_RELEASE(output);
 251         }
 252     }
 253     OBJ_RELEASE(orte_iof_base.iof_write_stdout);
 254     if (!orte_xml_output) {
 255         
 256         wev = orte_iof_base.iof_write_stderr->wev;
 257         if (!opal_list_is_empty(&wev->outputs)) {
 258             dump = false;
 259             
 260             while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
 261                 output = (orte_iof_write_output_t*)item;
 262                 if (!dump) {
 263                     num_written = write(wev->fd, output->data, output->numbytes);
 264                     if (num_written < output->numbytes) {
 265                         
 266                         dump = true;
 267                     }
 268                 }
 269                 OBJ_RELEASE(output);
 270             }
 271         }
 272         OBJ_RELEASE(orte_iof_base.iof_write_stderr);
 273     }
 274 
 275     
 276     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
 277 
 278     return ORTE_SUCCESS;
 279 }
 280 
 281 static int tool_output(const orte_process_name_t* peer,
 282                        orte_iof_tag_t source_tag,
 283                        const char *msg)
 284 {
 285     
 286     if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
 287         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
 288     } else {
 289         orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
 290     }
 291 
 292     return ORTE_SUCCESS;
 293 }
 294 
 295 
 296 
 297 
 298 
 299 static int tool_ft_event(int state)
 300 {
 301     return ORTE_ERR_NOT_IMPLEMENTED;
 302 }