This source file includes following definitions.
- PMIx_Connect
- PMIx_Connect_nb
- PMIx_Disconnect
- PMIx_Disconnect_nb
- wait_cbfunc
- op_cbfunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21
22 #include <pmix.h>
23 #include <pmix_rename.h>
24
25 #include "src/include/pmix_globals.h"
26 #include "src/mca/gds/base/base.h"
27
28 #ifdef HAVE_STRING_H
29 #include <string.h>
30 #endif
31 #include <fcntl.h>
32 #ifdef HAVE_UNISTD_H
33 #include <unistd.h>
34 #endif
35 #ifdef HAVE_SYS_SOCKET_H
36 #include <sys/socket.h>
37 #endif
38 #ifdef HAVE_SYS_UN_H
39 #include <sys/un.h>
40 #endif
41 #ifdef HAVE_SYS_UIO_H
42 #include <sys/uio.h>
43 #endif
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #include PMIX_EVENT_HEADER
48
49 #include "src/class/pmix_list.h"
50 #include "src/mca/bfrops/bfrops.h"
51 #include "src/util/argv.h"
52 #include "src/util/error.h"
53 #include "src/util/output.h"
54 #include "src/threads/threads.h"
55 #include "src/mca/gds/gds.h"
56 #include "src/mca/ptl/ptl.h"
57
58 #include "pmix_client_ops.h"
59
60
61 static void wait_cbfunc(struct pmix_peer_t *pr,
62 pmix_ptl_hdr_t *hdr,
63 pmix_buffer_t *buf, void *cbdata);
64 static void op_cbfunc(pmix_status_t status, void *cbdata);
65
66 PMIX_EXPORT pmix_status_t PMIx_Connect(const pmix_proc_t procs[], size_t nprocs,
67 const pmix_info_t info[], size_t ninfo)
68 {
69 pmix_status_t rc;
70 pmix_cb_t *cb;
71
72 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
73
74 pmix_output_verbose(2, pmix_client_globals.connect_output,
75 "pmix: connect called");
76
77 if (pmix_globals.init_cntr <= 0) {
78 PMIX_RELEASE_THREAD(&pmix_global_lock);
79 return PMIX_ERR_INIT;
80 }
81
82
83 if (!pmix_globals.connected) {
84 PMIX_RELEASE_THREAD(&pmix_global_lock);
85 return PMIX_ERR_UNREACH;
86 }
87 PMIX_RELEASE_THREAD(&pmix_global_lock);
88
89
90
91
92 cb = PMIX_NEW(pmix_cb_t);
93
94
95 if (PMIX_SUCCESS != (rc = PMIx_Connect_nb(procs, nprocs, info, ninfo, op_cbfunc, cb))) {
96 PMIX_RELEASE(cb);
97 return rc;
98 }
99
100
101 PMIX_WAIT_THREAD(&cb->lock);
102 rc = cb->status;
103 PMIX_RELEASE(cb);
104
105 pmix_output_verbose(2, pmix_globals.debug_output,
106 "pmix: connect completed");
107
108 return rc;
109 }
110
111 PMIX_EXPORT pmix_status_t PMIx_Connect_nb(const pmix_proc_t procs[], size_t nprocs,
112 const pmix_info_t info[], size_t ninfo,
113 pmix_op_cbfunc_t cbfunc, void *cbdata)
114 {
115 pmix_buffer_t *msg;
116 pmix_cmd_t cmd = PMIX_CONNECTNB_CMD;
117 pmix_status_t rc;
118 pmix_cb_t *cb;
119
120 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
121
122 pmix_output_verbose(2, pmix_client_globals.connect_output,
123 "pmix:connect_nb called");
124
125 if (pmix_globals.init_cntr <= 0) {
126 PMIX_RELEASE_THREAD(&pmix_global_lock);
127 return PMIX_ERR_INIT;
128 }
129
130
131 if (!pmix_globals.connected) {
132 PMIX_RELEASE_THREAD(&pmix_global_lock);
133 return PMIX_ERR_UNREACH;
134 }
135 PMIX_RELEASE_THREAD(&pmix_global_lock);
136
137
138 if (NULL == procs || 0 >= nprocs) {
139 return PMIX_ERR_BAD_PARAM;
140 }
141
142 msg = PMIX_NEW(pmix_buffer_t);
143
144 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
145 msg, &cmd, 1, PMIX_COMMAND);
146 if (PMIX_SUCCESS != rc) {
147 PMIX_ERROR_LOG(rc);
148 return rc;
149 }
150
151
152 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
153 msg, &nprocs, 1, PMIX_SIZE);
154 if (PMIX_SUCCESS != rc) {
155 PMIX_ERROR_LOG(rc);
156 return rc;
157 }
158 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
159 msg, procs, nprocs, PMIX_PROC);
160 if (PMIX_SUCCESS != rc) {
161 PMIX_ERROR_LOG(rc);
162 return rc;
163 }
164
165
166 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
167 msg, &ninfo, 1, PMIX_SIZE);
168 if (PMIX_SUCCESS != rc) {
169 PMIX_ERROR_LOG(rc);
170 PMIX_RELEASE(msg);
171 return rc;
172 }
173 if (0 < ninfo) {
174 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
175 msg, info, ninfo, PMIX_INFO);
176 if (PMIX_SUCCESS != rc) {
177 PMIX_ERROR_LOG(rc);
178 PMIX_RELEASE(msg);
179 return rc;
180 }
181 }
182
183
184
185
186 cb = PMIX_NEW(pmix_cb_t);
187 cb->cbfunc.opfn = cbfunc;
188 cb->cbdata = cbdata;
189
190
191 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
192 msg, wait_cbfunc, (void*)cb);
193 if (PMIX_SUCCESS != rc) {
194 PMIX_RELEASE(msg);
195 PMIX_RELEASE(cb);
196 }
197
198 return rc;
199 }
200
201 PMIX_EXPORT pmix_status_t PMIx_Disconnect(const pmix_proc_t procs[], size_t nprocs,
202 const pmix_info_t info[], size_t ninfo)
203 {
204 pmix_status_t rc;
205 pmix_cb_t *cb;
206
207 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
208 if (pmix_globals.init_cntr <= 0) {
209 PMIX_RELEASE_THREAD(&pmix_global_lock);
210 return PMIX_ERR_INIT;
211 }
212
213
214 if (!pmix_globals.connected) {
215 PMIX_RELEASE_THREAD(&pmix_global_lock);
216 return PMIX_ERR_UNREACH;
217 }
218 PMIX_RELEASE_THREAD(&pmix_global_lock);
219
220
221
222
223 cb = PMIX_NEW(pmix_cb_t);
224
225 if (PMIX_SUCCESS != (rc = PMIx_Disconnect_nb(procs, nprocs, info, ninfo, op_cbfunc, cb))) {
226 PMIX_RELEASE(cb);
227 return rc;
228 }
229
230
231 PMIX_WAIT_THREAD(&cb->lock);
232 rc = cb->status;
233 PMIX_RELEASE(cb);
234
235 pmix_output_verbose(2, pmix_globals.debug_output,
236 "pmix: disconnect completed");
237
238 return rc;
239 }
240
241 PMIX_EXPORT pmix_status_t PMIx_Disconnect_nb(const pmix_proc_t procs[], size_t nprocs,
242 const pmix_info_t info[], size_t ninfo,
243 pmix_op_cbfunc_t cbfunc, void *cbdata)
244 {
245 pmix_buffer_t *msg;
246 pmix_cmd_t cmd = PMIX_DISCONNECTNB_CMD;
247 pmix_status_t rc;
248 pmix_cb_t *cb;
249
250 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
251
252 pmix_output_verbose(2, pmix_globals.debug_output,
253 "pmix: disconnect called");
254
255 size_t cnt;
256 for (cnt = 0; cnt < nprocs; cnt++) {
257 if (0 != strcmp(pmix_globals.myid.nspace, procs[cnt].nspace)) {
258 PMIX_GDS_DEL_NSPACE(rc, procs[cnt].nspace);
259 }
260 }
261
262 if (pmix_globals.init_cntr <= 0) {
263 PMIX_RELEASE_THREAD(&pmix_global_lock);
264 return PMIX_ERR_INIT;
265 }
266
267
268 if (!pmix_globals.connected) {
269 PMIX_RELEASE_THREAD(&pmix_global_lock);
270 return PMIX_ERR_UNREACH;
271 }
272 PMIX_RELEASE_THREAD(&pmix_global_lock);
273
274
275 if (NULL == procs || 0 >= nprocs) {
276 return PMIX_ERR_BAD_PARAM;
277 }
278
279 msg = PMIX_NEW(pmix_buffer_t);
280
281 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
282 msg, &cmd, 1, PMIX_COMMAND);
283 if (PMIX_SUCCESS != rc) {
284 PMIX_ERROR_LOG(rc);
285 return rc;
286 }
287
288
289 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
290 msg, &nprocs, 1, PMIX_SIZE);
291 if (PMIX_SUCCESS != rc) {
292 PMIX_ERROR_LOG(rc);
293 return rc;
294 }
295 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
296 msg, procs, nprocs, PMIX_PROC);
297 if (PMIX_SUCCESS != rc) {
298 PMIX_ERROR_LOG(rc);
299 return rc;
300 }
301
302
303 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
304 msg, &ninfo, 1, PMIX_SIZE);
305 if (PMIX_SUCCESS != rc) {
306 PMIX_ERROR_LOG(rc);
307 PMIX_RELEASE(msg);
308 return rc;
309 }
310 if (0 < ninfo) {
311 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
312 msg, info, ninfo, PMIX_INFO);
313 if (PMIX_SUCCESS != rc) {
314 PMIX_ERROR_LOG(rc);
315 PMIX_RELEASE(msg);
316 return rc;
317 }
318 }
319
320
321
322
323 cb = PMIX_NEW(pmix_cb_t);
324 cb->cbfunc.opfn = cbfunc;
325 cb->cbdata = cbdata;
326
327
328 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
329 msg, wait_cbfunc, (void*)cb);
330 if (PMIX_SUCCESS != rc) {
331 PMIX_RELEASE(msg);
332 PMIX_RELEASE(cb);
333 }
334
335 pmix_output_verbose(2, pmix_globals.debug_output,
336 "pmix: disconnect completed");
337
338 return rc;
339 }
340
341 static void wait_cbfunc(struct pmix_peer_t *pr,
342 pmix_ptl_hdr_t *hdr,
343 pmix_buffer_t *buf, void *cbdata)
344 {
345 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
346 pmix_status_t rc;
347 pmix_status_t ret;
348 int32_t cnt;
349 char *nspace;
350 pmix_buffer_t bkt;
351 pmix_byte_object_t bo;
352
353 pmix_output_verbose(2, pmix_globals.debug_output,
354 "pmix:client recv callback activated with %d bytes",
355 (NULL == buf) ? -1 : (int)buf->bytes_used);
356
357 if (NULL == buf) {
358 ret = PMIX_ERR_BAD_PARAM;
359 goto report;
360 }
361
362
363
364 if (PMIX_BUFFER_IS_EMPTY(buf)) {
365 ret = PMIX_ERR_UNREACH;
366 goto report;
367 }
368
369
370 cnt = 1;
371 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
372 buf, &ret, &cnt, PMIX_STATUS);
373 if (PMIX_SUCCESS != rc) {
374 PMIX_ERROR_LOG(rc);
375 ret = rc;
376 }
377
378
379 cnt = 1;
380 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
381 buf, &bo, &cnt, PMIX_BYTE_OBJECT);
382 while (PMIX_SUCCESS == rc) {
383
384 PMIX_CONSTRUCT(&bkt, pmix_buffer_t);
385 PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &bkt, bo.bytes, bo.size);
386
387
388 cnt = 1;
389 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
390 &bkt, &nspace, &cnt, PMIX_STRING);
391 if (PMIX_SUCCESS != rc) {
392 PMIX_ERROR_LOG(rc);
393 PMIX_DESTRUCT(&bkt);
394 continue;
395 }
396
397 PMIX_GDS_STORE_JOB_INFO(rc, pmix_globals.mypeer, nspace, &bkt);
398 if (PMIX_SUCCESS != rc) {
399 PMIX_ERROR_LOG(rc);
400 }
401 free(nspace);
402 PMIX_DESTRUCT(&bkt);
403
404 cnt = 1;
405 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
406 buf, &bo, &cnt, PMIX_BYTE_OBJECT);
407 }
408 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
409 PMIX_ERROR_LOG(rc);
410 ret = rc;
411 }
412
413 report:
414 if (NULL != cb->cbfunc.opfn) {
415 cb->cbfunc.opfn(ret, cb->cbdata);
416 }
417 PMIX_RELEASE(cb);
418 }
419
420 static void op_cbfunc(pmix_status_t status, void *cbdata)
421 {
422 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
423
424 cb->status = status;
425 PMIX_POST_OBJECT(cb);
426 PMIX_WAKEUP_THREAD(&cb->lock);
427 }