This source file includes following definitions.
- init_server
- execute
- pmix_server_publish_fn
- pmix_server_lookup_fn
- pmix_server_unpublish_fn
- pmix_server_keyval_client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include "orte_config.h"
30
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34
35 #include "opal/util/argv.h"
36 #include "opal/util/output.h"
37 #include "opal/dss/dss.h"
38
39 #include "orte/mca/errmgr/errmgr.h"
40 #include "orte/util/name_fns.h"
41 #include "orte/util/show_help.h"
42 #include "orte/util/threads.h"
43 #include "orte/runtime/orte_data_server.h"
44 #include "orte/runtime/orte_globals.h"
45 #include "orte/mca/rml/rml.h"
46 #include "orte/mca/rml/base/rml_contact.h"
47
48 #include "pmix_server_internal.h"
49
50 static int init_server(void)
51 {
52 char *server;
53 opal_value_t val;
54 char input[1024], *filename;
55 FILE *fp;
56 int rc;
57
58
59 orte_pmix_server_globals.pubsub_init = true;
60
61
62
63 if (NULL == orte_data_server_uri) {
64 orte_pmix_server_globals.server = *ORTE_PROC_MY_HNP;
65 } else {
66 if (0 == strncmp(orte_data_server_uri, "file", strlen("file")) ||
67 0 == strncmp(orte_data_server_uri, "FILE", strlen("FILE"))) {
68
69 filename = strchr(orte_data_server_uri, ':');
70 if (NULL == filename) {
71
72 orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true,
73 orte_basename, orte_data_server_uri);
74 return ORTE_ERR_BAD_PARAM;
75 }
76 ++filename;
77
78 if (0 >= strlen(filename)) {
79
80 orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true,
81 orte_basename, orte_data_server_uri);
82 return ORTE_ERR_BAD_PARAM;
83 }
84
85
86 fp = fopen(filename, "r");
87 if (NULL == fp) {
88 orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true,
89 orte_basename, orte_data_server_uri);
90 return ORTE_ERR_BAD_PARAM;
91 }
92 if (NULL == fgets(input, 1024, fp)) {
93
94 fclose(fp);
95 orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true,
96 orte_basename, orte_data_server_uri,
97 orte_basename);
98 return ORTE_ERR_BAD_PARAM;
99 }
100 fclose(fp);
101 input[strlen(input)-1] = '\0';
102 server = strdup(input);
103 } else {
104 server = strdup(orte_data_server_uri);
105 }
106
107 if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(server, &orte_pmix_server_globals.server, NULL))) {
108 ORTE_ERROR_LOG(rc);
109 free(server);
110 return rc;
111 }
112
113 OBJ_CONSTRUCT(&val, opal_value_t);
114 val.key = OPAL_PMIX_PROC_URI;
115 val.type = OPAL_STRING;
116 val.data.string = server;
117 if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&orte_pmix_server_globals.server, &val))) {
118 ORTE_ERROR_LOG(rc);
119 val.key = NULL;
120 OBJ_DESTRUCT(&val);
121 return rc;
122 }
123 val.key = NULL;
124 OBJ_DESTRUCT(&val);
125
126
127
128
129
130 if (orte_pmix_server_globals.wait_for_server) {
131
132 struct timeval timeout;
133 timeout.tv_sec = orte_pmix_server_globals.timeout;
134 timeout.tv_usec = 0;
135 if (ORTE_SUCCESS != (rc = orte_rml.ping(server, &timeout))) {
136
137 if (ORTE_SUCCESS != (rc = orte_rml.ping(server, &timeout))) {
138
139 orte_show_help("help-orterun.txt", "orterun:server-not-found", true,
140 orte_basename, server,
141 (long)orte_pmix_server_globals.timeout,
142 ORTE_ERROR_NAME(rc));
143 ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
144 return rc;
145 }
146 }
147 }
148 }
149
150 return ORTE_SUCCESS;
151 }
152
153 static void execute(int sd, short args, void *cbdata)
154 {
155 pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
156 int rc;
157 opal_buffer_t *xfer;
158 orte_process_name_t *target;
159
160 ORTE_ACQUIRE_OBJECT(req);
161
162 if (!orte_pmix_server_globals.pubsub_init) {
163
164 if (ORTE_SUCCESS != (rc = init_server())) {
165 orte_show_help("help-orted.txt", "noserver", true,
166 (NULL == orte_data_server_uri) ?
167 "NULL" : orte_data_server_uri);
168 goto callback;
169 }
170 }
171
172
173 if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
174 orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
175 goto callback;
176 }
177
178
179 xfer = OBJ_NEW(opal_buffer_t);
180
181 if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &req->room_num, 1, OPAL_INT))) {
182 ORTE_ERROR_LOG(rc);
183 OBJ_RELEASE(xfer);
184 goto callback;
185 }
186 opal_dss.copy_payload(xfer, &req->msg);
187
188
189 if (OPAL_PMIX_RANGE_SESSION == req->range) {
190 opal_output_verbose(1, orte_pmix_server_globals.output,
191 "%s orted:pmix:server range SESSION",
192 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
193 target = &orte_pmix_server_globals.server;
194 } else if (OPAL_PMIX_RANGE_LOCAL == req->range) {
195
196 opal_output_verbose(1, orte_pmix_server_globals.output,
197 "%s orted:pmix:server range LOCAL",
198 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
199 target = ORTE_PROC_MY_NAME;
200 } else {
201 opal_output_verbose(1, orte_pmix_server_globals.output,
202 "%s orted:pmix:server range GLOBAL",
203 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
204 target = ORTE_PROC_MY_HNP;
205 }
206
207
208 rc = orte_rml.send_buffer_nb(target, xfer,
209 ORTE_RML_TAG_DATA_SERVER,
210 orte_rml_send_callback, NULL);
211 if (ORTE_SUCCESS == rc) {
212 return;
213 }
214
215 callback:
216
217 if (NULL != req->opcbfunc) {
218 req->opcbfunc(rc, req->cbdata);
219 } else if (NULL != req->lkcbfunc) {
220 req->lkcbfunc(rc, NULL, req->cbdata);
221 }
222 opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
223 OBJ_RELEASE(req);
224 }
225
226 int pmix_server_publish_fn(opal_process_name_t *proc,
227 opal_list_t *info,
228 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
229 {
230 pmix_server_req_t *req;
231 int rc;
232 uint8_t cmd = ORTE_PMIX_PUBLISH_CMD;
233 opal_value_t *iptr;
234 opal_pmix_persistence_t persist = OPAL_PMIX_PERSIST_APP;
235 bool rset, pset;
236
237 opal_output_verbose(1, orte_pmix_server_globals.output,
238 "%s orted:pmix:server PUBLISH",
239 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
240
241
242 req = OBJ_NEW(pmix_server_req_t);
243 opal_asprintf(&req->operation, "PUBLISH: %s:%d", __FILE__, __LINE__);
244 req->opcbfunc = cbfunc;
245 req->cbdata = cbdata;
246
247
248 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
249 ORTE_ERROR_LOG(rc);
250 OBJ_RELEASE(req);
251 return rc;
252 }
253
254
255 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, proc, 1, OPAL_NAME))) {
256 ORTE_ERROR_LOG(rc);
257 OBJ_RELEASE(req);
258 return rc;
259 }
260
261
262 rset = false;
263 pset = false;
264 OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
265 if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
266 req->range = (opal_pmix_data_range_t)iptr->data.uint;
267 if (pset) {
268 break;
269 }
270 rset = true;
271 } else if (0 == strcmp(iptr->key, OPAL_PMIX_PERSISTENCE)) {
272 persist = (opal_pmix_persistence_t)iptr->data.integer;
273 if (rset) {
274 break;
275 }
276 pset = true;
277 }
278 }
279
280
281 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) {
282 ORTE_ERROR_LOG(rc);
283 OBJ_RELEASE(req);
284 return rc;
285 }
286
287
288 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &persist, 1, OPAL_INT))) {
289 ORTE_ERROR_LOG(rc);
290 OBJ_RELEASE(req);
291 return rc;
292 }
293
294
295
296 OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
297 if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE) ||
298 0 == strcmp(iptr->key, OPAL_PMIX_PERSISTENCE)) {
299 continue;
300 }
301 if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
302
303 req->timeout = iptr->data.integer;
304 continue;
305 }
306 opal_output_verbose(5, orte_pmix_server_globals.output,
307 "%s publishing data %s of type %d from source %s",
308 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, iptr->type,
309 ORTE_NAME_PRINT(proc));
310 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
311 ORTE_ERROR_LOG(rc);
312 OBJ_RELEASE(req);
313 return rc;
314 }
315 }
316
317
318 opal_event_set(orte_event_base, &(req->ev),
319 -1, OPAL_EV_WRITE, execute, req);
320 opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
321 ORTE_POST_OBJECT(req);
322 opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
323
324 return OPAL_SUCCESS;
325
326 }
327
328 int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys,
329 opal_list_t *info,
330 opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
331 {
332 pmix_server_req_t *req;
333 int rc;
334 uint8_t cmd = ORTE_PMIX_LOOKUP_CMD;
335 int32_t nkeys, i;
336 opal_value_t *iptr;
337
338
339
340
341
342
343 req = OBJ_NEW(pmix_server_req_t);
344 opal_asprintf(&req->operation, "LOOKUP: %s:%d", __FILE__, __LINE__);
345 req->lkcbfunc = cbfunc;
346 req->cbdata = cbdata;
347
348
349 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
350 ORTE_ERROR_LOG(rc);
351 OBJ_RELEASE(req);
352 return rc;
353 }
354
355
356 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &proc->jobid, 1, ORTE_JOBID))) {
357 ORTE_ERROR_LOG(rc);
358 OBJ_RELEASE(req);
359 return rc;
360 }
361
362
363 OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
364 if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
365 req->range = (opal_pmix_data_range_t)iptr->data.uint;
366 break;
367 }
368 }
369
370
371 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) {
372 ORTE_ERROR_LOG(rc);
373 OBJ_RELEASE(req);
374 return rc;
375 }
376
377
378 nkeys = opal_argv_count(keys);
379 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
380 ORTE_ERROR_LOG(rc);
381 OBJ_RELEASE(req);
382 return rc;
383 }
384
385
386 for (i=0; i < nkeys; i++) {
387 opal_output_verbose(5, orte_pmix_server_globals.output,
388 "%s lookup data %s for proc %s",
389 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), keys[i],
390 ORTE_NAME_PRINT(proc));
391 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &keys[i], 1, OPAL_STRING))) {
392 ORTE_ERROR_LOG(rc);
393 OBJ_RELEASE(req);
394 return rc;
395 }
396 }
397
398
399 OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
400 if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
401 continue;
402 }
403 if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
404
405 req->timeout = iptr->data.integer;
406 continue;
407 }
408 opal_output_verbose(2, orte_pmix_server_globals.output,
409 "%s lookup directive %s for proc %s",
410 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key,
411 ORTE_NAME_PRINT(proc));
412 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
413 ORTE_ERROR_LOG(rc);
414 OBJ_RELEASE(req);
415 return rc;
416 }
417 }
418
419
420 opal_event_set(orte_event_base, &(req->ev),
421 -1, OPAL_EV_WRITE, execute, req);
422 opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
423 ORTE_POST_OBJECT(req);
424 opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
425
426 return OPAL_SUCCESS;
427 }
428
429 int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys,
430 opal_list_t *info,
431 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
432 {
433 pmix_server_req_t *req;
434 int rc;
435 uint8_t cmd = ORTE_PMIX_UNPUBLISH_CMD;
436 uint32_t nkeys, n;
437 opal_value_t *iptr;
438
439
440 req = OBJ_NEW(pmix_server_req_t);
441 opal_asprintf(&req->operation, "UNPUBLISH: %s:%d", __FILE__, __LINE__);
442 req->opcbfunc = cbfunc;
443 req->cbdata = cbdata;
444
445
446 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
447 ORTE_ERROR_LOG(rc);
448 OBJ_RELEASE(req);
449 return rc;
450 }
451
452
453 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, proc, 1, OPAL_NAME))) {
454 ORTE_ERROR_LOG(rc);
455 OBJ_RELEASE(req);
456 return rc;
457 }
458
459
460 OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
461 if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
462 req->range = (opal_pmix_data_range_t)iptr->data.integer;
463 break;
464 }
465 }
466
467
468 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_INT))) {
469 ORTE_ERROR_LOG(rc);
470 OBJ_RELEASE(req);
471 return rc;
472 }
473
474
475 nkeys = opal_argv_count(keys);
476 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
477 ORTE_ERROR_LOG(rc);
478 OBJ_RELEASE(req);
479 return rc;
480 }
481
482
483 for (n=0; n < nkeys; n++) {
484 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &keys[n], 1, OPAL_STRING))) {
485 ORTE_ERROR_LOG(rc);
486 OBJ_RELEASE(req);
487 return rc;
488 }
489 }
490
491
492 OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
493 if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
494 continue;
495 }
496 if (0 == strcmp(iptr->key, OPAL_PMIX_TIMEOUT)) {
497
498 req->timeout = iptr->data.integer;
499 continue;
500 }
501 if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
502 ORTE_ERROR_LOG(rc);
503 OBJ_RELEASE(req);
504 return rc;
505 }
506 }
507
508
509 opal_event_set(orte_event_base, &(req->ev),
510 -1, OPAL_EV_WRITE, execute, req);
511 opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
512 ORTE_POST_OBJECT(req);
513 opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
514
515 return OPAL_SUCCESS;
516 }
517
518 void pmix_server_keyval_client(int status, orte_process_name_t* sender,
519 opal_buffer_t *buffer,
520 orte_rml_tag_t tg, void *cbdata)
521 {
522 int rc, ret, room_num = -1;
523 int32_t cnt;
524 pmix_server_req_t *req=NULL;
525 opal_list_t info;
526 opal_value_t *iptr;
527 opal_pmix_pdata_t *pdata;
528 opal_process_name_t source;
529
530 opal_output_verbose(1, orte_pmix_server_globals.output,
531 "%s recvd lookup data return",
532 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
533
534 OBJ_CONSTRUCT(&info, opal_list_t);
535
536 cnt = 1;
537 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
538 ORTE_ERROR_LOG(rc);
539 return;
540 }
541
542
543 cnt = 1;
544 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
545 ORTE_ERROR_LOG(rc);
546 ret = rc;
547 goto release;
548 }
549
550 opal_output_verbose(5, orte_pmix_server_globals.output,
551 "%s recvd lookup returned status %d",
552 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret);
553
554 if (ORTE_SUCCESS == ret) {
555
556 cnt = 1;
557 while (OPAL_SUCCESS == opal_dss.unpack(buffer, &source, &cnt, OPAL_NAME)) {
558 pdata = OBJ_NEW(opal_pmix_pdata_t);
559 pdata->proc = source;
560 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &cnt, OPAL_VALUE))) {
561 ORTE_ERROR_LOG(rc);
562 OBJ_RELEASE(pdata);
563 continue;
564 }
565 opal_output_verbose(5, orte_pmix_server_globals.output,
566 "%s recvd lookup returned data %s of type %d from source %s",
567 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), iptr->key, iptr->type,
568 ORTE_NAME_PRINT(&source));
569 if (OPAL_SUCCESS != (rc = opal_value_xfer(&pdata->value, iptr))) {
570 ORTE_ERROR_LOG(rc);
571 OBJ_RELEASE(pdata);
572 OBJ_RELEASE(iptr);
573 continue;
574 }
575 OBJ_RELEASE(iptr);
576 opal_list_append(&info, &pdata->super);
577 }
578 }
579
580 release:
581 if (0 <= room_num) {
582
583 opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
584 }
585
586 if (NULL != req) {
587
588 if (NULL != req->opcbfunc) {
589 req->opcbfunc(ret, req->cbdata);
590 } else if (NULL != req->lkcbfunc) {
591 req->lkcbfunc(ret, &info, req->cbdata);
592 } else {
593
594 ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
595 }
596
597
598 OPAL_LIST_DESTRUCT(&info);
599 OBJ_RELEASE(req);
600 }
601 }