root/orte/orted/pmix/pmix_server_pub.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. init_server
  2. execute
  3. pmix_server_publish_fn
  4. pmix_server_lookup_fn
  5. pmix_server_unpublish_fn
  6. pmix_server_keyval_client

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009      Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2014      Mellanox Technologies, Inc.
  18  *                         All rights reserved.
  19  * Copyright (c) 2014-2016 Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * $COPYRIGHT$
  22  *
  23  * Additional copyrights may follow
  24  *
  25  * $HEADER$
  26  *
  27  */
  28 
  29 #include "orte_config.h"
  30 
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 
  35 #include "opal/util/argv.h"
  36 #include "opal/util/output.h"
  37 #include "opal/dss/dss.h"
  38 
  39 #include "orte/mca/errmgr/errmgr.h"
  40 #include "orte/util/name_fns.h"
  41 #include "orte/util/show_help.h"
  42 #include "orte/util/threads.h"
  43 #include "orte/runtime/orte_data_server.h"
  44 #include "orte/runtime/orte_globals.h"
  45 #include "orte/mca/rml/rml.h"
  46 #include "orte/mca/rml/base/rml_contact.h"
  47 
  48 #include "pmix_server_internal.h"
  49 
  50 static int init_server(void)
  51 {
  52     char *server;
  53     opal_value_t val;
  54     char input[1024], *filename;
  55     FILE *fp;
  56     int rc;
  57 
  58     /* only do this once */
  59     orte_pmix_server_globals.pubsub_init = true;
  60 
  61     /* if the universal server wasn't specified, then we use
  62      * our own HNP for that purpose */
  63     if (NULL == orte_data_server_uri) {
  64         orte_pmix_server_globals.server = *ORTE_PROC_MY_HNP;
  65     } else {
  66         if (0 == strncmp(orte_data_server_uri, "file", strlen("file")) ||
  67             0 == strncmp(orte_data_server_uri, "FILE", strlen("FILE"))) {
  68             /* it is a file - get the filename */
  69             filename = strchr(orte_data_server_uri, ':');
  70             if (NULL == filename) {
  71                 /* filename is not correctly formatted */
  72                 orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true,
  73                                orte_basename, orte_data_server_uri);
  74                 return ORTE_ERR_BAD_PARAM;
  75             }
  76             ++filename; /* space past the : */
  77 
  78             if (0 >= strlen(filename)) {
  79                 /* they forgot to give us the name! */
  80                 orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true,
  81                                orte_basename, orte_data_server_uri);
  82                 return ORTE_ERR_BAD_PARAM;
  83             }
  84 
  85             /* open the file and extract the uri */
  86             fp = fopen(filename, "r");
  87             if (NULL == fp) { /* can't find or read file! */
  88                 orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true,
  89                                orte_basename, orte_data_server_uri);
  90                 return ORTE_ERR_BAD_PARAM;
  91             }
  92             if (NULL == fgets(input, 1024, fp)) {
  93                 /* something malformed about file */
  94                 fclose(fp);
  95                 orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true,
  96                                orte_basename, orte_data_server_uri,
  97                                orte_basename);
  98                 return ORTE_ERR_BAD_PARAM;
  99             }
 100             fclose(fp);
 101             input[strlen(input)-1] = '\0';  /* remove newline */
 102             server = strdup(input);
 103         } else {
 104             server = strdup(orte_data_server_uri);
 105         }
 106         /* parse the URI to get the server's name */
 107         if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(server, &orte_pmix_server_globals.server, NULL))) {
 108             ORTE_ERROR_LOG(rc);
 109             free(server);
 110             return rc;
 111         }
 112         /* setup our route to the server */
 113         OBJ_CONSTRUCT(&val, opal_value_t);
 114         val.key = OPAL_PMIX_PROC_URI;
 115         val.type = OPAL_STRING;
 116         val.data.string = server;
 117         if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&orte_pmix_server_globals.server, &val))) {
 118             ORTE_ERROR_LOG(rc);
 119             val.key = NULL;
 120             OBJ_DESTRUCT(&val);
 121             return rc;
 122         }
 123         val.key = NULL;
 124         OBJ_DESTRUCT(&val);
 125 
 126         /* check if we are to wait for the server to start - resolves
 127          * a race condition that can occur when the server is run
 128          * as a background job - e.g., in scripts
 129          */
 130         if (orte_pmix_server_globals.wait_for_server) {
 131             /* ping the server */
 132             struct timeval timeout;
 133             timeout.tv_sec = orte_pmix_server_globals.timeout;
 134             timeout.tv_usec = 0;
 135             if (ORTE_SUCCESS != (rc = orte_rml.ping(server, &timeout))) {
 136                 /* try it one more time */
 137                 if (ORTE_SUCCESS != (rc = orte_rml.ping(server, &timeout))) {
 138                     /* okay give up */
 139                     orte_show_help("help-orterun.txt", "orterun:server-not-found", true,
 140                                    orte_basename, server,
 141                                    (long)orte_pmix_server_globals.timeout,
 142                                    ORTE_ERROR_NAME(rc));
 143                     ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
 144                     return rc;
 145                 }
 146             }
 147         }
 148     }
 149 
 150     return ORTE_SUCCESS;
 151 }
 152 
 153 static void execute(int sd, short args, void *cbdata)
 154 {
 155     pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
 156     int rc;
 157     opal_buffer_t *xfer;
 158     orte_process_name_t *target;
 159 
 160     ORTE_ACQUIRE_OBJECT(req);
 161 
 162     if (!orte_pmix_server_globals.pubsub_init) {
 163         /* we need to initialize our connection to the server */
 164         if (ORTE_SUCCESS != (rc = init_server())) {
 165             orte_show_help("help-orted.txt", "noserver", true,
 166                            (NULL == orte_data_server_uri) ?
 167                            "NULL" : orte_data_server_uri);
 168             goto callback;
 169         }
 170     }
 171 
 172     /* add this request to our tracker hotel */
 173     if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 174         orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
 175         goto callback;
 176     }
 177 
 178     /* setup the xfer */
 179     xfer = OBJ_NEW(opal_buffer_t);
 180     /* pack the room number */
 181     if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &req->room_num, 1, OPAL_INT))) {
 182         ORTE_ERROR_LOG(rc);
 183         OBJ_RELEASE(xfer);
 184         goto callback;
 185     }
 186     opal_dss.copy_payload(xfer, &req->msg);
 187 
 188     /* if the range is SESSION, then set the target to the global server */
 189     if (OPAL_PMIX_RANGE_SESSION == req->range) {
 190         opal_output_verbose(1, orte_pmix_server_globals.output,
 191                             "%s orted:pmix:server range SESSION",
 192                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 193         target = &orte_pmix_server_globals.server;
 194     } else if (OPAL_PMIX_RANGE_LOCAL == req->range) {
 195         /* if the range is local, send it to myself */
 196         opal_output_verbose(1, orte_pmix_server_globals.output,
 197                             "%s orted:pmix:server range LOCAL",
 198                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 199         target = ORTE_PROC_MY_NAME;
 200     } else {
 201         opal_output_verbose(1, orte_pmix_server_globals.output,
 202                             "%s orted:pmix:server range GLOBAL",
 203                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 204         target = ORTE_PROC_MY_HNP;
 205     }
 206 
 207     /* send the request to the target */
 208     rc = orte_rml.send_buffer_nb(target, xfer,
 209                                  ORTE_RML_TAG_DATA_SERVER,
 210                                  orte_rml_send_callback, NULL);
 211     if (ORTE_SUCCESS == rc) {
 212         return;
 213     }
 214 
 215   callback:
 216     /* execute the callback to avoid having the client hang */
 217     if (NULL != req->opcbfunc) {
 218         req->opcbfunc(rc, req->cbdata);
 219     } else if (NULL != req->lkcbfunc) {
 220         req->lkcbfunc(rc, NULL, req->cbdata);
 221     }
 222     opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 223     OBJ_RELEASE(req);
 224 }
 225 
 226 int pmix_server_publish_fn(opal_process_name_t *proc,
 227                            opal_list_t *info,
 228                            opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 229 {
 230     pmix_server_req_t *req;
 231     int rc;
 232     uint8_t cmd = ORTE_PMIX_PUBLISH_CMD;
 233     opal_value_t *iptr;
 234     opal_pmix_persistence_t persist = OPAL_PMIX_PERSIST_APP;
 235     bool rset, pset;
 236 
 237     opal_output_verbose(1, orte_pmix_server_globals.output,
 238                         "%s orted:pmix:server PUBLISH",
 239                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 240 
 241     /* create the caddy */
 242     req = OBJ_NEW(pmix_server_req_t);
 243     opal_asprintf(&req->operation, "PUBLISH: %s:%d", __FILE__, __LINE__);
 244     req->opcbfunc = cbfunc;
 245     req->cbdata = cbdata;
 246 
 247     /* load the command */
 248     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
 249         ORTE_ERROR_LOG(rc);
 250         OBJ_RELEASE(req);
 251         return rc;
 252     }
 253 
 254     /* pack the name of the publisher */
 255     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, proc, 1, OPAL_NAME))) {
 256         ORTE_ERROR_LOG(rc);
 257         OBJ_RELEASE(req);
 258         return rc;
 259     }
 260 
 261     /* no help for it - need to search for range/persistence */
 262     rset = false;
 263     pset = false;
 264     OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
 265         if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
 266             req->range = (opal_pmix_data_range_t)iptr->data.uint;
 267             if (pset) {
 268                 break;
 269             }
 270             rset = true;
 271         } else if (0 == strcmp(iptr->key, OPAL_PMIX_PERSISTENCE)) {
 272             persist = (opal_pmix_persistence_t)iptr->data.integer;
 273             if (rset) {
 274                 break;
 275             }
 276             pset = true;
 277         }
 278     }
 279 
 280     /* pack the range */
 281     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) {
 282         ORTE_ERROR_LOG(rc);
 283         OBJ_RELEASE(req);
 284         return rc;
 285     }
 286 
 287     /* pack the persistence */
 288     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &persist, 1, OPAL_INT))) {
 289         ORTE_ERROR_LOG(rc);
 290         OBJ_RELEASE(req);
 291         return rc;
 292     }
 293 
 294     /* if we have items, pack those too - ignore persistence, timeout
 295      * and range values */
 296     OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
 297         if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE) ||
 298             0 == strcmp(iptr->key, OPAL_PMIX_PERSISTENCE)) {
 299             continue;
 300         }
 301         if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
 302             /* record the timeout value, but don't pack it */
 303             req->timeout = iptr->data.integer;
 304             continue;
 305         }
 306         opal_output_verbose(5, orte_pmix_server_globals.output,
 307                             "%s publishing data %s of type %d from source %s",
 308                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, iptr->type,
 309                             ORTE_NAME_PRINT(proc));
 310         if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
 311             ORTE_ERROR_LOG(rc);
 312             OBJ_RELEASE(req);
 313             return rc;
 314         }
 315     }
 316 
 317     /* thread-shift so we can store the tracker */
 318     opal_event_set(orte_event_base, &(req->ev),
 319                    -1, OPAL_EV_WRITE, execute, req);
 320     opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
 321     ORTE_POST_OBJECT(req);
 322     opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
 323 
 324     return OPAL_SUCCESS;
 325 
 326 }
 327 
 328 int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys,
 329                           opal_list_t *info,
 330                           opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
 331 {
 332     pmix_server_req_t *req;
 333     int rc;
 334     uint8_t cmd = ORTE_PMIX_LOOKUP_CMD;
 335     int32_t nkeys, i;
 336     opal_value_t *iptr;
 337 
 338     /* the list of info objects are directives for us - they include
 339      * things like timeout constraints, so there is no reason to
 340      * forward them to the server */
 341 
 342     /* create the caddy */
 343     req = OBJ_NEW(pmix_server_req_t);
 344     opal_asprintf(&req->operation, "LOOKUP: %s:%d", __FILE__, __LINE__);
 345     req->lkcbfunc = cbfunc;
 346     req->cbdata = cbdata;
 347 
 348     /* load the command */
 349     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
 350         ORTE_ERROR_LOG(rc);
 351         OBJ_RELEASE(req);
 352         return rc;
 353     }
 354 
 355     /* pack the requesting process jobid */
 356     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &proc->jobid, 1, ORTE_JOBID))) {
 357         ORTE_ERROR_LOG(rc);
 358         OBJ_RELEASE(req);
 359         return rc;
 360     }
 361 
 362     /* no help for it - need to search for range */
 363     OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
 364         if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
 365             req->range = (opal_pmix_data_range_t)iptr->data.uint;
 366             break;
 367         }
 368     }
 369 
 370     /* pack the range */
 371     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) {
 372         ORTE_ERROR_LOG(rc);
 373         OBJ_RELEASE(req);
 374         return rc;
 375     }
 376 
 377     /* pack the number of keys */
 378     nkeys = opal_argv_count(keys);
 379     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
 380         ORTE_ERROR_LOG(rc);
 381         OBJ_RELEASE(req);
 382         return rc;
 383     }
 384 
 385     /* pack the keys too */
 386     for (i=0; i < nkeys; i++) {
 387         opal_output_verbose(5, orte_pmix_server_globals.output,
 388                             "%s lookup data %s for proc %s",
 389                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i],
 390                             ORTE_NAME_PRINT(proc));
 391         if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &keys[i], 1, OPAL_STRING))) {
 392             ORTE_ERROR_LOG(rc);
 393             OBJ_RELEASE(req);
 394             return rc;
 395         }
 396     }
 397 
 398     /* if we have items, pack those too - ignore range and timeout value */
 399     OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
 400         if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
 401             continue;
 402         }
 403         if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
 404             /* record the timeout value, but don't pack it */
 405             req->timeout = iptr->data.integer;
 406             continue;
 407         }
 408         opal_output_verbose(2, orte_pmix_server_globals.output,
 409                             "%s lookup directive %s for proc %s",
 410                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
 411                             ORTE_NAME_PRINT(proc));
 412         if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
 413             ORTE_ERROR_LOG(rc);
 414             OBJ_RELEASE(req);
 415             return rc;
 416         }
 417     }
 418 
 419     /* thread-shift so we can store the tracker */
 420     opal_event_set(orte_event_base, &(req->ev),
 421                    -1, OPAL_EV_WRITE, execute, req);
 422     opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
 423     ORTE_POST_OBJECT(req);
 424     opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
 425 
 426     return OPAL_SUCCESS;
 427 }
 428 
 429 int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys,
 430                              opal_list_t *info,
 431                              opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 432 {
 433     pmix_server_req_t *req;
 434     int rc;
 435     uint8_t cmd = ORTE_PMIX_UNPUBLISH_CMD;
 436     uint32_t nkeys, n;
 437     opal_value_t *iptr;
 438 
 439     /* create the caddy */
 440     req = OBJ_NEW(pmix_server_req_t);
 441     opal_asprintf(&req->operation, "UNPUBLISH: %s:%d", __FILE__, __LINE__);
 442     req->opcbfunc = cbfunc;
 443     req->cbdata = cbdata;
 444 
 445     /* load the command */
 446     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
 447         ORTE_ERROR_LOG(rc);
 448         OBJ_RELEASE(req);
 449         return rc;
 450     }
 451 
 452     /* pack the name of the publisher */
 453     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, proc, 1, OPAL_NAME))) {
 454         ORTE_ERROR_LOG(rc);
 455         OBJ_RELEASE(req);
 456         return rc;
 457     }
 458 
 459     /* no help for it - need to search for range */
 460     OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
 461         if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
 462             req->range = (opal_pmix_data_range_t)iptr->data.integer;
 463             break;
 464         }
 465     }
 466 
 467     /* pack the range */
 468     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_INT))) {
 469         ORTE_ERROR_LOG(rc);
 470         OBJ_RELEASE(req);
 471         return rc;
 472     }
 473 
 474     /* pack the number of keys */
 475     nkeys = opal_argv_count(keys);
 476     if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
 477         ORTE_ERROR_LOG(rc);
 478         OBJ_RELEASE(req);
 479         return rc;
 480     }
 481 
 482     /* pack the keys too */
 483     for (n=0; n < nkeys; n++) {
 484         if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &keys[n], 1, OPAL_STRING))) {
 485             ORTE_ERROR_LOG(rc);
 486             OBJ_RELEASE(req);
 487             return rc;
 488         }
 489     }
 490 
 491     /* if we have items, pack those too - ignore range and timeout value */
 492     OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
 493         if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
 494             continue;
 495         }
 496         if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
 497             /* record the timeout value, but don't pack it */
 498             req->timeout = iptr->data.integer;
 499             continue;
 500         }
 501         if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
 502             ORTE_ERROR_LOG(rc);
 503             OBJ_RELEASE(req);
 504             return rc;
 505         }
 506     }
 507 
 508     /* thread-shift so we can store the tracker */
 509     opal_event_set(orte_event_base, &(req->ev),
 510                    -1, OPAL_EV_WRITE, execute, req);
 511     opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
 512     ORTE_POST_OBJECT(req);
 513     opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
 514 
 515     return OPAL_SUCCESS;
 516 }
 517 
 518 void pmix_server_keyval_client(int status, orte_process_name_t* sender,
 519                                opal_buffer_t *buffer,
 520                                orte_rml_tag_t tg, void *cbdata)
 521 {
 522     int rc, ret, room_num = -1;
 523     int32_t cnt;
 524     pmix_server_req_t *req=NULL;
 525     opal_list_t info;
 526     opal_value_t *iptr;
 527     opal_pmix_pdata_t *pdata;
 528     opal_process_name_t source;
 529 
 530     opal_output_verbose(1, orte_pmix_server_globals.output,
 531                         "%s recvd lookup data return",
 532                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 533 
 534     OBJ_CONSTRUCT(&info, opal_list_t);
 535     /* unpack the room number of the request tracker */
 536     cnt = 1;
 537     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
 538         ORTE_ERROR_LOG(rc);
 539         return;
 540     }
 541 
 542     /* unpack the status */
 543     cnt = 1;
 544     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
 545         ORTE_ERROR_LOG(rc);
 546         ret = rc;
 547         goto release;
 548     }
 549 
 550     opal_output_verbose(5, orte_pmix_server_globals.output,
 551                         "%s recvd lookup returned status %d",
 552                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret);
 553 
 554     if (ORTE_SUCCESS == ret) {
 555         /* see if any data was included - not an error if the answer is no */
 556         cnt = 1;
 557         while (OPAL_SUCCESS == opal_dss.unpack(buffer, &source, &cnt, OPAL_NAME)) {
 558             pdata = OBJ_NEW(opal_pmix_pdata_t);
 559             pdata->proc = source;
 560             if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &cnt, OPAL_VALUE))) {
 561                 ORTE_ERROR_LOG(rc);
 562                 OBJ_RELEASE(pdata);
 563                 continue;
 564             }
 565             opal_output_verbose(5, orte_pmix_server_globals.output,
 566                                 "%s recvd lookup returned data %s of type %d from source %s",
 567                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, iptr->type,
 568                                 ORTE_NAME_PRINT(&source));
 569             if (OPAL_SUCCESS != (rc = opal_value_xfer(&pdata->value, iptr))) {
 570                 ORTE_ERROR_LOG(rc);
 571                 OBJ_RELEASE(pdata);
 572                 OBJ_RELEASE(iptr);
 573                 continue;
 574             }
 575             OBJ_RELEASE(iptr);
 576             opal_list_append(&info, &pdata->super);
 577         }
 578     }
 579 
 580   release:
 581     if (0 <= room_num) {
 582         /* retrieve the tracker */
 583         opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
 584     }
 585 
 586     if (NULL != req) {
 587         /* pass down the response */
 588         if (NULL != req->opcbfunc) {
 589             req->opcbfunc(ret, req->cbdata);
 590         } else if (NULL != req->lkcbfunc) {
 591             req->lkcbfunc(ret, &info, req->cbdata);
 592         } else {
 593             /* should not happen */
 594             ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
 595         }
 596 
 597         /* cleanup */
 598         OPAL_LIST_DESTRUCT(&info);
 599         OBJ_RELEASE(req);
 600     }
 601 }

/* [<][>][^][v][top][bottom][index][help] */