root/orte/orted/pmix/pmix_server_fence.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. relcb
  2. pmix_server_release
  3. pmix_server_fencenb_fn
  4. dmodex_req
  5. pmix_server_dmodex_req_fn

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009      Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2014      Mellanox Technologies, Inc.
  18  *                         All rights reserved.
  19  * Copyright (c) 2014-2017 Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * $COPYRIGHT$
  22  *
  23  * Additional copyrights may follow
  24  *
  25  * $HEADER$
  26  *
  27  */
  28 
  29 #include "orte_config.h"
  30 
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 
  35 #include "opal/util/output.h"
  36 #include "opal/dss/dss.h"
  37 
  38 #include "orte/mca/errmgr/errmgr.h"
  39 #include "orte/util/name_fns.h"
  40 #include "orte/util/show_help.h"
  41 #include "orte/util/threads.h"
  42 #include "orte/runtime/orte_globals.h"
  43 #include "orte/mca/grpcomm/grpcomm.h"
  44 #include "orte/mca/rml/rml.h"
  45 
  46 #include "pmix_server_internal.h"
  47 #include "pmix_server.h"
  48 
  49 static void relcb(void *cbdata)
  50 {
  51     uint8_t *data = (uint8_t*)cbdata;
  52 
  53     if (NULL != data) {
  54         free(data);
  55     }
  56 }
  57 static void pmix_server_release(int status, opal_buffer_t *buf, void *cbdata)
  58 {
  59     orte_pmix_mdx_caddy_t *cd=(orte_pmix_mdx_caddy_t*)cbdata;
  60     char *data = NULL;
  61     int32_t ndata = 0;
  62     int rc = OPAL_SUCCESS;
  63 
  64     ORTE_ACQUIRE_OBJECT(cd);
  65 
  66     /* unload the buffer */
  67     if (NULL != buf) {
  68         rc = opal_dss.unload(buf, (void**)&data, &ndata);
  69     }
  70     if (OPAL_SUCCESS == rc) {
  71         rc = status;
  72     }
  73     cd->cbfunc(rc, data, ndata, cd->cbdata, relcb, data);
  74     OBJ_RELEASE(cd);
  75 }
  76 
  77 /* this function is called when all the local participants have
  78  * called fence - thus, the collective is already locally
  79  * complete at this point. We therefore just need to create the
  80  * signature and pass the collective into grpcomm */
  81 int pmix_server_fencenb_fn(opal_list_t *procs, opal_list_t *info,
  82                            char *data, size_t ndata,
  83                            opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
  84 {
  85     orte_pmix_mdx_caddy_t *cd=NULL;
  86     int rc;
  87     opal_namelist_t *nm;
  88     size_t i;
  89     opal_buffer_t *buf=NULL;
  90 
  91     cd = OBJ_NEW(orte_pmix_mdx_caddy_t);
  92     cd->cbfunc = cbfunc;
  93     cd->cbdata = cbdata;
  94 
  95    /* compute the signature of this collective */
  96     if (NULL != procs) {
  97         cd->sig = OBJ_NEW(orte_grpcomm_signature_t);
  98         cd->sig->sz = opal_list_get_size(procs);
  99         cd->sig->signature = (orte_process_name_t*)malloc(cd->sig->sz * sizeof(orte_process_name_t));
 100         memset(cd->sig->signature, 0, cd->sig->sz * sizeof(orte_process_name_t));
 101         i=0;
 102         OPAL_LIST_FOREACH(nm, procs, opal_namelist_t) {
 103             cd->sig->signature[i].jobid = nm->name.jobid;
 104             cd->sig->signature[i].vpid = nm->name.vpid;
 105             ++i;
 106         }
 107     }
 108     buf = OBJ_NEW(opal_buffer_t);
 109 
 110     if (NULL != data) {
 111         opal_dss.load(buf, data, ndata);
 112     }
 113 
 114     if (4 < opal_output_get_verbosity(orte_pmix_server_globals.output)) {
 115         char *tmp=NULL;
 116         (void)opal_dss.print(&tmp, NULL, cd->sig, ORTE_SIGNATURE);
 117         free(tmp);
 118     }
 119 
 120     /* pass it to the global collective algorithm */
 121     /* pass along any data that was collected locally */
 122     if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(cd->sig, buf, pmix_server_release, cd))) {
 123         ORTE_ERROR_LOG(rc);
 124         OBJ_RELEASE(buf);
 125         return rc;
 126     }
 127     OBJ_RELEASE(buf);
 128     return ORTE_SUCCESS;
 129 }
 130 
 131 static void dmodex_req(int sd, short args, void *cbdata)
 132 {
 133     pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
 134     pmix_server_req_t *r;
 135     orte_job_t *jdata;
 136     orte_proc_t *proct, *dmn;
 137     int rc, rnum;
 138     opal_buffer_t *buf;
 139     uint8_t *data=NULL;
 140     int32_t sz=0;
 141 
 142     ORTE_ACQUIRE_OBJECT(rq);
 143 
 144     /* a race condition exists here because of the thread-shift - it is
 145      * possible that data for the specified proc arrived while we were
 146      * waiting to be serviced. In that case, the tracker that would have
 147      * indicated the data was already requested will have been removed,
 148      * and we would therefore think that we had to request it again.
 149      * So do a quick check to ensure we don't already have the desired
 150      * data */
 151     OPAL_MODEX_RECV_STRING(rc, "modex", &req->target, &data, &sz);
 152     if (OPAL_SUCCESS == rc) {
 153         req->mdxcbfunc(rc, (char*)data, sz, req->cbdata, relcb, data);
 154         OBJ_RELEASE(req);
 155         return;
 156     }
 157 
 158     /* adjust the timeout to reflect the size of the job as it can take some
 159      * amount of time to start the job */
 160     ORTE_ADJUST_TIMEOUT(req);
 161 
 162     /* has anyone already requested data for this target? If so,
 163      * then the data is already on its way */
 164     for (rnum=0; rnum < orte_pmix_server_globals.reqs.num_rooms; rnum++) {
 165         opal_hotel_knock(&orte_pmix_server_globals.reqs, rnum, (void**)&r);
 166         if (NULL == r) {
 167             continue;
 168         }
 169         if (r->target.jobid == req->target.jobid &&
 170             r->target.vpid == req->target.vpid) {
 171             /* save the request in the hotel until the
 172              * data is returned */
 173             if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 174                 orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
 175                 /* can't just return as that would cause the requestor
 176                  * to hang, so instead execute the callback */
 177                 goto callback;
 178             }
 179             return;
 180         }
 181     }
 182 
 183     /* lookup who is hosting this proc */
 184     if (NULL == (jdata = orte_get_job_data_object(req->target.jobid))) {
 185         /* if we don't know the job, then it could be a race
 186          * condition where we are being asked about a process
 187          * that we don't know about yet. In this case, just
 188          * record the request and we will process it later */
 189         if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 190             orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
 191             /* can't just return as that would cause the requestor
 192              * to hang, so instead execute the callback */
 193             goto callback;
 194         }
 195         return;
 196     }
 197     /* if this is a request for rank=WILDCARD, then they want the job-level data
 198      * for this job. It was probably not stored locally because we aren't hosting
 199      * any local procs. There is no need to request the data as we already have
 200      * it - so just register the nspace so the local PMIx server gets it */
 201     if (ORTE_VPID_WILDCARD == req->target.vpid) {
 202         rc = orte_pmix_server_register_nspace(jdata, true);
 203         if (ORTE_SUCCESS != rc) {
 204             goto callback;
 205         }
 206         /* let the server know that the data is now available */
 207         if (NULL != req->mdxcbfunc) {
 208             req->mdxcbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
 209         }
 210         OBJ_RELEASE(req);
 211         return;
 212     }
 213 
 214     /* if they are asking about a specific proc, then fetch it */
 215     if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, req->target.vpid))) {
 216         /* if we find the job, but not the process, then that is an error */
 217         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 218         rc = ORTE_ERR_NOT_FOUND;
 219         goto callback;
 220     }
 221 
 222     if (NULL == (dmn = proct->node->daemon)) {
 223         /* we don't know where this proc is located - since we already
 224          * found the job, and therefore know about its locations, this
 225          * must be an error */
 226         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 227         rc = ORTE_ERR_NOT_FOUND;
 228         goto callback;
 229     }
 230 
 231     /* point the request to the daemon that is hosting the
 232      * target process */
 233     req->proxy.vpid = dmn->name.vpid;
 234 
 235     /* track the request so we know the function and cbdata
 236      * to callback upon completion */
 237     if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 238         orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
 239         goto callback;
 240     }
 241 
 242     /* if we are the host daemon, then this is a local request, so
 243      * just wait for the data to come in */
 244     if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid &&
 245         ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
 246         return;
 247     }
 248 
 249     /* construct a request message */
 250     buf = OBJ_NEW(opal_buffer_t);
 251     if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, OPAL_NAME))) {
 252         ORTE_ERROR_LOG(rc);
 253         opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 254         OBJ_RELEASE(buf);
 255         goto callback;
 256     }
 257     /* include the request room number for quick retrieval */
 258     if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->room_num, 1, OPAL_INT))) {
 259         ORTE_ERROR_LOG(rc);
 260         opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 261         OBJ_RELEASE(buf);
 262         goto callback;
 263     }
 264 
 265     /* send it to the host daemon */
 266     if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&dmn->name, buf, ORTE_RML_TAG_DIRECT_MODEX,
 267                                                       orte_rml_send_callback, NULL))) {
 268         ORTE_ERROR_LOG(rc);
 269         opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 270         OBJ_RELEASE(buf);
 271         goto callback;
 272     }
 273     return;
 274 
 275   callback:
 276     /* this section gets executed solely upon an error */
 277     if (NULL != req->mdxcbfunc) {
 278         req->mdxcbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
 279     }
 280     OBJ_RELEASE(req);
 281 }
 282 
 283 /* the local PMIx embedded server will use this function to call
 284  * us and request that we obtain data from a remote daemon */
 285 int pmix_server_dmodex_req_fn(opal_process_name_t *proc, opal_list_t *info,
 286                               opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
 287 {
 288     /*  we have to shift threads to the ORTE thread, so
 289      * create a request and push it into that thread */
 290     ORTE_DMX_REQ(*proc, dmodex_req, cbfunc, cbdata);
 291     return OPAL_ERR_IN_PROCESS;
 292 }

/* [<][>][^][v][top][bottom][index][help] */