This source file includes following definitions.
- construct
- destruct
- rqcon
- rqdes
- orte_data_server_init
- orte_data_server_finalize
- orte_data_server
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 #include "orte_config.h"
  26 #include "orte/constants.h"
  27 #include "orte/types.h"
  28 
  29 #include <string.h>
  30 
  31 #ifdef HAVE_SYS_TIME_H
  32 #include <sys/time.h>
  33 #endif
  34 
  35 #include "opal/util/argv.h"
  36 #include "opal/util/output.h"
  37 #include "opal/class/opal_pointer_array.h"
  38 #include "opal/dss/dss.h"
  39 #include "opal/mca/pmix/pmix_types.h"
  40 
  41 #include "orte/mca/errmgr/errmgr.h"
  42 #include "orte/mca/rml/rml.h"
  43 #include "orte/runtime/orte_globals.h"
  44 #include "orte/util/name_fns.h"
  45 #include "orte/runtime/orte_wait.h"
  46 #include "orte/runtime/data_type_support/orte_dt_support.h"
  47 
  48 #include "orte/runtime/orte_data_server.h"
  49 
  50 
  51 typedef struct {
  52     
  53     opal_object_t super;
  54     
  55     orte_std_cntr_t index;
  56     
  57 
  58 
  59     orte_process_name_t owner;
  60     
  61 
  62     uint32_t uid;
  63     
  64     opal_pmix_data_range_t range;
  65     opal_pmix_persistence_t persistence;
  66     
  67     opal_list_t values;
  68     
  69 } orte_data_object_t;
  70 
  71 static void construct(orte_data_object_t *ptr)
  72 {
  73     ptr->index = -1;
  74     ptr->uid = UINT32_MAX;
  75     ptr->range = OPAL_PMIX_RANGE_UNDEF;
  76     ptr->persistence = OPAL_PMIX_PERSIST_SESSION;
  77     OBJ_CONSTRUCT(&ptr->values, opal_list_t);
  78 }
  79 
  80 static void destruct(orte_data_object_t *ptr)
  81 {
  82     OPAL_LIST_DESTRUCT(&ptr->values);
  83 }
  84 
  85 OBJ_CLASS_INSTANCE(orte_data_object_t,
  86                    opal_object_t,
  87                    construct, destruct);
  88 
  89 
  90 typedef struct {
  91     opal_list_item_t super;
  92     orte_process_name_t requestor;
  93     int room_number;
  94     uint32_t uid;
  95     opal_pmix_data_range_t range;
  96     char **keys;
  97 } orte_data_req_t;
  98 static void rqcon(orte_data_req_t *p)
  99 {
 100     p->keys = NULL;
 101 }
 102 static void rqdes(orte_data_req_t *p)
 103 {
 104     opal_argv_free(p->keys);
 105 }
 106 OBJ_CLASS_INSTANCE(orte_data_req_t,
 107                    opal_list_item_t,
 108                    rqcon, rqdes);
 109 
 110 
 111 static opal_pointer_array_t orte_data_server_store;
 112 static opal_list_t pending;
 113 static bool initialized = false;
 114 static int orte_data_server_output = -1;
 115 static int orte_data_server_verbosity = -1;
 116 
 117 int orte_data_server_init(void)
 118 {
 119     int rc;
 120 
 121     if (initialized) {
 122         return ORTE_SUCCESS;
 123     }
 124     initialized = true;
 125 
 126     
 127     orte_data_server_verbosity = -1;
 128     (void) mca_base_var_register ("orte", "orte", "data", "server_verbose",
 129                                   "Debug verbosity for ORTE data server",
 130                                   MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
 131                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 132                                   &orte_data_server_verbosity);
 133     if (0 <= orte_data_server_verbosity) {
 134         orte_data_server_output = opal_output_open(NULL);
 135         opal_output_set_verbosity(orte_data_server_output,
 136                                   orte_data_server_verbosity);
 137     }
 138 
 139     OBJ_CONSTRUCT(&orte_data_server_store, opal_pointer_array_t);
 140     if (ORTE_SUCCESS != (rc = opal_pointer_array_init(&orte_data_server_store,
 141                                                       1,
 142                                                       INT_MAX,
 143                                                       1))) {
 144         ORTE_ERROR_LOG(rc);
 145         return rc;
 146     }
 147 
 148     OBJ_CONSTRUCT(&pending, opal_list_t);
 149 
 150     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
 151                             ORTE_RML_TAG_DATA_SERVER,
 152                             ORTE_RML_PERSISTENT,
 153                             orte_data_server,
 154                             NULL);
 155 
 156     return ORTE_SUCCESS;
 157 }
 158 
 159 void orte_data_server_finalize(void)
 160 {
 161     orte_std_cntr_t i;
 162     orte_data_object_t *data;
 163 
 164     if (!initialized) {
 165         return;
 166     }
 167     initialized = false;
 168 
 169     for (i=0; i < orte_data_server_store.size; i++) {
 170         if (NULL != (data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, i))) {
 171             OBJ_RELEASE(data);
 172         }
 173     }
 174     OBJ_DESTRUCT(&orte_data_server_store);
 175     OPAL_LIST_DESTRUCT(&pending);
 176 }
 177 
 178 void orte_data_server(int status, orte_process_name_t* sender,
 179                       opal_buffer_t* buffer, orte_rml_tag_t tag,
 180                       void* cbdata)
 181 {
 182     uint8_t command;
 183     orte_std_cntr_t count;
 184     opal_process_name_t requestor;
 185     orte_data_object_t *data;
 186     opal_buffer_t *answer, *reply;
 187     int rc, ret, k;
 188     opal_value_t *iptr, *inext;
 189     uint32_t ninfo, i;
 190     char **keys = NULL, *str;
 191     bool ret_packed = false, wait = false, data_added;
 192     int room_number;
 193     uint32_t uid = UINT32_MAX;
 194     opal_pmix_data_range_t range;
 195     orte_data_req_t *req, *rqnext;
 196     orte_jobid_t jobid = ORTE_JOBID_INVALID;
 197 
 198     opal_output_verbose(1, orte_data_server_output,
 199                         "%s data server got message from %s",
 200                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 201                         ORTE_NAME_PRINT(sender));
 202 
 203     
 204     count = 1;
 205     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &room_number, &count, OPAL_INT))) {
 206         ORTE_ERROR_LOG(rc);
 207         return;
 208     }
 209 
 210     
 211     count = 1;
 212     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, OPAL_UINT8))) {
 213         ORTE_ERROR_LOG(rc);
 214         return;
 215     }
 216 
 217     answer = OBJ_NEW(opal_buffer_t);
 218     
 219     if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &room_number, 1, OPAL_INT))) {
 220         ORTE_ERROR_LOG(rc);
 221         OBJ_RELEASE(answer);
 222         return;
 223     }
 224 
 225     switch(command) {
 226     case ORTE_PMIX_PUBLISH_CMD:
 227         data = OBJ_NEW(orte_data_object_t);
 228         
 229         count = 1;
 230         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->owner, &count, OPAL_NAME))) {
 231             ORTE_ERROR_LOG(rc);
 232             OBJ_RELEASE(data);
 233             goto SEND_ERROR;
 234         }
 235 
 236         opal_output_verbose(1, orte_data_server_output,
 237                             "%s data server: publishing data from %s",
 238                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 239                             ORTE_NAME_PRINT(&data->owner));
 240 
 241         
 242         count = 1;
 243         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->range, &count, OPAL_PMIX_DATA_RANGE))) {
 244             ORTE_ERROR_LOG(rc);
 245             OBJ_RELEASE(data);
 246             goto SEND_ERROR;
 247         }
 248         
 249         count = 1;
 250         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->persistence, &count, OPAL_INT))) {
 251             ORTE_ERROR_LOG(rc);
 252             OBJ_RELEASE(data);
 253             goto SEND_ERROR;
 254         }
 255 
 256         count = 1;
 257         while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
 258             
 259             if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
 260                 data->uid = iptr->data.uint32;
 261                 OBJ_RELEASE(iptr);
 262             } else {
 263                 opal_output_verbose(10, orte_data_server_output,
 264                                     "%s data server: adding %s to data from %s",
 265                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
 266                                     ORTE_NAME_PRINT(&data->owner));
 267                 opal_list_append(&data->values, &iptr->super);
 268             }
 269         }
 270 
 271         data->index = opal_pointer_array_add(&orte_data_server_store, data);
 272 
 273         opal_output_verbose(1, orte_data_server_output,
 274                             "%s data server: checking for pending requests",
 275                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 276 
 277         
 278         reply = NULL;
 279         OPAL_LIST_FOREACH_SAFE(req, rqnext, &pending, orte_data_req_t) {
 280             if (req->uid != data->uid) {
 281                 continue;
 282             }
 283             
 284 
 285 
 286             if (OPAL_PMIX_RANGE_NAMESPACE == data->range) {
 287                 if (jobid != data->owner.jobid) {
 288                     continue;
 289                 }
 290             }
 291             for (i=0; NULL != req->keys[i]; i++) {
 292                 
 293                 OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
 294                     opal_output_verbose(10, orte_data_server_output,
 295                                         "%s\tCHECKING %s TO %s",
 296                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 297                                         iptr->key, req->keys[i]);
 298                     if (0 == strcmp(iptr->key, req->keys[i])) {
 299                         opal_output_verbose(10, orte_data_server_output,
 300                                             "%s data server: packaging return",
 301                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 302                         
 303                         if (NULL == reply) {
 304                             reply = OBJ_NEW(opal_buffer_t);
 305                             
 306                             if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &req->room_number, 1, OPAL_INT))) {
 307                                 ORTE_ERROR_LOG(rc);
 308                                 break;
 309                             }
 310                             
 311                             ret = ORTE_SUCCESS;
 312                             if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
 313                                 ORTE_ERROR_LOG(rc);
 314                                 break;
 315                             }
 316                         }
 317                         if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &data->owner, 1, OPAL_NAME))) {
 318                             ORTE_ERROR_LOG(rc);
 319                             break;
 320                         }
 321                         opal_output_verbose(10, orte_data_server_output,
 322                                             "%s data server: adding %s data from %s to response",
 323                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
 324                                             ORTE_NAME_PRINT(&data->owner));
 325                         if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &iptr, 1, OPAL_VALUE))) {
 326                             ORTE_ERROR_LOG(rc);
 327                             break;
 328                         }
 329                     }
 330                 }
 331             }
 332             if (NULL != reply) {
 333                 
 334                 opal_output_verbose(1, orte_data_server_output,
 335                                      "%s data server: returning data to %s",
 336                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 337                                      ORTE_NAME_PRINT(&req->requestor));
 338 
 339                 if (0 > (rc = orte_rml.send_buffer_nb(&req->requestor, reply, ORTE_RML_TAG_DATA_CLIENT,
 340                                                       orte_rml_send_callback, NULL))) {
 341                     ORTE_ERROR_LOG(rc);
 342                     OBJ_RELEASE(reply);
 343                 }
 344                 
 345                 opal_list_remove_item(&pending, &req->super);
 346                 OBJ_RELEASE(req);
 347                 reply = NULL;
 348                 
 349                 if (OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) {
 350                     opal_output_verbose(1, orte_data_server_output,
 351                                         "%s NOT STORING DATA FROM %s AT INDEX %d",
 352                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 353                                         ORTE_NAME_PRINT(&data->owner), data->index);
 354                     opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL);
 355                     OBJ_RELEASE(data);
 356                     goto release;
 357                 }
 358             }
 359         }
 360 
 361       release:
 362         
 363         ret = ORTE_SUCCESS;
 364         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
 365             ORTE_ERROR_LOG(rc);
 366             
 367 
 368         }
 369         goto SEND_ANSWER;
 370         break;
 371 
 372     case ORTE_PMIX_LOOKUP_CMD:
 373         opal_output_verbose(1, orte_data_server_output,
 374                             "%s data server: lookup data from %s",
 375                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 376                             ORTE_NAME_PRINT(sender));
 377 
 378         
 379         count = 1;
 380         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
 381             ORTE_ERROR_LOG(rc);
 382             goto SEND_ERROR;
 383         }
 384 
 385         
 386         count = 1;
 387         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_PMIX_DATA_RANGE))) {
 388             ORTE_ERROR_LOG(rc);
 389             goto SEND_ERROR;
 390         }
 391 
 392         
 393         count = 1;
 394         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
 395             ORTE_ERROR_LOG(rc);
 396             goto SEND_ERROR;
 397         }
 398         if (0 == ninfo) {
 399             
 400             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 401             rc = ORTE_ERR_BAD_PARAM;
 402             goto SEND_ERROR;
 403         }
 404 
 405         
 406         for (i=0; i < ninfo; i++) {
 407             count = 1;
 408             if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) {
 409                 ORTE_ERROR_LOG(rc);
 410                 opal_argv_free(keys);
 411                 goto SEND_ERROR;
 412             }
 413             opal_argv_append_nosize(&keys, str);
 414             free(str);
 415         }
 416 
 417         
 418         count = 1;
 419         uid = UINT32_MAX;
 420         while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
 421             
 422             if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
 423                 uid = iptr->data.uint32;
 424             } else if (0 == strcmp(iptr->key, OPAL_PMIX_WAIT)) {
 425                 
 426                 wait = true;
 427             }
 428             
 429             OBJ_RELEASE(iptr);
 430         }
 431         if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 432             ORTE_ERROR_LOG(rc);
 433             opal_argv_free(keys);
 434             goto SEND_ERROR;
 435         }
 436         if (UINT32_MAX == uid) {
 437             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 438             rc = ORTE_ERR_NOT_FOUND;
 439             opal_argv_free(keys);
 440             goto SEND_ERROR;
 441         }
 442 
 443         
 444         ret_packed = false;
 445         for (i=0; NULL != keys[i]; i++) {
 446             opal_output_verbose(10, orte_data_server_output,
 447                                 "%s data server: looking for %s",
 448                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i]);
 449             
 450             for (k=0; k < orte_data_server_store.size; k++) {
 451                 data_added = false;
 452                 data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
 453                 if (NULL == data) {
 454                     continue;
 455                 }
 456                 
 457                 if (uid != data->uid) {
 458                     opal_output_verbose(10, orte_data_server_output,
 459                                         "%s\tMISMATCH UID %u %u",
 460                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 461                                         (unsigned)uid, (unsigned)data->uid);
 462                     continue;
 463                 }
 464                 
 465 
 466 
 467                 if (OPAL_PMIX_RANGE_NAMESPACE == data->range) {
 468                     if (jobid != data->owner.jobid) {
 469                         opal_output_verbose(10, orte_data_server_output,
 470                                             "%s\tMISMATCH JOBID %s %s",
 471                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 472                                             ORTE_JOBID_PRINT(jobid),
 473                                             ORTE_JOBID_PRINT(data->owner.jobid));
 474                         continue;
 475                     }
 476                 }
 477                 
 478                 OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
 479                     opal_output_verbose(10, orte_data_server_output,
 480                                         "%s COMPARING %s %s",
 481                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 482                                         keys[i], iptr->key);
 483                     if (0 == strcmp(iptr->key, keys[i])) {
 484                         
 485                         if (!ret_packed) {
 486                             ret = ORTE_SUCCESS;
 487                             if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
 488                                 ORTE_ERROR_LOG(rc);
 489                                 opal_argv_free(keys);
 490                                 goto SEND_ERROR;
 491                             }
 492                             ret_packed = true;
 493                         }
 494                         data_added = true;
 495                         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->owner, 1, OPAL_NAME))) {
 496                             ORTE_ERROR_LOG(rc);
 497                             opal_argv_free(keys);
 498                             goto SEND_ERROR;
 499                         }
 500                         opal_output_verbose(1, orte_data_server_output,
 501                                             "%s data server: adding %s to data from %s",
 502                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
 503                                             ORTE_NAME_PRINT(&data->owner));
 504                         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &iptr, 1, OPAL_VALUE))) {
 505                             ORTE_ERROR_LOG(rc);
 506                             opal_argv_free(keys);
 507                             goto SEND_ERROR;
 508                         }
 509                     }
 510                 }
 511                 if (data_added && OPAL_PMIX_PERSIST_FIRST_READ == data->persistence) {
 512                     opal_output_verbose(1, orte_data_server_output,
 513                                         "%s REMOVING DATA FROM %s AT INDEX %d",
 514                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 515                                         ORTE_NAME_PRINT(&data->owner), data->index);
 516                     opal_pointer_array_set_item(&orte_data_server_store, data->index, NULL);
 517                     OBJ_RELEASE(data);
 518                 }
 519             }
 520         }
 521         if (!ret_packed) {
 522             opal_output_verbose(1, orte_data_server_output,
 523                                 "%s data server:lookup: data not found",
 524                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 525 
 526             
 527 
 528             if (wait) {
 529                 opal_output_verbose(1, orte_data_server_output,
 530                                     "%s data server:lookup: pushing request to wait",
 531                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 532                 OBJ_RELEASE(answer);
 533                 req = OBJ_NEW(orte_data_req_t);
 534                 req->room_number = room_number;
 535                 req->requestor = *sender;
 536                 req->uid = uid;
 537                 req->range = range;
 538                 req->keys = keys;
 539                 opal_list_append(&pending, &req->super);
 540                 return;
 541             }
 542             
 543             rc = ORTE_ERR_NOT_FOUND;
 544             opal_argv_free(keys);
 545             goto SEND_ERROR;
 546         }
 547 
 548         opal_argv_free(keys);
 549         opal_output_verbose(1, orte_data_server_output,
 550                             "%s data server:lookup: data found",
 551                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 552         goto SEND_ANSWER;
 553         break;
 554 
 555     case ORTE_PMIX_UNPUBLISH_CMD:
 556         
 557         count = 1;
 558         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) {
 559             ORTE_ERROR_LOG(rc);
 560             goto SEND_ERROR;
 561         }
 562 
 563         opal_output_verbose(1, orte_data_server_output,
 564                             "%s data server: unpublish data from %s",
 565                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 566                             ORTE_NAME_PRINT(&requestor));
 567 
 568         
 569         count = 1;
 570         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_INT))) {
 571             ORTE_ERROR_LOG(rc);
 572             goto SEND_ERROR;
 573         }
 574 
 575         
 576         count = 1;
 577         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
 578             ORTE_ERROR_LOG(rc);
 579             goto SEND_ERROR;
 580         }
 581         if (0 == ninfo) {
 582             
 583             rc = ORTE_ERR_BAD_PARAM;
 584             goto SEND_ERROR;
 585         }
 586 
 587         
 588         for (i=0; i < ninfo; i++) {
 589             count = 1;
 590             if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) {
 591                 ORTE_ERROR_LOG(rc);
 592                 opal_argv_free(keys);
 593                 goto SEND_ERROR;
 594             }
 595             opal_argv_append_nosize(&keys, str);
 596             free(str);
 597         }
 598 
 599         
 600         count = 1;
 601         uid = UINT32_MAX;
 602         while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
 603             
 604             if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
 605                 uid = iptr->data.uint32;
 606             }
 607             
 608             OBJ_RELEASE(iptr);
 609         }
 610         if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc || UINT32_MAX == uid) {
 611             ORTE_ERROR_LOG(rc);
 612             opal_argv_free(keys);
 613             goto SEND_ERROR;
 614         }
 615 
 616         
 617         for (i=0; NULL != keys[i]; i++) {
 618             
 619             for (k=0; k < orte_data_server_store.size; k++) {
 620                 data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
 621                 if (NULL == data) {
 622                     continue;
 623                 }
 624                 
 625                 if (uid != data->uid) {
 626                     continue;
 627                 }
 628                 
 629                 if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) {
 630                     continue;
 631                 }
 632                 
 633                 if (range != data->range) {
 634                     continue;
 635                 }
 636                 
 637                 OPAL_LIST_FOREACH_SAFE(iptr, inext, &data->values, opal_value_t) {
 638                     if (0 == strcmp(iptr->key, keys[i])) {
 639                         
 640                         opal_list_remove_item(&data->values, &iptr->super);
 641                         OBJ_RELEASE(iptr);
 642                     }
 643                 }
 644                 
 645                 if (0 == opal_list_get_size(&data->values)) {
 646                     opal_pointer_array_set_item(&orte_data_server_store, k, NULL);
 647                     OBJ_RELEASE(data);
 648                 }
 649             }
 650         }
 651         opal_argv_free(keys);
 652 
 653         
 654         ret = ORTE_SUCCESS;
 655         if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
 656             ORTE_ERROR_LOG(rc);
 657         }
 658         goto SEND_ANSWER;
 659         break;
 660 
 661     case ORTE_PMIX_PURGE_PROC_CMD:
 662         
 663 
 664 
 665         count = 1;
 666         if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) {
 667             ORTE_ERROR_LOG(rc);
 668             goto SEND_ERROR;
 669         }
 670 
 671         opal_output_verbose(1, orte_data_server_output,
 672                             "%s data server: purge data from %s",
 673                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 674                             ORTE_NAME_PRINT(&requestor));
 675 
 676         
 677         for (k=0; k < orte_data_server_store.size; k++) {
 678             data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
 679             if (NULL == data) {
 680                 continue;
 681             }
 682             
 683             if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) {
 684                 continue;
 685             }
 686             
 687 
 688             if ((data->persistence == OPAL_PMIX_PERSIST_APP ||
 689                  data->persistence == OPAL_PMIX_PERSIST_SESSION) &&
 690                 ORTE_VPID_WILDCARD != requestor.vpid) {
 691                 continue;
 692             }
 693             
 694             opal_pointer_array_set_item(&orte_data_server_store, k, NULL);
 695             OBJ_RELEASE(data);
 696         }
 697         
 698         OBJ_RELEASE(answer);
 699         return;
 700 
 701     default:
 702         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 703         rc = ORTE_ERR_BAD_PARAM;
 704         break;
 705     }
 706 
 707   SEND_ERROR:
 708     opal_output_verbose(1, orte_data_server_output,
 709                         "%s data server: sending error %s",
 710                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 711                         ORTE_ERROR_NAME(rc));
 712     
 713     if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT))) {
 714         ORTE_ERROR_LOG(ret);
 715     }
 716 
 717  SEND_ANSWER:
 718     if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT,
 719                                           orte_rml_send_callback, NULL))) {
 720         ORTE_ERROR_LOG(rc);
 721         OBJ_RELEASE(answer);
 722     }
 723 }