root/orte/mca/iof/hnp/iof_hnp_receive.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_iof_hnp_recv

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2011-2013 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
  16  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
  17  * $COPYRIGHT$
  18  *
  19  * Additional copyrights may follow
  20  *
  21  * $HEADER$
  22  */
  23 
  24 #include "orte_config.h"
  25 #include "orte/constants.h"
  26 
  27 #include <errno.h>
  28 #ifdef HAVE_UNISTD_H
  29 #include <unistd.h>
  30 #endif  /* HAVE_UNISTD_H */
  31 #include <string.h>
  32 #ifdef HAVE_FCNTL_H
  33 #include <fcntl.h>
  34 #else
  35 #ifdef HAVE_SYS_FCNTL_H
  36 #include <sys/fcntl.h>
  37 #endif
  38 #endif
  39 
  40 #include "opal/dss/dss.h"
  41 #include "opal/mca/pmix/pmix.h"
  42 
  43 #include "orte/mca/rml/rml.h"
  44 #include "orte/mca/errmgr/errmgr.h"
  45 #include "orte/util/name_fns.h"
  46 #include "orte/util/threads.h"
  47 #include "orte/runtime/orte_globals.h"
  48 
  49 #include "orte/mca/iof/iof.h"
  50 #include "orte/mca/iof/base/base.h"
  51 
  52 #include "iof_hnp.h"
  53 
  54 
  55 void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
  56                        opal_buffer_t* buffer, orte_rml_tag_t tag,
  57                        void* cbdata)
  58 {
  59     orte_process_name_t origin, requestor;
  60     unsigned char data[ORTE_IOF_BASE_MSG_MAX];
  61     orte_iof_tag_t stream;
  62     int32_t count, numbytes;
  63     orte_iof_sink_t *sink, *next;
  64     int rc;
  65     bool exclusive;
  66     orte_iof_proc_t *proct;
  67     orte_ns_cmp_bitmask_t mask=ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
  68 
  69     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
  70                          "%s received IOF from proc %s",
  71                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  72                          ORTE_NAME_PRINT(sender)));
  73 
  74     /* unpack the stream first as this may be flow control info */
  75     count = 1;
  76     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
  77         ORTE_ERROR_LOG(rc);
  78         goto CLEAN_RETURN;
  79     }
  80 
  81     if (ORTE_IOF_XON & stream) {
  82         /* re-start the stdin read event */
  83         if (NULL != mca_iof_hnp_component.stdinev &&
  84             !orte_job_term_ordered &&
  85             !mca_iof_hnp_component.stdinev->active) {
  86             ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
  87         }
  88         goto CLEAN_RETURN;
  89     } else if (ORTE_IOF_XOFF & stream) {
  90         /* stop the stdin read event */
  91         if (NULL != mca_iof_hnp_component.stdinev &&
  92             !mca_iof_hnp_component.stdinev->active) {
  93             opal_event_del(mca_iof_hnp_component.stdinev->ev);
  94             mca_iof_hnp_component.stdinev->active = false;
  95         }
  96         goto CLEAN_RETURN;
  97     }
  98 
  99     /* get name of the process whose io we are discussing */
 100     count = 1;
 101     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &origin, &count, ORTE_NAME))) {
 102         ORTE_ERROR_LOG(rc);
 103         goto CLEAN_RETURN;
 104     }
 105 
 106     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 107                          "%s received IOF cmd for source %s",
 108                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 109                          ORTE_NAME_PRINT(&origin)));
 110 
 111     /* check to see if a tool has requested something */
 112     if (ORTE_IOF_PULL & stream) {
 113         /* get name of the process wishing to be the sink */
 114         count = 1;
 115         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, ORTE_NAME))) {
 116             ORTE_ERROR_LOG(rc);
 117             goto CLEAN_RETURN;
 118         }
 119 
 120         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 121                              "%s received pull cmd from remote tool %s for proc %s",
 122                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 123                              ORTE_NAME_PRINT(&requestor),
 124                              ORTE_NAME_PRINT(&origin)));
 125 
 126         if (ORTE_IOF_EXCLUSIVE & stream) {
 127             exclusive = true;
 128         } else {
 129             exclusive = false;
 130         }
 131         /* do we already have this process in our list? */
 132         OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 133             if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &origin)) {
 134                 /* found it */
 135                 goto PROCESS;
 136             }
 137         }
 138         /* if we get here, then we don't yet have this proc in our list */
 139         proct = OBJ_NEW(orte_iof_proc_t);
 140         proct->name.jobid = origin.jobid;
 141         proct->name.vpid = origin.vpid;
 142         opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
 143 
 144       PROCESS:
 145         /* a tool is requesting that we send it a copy of the specified stream(s)
 146          * from the specified process(es), so create a sink for it
 147          */
 148         if (NULL == proct->subscribers) {
 149             proct->subscribers = OBJ_NEW(opal_list_t);
 150         }
 151         if (ORTE_IOF_STDOUT & stream) {
 152             ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDOUT, NULL);
 153             sink->daemon.jobid = requestor.jobid;
 154             sink->daemon.vpid = requestor.vpid;
 155             sink->exclusive = exclusive;
 156             opal_list_append(proct->subscribers, &sink->super);
 157         }
 158         if (ORTE_IOF_STDERR & stream) {
 159             ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDERR, NULL);
 160             sink->daemon.jobid = requestor.jobid;
 161             sink->daemon.vpid = requestor.vpid;
 162             sink->exclusive = exclusive;
 163             opal_list_append(proct->subscribers, &sink->super);
 164         }
 165         if (ORTE_IOF_STDDIAG & stream) {
 166             ORTE_IOF_SINK_DEFINE(&sink, &origin, -1, ORTE_IOF_STDDIAG, NULL);
 167             sink->daemon.jobid = requestor.jobid;
 168             sink->daemon.vpid = requestor.vpid;
 169             sink->exclusive = exclusive;
 170             opal_list_append(proct->subscribers, &sink->super);
 171         }
 172         goto CLEAN_RETURN;
 173     }
 174 
 175     if (ORTE_IOF_CLOSE & stream) {
 176         OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 177                              "%s received close cmd from remote tool %s for proc %s",
 178                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 179                              ORTE_NAME_PRINT(sender),
 180                              ORTE_NAME_PRINT(&origin)));
 181         /* a tool is requesting that we no longer forward a copy of the
 182          * specified stream(s) from the specified process(es) - remove the sink
 183          */
 184         OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 185             if (OPAL_EQUAL != orte_util_compare_name_fields(mask, &proct->name, &origin)) {
 186                 continue;
 187             }
 188             OPAL_LIST_FOREACH_SAFE(sink, next, proct->subscribers, orte_iof_sink_t) {
 189                  /* if the target isn't set, then this sink is for another purpose - ignore it */
 190                 if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
 191                     continue;
 192                 }
 193                 /* if this sink is the designated one, then remove it from list */
 194                 if ((stream & sink->tag) &&
 195                     sink->name.jobid == origin.jobid &&
 196                     (ORTE_VPID_WILDCARD == sink->name.vpid ||
 197                      ORTE_VPID_WILDCARD == origin.vpid ||
 198                      sink->name.vpid == origin.vpid)) {
 199                     /* send an ack message to the requestor - this ensures that the RML has
 200                      * completed sending anything to that requestor before it exits
 201                      */
 202                     orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, ORTE_IOF_CLOSE, NULL, 0);
 203                     opal_list_remove_item(proct->subscribers, &sink->super);
 204                     OBJ_RELEASE(sink);
 205                 }
 206             }
 207         }
 208         goto CLEAN_RETURN;
 209     }
 210 
 211     /* this must have come from a daemon forwarding output - unpack the data */
 212     numbytes=ORTE_IOF_BASE_MSG_MAX;
 213     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, data, &numbytes, OPAL_BYTE))) {
 214         ORTE_ERROR_LOG(rc);
 215         goto CLEAN_RETURN;
 216     }
 217     /* numbytes will contain the actual #bytes that were sent */
 218 
 219     OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
 220                          "%s unpacked %d bytes from remote proc %s",
 221                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
 222                          ORTE_NAME_PRINT(&origin)));
 223 
 224     /* do we already have this process in our list? */
 225     OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
 226         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, &origin)) {
 227             /* found it */
 228             goto NSTEP;
 229         }
 230     }
 231     /* if we get here, then we don't yet have this proc in our list */
 232     proct = OBJ_NEW(orte_iof_proc_t);
 233     proct->name.jobid = origin.jobid;
 234     proct->name.vpid = origin.vpid;
 235     opal_list_append(&mca_iof_hnp_component.procs, &proct->super);
 236 
 237   NSTEP:
 238     /* cycle through the endpoints to see if someone else wants a copy */
 239     exclusive = false;
 240     if (NULL != proct->subscribers) {
 241         OPAL_LIST_FOREACH(sink, proct->subscribers, orte_iof_sink_t) {
 242             /* if the target isn't set, then this sink is for another purpose - ignore it */
 243             if (ORTE_JOBID_INVALID == sink->daemon.jobid) {
 244                 continue;
 245             }
 246             if ((stream & sink->tag) &&
 247                 sink->name.jobid == origin.jobid &&
 248                 (ORTE_VPID_WILDCARD == sink->name.vpid ||
 249                  ORTE_VPID_WILDCARD == origin.vpid ||
 250                  sink->name.vpid == origin.vpid)) {
 251                 /* send the data to the tool */
 252                 if (NULL != opal_pmix.server_iof_push) {
 253                     /* don't pass along zero byte blobs */
 254                     if (0 < numbytes) {
 255                         rc = opal_pmix.server_iof_push(&proct->name, stream, data, numbytes);
 256                         if (ORTE_SUCCESS != rc) {
 257                             ORTE_ERROR_LOG(rc);
 258                         }
 259                     }
 260                 } else {
 261                     orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &origin, stream, data, numbytes);
 262                 }
 263                 if (sink->exclusive) {
 264                     exclusive = true;
 265                 }
 266             }
 267         }
 268     }
 269     /* if the user doesn't want a copy written to the screen, then we are done */
 270     if (!proct->copy) {
 271         return;
 272     }
 273 
 274     /* output this to our local output unless one of the sinks was exclusive */
 275     if (!exclusive) {
 276         if (ORTE_IOF_STDOUT & stream || orte_xml_output) {
 277             orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stdout->wev);
 278         } else {
 279             orte_iof_base_write_output(&origin, stream, data, numbytes, orte_iof_base.iof_write_stderr->wev);
 280         }
 281     }
 282 
 283  CLEAN_RETURN:
 284     return;
 285 }

/* [<][>][^][v][top][bottom][index][help] */