root/opal/mca/pmix/pmix4x/pmix/test/test_server.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. sdes
  2. scon
  3. nsdes
  4. nscon
  5. release_cb
  6. fill_seq_ranks_array
  7. set_namespace
  8. server_unpack_procs
  9. server_pack_procs
  10. remove_server_item
  11. srv_wait_all
  12. server_fwd_msg
  13. server_send_msg
  14. _send_procs_cb
  15. server_send_procs
  16. server_barrier
  17. _libpmix_cb
  18. server_read_cb
  19. server_fence_contrib
  20. server_find_id
  21. server_pack_dmdx
  22. server_unpack_dmdx
  23. _dmdx_cb
  24. server_dmdx_get
  25. server_init
  26. server_finalize
  27. server_launch_clients

   1  /*
   2  * Copyright (c) 2015-2019 Intel, Inc.  All rights reserved.
   3  * Copyright (c) 2015-2018 Mellanox Technologies, Inc.
   4  *                         All rights reserved.
   5  * Copyright (c) 2016-2019 Research Organization for Information Science
   6  *                         and Technology (RIST).  All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  *
  13  */
  14 
  15 #include <stdio.h>
  16 #include <stdlib.h>
  17 #include <unistd.h>
  18 #include <pthread.h>
  19 #include <sys/types.h>
  20 #include <sys/wait.h>
  21 
  22 #include "pmix_server.h"
  23 #include "src/include/pmix_globals.h"
  24 
  25 #include "test_server.h"
  26 #include "test_common.h"
  27 #include "cli_stages.h"
  28 #include "server_callbacks.h"
  29 
  30 int my_server_id = 0;
  31 
  32 server_info_t *my_server_info = NULL;
  33 pmix_list_t *server_list = NULL;
  34 pmix_list_t *server_nspace = NULL;
  35 
  36 static void sdes(server_info_t *s)
  37 {
  38     close(s->rd_fd);
  39     close(s->wr_fd);
  40     if (s->evread) {
  41         pmix_event_del(s->evread);
  42     }
  43     s->evread = NULL;
  44 }
  45 
  46 static void scon(server_info_t *s)
  47 {
  48     s->idx = 0;
  49     s->pid = 0;
  50     s->rd_fd = -1;
  51     s->wr_fd = -1;
  52     s->evread = NULL;
  53     s->modex_cbfunc = NULL;
  54     s->cbdata = NULL;
  55 }
  56 
  57 PMIX_CLASS_INSTANCE(server_info_t,
  58                     pmix_list_item_t,
  59                     scon, sdes);
  60 
  61 static void nsdes(server_nspace_t *ns)
  62 {
  63     if (ns->task_map) {
  64         free(ns->task_map);
  65     }
  66 }
  67 
  68 static void nscon(server_nspace_t *ns)
  69 {
  70     memset(ns->name, 0, PMIX_MAX_NSLEN);
  71     ns->ntasks = 0;
  72     ns->task_map = NULL;
  73 }
  74 
  75 PMIX_CLASS_INSTANCE(server_nspace_t,
  76                     pmix_list_item_t,
  77                     nscon, nsdes);
  78 
  79 static int server_send_procs(void);
  80 static void server_read_cb(int fd, short event, void *arg);
  81 static int srv_wait_all(double timeout);
  82 static int server_fwd_msg(msg_hdr_t *msg_hdr, char *buf, size_t size);
  83 static int server_send_msg(msg_hdr_t *msg_hdr, char *data, size_t size);
  84 static void remove_server_item(server_info_t *server);
  85 static void server_unpack_dmdx(char *buf, int *sender, pmix_proc_t *proc);
  86 static int server_pack_dmdx(int sender_id, const char *nspace, int rank,
  87                             char **buf);
  88 static void _dmdx_cb(int status, char *data, size_t sz, void *cbdata);
  89 
  90 static void release_cb(pmix_status_t status, void *cbdata)
  91 {
  92     int *ptr = (int*)cbdata;
  93     *ptr = 0;
  94 }
  95 
  96 static void fill_seq_ranks_array(size_t nprocs, int base_rank, char **ranks)
  97 {
  98     uint32_t i;
  99     int len = 0, max_ranks_len;
 100     if (0 >= nprocs) {
 101         return;
 102     }
 103     max_ranks_len = nprocs * (MAX_DIGIT_LEN+1);
 104     *ranks = (char*) malloc(max_ranks_len);
 105     for (i = 0; i < nprocs; i++) {
 106         len += snprintf(*ranks + len, max_ranks_len-len-1, "%d", i+base_rank);
 107         if (i != nprocs-1) {
 108             len += snprintf(*ranks + len, max_ranks_len-len-1, "%c", ',');
 109         }
 110     }
 111     if (len >= max_ranks_len-1) {
 112         free(*ranks);
 113         *ranks = NULL;
 114         TEST_ERROR(("Not enough allocated space for global ranks array."));
 115     }
 116 }
 117 
 118 static void set_namespace(int local_size, int univ_size,
 119                           int base_rank, char *name)
 120 {
 121     size_t ninfo;
 122     pmix_info_t *info;
 123     ninfo = 8;
 124     char *regex, *ppn;
 125     char *ranks = NULL;
 126 
 127     PMIX_INFO_CREATE(info, ninfo);
 128     pmix_strncpy(info[0].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
 129     info[0].value.type = PMIX_UINT32;
 130     info[0].value.data.uint32 = univ_size;
 131 
 132     pmix_strncpy(info[1].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
 133     info[1].value.type = PMIX_UINT32;
 134     info[1].value.data.uint32 = 0;
 135 
 136     pmix_strncpy(info[2].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
 137     info[2].value.type = PMIX_UINT32;
 138     info[2].value.data.uint32 = local_size;
 139 
 140     /* generate the array of local peers */
 141     fill_seq_ranks_array(local_size, base_rank, &ranks);
 142     if (NULL == ranks) {
 143         return;
 144     }
 145     pmix_strncpy(info[3].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
 146     info[3].value.type = PMIX_STRING;
 147     info[3].value.data.string = strdup(ranks);
 148     free(ranks);
 149 
 150     PMIx_generate_regex(NODE_NAME, &regex);
 151     pmix_strncpy(info[4].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
 152     info[4].value.type = PMIX_STRING;
 153     info[4].value.data.string = regex;
 154 
 155     /* generate the global proc map */
 156     fill_seq_ranks_array(univ_size, 0, &ranks);
 157     if (NULL == ranks) {
 158         return;
 159     }
 160     PMIx_generate_ppn(ranks, &ppn);
 161     free(ranks);
 162     pmix_strncpy(info[5].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
 163     info[5].value.type = PMIX_STRING;
 164     info[5].value.data.string = ppn;
 165 
 166     pmix_strncpy(info[6].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
 167     info[6].value.type = PMIX_UINT32;
 168     info[6].value.data.uint32 = univ_size;
 169 
 170     pmix_strncpy(info[7].key, PMIX_APPNUM, PMIX_MAX_KEYLEN);
 171     info[7].value.type = PMIX_UINT32;
 172     info[7].value.data.uint32 = getpid ();
 173 
 174     int in_progress = 1, rc;
 175     if (PMIX_SUCCESS == (rc = PMIx_server_register_nspace(name, local_size,
 176                                     info, ninfo, release_cb, &in_progress))) {
 177         PMIX_WAIT_FOR_COMPLETION(in_progress);
 178     }
 179     PMIX_INFO_FREE(info, ninfo);
 180 }
 181 
 182 static void server_unpack_procs(char *buf, size_t size)
 183 {
 184     char *ptr = buf;
 185     size_t i;
 186     size_t ns_count;
 187     char *nspace;
 188 
 189     while ((size_t)(ptr - buf) < size) {
 190         ns_count = (size_t)*ptr;
 191         ptr += sizeof(size_t);
 192 
 193         for (i = 0; i < ns_count; i++) {
 194             server_nspace_t *tmp, *ns_item = NULL;
 195             size_t ltasks, ntasks;
 196             int server_id;
 197 
 198             server_id = *ptr;
 199             ptr += sizeof(int);
 200 
 201             nspace = ptr;
 202             ptr += PMIX_MAX_NSLEN+1;
 203 
 204             ntasks = (size_t)*ptr;
 205             ptr += sizeof(size_t);
 206 
 207             ltasks = (size_t)*ptr;
 208             ptr += sizeof(size_t);
 209 
 210             PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
 211                 if (0 == strcmp(nspace, tmp->name)) {
 212                     ns_item = tmp;
 213                     break;
 214                 }
 215             }
 216             if (NULL == ns_item) {
 217                 ns_item = PMIX_NEW(server_nspace_t);
 218                 memcpy(ns_item->name, nspace, PMIX_MAX_NSLEN);
 219                 pmix_list_append(server_nspace, &ns_item->super);
 220                 ns_item->ltasks = ltasks;
 221                 ns_item->ntasks = ntasks;
 222                 ns_item->task_map = (int*)malloc(sizeof(int) * ntasks);
 223                 memset(ns_item->task_map, -1, sizeof(int) * ntasks);
 224             } else {
 225                 assert(ns_item->ntasks == ntasks);
 226             }
 227             size_t i;
 228             for (i = 0; i < ltasks; i++) {
 229                 int rank = (int)*ptr;
 230                 ptr += sizeof(int);
 231                 if (ns_item->task_map[rank] >= 0) {
 232                     continue;
 233                 }
 234                 ns_item->task_map[rank] = server_id;
 235             }
 236         }
 237     }
 238 }
 239 
 240 static size_t server_pack_procs(int server_id, char **buf, size_t size)
 241 {
 242     size_t ns_count = pmix_list_get_size(server_nspace);
 243     size_t buf_size = sizeof(size_t) + (PMIX_MAX_NSLEN+1)*ns_count;
 244     server_nspace_t *tmp;
 245     char *ptr;
 246 
 247     if (0 == ns_count) {
 248         return 0;
 249     }
 250 
 251     buf_size += size;
 252     /* compute size: server_id + total + local procs count + ranks */
 253     PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
 254         buf_size += sizeof(int) + sizeof(size_t) + sizeof(size_t) +
 255                 sizeof(int) * tmp->ltasks;
 256     }
 257     *buf = (char*)realloc(*buf, buf_size);
 258     memset(*buf + size, 0, buf_size);
 259     ptr = *buf + size;
 260     /* pack ns count */
 261     memcpy(ptr, &ns_count, sizeof(size_t));
 262     ptr += sizeof(size_t);
 263 
 264     assert(server_nspace->pmix_list_length);
 265 
 266     PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
 267         size_t i;
 268         /* pack server_id */
 269         memcpy(ptr, &server_id, sizeof(int));
 270         ptr += sizeof(int);
 271         /* pack ns name */
 272         memcpy(ptr, tmp->name, PMIX_MAX_NSLEN+1);
 273         ptr += PMIX_MAX_NSLEN+1;
 274         /* pack ns total size */
 275         memcpy(ptr, &tmp->ntasks, sizeof(size_t));
 276         ptr += sizeof(size_t);
 277         /* pack ns local size */
 278         memcpy(ptr, &tmp->ltasks, sizeof(size_t));
 279         ptr += sizeof(size_t);
 280         /* pack ns ranks */
 281         for(i = 0; i < tmp->ntasks; i++) {
 282             if (tmp->task_map[i] == server_id) {
 283                 int rank = (int)i;
 284                 memcpy(ptr, &rank, sizeof(int));
 285                 ptr += sizeof(int);
 286             }
 287         }
 288     }
 289     assert((size_t)(ptr - *buf) == buf_size);
 290     return buf_size;
 291 }
 292 
 293 static void remove_server_item(server_info_t *server)
 294 {
 295     pmix_list_remove_item(server_list, &server->super);
 296     PMIX_DESTRUCT_LOCK(&server->lock);
 297     PMIX_RELEASE(server);
 298 }
 299 
 300 static int srv_wait_all(double timeout)
 301 {
 302     server_info_t *server, *next;
 303     pid_t pid;
 304     int status;
 305     struct timeval tv;
 306     double start_time, cur_time;
 307     int ret = 0;
 308 
 309     gettimeofday(&tv, NULL);
 310     start_time = tv.tv_sec + 1E-6*tv.tv_usec;
 311     cur_time = start_time;
 312 
 313     /* Remove this server from the list */
 314     PMIX_LIST_FOREACH_SAFE(server, next, server_list, server_info_t) {
 315         if (server->pid == getpid()) {
 316             /* remove himself */
 317             remove_server_item(server);
 318             break;
 319         }
 320     }
 321 
 322     while (!pmix_list_is_empty(server_list) &&
 323                                 (timeout >= (cur_time - start_time))) {
 324         pid = waitpid(-1, &status, 0);
 325         if (pid >= 0) {
 326             PMIX_LIST_FOREACH_SAFE(server, next, server_list, server_info_t) {
 327                 if (server->pid == pid) {
 328                     TEST_VERBOSE(("server %d finalize PID:%d with status %d", server->idx,
 329                                 server->pid, WEXITSTATUS(status)));
 330                     ret += WEXITSTATUS(status);
 331                     remove_server_item(server);
 332                 }
 333             }
 334         }
 335         // calculate current timestamp
 336         gettimeofday(&tv, NULL);
 337         cur_time = tv.tv_sec + 1E-6*tv.tv_usec;
 338     }
 339 
 340     return ret;
 341 }
 342 
 343 static int server_fwd_msg(msg_hdr_t *msg_hdr, char *buf, size_t size)
 344 {
 345     server_info_t *tmp_server, *server = NULL;
 346     int rc = PMIX_SUCCESS;
 347 
 348     PMIX_LIST_FOREACH(tmp_server, server_list, server_info_t) {
 349         if (tmp_server->idx == msg_hdr->dst_id) {
 350             server = tmp_server;
 351             break;
 352         }
 353     }
 354     if (NULL == server) {
 355         return PMIX_ERROR;
 356     }
 357     rc = write(server->wr_fd, msg_hdr, sizeof(msg_hdr_t));
 358     if (rc != sizeof(msg_hdr_t)) {
 359         return PMIX_ERROR;
 360     }
 361     rc = write(server->wr_fd, buf, size);
 362     if (rc != (ssize_t)size) {
 363         return PMIX_ERROR;
 364     }
 365     return PMIX_SUCCESS;
 366 }
 367 
 368 static int server_send_msg(msg_hdr_t *msg_hdr, char *data, size_t size)
 369 {
 370     size_t ret = 0;
 371     server_info_t *server = NULL, *server_tmp;
 372     if (0 == my_server_id) {
 373         PMIX_LIST_FOREACH(server_tmp, server_list, server_info_t) {
 374             if (server_tmp->idx == msg_hdr->dst_id) {
 375                 server = server_tmp;
 376                 break;
 377             }
 378         }
 379         if (NULL == server) {
 380             abort();
 381         }
 382     } else {
 383         server = (server_info_t *)pmix_list_get_first(server_list);
 384     }
 385 
 386     ret += write(server->wr_fd, msg_hdr, sizeof(msg_hdr_t));
 387     ret += write(server->wr_fd, data, size);
 388     if (ret != (sizeof(*msg_hdr) + size)) {
 389         return PMIX_ERROR;
 390     }
 391     return PMIX_SUCCESS;
 392 }
 393 
 394 static void _send_procs_cb(pmix_status_t status, const char *data,
 395                            size_t ndata, void *cbdata,
 396                            pmix_release_cbfunc_t relfn, void *relcbd)
 397 {
 398     server_info_t *server = (server_info_t*)cbdata;
 399 
 400     server_unpack_procs((char*)data, ndata);
 401     free((char*)data);
 402     PMIX_WAKEUP_THREAD(&server->lock);
 403 }
 404 
 405 static int server_send_procs(void)
 406 {
 407     server_info_t *server;
 408     msg_hdr_t msg_hdr;
 409     int rc = PMIX_SUCCESS;
 410     char *buf = NULL;
 411 
 412     if (0 == my_server_id) {
 413         server = my_server_info;
 414     } else {
 415         server = (server_info_t *)pmix_list_get_first(server_list);
 416     }
 417 
 418     msg_hdr.cmd = CMD_FENCE_CONTRIB;
 419     msg_hdr.dst_id = 0;
 420     msg_hdr.src_id = my_server_id;
 421     msg_hdr.size = server_pack_procs(my_server_id, &buf, 0);
 422     server->modex_cbfunc = _send_procs_cb;
 423     server->cbdata = (void*)server;
 424 
 425     server->lock.active = true;
 426 
 427     if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, buf, msg_hdr.size))) {
 428         if (buf) {
 429             free(buf);
 430         }
 431         return PMIX_ERROR;
 432     }
 433     if (buf) {
 434         free(buf);
 435     }
 436 
 437     PMIX_WAIT_THREAD(&server->lock);
 438     return PMIX_SUCCESS;
 439 }
 440 
 441 int server_barrier(void)
 442 {
 443     server_info_t *server;
 444     msg_hdr_t msg_hdr;
 445     int rc = PMIX_SUCCESS;
 446 
 447     if (0 == my_server_id) {
 448         server = my_server_info;
 449     } else {
 450         server = (server_info_t *)pmix_list_get_first(server_list);
 451     }
 452 
 453     msg_hdr.cmd = CMD_BARRIER_REQUEST;
 454     msg_hdr.dst_id = 0;
 455     msg_hdr.src_id = my_server_id;
 456     msg_hdr.size = 0;
 457 
 458     server->lock.active = true;
 459 
 460     if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, NULL, 0))) {
 461         return PMIX_ERROR;
 462     }
 463     PMIX_WAIT_THREAD(&server->lock);
 464 
 465     return PMIX_SUCCESS;
 466 }
 467 
 468 static void _libpmix_cb(void *cbdata)
 469 {
 470     char *ptr = (char*)cbdata;
 471     if (ptr) {
 472         free(ptr);
 473     }
 474 }
 475 
 476 static void server_read_cb(int fd, short event, void *arg)
 477 {
 478     server_info_t *server = (server_info_t*)arg;
 479     msg_hdr_t msg_hdr;
 480     char *msg_buf = NULL;
 481     static char *fence_buf = NULL;
 482     int rc;
 483     static size_t barrier_cnt = 0;
 484     static size_t contrib_cnt = 0;
 485     static size_t fence_buf_offset = 0;
 486 
 487     rc = read(server->rd_fd, &msg_hdr, sizeof(msg_hdr_t));
 488     if (rc <= 0) {
 489         return;
 490     }
 491     if (msg_hdr.size) {
 492         msg_buf = (char*) malloc(sizeof(char) * msg_hdr.size);
 493         rc += read(server->rd_fd, msg_buf, msg_hdr.size);
 494     }
 495     if (rc != (int)(sizeof(msg_hdr_t) + msg_hdr.size)) {
 496         TEST_ERROR(("error read from %d", server->idx));
 497     }
 498 
 499     if (my_server_id != msg_hdr.dst_id) {
 500         server_fwd_msg(&msg_hdr, msg_buf, msg_hdr.size);
 501         free(msg_buf);
 502         return;
 503     }
 504 
 505     switch(msg_hdr.cmd) {
 506         case CMD_BARRIER_REQUEST:
 507             barrier_cnt++;
 508             TEST_VERBOSE(("CMD_BARRIER_REQ req from %d cnt %d", msg_hdr.src_id,
 509                           barrier_cnt));
 510             if (pmix_list_get_size(server_list) == barrier_cnt) {
 511                 barrier_cnt = 0; /* reset barrier counter */
 512                 server_info_t *tmp_server;
 513                 PMIX_LIST_FOREACH(tmp_server, server_list, server_info_t) {
 514                     msg_hdr_t resp_hdr;
 515                     resp_hdr.dst_id = tmp_server->idx;
 516                     resp_hdr.src_id = my_server_id;
 517                     resp_hdr.cmd = CMD_BARRIER_RESPONSE;
 518                     resp_hdr.size = 0;
 519                     server_send_msg(&resp_hdr, NULL, 0);
 520                 }
 521             }
 522             break;
 523         case CMD_BARRIER_RESPONSE:
 524             TEST_VERBOSE(("%d: CMD_BARRIER_RESP", my_server_id));
 525             PMIX_WAKEUP_THREAD(&server->lock);
 526             break;
 527         case CMD_FENCE_CONTRIB:
 528             contrib_cnt++;
 529             if (msg_hdr.size > 0) {
 530                 fence_buf = (char*)realloc((void*)fence_buf,
 531                                            fence_buf_offset + msg_hdr.size);
 532                 memcpy(fence_buf + fence_buf_offset, msg_buf, msg_hdr.size);
 533                 fence_buf_offset += msg_hdr.size;
 534                 free(msg_buf);
 535                 msg_buf = NULL;
 536             }
 537 
 538             TEST_VERBOSE(("CMD_FENCE_CONTRIB req from %d cnt %d size %d",
 539                         msg_hdr.src_id, contrib_cnt, msg_hdr.size));
 540             if (pmix_list_get_size(server_list) == contrib_cnt) {
 541                 server_info_t *tmp_server;
 542                 PMIX_LIST_FOREACH(tmp_server, server_list, server_info_t) {
 543                     msg_hdr_t resp_hdr;
 544                     resp_hdr.dst_id = tmp_server->idx;
 545                     resp_hdr.src_id = my_server_id;
 546                     resp_hdr.cmd = CMD_FENCE_COMPLETE;
 547                     resp_hdr.size = fence_buf_offset;
 548                     server_send_msg(&resp_hdr, fence_buf, fence_buf_offset);
 549                 }
 550                 TEST_VERBOSE(("CMD_FENCE_CONTRIB complete, size %d",
 551                               fence_buf_offset));
 552                 if (fence_buf) {
 553                     free(fence_buf);
 554                     fence_buf = NULL;
 555                     fence_buf_offset = 0;
 556                 }
 557                 contrib_cnt = 0;
 558             }
 559             break;
 560         case CMD_FENCE_COMPLETE:
 561             TEST_VERBOSE(("%d: CMD_FENCE_COMPLETE size %d", my_server_id,
 562                         msg_hdr.size));
 563             server->modex_cbfunc(PMIX_SUCCESS, msg_buf, msg_hdr.size,
 564                                  server->cbdata, _libpmix_cb, msg_buf);
 565             msg_buf = NULL;
 566             break;
 567         case CMD_DMDX_REQUEST: {
 568             int *sender_id;
 569             pmix_proc_t proc;
 570             if (NULL == msg_buf) {
 571                 abort();
 572             }
 573             sender_id = (int*)malloc(sizeof(int));
 574             server_unpack_dmdx(msg_buf, sender_id, &proc);
 575             TEST_VERBOSE(("%d: CMD_DMDX_REQUEST from %d: %s:%d", my_server_id,
 576                         *sender_id, proc.nspace, proc.rank));
 577             rc = PMIx_server_dmodex_request(&proc, _dmdx_cb, (void*)sender_id);
 578             break;
 579         }
 580         case CMD_DMDX_RESPONSE:
 581             TEST_VERBOSE(("%d: CMD_DMDX_RESPONSE", my_server_id));
 582             server->modex_cbfunc(PMIX_SUCCESS, msg_buf, msg_hdr.size,
 583                                  server->cbdata, _libpmix_cb, msg_buf);
 584             msg_buf = NULL;
 585             break;
 586     }
 587     if (NULL != msg_buf) {
 588         free(msg_buf);
 589     }
 590 }
 591 
 592 int server_fence_contrib(char *data, size_t ndata,
 593                          pmix_modex_cbfunc_t cbfunc, void *cbdata)
 594 {
 595     server_info_t *server;
 596     msg_hdr_t msg_hdr;
 597     int rc = PMIX_SUCCESS;
 598 
 599     if (0 == my_server_id) {
 600         server = my_server_info;
 601     } else {
 602         server = (server_info_t *)pmix_list_get_first(server_list);
 603     }
 604     msg_hdr.cmd = CMD_FENCE_CONTRIB;
 605     msg_hdr.dst_id = 0;
 606     msg_hdr.src_id = my_server_id;
 607     msg_hdr.size = ndata;
 608     server->modex_cbfunc = cbfunc;
 609     server->cbdata = cbdata;
 610 
 611     if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, data, ndata))) {
 612         return PMIX_ERROR;
 613     }
 614     return rc;
 615 }
 616 
 617 static int server_find_id(const char *nspace, int rank)
 618 {
 619     server_nspace_t *tmp;
 620 
 621     PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
 622         if (0 == strcmp(tmp->name, nspace)) {
 623             return tmp->task_map[rank];
 624         }
 625     }
 626     return -1;
 627 }
 628 
 629 static int server_pack_dmdx(int sender_id, const char *nspace, int rank,
 630                             char **buf)
 631 {
 632     size_t buf_size = sizeof(int) + PMIX_MAX_NSLEN +1 + sizeof(int);
 633     char *ptr;
 634 
 635     *buf = (char*)malloc(buf_size);
 636     ptr = *buf;
 637 
 638     memcpy(ptr, &sender_id, sizeof(int));
 639     ptr += sizeof(int);
 640 
 641     memcpy(ptr, nspace, PMIX_MAX_NSLEN+1);
 642     ptr += PMIX_MAX_NSLEN +1;
 643 
 644     memcpy(ptr, &rank, sizeof(int));
 645     ptr += sizeof(int);
 646 
 647     return buf_size;
 648 }
 649 
 650 static void server_unpack_dmdx(char *buf, int *sender, pmix_proc_t *proc)
 651 {
 652     char *ptr = buf;
 653 
 654     *sender = (int)*ptr;
 655     ptr += sizeof(int);
 656 
 657     memcpy(proc->nspace, ptr, PMIX_MAX_NSLEN +1);
 658     ptr += PMIX_MAX_NSLEN +1;
 659 
 660     proc->rank = (int)*ptr;
 661     ptr += sizeof(int);
 662 }
 663 
 664 
 665 static void _dmdx_cb(int status, char *data, size_t sz, void *cbdata)
 666 {
 667     msg_hdr_t msg_hdr;
 668     int *sender_id = (int*)cbdata;
 669 
 670     msg_hdr.cmd = CMD_DMDX_RESPONSE;
 671     msg_hdr.src_id = my_server_id;
 672     msg_hdr.size = sz;
 673     msg_hdr.dst_id = *sender_id;
 674     free(sender_id);
 675 
 676     server_send_msg(&msg_hdr, data, sz);
 677 }
 678 
 679 int server_dmdx_get(const char *nspace, int rank,
 680                     pmix_modex_cbfunc_t cbfunc, void *cbdata)
 681 {
 682     server_info_t *server = NULL, *tmp;
 683     msg_hdr_t msg_hdr;
 684     pmix_status_t rc = PMIX_SUCCESS;
 685     char *buf = NULL;
 686 
 687 
 688     if (0 > (msg_hdr.dst_id = server_find_id(nspace, rank))) {
 689         TEST_ERROR(("%d: server cannot found for %s:%d", my_server_id, nspace, rank));
 690         goto error;
 691     }
 692 
 693     if (0 == my_server_id) {
 694         PMIX_LIST_FOREACH(tmp, server_list, server_info_t) {
 695             if (tmp->idx == msg_hdr.dst_id) {
 696                 server = tmp;
 697                 break;
 698             }
 699         }
 700     } else {
 701         server = (server_info_t *)pmix_list_get_first(server_list);
 702     }
 703 
 704     if (server == NULL) {
 705         goto error;
 706     }
 707 
 708     msg_hdr.cmd = CMD_DMDX_REQUEST;
 709     msg_hdr.src_id = my_server_id;
 710     msg_hdr.size = server_pack_dmdx(my_server_id, nspace, rank, &buf);
 711     server->modex_cbfunc = cbfunc;
 712     server->cbdata = cbdata;
 713 
 714     if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, buf, msg_hdr.size))) {
 715         rc = PMIX_ERROR;
 716     }
 717     free(buf);
 718     return rc;
 719 
 720 error:
 721     cbfunc(PMIX_ERROR, NULL, 0, cbdata, NULL, 0);
 722     return PMIX_ERROR;
 723 }
 724 
 725 int server_init(test_params *params)
 726 {
 727     pmix_info_t info[1];
 728     int rc = PMIX_SUCCESS;
 729 
 730     /* fork/init servers procs */
 731     if (params->nservers >= 1) {
 732         int i;
 733         server_info_t *server_info = NULL;
 734         server_list = PMIX_NEW(pmix_list_t);
 735 
 736         TEST_VERBOSE(("pmix server %d started PID:%d", my_server_id, getpid()));
 737         for (i = params->nservers - 1; i >= 0; i--) {
 738             pid_t pid;
 739             server_info = PMIX_NEW(server_info_t);
 740 
 741             int fd1[2];
 742             int fd2[2];
 743 
 744             pipe(fd1);
 745             pipe(fd2);
 746 
 747             if (0 != i) {
 748                 pid = fork();
 749                 if (pid < 0) {
 750                     TEST_ERROR(("Fork failed"));
 751                     return pid;
 752                 }
 753                 if (pid == 0) {
 754                     server_list = PMIX_NEW(pmix_list_t);
 755                     my_server_id = i;
 756                     server_info->idx = 0;
 757                     server_info->pid = getppid();
 758                     server_info->rd_fd = fd1[0];
 759                     server_info->wr_fd = fd2[1];
 760                     close(fd1[1]);
 761                     close(fd2[0]);
 762                     PMIX_CONSTRUCT_LOCK(&server_info->lock);
 763                     pmix_list_append(server_list, &server_info->super);
 764                     break;
 765                 }
 766                 server_info->idx = i;
 767                 server_info->pid = pid;
 768                 server_info->wr_fd = fd1[1];
 769                 server_info->rd_fd = fd2[0];
 770                 PMIX_CONSTRUCT_LOCK(&server_info->lock);
 771                 close(fd1[0]);
 772                 close(fd2[1]);
 773             } else {
 774                 my_server_info = server_info;
 775                 server_info->pid = getpid();
 776                 server_info->idx  = 0;
 777                 server_info->rd_fd = fd1[0];
 778                 server_info->wr_fd = fd1[1];
 779                 PMIX_CONSTRUCT_LOCK(&server_info->lock);
 780                 close(fd2[0]);
 781                 close(fd2[1]);
 782             }
 783             TEST_VERBOSE(("%d: add server %d", my_server_id, server_info->idx));
 784             pmix_list_append(server_list, &server_info->super);
 785         }
 786     }
 787     /* compute local proc size */
 788     params->lsize = (params->nprocs % params->nservers) > (uint32_t)my_server_id ?
 789                 params->nprocs / params->nservers + 1 :
 790                 params->nprocs / params->nservers;
 791     /* setup the server library */
 792     (void)strncpy(info[0].key, PMIX_SOCKET_MODE, PMIX_MAX_KEYLEN);
 793     info[0].value.type = PMIX_UINT32;
 794     info[0].value.data.uint32 = 0666;
 795 
 796     server_nspace = PMIX_NEW(pmix_list_t);
 797 
 798     if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, 1))) {
 799         TEST_ERROR(("Init failed with error %d", rc));
 800         goto error;
 801     }
 802 
 803     /* register test server read thread */
 804     if (params->nservers && pmix_list_get_size(server_list)) {
 805         server_info_t *server;
 806         PMIX_LIST_FOREACH(server, server_list, server_info_t) {
 807             server->evread = pmix_event_new(pmix_globals.evbase, server->rd_fd,
 808                                             EV_READ|EV_PERSIST, server_read_cb, server);
 809             pmix_event_add(server->evread, NULL);
 810         }
 811     }
 812 
 813     /* register the errhandler */
 814     PMIx_Register_event_handler(NULL, 0, NULL, 0,
 815                                 errhandler, errhandler_reg_callbk, NULL);
 816 
 817     if (0 != (rc = server_barrier())) {
 818         goto error;
 819     }
 820 
 821     return PMIX_SUCCESS;
 822 
 823 error:
 824     PMIX_DESTRUCT(server_nspace);
 825     return rc;
 826 }
 827 
 828 int server_finalize(test_params *params)
 829 {
 830     int rc = PMIX_SUCCESS;
 831     int total_ret = 0;
 832 
 833     if (0 != (rc = server_barrier())) {
 834         total_ret++;
 835         goto exit;
 836     }
 837 
 838     if (0 != my_server_id) {
 839         server_info_t *server = (server_info_t*)pmix_list_get_first(server_list);
 840         remove_server_item(server);
 841     }
 842 
 843     if (params->nservers && 0 == my_server_id) {
 844         int ret;
 845         /* wait for all servers are finished */
 846         ret = srv_wait_all(10.0);
 847         if (!pmix_list_is_empty(server_list)) {
 848             total_ret += ret;
 849         }
 850         PMIX_LIST_RELEASE(server_list);
 851         TEST_VERBOSE(("SERVER %d FINALIZE PID:%d with status %d",
 852                     my_server_id, getpid(), ret));
 853         if (0 == total_ret) {
 854             TEST_OUTPUT(("Test finished OK!"));
 855         } else {
 856             rc = PMIX_ERROR;
 857         }
 858     }
 859     PMIX_LIST_RELEASE(server_nspace);
 860 
 861     /* finalize the server library */
 862     if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
 863         TEST_ERROR(("Finalize failed with error %d", rc));
 864         total_ret += rc;
 865         goto exit;
 866     }
 867 
 868 exit:
 869     return total_ret;
 870 }
 871 
 872 int server_launch_clients(int local_size, int univ_size, int base_rank,
 873                    test_params *params, char *** client_env, char ***base_argv)
 874 {
 875     int n;
 876     uid_t myuid;
 877     gid_t mygid;
 878     char *ranks = NULL;
 879     char digit[MAX_DIGIT_LEN];
 880     int rc;
 881     static int cli_counter = 0;
 882     static int num_ns = 0;
 883     pmix_proc_t proc;
 884     int rank_counter = 0;
 885     server_nspace_t *nspace_item = PMIX_NEW(server_nspace_t);
 886 
 887     TEST_VERBOSE(("%d: lsize: %d, base rank %d, local_size %d, univ_size %d",
 888                   my_server_id,
 889                   params->lsize,
 890                   base_rank,
 891                   local_size,
 892                   univ_size));
 893 
 894     TEST_VERBOSE(("Setting job info"));
 895     (void)snprintf(proc.nspace, PMIX_MAX_NSLEN, "%s-%d", TEST_NAMESPACE, num_ns);
 896     set_namespace(local_size, univ_size, base_rank, proc.nspace);
 897     if (NULL != ranks) {
 898         free(ranks);
 899     }
 900     /* add namespace entry */
 901     nspace_item->ntasks = univ_size;
 902     nspace_item->ltasks = local_size;
 903     nspace_item->task_map = (int*)malloc(sizeof(int) * univ_size);
 904     memset(nspace_item->task_map, -1, sizeof(int)*univ_size);
 905     strcpy(nspace_item->name, proc.nspace);
 906     pmix_list_append(server_nspace, &nspace_item->super);
 907     for (n = 0; n < local_size; n++) {
 908         proc.rank = base_rank + n;
 909         nspace_item->task_map[proc.rank] = my_server_id;
 910     }
 911 
 912     server_send_procs();
 913 
 914     myuid = getuid();
 915     mygid = getgid();
 916 
 917     /* fork/exec the test */
 918     for (n = 0; n < local_size; n++) {
 919         proc.rank = base_rank + rank_counter;
 920         if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, client_env))) {//n
 921             TEST_ERROR(("Server fork setup failed with error %d", rc));
 922             PMIx_server_finalize();
 923             cli_kill_all();
 924             return rc;
 925         }
 926         if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid, NULL, NULL, NULL))) {//n
 927             TEST_ERROR(("Server fork setup failed with error %d", rc));
 928             PMIx_server_finalize();
 929             cli_kill_all();
 930             return 0;
 931         }
 932 
 933         cli_info[cli_counter].pid = fork();
 934         if (cli_info[cli_counter].pid < 0) {
 935             TEST_ERROR(("Fork failed"));
 936             PMIx_server_finalize();
 937             cli_kill_all();
 938             return 0;
 939         }
 940         cli_info[cli_counter].rank = proc.rank;//n
 941         cli_info[cli_counter].ns = strdup(proc.nspace);
 942 
 943         char **client_argv = pmix_argv_copy(*base_argv);
 944 
 945         /* add two last arguments: -r <rank> */
 946         sprintf(digit, "%d", proc.rank);
 947         pmix_argv_append_nosize(&client_argv, "-r");
 948         pmix_argv_append_nosize(&client_argv, digit);
 949 
 950         pmix_argv_append_nosize(&client_argv, "-s");
 951         pmix_argv_append_nosize(&client_argv, proc.nspace);
 952 
 953         sprintf(digit, "%d", univ_size);
 954         pmix_argv_append_nosize(&client_argv, "--ns-size");
 955         pmix_argv_append_nosize(&client_argv, digit);
 956 
 957         sprintf(digit, "%d", num_ns);
 958         pmix_argv_append_nosize(&client_argv, "--ns-id");
 959         pmix_argv_append_nosize(&client_argv, digit);
 960 
 961         sprintf(digit, "%d", 0);
 962         pmix_argv_append_nosize(&client_argv, "--base-rank");
 963         pmix_argv_append_nosize(&client_argv, digit);
 964 
 965         if (cli_info[cli_counter].pid == 0) {
 966             if( !TEST_VERBOSE_GET() ){
 967                 // Hide clients stdout
 968                 if (NULL == freopen("/dev/null","w", stdout)) {
 969                     return 0;
 970                 }
 971             }
 972             execve(params->binary, client_argv, *client_env);
 973             /* Does not return */
 974             TEST_ERROR(("execve() failed"));
 975             return 0;
 976         }
 977         cli_info[cli_counter].state = CLI_FORKED;
 978 
 979         pmix_argv_free(client_argv);
 980 
 981         cli_counter++;
 982         rank_counter++;
 983     }
 984     num_ns++;
 985     return rank_counter;
 986 }

/* [<][>][^][v][top][bottom][index][help] */