This source file includes following definitions.
- sdes
- scon
- nsdes
- nscon
- release_cb
- fill_seq_ranks_array
- set_namespace
- server_unpack_procs
- server_pack_procs
- remove_server_item
- srv_wait_all
- server_fwd_msg
- server_send_msg
- _send_procs_cb
- server_send_procs
- server_barrier
- _libpmix_cb
- server_read_cb
- server_fence_contrib
- server_find_id
- server_pack_dmdx
- server_unpack_dmdx
- _dmdx_cb
- server_dmdx_get
- server_init
- server_finalize
- server_launch_clients
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <pthread.h>
19 #include <sys/types.h>
20 #include <sys/wait.h>
21
22 #include "pmix_server.h"
23 #include "src/include/pmix_globals.h"
24
25 #include "test_server.h"
26 #include "test_common.h"
27 #include "cli_stages.h"
28 #include "server_callbacks.h"
29
30 int my_server_id = 0;
31
32 server_info_t *my_server_info = NULL;
33 pmix_list_t *server_list = NULL;
34 pmix_list_t *server_nspace = NULL;
35
36 static void sdes(server_info_t *s)
37 {
38 close(s->rd_fd);
39 close(s->wr_fd);
40 if (s->evread) {
41 pmix_event_del(s->evread);
42 }
43 s->evread = NULL;
44 }
45
46 static void scon(server_info_t *s)
47 {
48 s->idx = 0;
49 s->pid = 0;
50 s->rd_fd = -1;
51 s->wr_fd = -1;
52 s->evread = NULL;
53 s->modex_cbfunc = NULL;
54 s->cbdata = NULL;
55 }
56
57 PMIX_CLASS_INSTANCE(server_info_t,
58 pmix_list_item_t,
59 scon, sdes);
60
61 static void nsdes(server_nspace_t *ns)
62 {
63 if (ns->task_map) {
64 free(ns->task_map);
65 }
66 }
67
68 static void nscon(server_nspace_t *ns)
69 {
70 memset(ns->name, 0, PMIX_MAX_NSLEN);
71 ns->ntasks = 0;
72 ns->task_map = NULL;
73 }
74
75 PMIX_CLASS_INSTANCE(server_nspace_t,
76 pmix_list_item_t,
77 nscon, nsdes);
78
79 static int server_send_procs(void);
80 static void server_read_cb(int fd, short event, void *arg);
81 static int srv_wait_all(double timeout);
82 static int server_fwd_msg(msg_hdr_t *msg_hdr, char *buf, size_t size);
83 static int server_send_msg(msg_hdr_t *msg_hdr, char *data, size_t size);
84 static void remove_server_item(server_info_t *server);
85 static void server_unpack_dmdx(char *buf, int *sender, pmix_proc_t *proc);
86 static int server_pack_dmdx(int sender_id, const char *nspace, int rank,
87 char **buf);
88 static void _dmdx_cb(int status, char *data, size_t sz, void *cbdata);
89
90 static void release_cb(pmix_status_t status, void *cbdata)
91 {
92 int *ptr = (int*)cbdata;
93 *ptr = 0;
94 }
95
96 static void fill_seq_ranks_array(size_t nprocs, int base_rank, char **ranks)
97 {
98 uint32_t i;
99 int len = 0, max_ranks_len;
100 if (0 >= nprocs) {
101 return;
102 }
103 max_ranks_len = nprocs * (MAX_DIGIT_LEN+1);
104 *ranks = (char*) malloc(max_ranks_len);
105 for (i = 0; i < nprocs; i++) {
106 len += snprintf(*ranks + len, max_ranks_len-len-1, "%d", i+base_rank);
107 if (i != nprocs-1) {
108 len += snprintf(*ranks + len, max_ranks_len-len-1, "%c", ',');
109 }
110 }
111 if (len >= max_ranks_len-1) {
112 free(*ranks);
113 *ranks = NULL;
114 TEST_ERROR(("Not enough allocated space for global ranks array."));
115 }
116 }
117
118 static void set_namespace(int local_size, int univ_size,
119 int base_rank, char *name)
120 {
121 size_t ninfo;
122 pmix_info_t *info;
123 ninfo = 8;
124 char *regex, *ppn;
125 char *ranks = NULL;
126
127 PMIX_INFO_CREATE(info, ninfo);
128 pmix_strncpy(info[0].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
129 info[0].value.type = PMIX_UINT32;
130 info[0].value.data.uint32 = univ_size;
131
132 pmix_strncpy(info[1].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
133 info[1].value.type = PMIX_UINT32;
134 info[1].value.data.uint32 = 0;
135
136 pmix_strncpy(info[2].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
137 info[2].value.type = PMIX_UINT32;
138 info[2].value.data.uint32 = local_size;
139
140
141 fill_seq_ranks_array(local_size, base_rank, &ranks);
142 if (NULL == ranks) {
143 return;
144 }
145 pmix_strncpy(info[3].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
146 info[3].value.type = PMIX_STRING;
147 info[3].value.data.string = strdup(ranks);
148 free(ranks);
149
150 PMIx_generate_regex(NODE_NAME, ®ex);
151 pmix_strncpy(info[4].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
152 info[4].value.type = PMIX_STRING;
153 info[4].value.data.string = regex;
154
155
156 fill_seq_ranks_array(univ_size, 0, &ranks);
157 if (NULL == ranks) {
158 return;
159 }
160 PMIx_generate_ppn(ranks, &ppn);
161 free(ranks);
162 pmix_strncpy(info[5].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
163 info[5].value.type = PMIX_STRING;
164 info[5].value.data.string = ppn;
165
166 pmix_strncpy(info[6].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
167 info[6].value.type = PMIX_UINT32;
168 info[6].value.data.uint32 = univ_size;
169
170 pmix_strncpy(info[7].key, PMIX_APPNUM, PMIX_MAX_KEYLEN);
171 info[7].value.type = PMIX_UINT32;
172 info[7].value.data.uint32 = getpid ();
173
174 int in_progress = 1, rc;
175 if (PMIX_SUCCESS == (rc = PMIx_server_register_nspace(name, local_size,
176 info, ninfo, release_cb, &in_progress))) {
177 PMIX_WAIT_FOR_COMPLETION(in_progress);
178 }
179 PMIX_INFO_FREE(info, ninfo);
180 }
181
182 static void server_unpack_procs(char *buf, size_t size)
183 {
184 char *ptr = buf;
185 size_t i;
186 size_t ns_count;
187 char *nspace;
188
189 while ((size_t)(ptr - buf) < size) {
190 ns_count = (size_t)*ptr;
191 ptr += sizeof(size_t);
192
193 for (i = 0; i < ns_count; i++) {
194 server_nspace_t *tmp, *ns_item = NULL;
195 size_t ltasks, ntasks;
196 int server_id;
197
198 server_id = *ptr;
199 ptr += sizeof(int);
200
201 nspace = ptr;
202 ptr += PMIX_MAX_NSLEN+1;
203
204 ntasks = (size_t)*ptr;
205 ptr += sizeof(size_t);
206
207 ltasks = (size_t)*ptr;
208 ptr += sizeof(size_t);
209
210 PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
211 if (0 == strcmp(nspace, tmp->name)) {
212 ns_item = tmp;
213 break;
214 }
215 }
216 if (NULL == ns_item) {
217 ns_item = PMIX_NEW(server_nspace_t);
218 memcpy(ns_item->name, nspace, PMIX_MAX_NSLEN);
219 pmix_list_append(server_nspace, &ns_item->super);
220 ns_item->ltasks = ltasks;
221 ns_item->ntasks = ntasks;
222 ns_item->task_map = (int*)malloc(sizeof(int) * ntasks);
223 memset(ns_item->task_map, -1, sizeof(int) * ntasks);
224 } else {
225 assert(ns_item->ntasks == ntasks);
226 }
227 size_t i;
228 for (i = 0; i < ltasks; i++) {
229 int rank = (int)*ptr;
230 ptr += sizeof(int);
231 if (ns_item->task_map[rank] >= 0) {
232 continue;
233 }
234 ns_item->task_map[rank] = server_id;
235 }
236 }
237 }
238 }
239
240 static size_t server_pack_procs(int server_id, char **buf, size_t size)
241 {
242 size_t ns_count = pmix_list_get_size(server_nspace);
243 size_t buf_size = sizeof(size_t) + (PMIX_MAX_NSLEN+1)*ns_count;
244 server_nspace_t *tmp;
245 char *ptr;
246
247 if (0 == ns_count) {
248 return 0;
249 }
250
251 buf_size += size;
252
253 PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
254 buf_size += sizeof(int) + sizeof(size_t) + sizeof(size_t) +
255 sizeof(int) * tmp->ltasks;
256 }
257 *buf = (char*)realloc(*buf, buf_size);
258 memset(*buf + size, 0, buf_size);
259 ptr = *buf + size;
260
261 memcpy(ptr, &ns_count, sizeof(size_t));
262 ptr += sizeof(size_t);
263
264 assert(server_nspace->pmix_list_length);
265
266 PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
267 size_t i;
268
269 memcpy(ptr, &server_id, sizeof(int));
270 ptr += sizeof(int);
271
272 memcpy(ptr, tmp->name, PMIX_MAX_NSLEN+1);
273 ptr += PMIX_MAX_NSLEN+1;
274
275 memcpy(ptr, &tmp->ntasks, sizeof(size_t));
276 ptr += sizeof(size_t);
277
278 memcpy(ptr, &tmp->ltasks, sizeof(size_t));
279 ptr += sizeof(size_t);
280
281 for(i = 0; i < tmp->ntasks; i++) {
282 if (tmp->task_map[i] == server_id) {
283 int rank = (int)i;
284 memcpy(ptr, &rank, sizeof(int));
285 ptr += sizeof(int);
286 }
287 }
288 }
289 assert((size_t)(ptr - *buf) == buf_size);
290 return buf_size;
291 }
292
293 static void remove_server_item(server_info_t *server)
294 {
295 pmix_list_remove_item(server_list, &server->super);
296 PMIX_DESTRUCT_LOCK(&server->lock);
297 PMIX_RELEASE(server);
298 }
299
300 static int srv_wait_all(double timeout)
301 {
302 server_info_t *server, *next;
303 pid_t pid;
304 int status;
305 struct timeval tv;
306 double start_time, cur_time;
307 int ret = 0;
308
309 gettimeofday(&tv, NULL);
310 start_time = tv.tv_sec + 1E-6*tv.tv_usec;
311 cur_time = start_time;
312
313
314 PMIX_LIST_FOREACH_SAFE(server, next, server_list, server_info_t) {
315 if (server->pid == getpid()) {
316
317 remove_server_item(server);
318 break;
319 }
320 }
321
322 while (!pmix_list_is_empty(server_list) &&
323 (timeout >= (cur_time - start_time))) {
324 pid = waitpid(-1, &status, 0);
325 if (pid >= 0) {
326 PMIX_LIST_FOREACH_SAFE(server, next, server_list, server_info_t) {
327 if (server->pid == pid) {
328 TEST_VERBOSE(("server %d finalize PID:%d with status %d", server->idx,
329 server->pid, WEXITSTATUS(status)));
330 ret += WEXITSTATUS(status);
331 remove_server_item(server);
332 }
333 }
334 }
335
336 gettimeofday(&tv, NULL);
337 cur_time = tv.tv_sec + 1E-6*tv.tv_usec;
338 }
339
340 return ret;
341 }
342
343 static int server_fwd_msg(msg_hdr_t *msg_hdr, char *buf, size_t size)
344 {
345 server_info_t *tmp_server, *server = NULL;
346 int rc = PMIX_SUCCESS;
347
348 PMIX_LIST_FOREACH(tmp_server, server_list, server_info_t) {
349 if (tmp_server->idx == msg_hdr->dst_id) {
350 server = tmp_server;
351 break;
352 }
353 }
354 if (NULL == server) {
355 return PMIX_ERROR;
356 }
357 rc = write(server->wr_fd, msg_hdr, sizeof(msg_hdr_t));
358 if (rc != sizeof(msg_hdr_t)) {
359 return PMIX_ERROR;
360 }
361 rc = write(server->wr_fd, buf, size);
362 if (rc != (ssize_t)size) {
363 return PMIX_ERROR;
364 }
365 return PMIX_SUCCESS;
366 }
367
368 static int server_send_msg(msg_hdr_t *msg_hdr, char *data, size_t size)
369 {
370 size_t ret = 0;
371 server_info_t *server = NULL, *server_tmp;
372 if (0 == my_server_id) {
373 PMIX_LIST_FOREACH(server_tmp, server_list, server_info_t) {
374 if (server_tmp->idx == msg_hdr->dst_id) {
375 server = server_tmp;
376 break;
377 }
378 }
379 if (NULL == server) {
380 abort();
381 }
382 } else {
383 server = (server_info_t *)pmix_list_get_first(server_list);
384 }
385
386 ret += write(server->wr_fd, msg_hdr, sizeof(msg_hdr_t));
387 ret += write(server->wr_fd, data, size);
388 if (ret != (sizeof(*msg_hdr) + size)) {
389 return PMIX_ERROR;
390 }
391 return PMIX_SUCCESS;
392 }
393
394 static void _send_procs_cb(pmix_status_t status, const char *data,
395 size_t ndata, void *cbdata,
396 pmix_release_cbfunc_t relfn, void *relcbd)
397 {
398 server_info_t *server = (server_info_t*)cbdata;
399
400 server_unpack_procs((char*)data, ndata);
401 free((char*)data);
402 PMIX_WAKEUP_THREAD(&server->lock);
403 }
404
405 static int server_send_procs(void)
406 {
407 server_info_t *server;
408 msg_hdr_t msg_hdr;
409 int rc = PMIX_SUCCESS;
410 char *buf = NULL;
411
412 if (0 == my_server_id) {
413 server = my_server_info;
414 } else {
415 server = (server_info_t *)pmix_list_get_first(server_list);
416 }
417
418 msg_hdr.cmd = CMD_FENCE_CONTRIB;
419 msg_hdr.dst_id = 0;
420 msg_hdr.src_id = my_server_id;
421 msg_hdr.size = server_pack_procs(my_server_id, &buf, 0);
422 server->modex_cbfunc = _send_procs_cb;
423 server->cbdata = (void*)server;
424
425 server->lock.active = true;
426
427 if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, buf, msg_hdr.size))) {
428 if (buf) {
429 free(buf);
430 }
431 return PMIX_ERROR;
432 }
433 if (buf) {
434 free(buf);
435 }
436
437 PMIX_WAIT_THREAD(&server->lock);
438 return PMIX_SUCCESS;
439 }
440
441 int server_barrier(void)
442 {
443 server_info_t *server;
444 msg_hdr_t msg_hdr;
445 int rc = PMIX_SUCCESS;
446
447 if (0 == my_server_id) {
448 server = my_server_info;
449 } else {
450 server = (server_info_t *)pmix_list_get_first(server_list);
451 }
452
453 msg_hdr.cmd = CMD_BARRIER_REQUEST;
454 msg_hdr.dst_id = 0;
455 msg_hdr.src_id = my_server_id;
456 msg_hdr.size = 0;
457
458 server->lock.active = true;
459
460 if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, NULL, 0))) {
461 return PMIX_ERROR;
462 }
463 PMIX_WAIT_THREAD(&server->lock);
464
465 return PMIX_SUCCESS;
466 }
467
468 static void _libpmix_cb(void *cbdata)
469 {
470 char *ptr = (char*)cbdata;
471 if (ptr) {
472 free(ptr);
473 }
474 }
475
476 static void server_read_cb(int fd, short event, void *arg)
477 {
478 server_info_t *server = (server_info_t*)arg;
479 msg_hdr_t msg_hdr;
480 char *msg_buf = NULL;
481 static char *fence_buf = NULL;
482 int rc;
483 static size_t barrier_cnt = 0;
484 static size_t contrib_cnt = 0;
485 static size_t fence_buf_offset = 0;
486
487 rc = read(server->rd_fd, &msg_hdr, sizeof(msg_hdr_t));
488 if (rc <= 0) {
489 return;
490 }
491 if (msg_hdr.size) {
492 msg_buf = (char*) malloc(sizeof(char) * msg_hdr.size);
493 rc += read(server->rd_fd, msg_buf, msg_hdr.size);
494 }
495 if (rc != (int)(sizeof(msg_hdr_t) + msg_hdr.size)) {
496 TEST_ERROR(("error read from %d", server->idx));
497 }
498
499 if (my_server_id != msg_hdr.dst_id) {
500 server_fwd_msg(&msg_hdr, msg_buf, msg_hdr.size);
501 free(msg_buf);
502 return;
503 }
504
505 switch(msg_hdr.cmd) {
506 case CMD_BARRIER_REQUEST:
507 barrier_cnt++;
508 TEST_VERBOSE(("CMD_BARRIER_REQ req from %d cnt %d", msg_hdr.src_id,
509 barrier_cnt));
510 if (pmix_list_get_size(server_list) == barrier_cnt) {
511 barrier_cnt = 0;
512 server_info_t *tmp_server;
513 PMIX_LIST_FOREACH(tmp_server, server_list, server_info_t) {
514 msg_hdr_t resp_hdr;
515 resp_hdr.dst_id = tmp_server->idx;
516 resp_hdr.src_id = my_server_id;
517 resp_hdr.cmd = CMD_BARRIER_RESPONSE;
518 resp_hdr.size = 0;
519 server_send_msg(&resp_hdr, NULL, 0);
520 }
521 }
522 break;
523 case CMD_BARRIER_RESPONSE:
524 TEST_VERBOSE(("%d: CMD_BARRIER_RESP", my_server_id));
525 PMIX_WAKEUP_THREAD(&server->lock);
526 break;
527 case CMD_FENCE_CONTRIB:
528 contrib_cnt++;
529 if (msg_hdr.size > 0) {
530 fence_buf = (char*)realloc((void*)fence_buf,
531 fence_buf_offset + msg_hdr.size);
532 memcpy(fence_buf + fence_buf_offset, msg_buf, msg_hdr.size);
533 fence_buf_offset += msg_hdr.size;
534 free(msg_buf);
535 msg_buf = NULL;
536 }
537
538 TEST_VERBOSE(("CMD_FENCE_CONTRIB req from %d cnt %d size %d",
539 msg_hdr.src_id, contrib_cnt, msg_hdr.size));
540 if (pmix_list_get_size(server_list) == contrib_cnt) {
541 server_info_t *tmp_server;
542 PMIX_LIST_FOREACH(tmp_server, server_list, server_info_t) {
543 msg_hdr_t resp_hdr;
544 resp_hdr.dst_id = tmp_server->idx;
545 resp_hdr.src_id = my_server_id;
546 resp_hdr.cmd = CMD_FENCE_COMPLETE;
547 resp_hdr.size = fence_buf_offset;
548 server_send_msg(&resp_hdr, fence_buf, fence_buf_offset);
549 }
550 TEST_VERBOSE(("CMD_FENCE_CONTRIB complete, size %d",
551 fence_buf_offset));
552 if (fence_buf) {
553 free(fence_buf);
554 fence_buf = NULL;
555 fence_buf_offset = 0;
556 }
557 contrib_cnt = 0;
558 }
559 break;
560 case CMD_FENCE_COMPLETE:
561 TEST_VERBOSE(("%d: CMD_FENCE_COMPLETE size %d", my_server_id,
562 msg_hdr.size));
563 server->modex_cbfunc(PMIX_SUCCESS, msg_buf, msg_hdr.size,
564 server->cbdata, _libpmix_cb, msg_buf);
565 msg_buf = NULL;
566 break;
567 case CMD_DMDX_REQUEST: {
568 int *sender_id;
569 pmix_proc_t proc;
570 if (NULL == msg_buf) {
571 abort();
572 }
573 sender_id = (int*)malloc(sizeof(int));
574 server_unpack_dmdx(msg_buf, sender_id, &proc);
575 TEST_VERBOSE(("%d: CMD_DMDX_REQUEST from %d: %s:%d", my_server_id,
576 *sender_id, proc.nspace, proc.rank));
577 rc = PMIx_server_dmodex_request(&proc, _dmdx_cb, (void*)sender_id);
578 break;
579 }
580 case CMD_DMDX_RESPONSE:
581 TEST_VERBOSE(("%d: CMD_DMDX_RESPONSE", my_server_id));
582 server->modex_cbfunc(PMIX_SUCCESS, msg_buf, msg_hdr.size,
583 server->cbdata, _libpmix_cb, msg_buf);
584 msg_buf = NULL;
585 break;
586 }
587 if (NULL != msg_buf) {
588 free(msg_buf);
589 }
590 }
591
592 int server_fence_contrib(char *data, size_t ndata,
593 pmix_modex_cbfunc_t cbfunc, void *cbdata)
594 {
595 server_info_t *server;
596 msg_hdr_t msg_hdr;
597 int rc = PMIX_SUCCESS;
598
599 if (0 == my_server_id) {
600 server = my_server_info;
601 } else {
602 server = (server_info_t *)pmix_list_get_first(server_list);
603 }
604 msg_hdr.cmd = CMD_FENCE_CONTRIB;
605 msg_hdr.dst_id = 0;
606 msg_hdr.src_id = my_server_id;
607 msg_hdr.size = ndata;
608 server->modex_cbfunc = cbfunc;
609 server->cbdata = cbdata;
610
611 if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, data, ndata))) {
612 return PMIX_ERROR;
613 }
614 return rc;
615 }
616
617 static int server_find_id(const char *nspace, int rank)
618 {
619 server_nspace_t *tmp;
620
621 PMIX_LIST_FOREACH(tmp, server_nspace, server_nspace_t) {
622 if (0 == strcmp(tmp->name, nspace)) {
623 return tmp->task_map[rank];
624 }
625 }
626 return -1;
627 }
628
629 static int server_pack_dmdx(int sender_id, const char *nspace, int rank,
630 char **buf)
631 {
632 size_t buf_size = sizeof(int) + PMIX_MAX_NSLEN +1 + sizeof(int);
633 char *ptr;
634
635 *buf = (char*)malloc(buf_size);
636 ptr = *buf;
637
638 memcpy(ptr, &sender_id, sizeof(int));
639 ptr += sizeof(int);
640
641 memcpy(ptr, nspace, PMIX_MAX_NSLEN+1);
642 ptr += PMIX_MAX_NSLEN +1;
643
644 memcpy(ptr, &rank, sizeof(int));
645 ptr += sizeof(int);
646
647 return buf_size;
648 }
649
650 static void server_unpack_dmdx(char *buf, int *sender, pmix_proc_t *proc)
651 {
652 char *ptr = buf;
653
654 *sender = (int)*ptr;
655 ptr += sizeof(int);
656
657 memcpy(proc->nspace, ptr, PMIX_MAX_NSLEN +1);
658 ptr += PMIX_MAX_NSLEN +1;
659
660 proc->rank = (int)*ptr;
661 ptr += sizeof(int);
662 }
663
664
665 static void _dmdx_cb(int status, char *data, size_t sz, void *cbdata)
666 {
667 msg_hdr_t msg_hdr;
668 int *sender_id = (int*)cbdata;
669
670 msg_hdr.cmd = CMD_DMDX_RESPONSE;
671 msg_hdr.src_id = my_server_id;
672 msg_hdr.size = sz;
673 msg_hdr.dst_id = *sender_id;
674 free(sender_id);
675
676 server_send_msg(&msg_hdr, data, sz);
677 }
678
679 int server_dmdx_get(const char *nspace, int rank,
680 pmix_modex_cbfunc_t cbfunc, void *cbdata)
681 {
682 server_info_t *server = NULL, *tmp;
683 msg_hdr_t msg_hdr;
684 pmix_status_t rc = PMIX_SUCCESS;
685 char *buf = NULL;
686
687
688 if (0 > (msg_hdr.dst_id = server_find_id(nspace, rank))) {
689 TEST_ERROR(("%d: server cannot found for %s:%d", my_server_id, nspace, rank));
690 goto error;
691 }
692
693 if (0 == my_server_id) {
694 PMIX_LIST_FOREACH(tmp, server_list, server_info_t) {
695 if (tmp->idx == msg_hdr.dst_id) {
696 server = tmp;
697 break;
698 }
699 }
700 } else {
701 server = (server_info_t *)pmix_list_get_first(server_list);
702 }
703
704 if (server == NULL) {
705 goto error;
706 }
707
708 msg_hdr.cmd = CMD_DMDX_REQUEST;
709 msg_hdr.src_id = my_server_id;
710 msg_hdr.size = server_pack_dmdx(my_server_id, nspace, rank, &buf);
711 server->modex_cbfunc = cbfunc;
712 server->cbdata = cbdata;
713
714 if (PMIX_SUCCESS != (rc = server_send_msg(&msg_hdr, buf, msg_hdr.size))) {
715 rc = PMIX_ERROR;
716 }
717 free(buf);
718 return rc;
719
720 error:
721 cbfunc(PMIX_ERROR, NULL, 0, cbdata, NULL, 0);
722 return PMIX_ERROR;
723 }
724
725 int server_init(test_params *params)
726 {
727 pmix_info_t info[1];
728 int rc = PMIX_SUCCESS;
729
730
731 if (params->nservers >= 1) {
732 int i;
733 server_info_t *server_info = NULL;
734 server_list = PMIX_NEW(pmix_list_t);
735
736 TEST_VERBOSE(("pmix server %d started PID:%d", my_server_id, getpid()));
737 for (i = params->nservers - 1; i >= 0; i--) {
738 pid_t pid;
739 server_info = PMIX_NEW(server_info_t);
740
741 int fd1[2];
742 int fd2[2];
743
744 pipe(fd1);
745 pipe(fd2);
746
747 if (0 != i) {
748 pid = fork();
749 if (pid < 0) {
750 TEST_ERROR(("Fork failed"));
751 return pid;
752 }
753 if (pid == 0) {
754 server_list = PMIX_NEW(pmix_list_t);
755 my_server_id = i;
756 server_info->idx = 0;
757 server_info->pid = getppid();
758 server_info->rd_fd = fd1[0];
759 server_info->wr_fd = fd2[1];
760 close(fd1[1]);
761 close(fd2[0]);
762 PMIX_CONSTRUCT_LOCK(&server_info->lock);
763 pmix_list_append(server_list, &server_info->super);
764 break;
765 }
766 server_info->idx = i;
767 server_info->pid = pid;
768 server_info->wr_fd = fd1[1];
769 server_info->rd_fd = fd2[0];
770 PMIX_CONSTRUCT_LOCK(&server_info->lock);
771 close(fd1[0]);
772 close(fd2[1]);
773 } else {
774 my_server_info = server_info;
775 server_info->pid = getpid();
776 server_info->idx = 0;
777 server_info->rd_fd = fd1[0];
778 server_info->wr_fd = fd1[1];
779 PMIX_CONSTRUCT_LOCK(&server_info->lock);
780 close(fd2[0]);
781 close(fd2[1]);
782 }
783 TEST_VERBOSE(("%d: add server %d", my_server_id, server_info->idx));
784 pmix_list_append(server_list, &server_info->super);
785 }
786 }
787
788 params->lsize = (params->nprocs % params->nservers) > (uint32_t)my_server_id ?
789 params->nprocs / params->nservers + 1 :
790 params->nprocs / params->nservers;
791
792 (void)strncpy(info[0].key, PMIX_SOCKET_MODE, PMIX_MAX_KEYLEN);
793 info[0].value.type = PMIX_UINT32;
794 info[0].value.data.uint32 = 0666;
795
796 server_nspace = PMIX_NEW(pmix_list_t);
797
798 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, 1))) {
799 TEST_ERROR(("Init failed with error %d", rc));
800 goto error;
801 }
802
803
804 if (params->nservers && pmix_list_get_size(server_list)) {
805 server_info_t *server;
806 PMIX_LIST_FOREACH(server, server_list, server_info_t) {
807 server->evread = pmix_event_new(pmix_globals.evbase, server->rd_fd,
808 EV_READ|EV_PERSIST, server_read_cb, server);
809 pmix_event_add(server->evread, NULL);
810 }
811 }
812
813
814 PMIx_Register_event_handler(NULL, 0, NULL, 0,
815 errhandler, errhandler_reg_callbk, NULL);
816
817 if (0 != (rc = server_barrier())) {
818 goto error;
819 }
820
821 return PMIX_SUCCESS;
822
823 error:
824 PMIX_DESTRUCT(server_nspace);
825 return rc;
826 }
827
828 int server_finalize(test_params *params)
829 {
830 int rc = PMIX_SUCCESS;
831 int total_ret = 0;
832
833 if (0 != (rc = server_barrier())) {
834 total_ret++;
835 goto exit;
836 }
837
838 if (0 != my_server_id) {
839 server_info_t *server = (server_info_t*)pmix_list_get_first(server_list);
840 remove_server_item(server);
841 }
842
843 if (params->nservers && 0 == my_server_id) {
844 int ret;
845
846 ret = srv_wait_all(10.0);
847 if (!pmix_list_is_empty(server_list)) {
848 total_ret += ret;
849 }
850 PMIX_LIST_RELEASE(server_list);
851 TEST_VERBOSE(("SERVER %d FINALIZE PID:%d with status %d",
852 my_server_id, getpid(), ret));
853 if (0 == total_ret) {
854 TEST_OUTPUT(("Test finished OK!"));
855 } else {
856 rc = PMIX_ERROR;
857 }
858 }
859 PMIX_LIST_RELEASE(server_nspace);
860
861
862 if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
863 TEST_ERROR(("Finalize failed with error %d", rc));
864 total_ret += rc;
865 goto exit;
866 }
867
868 exit:
869 return total_ret;
870 }
871
872 int server_launch_clients(int local_size, int univ_size, int base_rank,
873 test_params *params, char *** client_env, char ***base_argv)
874 {
875 int n;
876 uid_t myuid;
877 gid_t mygid;
878 char *ranks = NULL;
879 char digit[MAX_DIGIT_LEN];
880 int rc;
881 static int cli_counter = 0;
882 static int num_ns = 0;
883 pmix_proc_t proc;
884 int rank_counter = 0;
885 server_nspace_t *nspace_item = PMIX_NEW(server_nspace_t);
886
887 TEST_VERBOSE(("%d: lsize: %d, base rank %d, local_size %d, univ_size %d",
888 my_server_id,
889 params->lsize,
890 base_rank,
891 local_size,
892 univ_size));
893
894 TEST_VERBOSE(("Setting job info"));
895 (void)snprintf(proc.nspace, PMIX_MAX_NSLEN, "%s-%d", TEST_NAMESPACE, num_ns);
896 set_namespace(local_size, univ_size, base_rank, proc.nspace);
897 if (NULL != ranks) {
898 free(ranks);
899 }
900
901 nspace_item->ntasks = univ_size;
902 nspace_item->ltasks = local_size;
903 nspace_item->task_map = (int*)malloc(sizeof(int) * univ_size);
904 memset(nspace_item->task_map, -1, sizeof(int)*univ_size);
905 strcpy(nspace_item->name, proc.nspace);
906 pmix_list_append(server_nspace, &nspace_item->super);
907 for (n = 0; n < local_size; n++) {
908 proc.rank = base_rank + n;
909 nspace_item->task_map[proc.rank] = my_server_id;
910 }
911
912 server_send_procs();
913
914 myuid = getuid();
915 mygid = getgid();
916
917
918 for (n = 0; n < local_size; n++) {
919 proc.rank = base_rank + rank_counter;
920 if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, client_env))) {
921 TEST_ERROR(("Server fork setup failed with error %d", rc));
922 PMIx_server_finalize();
923 cli_kill_all();
924 return rc;
925 }
926 if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid, NULL, NULL, NULL))) {
927 TEST_ERROR(("Server fork setup failed with error %d", rc));
928 PMIx_server_finalize();
929 cli_kill_all();
930 return 0;
931 }
932
933 cli_info[cli_counter].pid = fork();
934 if (cli_info[cli_counter].pid < 0) {
935 TEST_ERROR(("Fork failed"));
936 PMIx_server_finalize();
937 cli_kill_all();
938 return 0;
939 }
940 cli_info[cli_counter].rank = proc.rank;
941 cli_info[cli_counter].ns = strdup(proc.nspace);
942
943 char **client_argv = pmix_argv_copy(*base_argv);
944
945
946 sprintf(digit, "%d", proc.rank);
947 pmix_argv_append_nosize(&client_argv, "-r");
948 pmix_argv_append_nosize(&client_argv, digit);
949
950 pmix_argv_append_nosize(&client_argv, "-s");
951 pmix_argv_append_nosize(&client_argv, proc.nspace);
952
953 sprintf(digit, "%d", univ_size);
954 pmix_argv_append_nosize(&client_argv, "--ns-size");
955 pmix_argv_append_nosize(&client_argv, digit);
956
957 sprintf(digit, "%d", num_ns);
958 pmix_argv_append_nosize(&client_argv, "--ns-id");
959 pmix_argv_append_nosize(&client_argv, digit);
960
961 sprintf(digit, "%d", 0);
962 pmix_argv_append_nosize(&client_argv, "--base-rank");
963 pmix_argv_append_nosize(&client_argv, digit);
964
965 if (cli_info[cli_counter].pid == 0) {
966 if( !TEST_VERBOSE_GET() ){
967
968 if (NULL == freopen("/dev/null","w", stdout)) {
969 return 0;
970 }
971 }
972 execve(params->binary, client_argv, *client_env);
973
974 TEST_ERROR(("execve() failed"));
975 return 0;
976 }
977 cli_info[cli_counter].state = CLI_FORKED;
978
979 pmix_argv_free(client_argv);
980
981 cli_counter++;
982 rank_counter++;
983 }
984 num_ns++;
985 return rank_counter;
986 }