root/orte/mca/rml/base/rml_base_msg_handlers.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_rml_base_post_recv
  2. msg_match_recv
  3. orte_rml_base_process_msg

   1 /* -*- C -*-
   2  *
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2005 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2007-2013 Los Alamos National Security, LLC.  All rights
  14  *                         reserved.
  15  * Copyright (c) 2015-2019 Intel, Inc.  All rights reserved.
  16  * Copyright (c) 2017      Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 /** @file:
  25  *
  26  */
  27 
  28 /*
  29  * includes
  30  */
  31 #include "orte_config.h"
  32 
  33 #include <string.h>
  34 
  35 #include "orte/constants.h"
  36 #include "orte/types.h"
  37 
  38 #include "opal/dss/dss.h"
  39 #include "opal/util/output.h"
  40 #include "opal/util/timings.h"
  41 #include "opal/class/opal_list.h"
  42 
  43 #include "orte/mca/errmgr/errmgr.h"
  44 #include "orte/runtime/orte_globals.h"
  45 #include "orte/runtime/orte_wait.h"
  46 #include "orte/util/name_fns.h"
  47 #include "orte/util/nidmap.h"
  48 #include "orte/util/threads.h"
  49 
  50 #include "orte/mca/rml/rml.h"
  51 #include "orte/mca/rml/base/base.h"
  52 #include "orte/mca/rml/base/rml_contact.h"
  53 
  54 
  55 static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all);
  56 
  57 
  58 void orte_rml_base_post_recv(int sd, short args, void *cbdata)
  59 {
  60     orte_rml_recv_request_t *req = (orte_rml_recv_request_t*)cbdata;
  61     orte_rml_posted_recv_t *post, *recv;
  62     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
  63 
  64     ORTE_ACQUIRE_OBJECT(req);
  65 
  66     opal_output_verbose(5, orte_rml_base_framework.framework_output,
  67                         "%s posting recv",
  68                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
  69 
  70     if (NULL == req) {
  71         /* this can only happen if something is really wrong, but
  72          * someone managed to get here in a bizarre test */
  73         opal_output(0, "%s CANNOT POST NULL RML RECV REQUEST",
  74                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
  75         return;
  76     }
  77     post = req->post;
  78 
  79     /* if the request is to cancel a recv, then find the recv
  80      * and remove it from our list
  81      */
  82     if (req->cancel) {
  83         OPAL_LIST_FOREACH(recv, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
  84             if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &post->peer, &recv->peer) &&
  85                 post->tag == recv->tag) {
  86                 opal_output_verbose(5, orte_rml_base_framework.framework_output,
  87                                     "%s canceling recv %d for peer %s",
  88                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
  89                                     post->tag, ORTE_NAME_PRINT(&recv->peer));
  90                 /* got a match - remove it */
  91                 opal_list_remove_item(&orte_rml_base.posted_recvs, &recv->super);
  92                 OBJ_RELEASE(recv);
  93                 break;
  94             }
  95         }
  96         OBJ_RELEASE(req);
  97         return;
  98     }
  99 
 100     /* bozo check - cannot have two receives for the same peer/tag combination */
 101     OPAL_LIST_FOREACH(recv, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
 102         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &post->peer, &recv->peer) &&
 103             post->tag == recv->tag) {
 104             opal_output(0, "%s TWO RECEIVES WITH SAME PEER %s AND TAG %d - ABORTING",
 105                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 106                         ORTE_NAME_PRINT(&post->peer), post->tag);
 107             abort();
 108         }
 109     }
 110 
 111     opal_output_verbose(5, orte_rml_base_framework.framework_output,
 112                         "%s posting %s recv on tag %d for peer %s",
 113                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 114                         (post->persistent) ? "persistent" : "non-persistent",
 115                         post->tag, ORTE_NAME_PRINT(&post->peer));
 116     /* add it to the list of recvs */
 117     opal_list_append(&orte_rml_base.posted_recvs, &post->super);
 118     req->post = NULL;
 119     /* handle any messages that may have already arrived for this recv */
 120     msg_match_recv(post, post->persistent);
 121 
 122     /* cleanup */
 123     OBJ_RELEASE(req);
 124 }
 125 
 126 static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all)
 127 {
 128     opal_list_item_t *item, *next;
 129     orte_rml_recv_t *msg;
 130     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
 131 
 132     /* scan thru the list of unmatched recvd messages and
 133      * see if any matches this spec - if so, push the first
 134      * into the recvd msg queue and look no further
 135      */
 136     item = opal_list_get_first(&orte_rml_base.unmatched_msgs);
 137     while (item != opal_list_get_end(&orte_rml_base.unmatched_msgs)) {
 138         next = opal_list_get_next(item);
 139         msg = (orte_rml_recv_t*)item;
 140         opal_output_verbose(5, orte_rml_base_framework.framework_output,
 141                             "%s checking recv for %s against unmatched msg from %s",
 142                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 143                             ORTE_NAME_PRINT(&rcv->peer),
 144                             ORTE_NAME_PRINT(&msg->sender));
 145 
 146         /* since names could include wildcards, must use
 147          * the more generalized comparison function
 148          */
 149         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &rcv->peer) &&
 150             msg->tag == rcv->tag) {
 151             ORTE_RML_ACTIVATE_MESSAGE(msg);
 152             opal_list_remove_item(&orte_rml_base.unmatched_msgs, item);
 153             if (!get_all) {
 154                 break;
 155             }
 156         }
 157         item = next;
 158     }
 159 }
 160 
 161 void orte_rml_base_process_msg(int fd, short flags, void *cbdata)
 162 {
 163     orte_rml_recv_t *msg = (orte_rml_recv_t*)cbdata;
 164     orte_rml_posted_recv_t *post;
 165     orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
 166     opal_buffer_t buf;
 167 
 168     ORTE_ACQUIRE_OBJECT(msg);
 169 
 170     OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
 171                          "%s message received from %s for tag %d",
 172                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 173                          ORTE_NAME_PRINT(&msg->sender),
 174                          msg->tag));
 175 
 176     /* if this message is just to warmup the connection, then drop it */
 177     if (ORTE_RML_TAG_WARMUP_CONNECTION == msg->tag) {
 178         if (!orte_nidmap_communicated) {
 179             opal_buffer_t * buffer = OBJ_NEW(opal_buffer_t);
 180             int rc;
 181             if (NULL == buffer) {
 182                 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 183                 return;
 184             }
 185 
 186             if (ORTE_SUCCESS != (rc = orte_util_nidmap_create(orte_node_pool, buffer))) {
 187                 ORTE_ERROR_LOG(rc);
 188                 OBJ_RELEASE(buffer);
 189                 return;
 190             }
 191 
 192             if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&msg->sender, buffer,
 193                                                               ORTE_RML_TAG_NODE_REGEX_REPORT,
 194                                                               orte_rml_send_callback, NULL))) {
 195                 ORTE_ERROR_LOG(rc);
 196                 OBJ_RELEASE(buffer);
 197                 return;
 198             }
 199             OBJ_RELEASE(msg);
 200             return;
 201         }
 202     }
 203 
 204     /* see if we have a waiting recv for this message */
 205     OPAL_LIST_FOREACH(post, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
 206         /* since names could include wildcards, must use
 207          * the more generalized comparison function
 208          */
 209         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &post->peer) &&
 210             msg->tag == post->tag) {
 211             /* deliver the data to this location */
 212             if (post->buffer_data) {
 213                 /* deliver it in a buffer */
 214                 OBJ_CONSTRUCT(&buf, opal_buffer_t);
 215                 opal_dss.load(&buf, msg->iov.iov_base, msg->iov.iov_len);
 216                 /* xfer ownership of the malloc'd data to the buffer */
 217                 msg->iov.iov_base = NULL;
 218                 post->cbfunc.buffer(ORTE_SUCCESS, &msg->sender, &buf, msg->tag, post->cbdata);
 219                 /* the user must have unloaded the buffer if they wanted
 220                  * to retain ownership of it, so release whatever remains
 221                  */
 222                 OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
 223                                      "%s message received  bytes from %s for tag %d called callback",
 224                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 225                                      ORTE_NAME_PRINT(&msg->sender),
 226                                      msg->tag));
 227                 OBJ_DESTRUCT(&buf);
 228             } else {
 229                 /* deliver as an iovec */
 230                 post->cbfunc.iov(ORTE_SUCCESS, &msg->sender, &msg->iov, 1, msg->tag, post->cbdata);
 231                 /* the user should have shifted the data to
 232                  * a local variable and NULL'd the iov_base
 233                  * if they wanted ownership of the data
 234                  */
 235             }
 236             /* release the message */
 237             OBJ_RELEASE(msg);
 238             OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
 239                                  "%s message tag %d on released",
 240                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 241                                  post->tag));
 242             /* if the recv is non-persistent, remove it */
 243             if (!post->persistent) {
 244                 opal_list_remove_item(&orte_rml_base.posted_recvs, &post->super);
 245                 /*OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
 246                                      "%s non persistent recv %p remove success releasing now",
 247                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 248                                      post));*/
 249                 OBJ_RELEASE(post);
 250 
 251             }
 252             return;
 253         }
 254     }
 255     /* we get here if no matching recv was found - we then hold
 256      * the message until such a recv is issued
 257      */
 258      OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
 259                             "%s message received bytes from %s for tag %d Not Matched adding to unmatched msgs",
 260                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 261                             ORTE_NAME_PRINT(&msg->sender),
 262                             msg->tag));
 263      opal_list_append(&orte_rml_base.unmatched_msgs, &msg->super);
 264 }

/* [<][>][^][v][top][bottom][index][help] */