This source file includes following definitions.
- relcbfunc
- query_cbfunc
- acb
- PMIx_Job_control
- PMIx_Job_control_nb
- PMIx_Process_monitor
- PMIx_Process_monitor_nb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include <src/include/pmix_config.h>
16
17 #include <src/include/pmix_stdint.h>
18 #include <src/include/pmix_socket_errno.h>
19
20 #include <pmix.h>
21 #include <pmix_common.h>
22 #include <pmix_server.h>
23 #include <pmix_rename.h>
24
25 #include "src/threads/threads.h"
26 #include "src/util/argv.h"
27 #include "src/util/error.h"
28 #include "src/util/name_fns.h"
29 #include "src/util/output.h"
30 #include "src/mca/bfrops/bfrops.h"
31 #include "src/mca/ptl/ptl.h"
32
33 #include "src/client/pmix_client_ops.h"
34 #include "src/server/pmix_server_ops.h"
35 #include "src/include/pmix_globals.h"
36
37 static void relcbfunc(void *cbdata)
38 {
39 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
40
41 pmix_output_verbose(2, pmix_globals.debug_output,
42 "pmix:job_ctrl release callback");
43
44 if (NULL != cd->info) {
45 PMIX_INFO_FREE(cd->info, cd->ninfo);
46 }
47 PMIX_RELEASE(cd);
48 }
49 static void query_cbfunc(struct pmix_peer_t *peer,
50 pmix_ptl_hdr_t *hdr,
51 pmix_buffer_t *buf, void *cbdata)
52 {
53 pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
54 pmix_status_t rc;
55 pmix_shift_caddy_t *results;
56 int cnt;
57
58 pmix_output_verbose(2, pmix_globals.debug_output,
59 "pmix:job_ctrl cback from server with %d bytes",
60 (int)buf->bytes_used);
61
62
63
64 if (PMIX_BUFFER_IS_EMPTY(buf)) {
65
66 if (NULL != cd->cbfunc) {
67 cd->cbfunc(PMIX_ERR_COMM_FAILURE, NULL, 0, cd->cbdata, NULL, NULL);
68 }
69 PMIX_RELEASE(cd);
70 return;
71 }
72
73 results = PMIX_NEW(pmix_shift_caddy_t);
74
75
76 cnt = 1;
77 PMIX_BFROPS_UNPACK(rc, peer, buf, &results->status, &cnt, PMIX_STATUS);
78 if (PMIX_SUCCESS != rc) {
79 PMIX_ERROR_LOG(rc);
80 goto complete;
81 }
82 if (PMIX_SUCCESS != results->status) {
83 goto complete;
84 }
85
86
87 cnt = 1;
88 PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE);
89 if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
90 PMIX_ERROR_LOG(rc);
91 goto complete;
92 }
93 if (0 < results->ninfo) {
94 PMIX_INFO_CREATE(results->info, results->ninfo);
95 cnt = results->ninfo;
96 PMIX_BFROPS_UNPACK(rc, peer, buf, results->info, &cnt, PMIX_INFO);
97 if (PMIX_SUCCESS != rc) {
98 PMIX_ERROR_LOG(rc);
99 goto complete;
100 }
101 }
102
103 complete:
104 pmix_output_verbose(2, pmix_globals.debug_output,
105 "pmix:job_ctrl cback from server releasing");
106
107 if (NULL != cd->cbfunc) {
108 cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results);
109 } else {
110 PMIX_RELEASE(results);
111 }
112 PMIX_RELEASE(cd);
113 }
114
115 static void acb(pmix_status_t status,
116 pmix_info_t *info, size_t ninfo,
117 void *cbdata,
118 pmix_release_cbfunc_t release_fn,
119 void *release_cbdata)
120 {
121 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
122 cb->status = status;
123 if (NULL != release_fn) {
124 release_fn(release_cbdata);
125 }
126 PMIX_WAKEUP_THREAD(&cb->lock);
127 }
128
129 PMIX_EXPORT pmix_status_t PMIx_Job_control(const pmix_proc_t targets[], size_t ntargets,
130 const pmix_info_t directives[], size_t ndirs)
131 {
132 pmix_cb_t cb;
133 pmix_status_t rc;
134
135 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
136
137 if (pmix_globals.init_cntr <= 0) {
138 PMIX_RELEASE_THREAD(&pmix_global_lock);
139 return PMIX_ERR_INIT;
140 }
141 PMIX_RELEASE_THREAD(&pmix_global_lock);
142
143 pmix_output_verbose(2, pmix_globals.debug_output,
144 "%s pmix:job_ctrl", PMIX_NAME_PRINT(&pmix_globals.myid));
145
146
147
148
149 PMIX_CONSTRUCT(&cb, pmix_cb_t);
150 if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(targets, ntargets,
151 directives, ndirs,
152 acb, &cb))) {
153 PMIX_DESTRUCT(&cb);
154 return rc;
155 }
156
157
158 PMIX_WAIT_THREAD(&cb.lock);
159 rc = cb.status;
160 PMIX_DESTRUCT(&cb);
161
162 pmix_output_verbose(2, pmix_globals.debug_output,
163 "pmix:job_ctrl completed");
164
165 return rc;
166 }
167
168 PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_t ntargets,
169 const pmix_info_t directives[], size_t ndirs,
170 pmix_info_cbfunc_t cbfunc, void *cbdata)
171 {
172 pmix_buffer_t *msg;
173 pmix_cmd_t cmd = PMIX_JOB_CONTROL_CMD;
174 pmix_status_t rc;
175 pmix_query_caddy_t *cb;
176
177 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
178
179 pmix_output_verbose(2, pmix_globals.debug_output,
180 "pmix: job control called with %d directives", (int)ndirs);
181
182 if (pmix_globals.init_cntr <= 0) {
183 PMIX_RELEASE_THREAD(&pmix_global_lock);
184 return PMIX_ERR_INIT;
185 }
186
187
188
189 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
190 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
191 PMIX_RELEASE_THREAD(&pmix_global_lock);
192 if (NULL == pmix_host_server.job_control) {
193
194 return PMIX_ERR_NOT_SUPPORTED;
195 }
196 pmix_output_verbose(2, pmix_globals.debug_output,
197 "pmix:job_control handed to RM");
198 rc = pmix_host_server.job_control(&pmix_globals.myid,
199 targets, ntargets,
200 directives, ndirs,
201 cbfunc, cbdata);
202 return rc;
203 }
204
205
206 if (!pmix_globals.connected) {
207 PMIX_RELEASE_THREAD(&pmix_global_lock);
208 return PMIX_ERR_UNREACH;
209 }
210 PMIX_RELEASE_THREAD(&pmix_global_lock);
211
212
213 msg = PMIX_NEW(pmix_buffer_t);
214
215 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
216 msg, &cmd, 1, PMIX_COMMAND);
217 if (PMIX_SUCCESS != rc) {
218 PMIX_ERROR_LOG(rc);
219 PMIX_RELEASE(msg);
220 return rc;
221 }
222
223
224 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
225 msg, &ntargets, 1, PMIX_SIZE);
226 if (PMIX_SUCCESS != rc) {
227 PMIX_ERROR_LOG(rc);
228 PMIX_RELEASE(msg);
229 return rc;
230 }
231
232
233 if (NULL != targets && 0 < ntargets) {
234
235 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
236 msg, targets, ntargets, PMIX_PROC);
237 if (PMIX_SUCCESS != rc) {
238 PMIX_ERROR_LOG(rc);
239 PMIX_RELEASE(msg);
240 return rc;
241 }
242 }
243
244
245 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
246 msg, &ndirs, 1, PMIX_SIZE);
247 if (PMIX_SUCCESS != rc) {
248 PMIX_ERROR_LOG(rc);
249 PMIX_RELEASE(msg);
250 return rc;
251 }
252 if (NULL != directives && 0 < ndirs) {
253 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
254 msg, directives, ndirs, PMIX_INFO);
255 if (PMIX_SUCCESS != rc) {
256 PMIX_ERROR_LOG(rc);
257 PMIX_RELEASE(msg);
258 return rc;
259 }
260 }
261
262
263
264
265 cb = PMIX_NEW(pmix_query_caddy_t);
266 cb->cbfunc = cbfunc;
267 cb->cbdata = cbdata;
268
269
270 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
271 msg, query_cbfunc, (void*)cb);
272 if (PMIX_SUCCESS != rc) {
273 PMIX_RELEASE(msg);
274 PMIX_RELEASE(cb);
275 }
276
277 return rc;
278 }
279
280 PMIX_EXPORT pmix_status_t PMIx_Process_monitor(const pmix_info_t *monitor, pmix_status_t error,
281 const pmix_info_t directives[], size_t ndirs)
282 {
283 pmix_cb_t cb;
284 pmix_status_t rc;
285
286 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
287
288 if (pmix_globals.init_cntr <= 0) {
289 PMIX_RELEASE_THREAD(&pmix_global_lock);
290 return PMIX_ERR_INIT;
291 }
292 PMIX_RELEASE_THREAD(&pmix_global_lock);
293
294 pmix_output_verbose(2, pmix_globals.debug_output,
295 "%s pmix:monitor", PMIX_NAME_PRINT(&pmix_globals.myid));
296
297
298
299
300 PMIX_CONSTRUCT(&cb, pmix_cb_t);
301 if (PMIX_SUCCESS != (rc = PMIx_Process_monitor_nb(monitor, error,
302 directives, ndirs,
303 acb, &cb))) {
304 PMIX_DESTRUCT(&cb);
305 return rc;
306 }
307
308
309 PMIX_WAIT_THREAD(&cb.lock);
310 rc = cb.status;
311 PMIX_DESTRUCT(&cb);
312
313 pmix_output_verbose(2, pmix_globals.debug_output,
314 "pmix:monitor completed");
315
316 return rc;
317 }
318
319 PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pmix_status_t error,
320 const pmix_info_t directives[], size_t ndirs,
321 pmix_info_cbfunc_t cbfunc, void *cbdata)
322 {
323 pmix_buffer_t *msg;
324 pmix_cmd_t cmd = PMIX_MONITOR_CMD;
325 pmix_status_t rc;
326 pmix_query_caddy_t *cb;
327
328 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
329
330 pmix_output_verbose(2, pmix_globals.debug_output,
331 "pmix: monitor called");
332
333 if (pmix_globals.init_cntr <= 0) {
334 PMIX_RELEASE_THREAD(&pmix_global_lock);
335 return PMIX_ERR_INIT;
336 }
337
338
339 if (NULL == monitor) {
340 PMIX_RELEASE_THREAD(&pmix_global_lock);
341 return PMIX_ERR_BAD_PARAM;
342 }
343
344
345
346 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
347 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
348 PMIX_RELEASE_THREAD(&pmix_global_lock);
349 if (NULL == pmix_host_server.monitor) {
350
351 return PMIX_ERR_NOT_SUPPORTED;
352 }
353 pmix_output_verbose(2, pmix_globals.debug_output,
354 "pmix:monitor handed to RM");
355 rc = pmix_host_server.monitor(&pmix_globals.myid, monitor, error,
356 directives, ndirs, cbfunc, cbdata);
357 return rc;
358 }
359
360
361 if (!pmix_globals.connected) {
362 PMIX_RELEASE_THREAD(&pmix_global_lock);
363 return PMIX_ERR_UNREACH;
364 }
365 PMIX_RELEASE_THREAD(&pmix_global_lock);
366
367
368 if (0 == strncmp(monitor->key, PMIX_SEND_HEARTBEAT, PMIX_MAX_KEYLEN)) {
369 msg = PMIX_NEW(pmix_buffer_t);
370 if (NULL == msg) {
371 return PMIX_ERR_NOMEM;
372 }
373 PMIX_PTL_SEND_ONEWAY(rc, pmix_client_globals.myserver, msg, PMIX_PTL_TAG_HEARTBEAT);
374 if (PMIX_SUCCESS != rc) {
375 PMIX_RELEASE(msg);
376 }
377 return rc;
378 }
379
380
381 msg = PMIX_NEW(pmix_buffer_t);
382
383 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
384 msg, &cmd, 1, PMIX_COMMAND);
385 if (PMIX_SUCCESS != rc) {
386 PMIX_ERROR_LOG(rc);
387 PMIX_RELEASE(msg);
388 return rc;
389 }
390
391
392 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
393 msg, monitor, 1, PMIX_INFO);
394 if (PMIX_SUCCESS != rc) {
395 PMIX_ERROR_LOG(rc);
396 PMIX_RELEASE(msg);
397 return rc;
398 }
399
400
401 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
402 msg, &error, 1, PMIX_STATUS);
403 if (PMIX_SUCCESS != rc) {
404 PMIX_ERROR_LOG(rc);
405 PMIX_RELEASE(msg);
406 return rc;
407 }
408
409
410 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
411 msg, &ndirs, 1, PMIX_SIZE);
412 if (PMIX_SUCCESS != rc) {
413 PMIX_ERROR_LOG(rc);
414 PMIX_RELEASE(msg);
415 return rc;
416 }
417 if (0 < ndirs) {
418 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
419 msg, directives, ndirs, PMIX_INFO);
420 if (PMIX_SUCCESS != rc) {
421 PMIX_ERROR_LOG(rc);
422 PMIX_RELEASE(msg);
423 return rc;
424 }
425 }
426
427
428
429
430 cb = PMIX_NEW(pmix_query_caddy_t);
431 cb->cbfunc = cbfunc;
432 cb->cbdata = cbdata;
433
434
435 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
436 msg, query_cbfunc, (void*)cb);
437 if (PMIX_SUCCESS != rc) {
438 PMIX_RELEASE(msg);
439 PMIX_RELEASE(cb);
440 }
441
442 return rc;
443 }