root/opal/mca/pmix/pmix4x/pmix/src/common/pmix_log.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. opcbfunc
  2. log_cbfunc
  3. PMIx_Log
  4. localcbfunc
  5. PMIx_Log_nb

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
   4  * Copyright (c) 2016      Mellanox Technologies, Inc.
   5  *                         All rights reserved.
   6  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
   7  * Copyright (c) 2019      Research Organization for Information Science
   8  *                         and Technology (RIST).  All rights reserved.
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 #include <src/include/pmix_config.h>
  16 
  17 #include <src/include/pmix_stdint.h>
  18 #include <src/include/pmix_socket_errno.h>
  19 
  20 #include <pmix.h>
  21 #include <pmix_common.h>
  22 #include <pmix_server.h>
  23 #include <pmix_rename.h>
  24 
  25 #include "src/threads/threads.h"
  26 #include "src/util/argv.h"
  27 #include "src/util/error.h"
  28 #include "src/util/name_fns.h"
  29 #include "src/util/output.h"
  30 #include "src/mca/bfrops/bfrops.h"
  31 #include "src/mca/plog/base/base.h"
  32 
  33 #include "src/client/pmix_client_ops.h"
  34 #include "src/server/pmix_server_ops.h"
  35 #include "src/include/pmix_globals.h"
  36 
  37 static void opcbfunc(pmix_status_t status, void *cbdata)
  38 {
  39     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
  40     cb->status = status;
  41     PMIX_WAKEUP_THREAD(&cb->lock);
  42 }
  43 
  44 static void log_cbfunc(struct pmix_peer_t *peer,
  45                        pmix_ptl_hdr_t *hdr,
  46                        pmix_buffer_t *buf, void *cbdata)
  47 {
  48     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
  49     int32_t m;
  50     pmix_status_t rc, status;
  51 
  52     /* unpack the return status */
  53     m=1;
  54     PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &m, PMIX_STATUS);
  55     if (PMIX_SUCCESS != rc) {
  56         status = rc;
  57     }
  58 
  59     if (NULL != cd->cbfunc.opcbfn) {
  60         cd->cbfunc.opcbfn(status, cd->cbdata);
  61     }
  62     PMIX_RELEASE(cd);
  63 }
  64 
  65 PMIX_EXPORT pmix_status_t PMIx_Log(const pmix_info_t data[], size_t ndata,
  66                                    const pmix_info_t directives[], size_t ndirs)
  67 {
  68     pmix_cb_t cb;
  69     pmix_status_t rc;
  70 
  71     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
  72 
  73     if (pmix_globals.init_cntr <= 0) {
  74         PMIX_RELEASE_THREAD(&pmix_global_lock);
  75         return PMIX_ERR_INIT;
  76     }
  77     PMIX_RELEASE_THREAD(&pmix_global_lock);
  78 
  79     pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
  80                         "%s pmix:log", PMIX_NAME_PRINT(&pmix_globals.myid));
  81 
  82     /* create a callback object as we need to pass it to the
  83      * recv routine so we know which callback to use when
  84      * the return message is recvd */
  85     PMIX_CONSTRUCT(&cb, pmix_cb_t);
  86     if (PMIX_SUCCESS != (rc = PMIx_Log_nb(data, ndata, directives,
  87                                           ndirs, opcbfunc, &cb))) {
  88         PMIX_DESTRUCT(&cb);
  89         return rc;
  90     }
  91 
  92     /* wait for the operation to complete */
  93     PMIX_WAIT_THREAD(&cb.lock);
  94     rc = cb.status;
  95     PMIX_DESTRUCT(&cb);
  96 
  97     pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
  98                         "pmix:log completed");
  99 
 100     return rc;
 101 }
 102 
 103 static void localcbfunc(pmix_status_t status, void *cbdata)
 104 {
 105     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
 106 
 107     PMIX_INFO_FREE(cd->directives, cd->ndirs);
 108     if (NULL != cd->cbfunc.opcbfn) {
 109         cd->cbfunc.opcbfn(status, cd->cbdata);
 110     }
 111     PMIX_RELEASE(cd);
 112 }
 113 
 114 PMIX_EXPORT pmix_status_t PMIx_Log_nb(const pmix_info_t data[], size_t ndata,
 115                                       const pmix_info_t directives[], size_t ndirs,
 116                                       pmix_op_cbfunc_t cbfunc, void *cbdata)
 117 
 118 {
 119     pmix_shift_caddy_t *cd;
 120     pmix_cmd_t cmd = PMIX_LOG_CMD;
 121     pmix_buffer_t *msg;
 122     pmix_status_t rc;
 123     size_t n;
 124     time_t timestamp = 0;
 125     pmix_proc_t *source = NULL;
 126 
 127     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 128 
 129     pmix_output_verbose(2, pmix_globals.debug_output,
 130                         "pmix:log non-blocking");
 131 
 132     if (pmix_globals.init_cntr <= 0) {
 133         PMIX_RELEASE_THREAD(&pmix_global_lock);
 134         return PMIX_ERR_INIT;
 135     }
 136 
 137     if (0 == ndata || NULL == data) {
 138         PMIX_RELEASE_THREAD(&pmix_global_lock);
 139         return PMIX_ERR_BAD_PARAM;
 140     }
 141 
 142     /* check the directives - if they requested a timestamp, then
 143      * get the time, also look for a source */
 144     if (NULL != directives) {
 145         for (n=0; n < ndirs; n++) {
 146             if (0 == strncmp(directives[n].key, PMIX_LOG_GENERATE_TIMESTAMP, PMIX_MAX_KEYLEN)) {
 147                 if (PMIX_INFO_TRUE(&directives[n])) {
 148                     /* pickup the timestamp */
 149                     timestamp = time(NULL);
 150                 }
 151             } else if (0 == strncmp(directives[n].key, PMIX_LOG_SOURCE, PMIX_MAX_KEYLEN)) {
 152                 source = directives[n].value.data.proc;
 153             }
 154         }
 155     }
 156 
 157     /* if we are a client or tool, we never do this ourselves - we
 158      * always pass this request to our server for execution */
 159     if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
 160         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 161         /* if we aren't connected, don't attempt to send */
 162         if (!pmix_globals.connected) {
 163             PMIX_RELEASE_THREAD(&pmix_global_lock);
 164             return PMIX_ERR_UNREACH;
 165         }
 166         PMIX_RELEASE_THREAD(&pmix_global_lock);
 167 
 168         /* if we are not a server, then relay this request to the server */
 169         cd = PMIX_NEW(pmix_shift_caddy_t);
 170         cd->cbfunc.opcbfn = cbfunc;
 171         cd->cbdata = cbdata;
 172         msg = PMIX_NEW(pmix_buffer_t);
 173         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 174                          msg, &cmd, 1, PMIX_COMMAND);
 175         if (PMIX_SUCCESS != rc) {
 176             PMIX_ERROR_LOG(rc);
 177             PMIX_RELEASE(msg);
 178             PMIX_RELEASE(cd);
 179             return rc;
 180         }
 181         /* provide the timestamp - zero will indicate
 182          * that it wasn't taken */
 183         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 184                          msg, &timestamp, 1, PMIX_TIME);
 185         if (PMIX_SUCCESS != rc) {
 186             PMIX_ERROR_LOG(rc);
 187             PMIX_RELEASE(msg);
 188             PMIX_RELEASE(cd);
 189             return rc;
 190         }
 191         /* pack the number of data entries */
 192         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 193                          msg, &ndata, 1, PMIX_SIZE);
 194         if (PMIX_SUCCESS != rc) {
 195             PMIX_ERROR_LOG(rc);
 196             PMIX_RELEASE(msg);
 197             PMIX_RELEASE(cd);
 198             return rc;
 199         }
 200         if (0 < ndata) {
 201             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 202                              msg, data, ndata, PMIX_INFO);
 203             if (PMIX_SUCCESS != rc) {
 204                 PMIX_ERROR_LOG(rc);
 205                 PMIX_RELEASE(msg);
 206                 PMIX_RELEASE(cd);
 207                 return rc;
 208             }
 209         }
 210         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 211                          msg, &ndirs, 1, PMIX_SIZE);
 212         if (PMIX_SUCCESS != rc) {
 213             PMIX_ERROR_LOG(rc);
 214             PMIX_RELEASE(msg);
 215             PMIX_RELEASE(cd);
 216             return rc;
 217         }
 218         if (0 < ndirs) {
 219             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 220                              msg, directives, ndirs, PMIX_INFO);
 221             if (PMIX_SUCCESS != rc) {
 222                 PMIX_ERROR_LOG(rc);
 223                 PMIX_RELEASE(msg);
 224                 PMIX_RELEASE(cd);
 225                 return rc;
 226             }
 227         }
 228 
 229         pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
 230                             "pmix:log sending to server");
 231         PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 232                            msg, log_cbfunc, (void*)cd);
 233         if (PMIX_SUCCESS != rc) {
 234             PMIX_ERROR_LOG(rc);
 235             PMIX_RELEASE(cd);
 236         }
 237         return rc;
 238     }
 239     PMIX_RELEASE_THREAD(&pmix_global_lock);
 240 
 241     /* if no recorded source was found, then we must be it */
 242     if (NULL == source) {
 243         source = &pmix_globals.myid;
 244         cd = PMIX_NEW(pmix_shift_caddy_t);
 245         cd->cbfunc.opcbfn = cbfunc;
 246         cd->cbdata = cbdata;
 247         cd->ndirs = ndirs + 1;
 248         PMIX_INFO_CREATE(cd->directives, cd->ndirs);
 249         for (n=0; n < ndirs; n++) {
 250             PMIX_INFO_XFER(&cd->directives[n], (pmix_info_t*)&directives[n]);
 251         }
 252         PMIX_INFO_LOAD(&cd->directives[ndirs], PMIX_LOG_SOURCE, &source, PMIX_PROC);
 253         /* call down to process the request - the various components
 254          * will thread shift as required */
 255         rc = pmix_plog.log(source, data, ndata, cd->directives, cd->ndirs, localcbfunc, cd);
 256         if (PMIX_SUCCESS != rc) {
 257             PMIX_INFO_FREE(cd->directives, cd->ndirs);
 258             PMIX_RELEASE(cd);
 259         }
 260     } else if (0 == strncmp(source->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) &&
 261                source->rank == pmix_globals.myid.rank) {
 262         /* if I am the recorded source, then this is a re-submission of
 263          * something that got "upcalled" by a prior call. In this case,
 264          * we return a "not supported" error as clearly we couldn't
 265          * handle it, and neither could our host */
 266         rc = PMIX_ERR_NOT_SUPPORTED;
 267     } else {
 268         /* call down to process the request - the various components
 269          * will thread shift as required */
 270         rc = pmix_plog.log(source, data, ndata, directives, ndirs, cbfunc, cbdata);
 271     }
 272 
 273     return rc;
 274 }

/* [<][>][^][v][top][bottom][index][help] */