root/orte/runtime/orte_data_server.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. construct
  2. destruct
  3. rqcon
  4. rqdes
  5. orte_data_server_init
  6. orte_data_server_finalize
  7. orte_data_server

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2012-2016 Los Alamos National Security, LLC.
  14  *                         All rights reserved
  15  * Copyright (c) 2015-2019 Intel, Inc.  All rights reserved.
  16  * Copyright (c) 2017      Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 
  25 #include "orte_config.h"
  26 #include "orte/constants.h"
  27 #include "orte/types.h"
  28 
  29 #include <string.h>
  30 
  31 #ifdef HAVE_SYS_TIME_H
  32 #include <sys/time.h>
  33 #endif
  34 
  35 #include "opal/util/argv.h"
  36 #include "opal/util/output.h"
  37 #include "opal/class/opal_pointer_array.h"
  38 #include "opal/dss/dss.h"
  39 #include "opal/mca/pmix/pmix_types.h"
  40 
  41 #include "orte/mca/errmgr/errmgr.h"
  42 #include "orte/mca/rml/rml.h"
  43 #include "orte/runtime/orte_globals.h"
  44 #include "orte/util/name_fns.h"
  45 #include "orte/runtime/orte_wait.h"
  46 #include "orte/runtime/data_type_support/orte_dt_support.h"
  47 
  48 #include "orte/runtime/orte_data_server.h"
  49 
  50 /* define an object to hold data */
  51 typedef struct {
  52     /* base object */
  53     opal_object_t super;
  54     /* index of this object in the storage array */
  55     orte_std_cntr_t index;
  56     /* process that owns this data - only the
  57     * owner can remove it
  58     */
  59     orte_process_name_t owner;
  60     /* uid of the owner - helps control
  61      * access rights */
  62     uint32_t uid;
  63     /* characteristics */
  64     opal_pmix_data_range_t range;
  65     opal_pmix_persistence_t persistence;
  66     /* and the values themselves */
  67     opal_list_t values;
  68     /* the value itself */
  69 } orte_data_object_t;
  70 
  71 static void construct(orte_data_object_t *ptr)
  72 {
  73     ptr->index = -1;
  74     ptr->uid = UINT32_MAX;
  75     ptr->range = OPAL_PMIX_RANGE_UNDEF;
  76     ptr->persistence = OPAL_PMIX_PERSIST_SESSION;
  77     OBJ_CONSTRUCT(&ptr->values, opal_list_t);
  78 }
  79 
  80 static void destruct(orte_data_object_t *ptr)
  81 {
  82     OPAL_LIST_DESTRUCT(&ptr->values);
  83 }
  84 
  85 OBJ_CLASS_INSTANCE(orte_data_object_t,
  86                    opal_object_t,
  87                    construct, destruct);
  88 
  89 /* define a request object for delayed answers */
  90 typedef struct {
  91     opal_list_item_t super;
  92     orte_process_name_t requestor;
  93     int room_number;
  94     uint32_t uid;
  95     opal_pmix_data_range_t range;
  96     char **keys;
  97 } orte_data_req_t;
  98 static void rqcon(orte_data_req_t *p)
  99 {
 100     p->keys = NULL;
 101 }
 102 static void rqdes(orte_data_req_t *p)
 103 {
 104     opal_argv_free(p->keys);
 105 }
 106 OBJ_CLASS_INSTANCE(orte_data_req_t,
 107                    opal_list_item_t,
 108                    rqcon, rqdes);
 109 
 110 /* local globals */
 111 static opal_pointer_array_t orte_data_server_store;
 112 static opal_list_t pending;
 113 static bool initialized = false;
 114 static int orte_data_server_output = -1;
 115 static int orte_data_server_verbosity = -1;
 116 
 117 int orte_data_server_init(void)
 118 {
 119     int rc;
 120 
 121     if (initialized) {
 122         return ORTE_SUCCESS;
 123     }
 124     initialized = true;
 125 
 126     /* register a verbosity */
 127     orte_data_server_verbosity = -1;
 128     (void) mca_base_var_register ("orte", "orte", "data", "server_verbose",
 129                                   "Debug verbosity for ORTE data server",
 130                                   MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
 131                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 132                                   &orte_data_server_verbosity);
 133     if (0 <= orte_data_server_verbosity) {
 134         orte_data_server_output = opal_output_open(NULL);
 135         opal_output_set_verbosity(orte_data_server_output,
 136                                   orte_data_server_verbosity);
 137     }
 138 
 139     OBJ_CONSTRUCT(&orte_data_server_store, opal_pointer_array_t);
 140     if (ORTE_SUCCESS != (rc = opal_pointer_array_init(&orte_data_server_store,
 141                                                       1,
 142                                                       INT_MAX,
 143                                                       1))) {
 144         ORTE_ERROR_LOG(rc);
 145         return rc;
 146     }
 147 
 148     OBJ_CONSTRUCT(&pending, opal_list_t);
 149 
 150     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 151                             ORTE_RML_TAG_DATA_SERVER,
 152                             ORTE_RML_PERSISTENT,
 153                             orte_data_server,
 154                             NULL);
 155 
 156     return ORTE_SUCCESS;
 157 }
 158 
 159 void orte_data_server_finalize(void)
 160 {
 161     orte_std_cntr_t i;
 162     orte_data_object_t *data;
 163 
 164     if (!initialized) {
 165         return;
 166     }
 167     initialized = false;
 168 
 169     for (i=0; i < orte_data_server_store.size; i++) {
 170         if (NULL != (data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, i))) {
 171             OBJ_RELEASE(data);
 172         }
 173     }
 174     OBJ_DESTRUCT(&orte_data_server_store);
 175     OPAL_LIST_DESTRUCT(&pending);
 176 }
 177 
 178 void orte_data_server(int status, orte_process_name_t* sender,
 179                       opal_buffer_t* buffer, orte_rml_tag_t tag,
 180                       void* cbdata)
 181 {
 182     uint8_t command;
 183     orte_std_cntr_t count;
 184     opal_process_name_t requestor;
 185     orte_data_object_t *data;
 186     opal_buffer_t *answer, *reply;
 187     int rc, ret, k;
 188     opal_value_t *iptr, *inext;
 189     uint32_t ninfo, i;
 190     char **keys = NULL, *str;
 191     bool ret_packed = false, wait = false, data_added;
 192     int room_number;
 193     uint32_t uid = UINT32_MAX;
 194     opal_pmix_data_range_t range;
 195     orte_data_req_t *req, *rqnext;
 196     orte_jobid_t jobid = ORTE_JOBID_INVALID;
 197 
 198     opal_output_verbose(1, orte_data_server_output,
 199                         "%s data server got message from %s",
 200                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 201                         ORTE_NAME_PRINT(sender));
 202 
 203     /* unpack the room number of the caller's request */
 204     count = 1;
 205     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &room_number, &count, OPAL_INT))) {
 206         ORTE_ERROR_LOG(rc);
 207         return;
 208     }
 209 
 210     /* unpack the command */
 211     count = 1;
 212     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, OPAL_UINT8))) {
 213         ORTE_ERROR_LOG(rc);
 214         return;
 215     }
 216 
 217     answer = OBJ_NEW(opal_buffer_t);
 218     /* pack the room number as this must lead any response */
 219     if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &room_number, 1, OPAL_INT))) {
 220         ORTE_ERROR_LOG(rc);
 221         OBJ_RELEASE(answer);
 222         return;
 223     }
 224 
 225     switch(command) {
 226     case ORTE_PMIX_PUBLISH_CMD:
 227         data = OBJ_NEW(orte_data_object_t);
 228         /* unpack the publisher */
 229         count = 1;
 230         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->owner, &count, OPAL_NAME))) {
 231             ORTE_ERROR_LOG(rc);
 232             OBJ_RELEASE(data);
 233             goto SEND_ERROR;
 234         }
 235 
 236         opal_output_verbose(1, orte_data_server_output,
 237                             "%s data server: publishing data from %s",
 238                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 239                             ORTE_NAME_PRINT(&data->owner));
 240 
 241         /* unpack the range */
 242         count = 1;
 243         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->range, &count, OPAL_PMIX_DATA_RANGE))) {
 244             ORTE_ERROR_LOG(rc);
 245             OBJ_RELEASE(data);
 246             goto SEND_ERROR;
 247         }
 248         /* unpack the persistence */
 249         count = 1;
 250         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->persistence, &count, OPAL_INT))) {
 251             ORTE_ERROR_LOG(rc);
 252             OBJ_RELEASE(data);
 253             goto SEND_ERROR;
 254         }
 255 
 256         count = 1;
 257         while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
 258             /* if this is the userid, separate it out */
 259             if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
 260                 data->uid = iptr->data.uint32;
 261                 OBJ_RELEASE(iptr);
 262             } else {
 263                 opal_output_verbose(10, orte_data_server_output,
 264                                     "%s data server: adding %s to data from %s",
 265                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
 266                                     ORTE_NAME_PRINT(&data->owner));
 267                 opal_list_append(&data->values, &iptr->super);
 268             }
 269         }
 270 
 271         data->index = opal_pointer_array_add(&orte_data_server_store, data);
 272 
 273         opal_output_verbose(1, orte_data_server_output,
 274                             "%s data server: checking for pending requests",
 275                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 276 
 277         /* check for pending requests that match this data */
 278         reply = NULL;
 279         OPAL_LIST_FOREACH_SAFE(req, rqnext, &pending, orte_data_req_t) {
 280             if (req->uid != data->uid) {
 281                 continue;
 282             }
 283             /* if the published range is constrained to namespace, then only
 284              * consider this data if the publisher is
 285              * in the same namespace as the requestor */
 286             if (OPAL_PMIX_RANGE_NAMESPACE == data->range) {
 287                 if (jobid != data->owner.jobid) {
 288                     continue;
 289                 }
 290             }
 291             for (i=0; NULL != req->keys[i]; i++) {
 292                 /* cycle thru the data keys for matches */
 293                 OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
 294                     opal_output_verbose(10, orte_data_server_output,
 295                                         "%s\tCHECKING %s TO %s",
 296                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 297                                         iptr->key, req->keys[i]);
 298                     if (0 == strcmp(iptr->key, req->keys[i])) {
 299                         opal_output_verbose(10, orte_data_server_output,
 300                                             "%s data server: packaging return",
 301                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 302                         /* found it - package it for return */
 303                         if (NULL == reply) {
 304                             reply = OBJ_NEW(opal_buffer_t);
 305                             /* start with their room number */
 306                             if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &req->room_number, 1, OPAL_INT))) {
 307                                 ORTE_ERROR_LOG(rc);
 308                                 break;
 309                             }
 310                             /* then the status */
 311                             ret = ORTE_SUCCESS;
 312                             if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
 313                                 ORTE_ERROR_LOG(rc);
 314                                 break;
 315                             }
 316                         }
 317                         if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &data->owner, 1, OPAL_NAME))) {
 318                             ORTE_ERROR_LOG(rc);
 319                             break;
 320                         }
 321                         opal_output_verbose(10, orte_data_server_output,
 322                                             "%s data server: adding %s data from %s to response",
 323                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
 324                                             ORTE_NAME_PRINT(&data->owner));
 325                         if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &iptr, 1, OPAL_VALUE))) {
 326                             ORTE_ERROR_LOG(rc);
 327                             break;
 328                         }
 329                     }
 330                 }
 331             }
 332             if (NULL != reply) {
 333                 /* send it back to the requestor */
 334                 opal_output_verbose(1, orte_data_server_output,
 335                                      "%s data server: returning data to %s",
 336                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 337                                      ORTE_NAME_PRINT(&req->requestor));
 338 
 339                 if (0 > (rc = orte_rml.send_buffer_nb(&req->requestor, reply, ORTE_RML_TAG_DATA_CLIENT,
 340                                                       orte_rml_send_callback, NULL))) {
 341                     ORTE_ERROR_LOG(rc);
 342                     OBJ_RELEASE(reply);
 343                 }
 344                 /* remove this request */
 345                 opal_list_remove_item(&pending, &req->super);
 346                 OBJ_RELEASE(req);
 347                 reply = NULL;
 348                 /* if the persistence is "first_read", then delete this data */
 349                 if (OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) {
 350                     opal_output_verbose(1, orte_data_server_output,
 351                                         "%s NOT STORING DATA FROM %s AT INDEX %d",
 352                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 353                                         ORTE_NAME_PRINT(&data->owner), data->index);
 354                     opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL);
 355                     OBJ_RELEASE(data);
 356                     goto release;
 357                 }
 358             }
 359         }
 360 
 361       release:
 362         /* tell the user it was wonderful... */
 363         ret = ORTE_SUCCESS;
 364         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
 365             ORTE_ERROR_LOG(rc);
 366             /* if we can't pack it, we probably can't pack the
 367              * rc value either, so just send whatever is there */
 368         }
 369         goto SEND_ANSWER;
 370         break;
 371 
 372     case ORTE_PMIX_LOOKUP_CMD:
 373         opal_output_verbose(1, orte_data_server_output,
 374                             "%s data server: lookup data from %s",
 375                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 376                             ORTE_NAME_PRINT(sender));
 377 
 378         /* unpack the requestor's jobid */
 379         count = 1;
 380         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
 381             ORTE_ERROR_LOG(rc);
 382             goto SEND_ERROR;
 383         }
 384 
 385         /* unpack the range - this sets some constraints on the range of data to be considered */
 386         count = 1;
 387         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_PMIX_DATA_RANGE))) {
 388             ORTE_ERROR_LOG(rc);
 389             goto SEND_ERROR;
 390         }
 391 
 392         /* unpack the number of keys */
 393         count = 1;
 394         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
 395             ORTE_ERROR_LOG(rc);
 396             goto SEND_ERROR;
 397         }
 398         if (0 == ninfo) {
 399             /* they forgot to send us the keys?? */
 400             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 401             rc = ORTE_ERR_BAD_PARAM;
 402             goto SEND_ERROR;
 403         }
 404 
 405         /* unpack the keys */
 406         for (i=0; i < ninfo; i++) {
 407             count = 1;
 408             if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) {
 409                 ORTE_ERROR_LOG(rc);
 410                 opal_argv_free(keys);
 411                 goto SEND_ERROR;
 412             }
 413             opal_argv_append_nosize(&keys, str);
 414             free(str);
 415         }
 416 
 417         /* unpack any info elements */
 418         count = 1;
 419         uid = UINT32_MAX;
 420         while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
 421             /* if this is the userid, separate it out */
 422             if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
 423                 uid = iptr->data.uint32;
 424             } else if (0 == strcmp(iptr->key, OPAL_PMIX_WAIT)) {
 425                 /* flag that we wait until the data is present */
 426                 wait = true;
 427             }
 428             /* ignore anything else for now */
 429             OBJ_RELEASE(iptr);
 430         }
 431         if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 432             ORTE_ERROR_LOG(rc);
 433             opal_argv_free(keys);
 434             goto SEND_ERROR;
 435         }
 436         if (UINT32_MAX == uid) {
 437             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 438             rc = ORTE_ERR_NOT_FOUND;
 439             opal_argv_free(keys);
 440             goto SEND_ERROR;
 441         }
 442 
 443         /* cycle across the provided keys */
 444         ret_packed = false;
 445         for (i=0; NULL != keys[i]; i++) {
 446             opal_output_verbose(10, orte_data_server_output,
 447                                 "%s data server: looking for %s",
 448                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i]);
 449             /* cycle across the stored data, looking for a match */
 450             for (k=0; k < orte_data_server_store.size; k++) {
 451                 data_added = false;
 452                 data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
 453                 if (NULL == data) {
 454                     continue;
 455                 }
 456                 /* for security reasons, can only access data posted by the same user id */
 457                 if (uid != data->uid) {
 458                     opal_output_verbose(10, orte_data_server_output,
 459                                         "%s\tMISMATCH UID %u %u",
 460                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 461                                         (unsigned)uid, (unsigned)data->uid);
 462                     continue;
 463                 }
 464                 /* if the published range is constrained to namespace, then only
 465                  * consider this data if the publisher is
 466                  * in the same namespace as the requestor */
 467                 if (OPAL_PMIX_RANGE_NAMESPACE == data->range) {
 468                     if (jobid != data->owner.jobid) {
 469                         opal_output_verbose(10, orte_data_server_output,
 470                                             "%s\tMISMATCH JOBID %s %s",
 471                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 472                                             ORTE_JOBID_PRINT(jobid),
 473                                             ORTE_JOBID_PRINT(data->owner.jobid));
 474                         continue;
 475                     }
 476                 }
 477                 /* see if we have this key */
 478                 OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
 479                     opal_output_verbose(10, orte_data_server_output,
 480                                         "%s COMPARING %s %s",
 481                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 482                                         keys[i], iptr->key);
 483                     if (0 == strcmp(iptr->key, keys[i])) {
 484                         /* found it - package it for return */
 485                         if (!ret_packed) {
 486                             ret = ORTE_SUCCESS;
 487                             if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
 488                                 ORTE_ERROR_LOG(rc);
 489                                 opal_argv_free(keys);
 490                                 goto SEND_ERROR;
 491                             }
 492                             ret_packed = true;
 493                         }
 494                         data_added = true;
 495                         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->owner, 1, OPAL_NAME))) {
 496                             ORTE_ERROR_LOG(rc);
 497                             opal_argv_free(keys);
 498                             goto SEND_ERROR;
 499                         }
 500                         opal_output_verbose(1, orte_data_server_output,
 501                                             "%s data server: adding %s to data from %s",
 502                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
 503                                             ORTE_NAME_PRINT(&data->owner));
 504                         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &iptr, 1, OPAL_VALUE))) {
 505                             ORTE_ERROR_LOG(rc);
 506                             opal_argv_free(keys);
 507                             goto SEND_ERROR;
 508                         }
 509                     }
 510                 }
 511                 if (data_added && OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) {
 512                     opal_output_verbose(1, orte_data_server_output,
 513                                         "%s REMOVING DATA FROM %s AT INDEX %d",
 514                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 515                                         ORTE_NAME_PRINT(&data->owner), data->index);
 516                     opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL);
 517                     OBJ_RELEASE(data);
 518                 }
 519             }
 520         }
 521         if (!ret_packed) {
 522             opal_output_verbose(1, orte_data_server_output,
 523                                 "%s data server:lookup: data not found",
 524                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 525 
 526             /* if we were told to wait for the data, then queue this up
 527              * for later processing */
 528             if (wait) {
 529                 opal_output_verbose(1, orte_data_server_output,
 530                                     "%s data server:lookup: pushing request to wait",
 531                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 532                 OBJ_RELEASE(answer);
 533                 req = OBJ_NEW(orte_data_req_t);
 534                 req->room_number = room_number;
 535                 req->requestor = *sender;
 536                 req->uid = uid;
 537                 req->range = range;
 538                 req->keys = keys;
 539                 opal_list_append(&pending, &req->super);
 540                 return;
 541             }
 542             /* nothing was found - indicate that situation */
 543             rc = ORTE_ERR_NOT_FOUND;
 544             opal_argv_free(keys);
 545             goto SEND_ERROR;
 546         }
 547 
 548         opal_argv_free(keys);
 549         opal_output_verbose(1, orte_data_server_output,
 550                             "%s data server:lookup: data found",
 551                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 552         goto SEND_ANSWER;
 553         break;
 554 
 555     case ORTE_PMIX_UNPUBLISH_CMD:
 556         /* unpack the requestor */
 557         count = 1;
 558         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) {
 559             ORTE_ERROR_LOG(rc);
 560             goto SEND_ERROR;
 561         }
 562 
 563         opal_output_verbose(1, orte_data_server_output,
 564                             "%s data server: unpublish data from %s",
 565                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 566                             ORTE_NAME_PRINT(&requestor));
 567 
 568         /* unpack the range - this sets some constraints on the range of data to be considered */
 569         count = 1;
 570         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_INT))) {
 571             ORTE_ERROR_LOG(rc);
 572             goto SEND_ERROR;
 573         }
 574 
 575         /* unpack the number of keys */
 576         count = 1;
 577         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
 578             ORTE_ERROR_LOG(rc);
 579             goto SEND_ERROR;
 580         }
 581         if (0 == ninfo) {
 582             /* they forgot to send us the keys?? */
 583             rc = ORTE_ERR_BAD_PARAM;
 584             goto SEND_ERROR;
 585         }
 586 
 587         /* unpack the keys */
 588         for (i=0; i < ninfo; i++) {
 589             count = 1;
 590             if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) {
 591                 ORTE_ERROR_LOG(rc);
 592                 opal_argv_free(keys);
 593                 goto SEND_ERROR;
 594             }
 595             opal_argv_append_nosize(&keys, str);
 596             free(str);
 597         }
 598 
 599         /* unpack any info elements */
 600         count = 1;
 601         uid = UINT32_MAX;
 602         while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
 603             /* if this is the userid, separate it out */
 604             if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
 605                 uid = iptr->data.uint32;
 606             }
 607             /* ignore anything else for now */
 608             OBJ_RELEASE(iptr);
 609         }
 610         if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc || UINT32_MAX == uid) {
 611             ORTE_ERROR_LOG(rc);
 612             opal_argv_free(keys);
 613             goto SEND_ERROR;
 614         }
 615 
 616         /* cycle across the provided keys */
 617         for (i=0; NULL != keys[i]; i++) {
 618             /* cycle across the stored data, looking for a match */
 619             for (k=0; k < orte_data_server_store.size; k++) {
 620                 data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
 621                 if (NULL == data) {
 622                     continue;
 623                 }
 624                 /* can only access data posted by the same user id */
 625                 if (uid != data->uid) {
 626                     continue;
 627                 }
 628                 /* can only access data posted by the same process */
 629                 if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) {
 630                     continue;
 631                 }
 632                 /* can only access data posted for the same range */
 633                 if (range != data->range) {
 634                     continue;
 635                 }
 636                 /* see if we have this key */
 637                 OPAL_LIST_FOREACH_SAFE(iptr, inext, &data->values, opal_value_t) {
 638                     if (0 == strcmp(iptr->key, keys[i])) {
 639                         /* found it -  delete the object from the data store */
 640                         opal_list_remove_item(&data->values, &iptr->super);
 641                         OBJ_RELEASE(iptr);
 642                     }
 643                 }
 644                 /* if all the data has been removed, then remove the object */
 645                 if (0 == opal_list_get_size(&data->values)) {
 646                     opal_pointer_array_set_item(&orte_data_server_store, k, NULL);
 647                     OBJ_RELEASE(data);
 648                 }
 649             }
 650         }
 651         opal_argv_free(keys);
 652 
 653         /* tell the sender this succeeded */
 654         ret = ORTE_SUCCESS;
 655         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
 656             ORTE_ERROR_LOG(rc);
 657         }
 658         goto SEND_ANSWER;
 659         break;
 660 
 661     case ORTE_PMIX_PURGE_PROC_CMD:
 662         /* unpack the proc whose data is to be purged - session
 663          * data is purged by providing a requestor whose rank
 664          * is wildcard */
 665         count = 1;
 666         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) {
 667             ORTE_ERROR_LOG(rc);
 668             goto SEND_ERROR;
 669         }
 670 
 671         opal_output_verbose(1, orte_data_server_output,
 672                             "%s data server: purge data from %s",
 673                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 674                             ORTE_NAME_PRINT(&requestor));
 675 
 676         /* cycle across the stored data, looking for a match */
 677         for (k=0; k < orte_data_server_store.size; k++) {
 678             data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
 679             if (NULL == data) {
 680                 continue;
 681             }
 682             /* check if data posted by the same process */
 683             if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) {
 684                 continue;
 685             }
 686             /* check persistence - if it is intended to persist beyond the
 687              * proc itself, then we only delete it if rank=wildcard*/
 688             if ((data->persistence == OPAL_PMIX_PERSIST_APP ||
 689                  data->persistence == OPAL_PMIX_PERSIST_SESSION) &&
 690                 ORTE_VPID_WILDCARD != requestor.vpid) {
 691                 continue;
 692             }
 693             /* remove the object */
 694             opal_pointer_array_set_item(&orte_data_server_store, k, NULL);
 695             OBJ_RELEASE(data);
 696         }
 697         /* no response is required */
 698         OBJ_RELEASE(answer);
 699         return;
 700 
 701     default:
 702         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 703         rc = ORTE_ERR_BAD_PARAM;
 704         break;
 705     }
 706 
 707   SEND_ERROR:
 708     opal_output_verbose(1, orte_data_server_output,
 709                         "%s data server: sending error %s",
 710                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 711                         ORTE_ERROR_NAME(rc));
 712     /* pack the error code */
 713     if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT))) {
 714         ORTE_ERROR_LOG(ret);
 715     }
 716 
 717  SEND_ANSWER:
 718     if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT,
 719                                           orte_rml_send_callback, NULL))) {
 720         ORTE_ERROR_LOG(rc);
 721         OBJ_RELEASE(answer);
 722     }
 723 }

/* [<][>][^][v][top][bottom][index][help] */