root/opal/mca/pmix/pmix4x/pmix/test/server_callbacks.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. tcon
  2. tdes
  3. connected
  4. finalized
  5. abort_fn
  6. fencenb_fn
  7. dmodex_fn
  8. publish_fn
  9. lookup_fn
  10. unpublish_fn
  11. _release_cb
  12. release_cb
  13. spawn_fn
  14. connect_fn
  15. disconnect_fn
  16. regevents_fn
  17. deregevents_fn

   1 /*
   2  * Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
   3  * Copyright (c) 2015      Research Organization for Information Science
   4  *                         and Technology (RIST). All rights reserved.
   5  * Copyright (c) 2015-2018 Mellanox Technologies, Inc.
   6  *                         All rights reserved.
   7  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
   8  * $COPYRIGHT$
   9  *
  10  * Additional copyrights may follow
  11  *
  12  * $HEADER$
  13  *
  14  */
  15 
  16 #include <pthread.h>
  17 #include <stdio.h>
  18 #include "server_callbacks.h"
  19 #include "src/util/argv.h"
  20 #include "test_server.h"
  21 
  22 extern bool spawn_wait;
  23 
  24 pmix_server_module_t mymodule = {
  25     .client_connected = connected,
  26     .client_finalized = finalized,
  27     .abort = abort_fn,
  28     .fence_nb = fencenb_fn,
  29     .direct_modex = dmodex_fn,
  30     .publish = publish_fn,
  31     .lookup = lookup_fn,
  32     .unpublish = unpublish_fn,
  33     .spawn = spawn_fn,
  34     .connect = connect_fn,
  35     .disconnect = disconnect_fn,
  36     .register_events = regevents_fn,
  37     .deregister_events = deregevents_fn
  38 };
  39 
  40 typedef struct {
  41     pmix_list_item_t super;
  42     pmix_info_t data;
  43     char *namespace_published;
  44     int rank_published;
  45 } pmix_test_info_t;
  46 
  47 static void tcon(pmix_test_info_t *p)
  48 {
  49     PMIX_INFO_CONSTRUCT(&p->data);
  50 }
  51 
  52 static void tdes(pmix_test_info_t *p)
  53 {
  54     PMIX_INFO_DESTRUCT(&p->data);
  55 }
  56 
  57 PMIX_CLASS_INSTANCE(pmix_test_info_t,
  58                           pmix_list_item_t,
  59                           tcon, tdes);
  60 
  61 pmix_list_t *pmix_test_published_list = NULL;
  62 
  63 static int finalized_count = 0;
  64 
  65 pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
  66                         pmix_op_cbfunc_t cbfunc, void *cbdata)
  67 {
  68     if (NULL != cbfunc) {
  69         cbfunc(PMIX_SUCCESS, cbdata);
  70     }
  71     return PMIX_SUCCESS;
  72 }
  73 
  74 pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
  75               pmix_op_cbfunc_t cbfunc, void *cbdata)
  76 {
  77     cli_info_t *cli = NULL;
  78     int i;
  79     for (i = 0; i < cli_info_cnt; i++) {
  80         if((proc->rank == cli_info[i].rank) &&
  81                 (0 == strcmp(proc->nspace, cli_info[i].ns))){
  82             cli = &cli_info[i];
  83             break;
  84         }
  85     }
  86     if (NULL == cli) {
  87         TEST_ERROR(("cannot found rank %d", proc->rank));
  88         return PMIX_SUCCESS;
  89     }
  90     if( CLI_TERM <= cli->state ){
  91         TEST_ERROR(("double termination of rank %d", proc->rank));
  92         return PMIX_SUCCESS;
  93     }
  94     TEST_VERBOSE(("Rank %s:%d terminated", proc->nspace, proc->rank));
  95     cli_finalize(cli);
  96     finalized_count++;
  97     if (finalized_count == cli_info_cnt) {
  98         if (NULL != pmix_test_published_list) {
  99             PMIX_LIST_RELEASE(pmix_test_published_list);
 100         }
 101     }
 102     if (NULL != cbfunc) {
 103         cbfunc(PMIX_SUCCESS, cbdata);
 104     }
 105     return PMIX_SUCCESS;
 106 }
 107 
 108 pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
 109              int status, const char msg[],
 110              pmix_proc_t procs[], size_t nprocs,
 111              pmix_op_cbfunc_t cbfunc, void *cbdata)
 112 {
 113     if (NULL != cbfunc) {
 114         cbfunc(PMIX_SUCCESS, cbdata);
 115     }
 116     TEST_VERBOSE(("Abort is called with status = %d, msg = %s",
 117                   status, msg));
 118     test_abort = true;
 119     return PMIX_SUCCESS;
 120 }
 121 
 122 pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
 123                const pmix_info_t info[], size_t ninfo,
 124                char *data, size_t ndata,
 125                pmix_modex_cbfunc_t cbfunc, void *cbdata)
 126 {
 127     TEST_VERBOSE(("Getting data for %s:%d",
 128                   procs[0].nspace, procs[0].rank));
 129 
 130     if ((pmix_list_get_size(server_list) == 1) && (my_server_id == 0)) {
 131         if (NULL != cbfunc) {
 132             cbfunc(PMIX_SUCCESS, data, ndata, cbdata, NULL, NULL);
 133         }
 134         return PMIX_SUCCESS;
 135     }
 136     return server_fence_contrib(data, ndata, cbfunc, cbdata);
 137 }
 138 
 139 pmix_status_t dmodex_fn(const pmix_proc_t *proc,
 140               const pmix_info_t info[], size_t ninfo,
 141               pmix_modex_cbfunc_t cbfunc, void *cbdata)
 142 {
 143     TEST_VERBOSE(("Getting data for %s:%d", proc->nspace, proc->rank));
 144 
 145     /* return not_found fot single server mode */
 146     if ((pmix_list_get_size(server_list) == 1) && (my_server_id == 0)) {
 147         return PMIX_ERR_NOT_FOUND;
 148     }
 149     // TODO: add support tracker for dmodex requests
 150     return server_dmdx_get(proc->nspace, proc->rank, cbfunc, cbdata);
 151 }
 152 
 153 pmix_status_t publish_fn(const pmix_proc_t *proc,
 154                const pmix_info_t info[], size_t ninfo,
 155                pmix_op_cbfunc_t cbfunc, void *cbdata)
 156 {
 157     size_t i;
 158     int found;
 159     pmix_test_info_t *new_info, *old_info;
 160     if (NULL == pmix_test_published_list) {
 161         pmix_test_published_list = PMIX_NEW(pmix_list_t);
 162     }
 163     for (i = 0; i < ninfo; i++) {
 164         found = 0;
 165         PMIX_LIST_FOREACH(old_info, pmix_test_published_list, pmix_test_info_t) {
 166             if (!strcmp(old_info->data.key, info[i].key)) {
 167                 found = 1;
 168                 break;
 169             }
 170         }
 171         if (!found) {
 172             new_info = PMIX_NEW(pmix_test_info_t);
 173             strncpy(new_info->data.key, info[i].key, strlen(info[i].key)+1);
 174             pmix_value_xfer(&new_info->data.value, (pmix_value_t*)&info[i].value);
 175             new_info->namespace_published = strdup(proc->nspace);
 176             new_info->rank_published = proc->rank;
 177             pmix_list_append(pmix_test_published_list, &new_info->super);
 178         }
 179     }
 180     if (NULL != cbfunc) {
 181         cbfunc(PMIX_SUCCESS, cbdata);
 182     }
 183     return PMIX_SUCCESS;
 184 }
 185 
 186 pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
 187               const pmix_info_t info[], size_t ninfo,
 188               pmix_lookup_cbfunc_t cbfunc, void *cbdata)
 189 {
 190     size_t i, ndata, ret;
 191     pmix_status_t rc = PMIX_SUCCESS;
 192     pmix_pdata_t *pdata;
 193     pmix_test_info_t *tinfo;
 194     if (NULL == pmix_test_published_list) {
 195         return PMIX_ERR_NOT_FOUND;
 196     }
 197     ndata = pmix_argv_count(keys);
 198     PMIX_PDATA_CREATE(pdata, ndata);
 199     ret = 0;
 200     for (i = 0; i < ndata; i++) {
 201         PMIX_LIST_FOREACH(tinfo, pmix_test_published_list, pmix_test_info_t) {
 202             if (0 == strcmp(tinfo->data.key, keys[i])) {
 203                 (void)strncpy(pdata[i].proc.nspace, tinfo->namespace_published, PMIX_MAX_NSLEN);
 204                 pdata[i].proc.rank = tinfo->rank_published;
 205                 memset(pdata[i].key, 0, PMIX_MAX_KEYLEN+1);
 206                 (void)strncpy(pdata[i].key, keys[i], PMIX_MAX_KEYLEN);
 207                 pmix_value_xfer(&pdata[i].value, &tinfo->data.value);
 208                 ret++;
 209                 break;
 210             }
 211         }
 212     }
 213     if (ret != ndata) {
 214         rc = PMIX_ERR_NOT_FOUND;
 215         goto error;
 216     }
 217     if (NULL != cbfunc) {
 218         cbfunc(PMIX_SUCCESS, pdata, ndata, cbdata);
 219     }
 220 error:
 221     PMIX_PDATA_FREE(pdata, ndata);
 222     return rc;
 223 }
 224 
 225 pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
 226                  const pmix_info_t info[], size_t ninfo,
 227                  pmix_op_cbfunc_t cbfunc, void *cbdata)
 228 {
 229     size_t i;
 230     pmix_test_info_t *iptr, *next;
 231     if (NULL == pmix_test_published_list) {
 232         return PMIX_ERR_NOT_FOUND;
 233     }
 234     PMIX_LIST_FOREACH_SAFE(iptr, next, pmix_test_published_list, pmix_test_info_t) {
 235         if (1) {  // if data posted by this process
 236             if (NULL == keys) {
 237                 pmix_list_remove_item(pmix_test_published_list, &iptr->super);
 238                 PMIX_RELEASE(iptr);
 239             } else {
 240                 ninfo = pmix_argv_count(keys);
 241                 for (i = 0; i < ninfo; i++) {
 242                     if (!strcmp(iptr->data.key, keys[i])) {
 243                         pmix_list_remove_item(pmix_test_published_list, &iptr->super);
 244                         PMIX_RELEASE(iptr);
 245                         break;
 246                     }
 247                 }
 248             }
 249         }
 250     }
 251     if (NULL != cbfunc) {
 252         cbfunc(PMIX_SUCCESS, cbdata);
 253     }
 254     return PMIX_SUCCESS;
 255 }
 256 
 257 typedef struct {
 258     pmix_status_t status;
 259     pmix_spawn_cbfunc_t cbfunc;
 260     void *cbdata;
 261 } release_cbdata;
 262 
 263 
 264 static void * _release_cb(void *arg)
 265 {
 266     release_cbdata *cb = (release_cbdata*)arg;
 267     if (NULL != cb->cbfunc) {
 268         cb->cbfunc(cb->status, "foobar", cb->cbdata);
 269     }
 270     free(cb);
 271     spawn_wait = false;
 272     pthread_exit(NULL);
 273 }
 274 
 275 static void release_cb(pmix_status_t status, void *cbdata)
 276 {
 277     pthread_t thread;
 278 
 279     if (0 > pthread_create(&thread, NULL, _release_cb, cbdata)) {
 280         spawn_wait = false;
 281         return;
 282     }
 283     pthread_detach(thread);
 284 }
 285 
 286 pmix_status_t spawn_fn(const pmix_proc_t *proc,
 287              const pmix_info_t job_info[], size_t ninfo,
 288              const pmix_app_t apps[], size_t napps,
 289              pmix_spawn_cbfunc_t cbfunc, void *cbdata)
 290 {
 291     release_cbdata *cb = malloc(sizeof(release_cbdata));
 292 
 293     cb->status = PMIX_SUCCESS;
 294     cb->cbfunc = cbfunc;
 295     cb->cbdata = cbdata;
 296 
 297     spawn_wait = true;
 298     PMIx_server_register_nspace("foobar", napps, NULL, 0, release_cb, (void*)cb);
 299     return PMIX_SUCCESS;
 300 }
 301 static int numconnect = 0;
 302 
 303 pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
 304                          const pmix_info_t info[], size_t ninfo,
 305                          pmix_op_cbfunc_t cbfunc, void *cbdata)
 306 {
 307     if (NULL != cbfunc) {
 308         cbfunc(PMIX_SUCCESS, cbdata);
 309     }
 310     numconnect++;
 311     return PMIX_SUCCESS;
 312 }
 313 
 314 pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
 315                             const pmix_info_t info[], size_t ninfo,
 316                             pmix_op_cbfunc_t cbfunc, void *cbdata)
 317 {
 318     if (NULL != cbfunc) {
 319         cbfunc(PMIX_SUCCESS, cbdata);
 320     }
 321     return PMIX_SUCCESS;
 322 }
 323 
 324 pmix_status_t regevents_fn (pmix_status_t *codes, size_t ncodes,
 325                            const pmix_info_t info[], size_t ninfo,
 326                            pmix_op_cbfunc_t cbfunc, void *cbdata)
 327 {
 328     TEST_VERBOSE ((" pmix host server regevents_fn called "));
 329     if (NULL != cbfunc) {
 330         cbfunc(PMIX_SUCCESS, cbdata);
 331     }
 332     return PMIX_SUCCESS;
 333 }
 334 
 335 pmix_status_t deregevents_fn (pmix_status_t *codes, size_t ncodes,
 336                              pmix_op_cbfunc_t cbfunc, void *cbdata)
 337 {
 338     TEST_VERBOSE ((" pmix host server deregevents_fn called "));
 339     if (NULL != cbfunc) {
 340         cbfunc(PMIX_SUCCESS, cbdata);
 341     }
 342     return PMIX_SUCCESS;
 343 }

/* [<][>][^][v][top][bottom][index][help] */