This source file includes following definitions.
- PMIx_Spawn
- PMIx_Spawn_nb
- wait_cbfunc
- spawn_cbfunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21
22 #include <pmix.h>
23 #include <pmix_rename.h>
24
25 #include "src/include/pmix_globals.h"
26
27 #ifdef HAVE_STRING_H
28 #include <string.h>
29 #endif
30 #include <fcntl.h>
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46 #include PMIX_EVENT_HEADER
47
48 #include "src/class/pmix_list.h"
49 #include "src/threads/threads.h"
50 #include "src/mca/bfrops/bfrops.h"
51 #include "src/mca/pnet/base/base.h"
52 #include "src/util/argv.h"
53 #include "src/util/error.h"
54 #include "src/util/output.h"
55 #include "src/mca/gds/gds.h"
56 #include "src/mca/ptl/ptl.h"
57
58 #include "pmix_client_ops.h"
59
60 static void wait_cbfunc(struct pmix_peer_t *pr,
61 pmix_ptl_hdr_t *hdr,
62 pmix_buffer_t *buf, void *cbdata);
63 static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata);
64
65 PMIX_EXPORT pmix_status_t PMIx_Spawn(const pmix_info_t job_info[], size_t ninfo,
66 const pmix_app_t apps[], size_t napps,
67 pmix_nspace_t nspace)
68 {
69 pmix_status_t rc;
70 pmix_cb_t *cb;
71
72 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
73
74 pmix_output_verbose(2, pmix_globals.debug_output,
75 "pmix: spawn called");
76
77 if (pmix_globals.init_cntr <= 0) {
78 PMIX_RELEASE_THREAD(&pmix_global_lock);
79 return PMIX_ERR_INIT;
80 }
81
82
83 if (!pmix_globals.connected) {
84 PMIX_RELEASE_THREAD(&pmix_global_lock);
85 return PMIX_ERR_UNREACH;
86 }
87 PMIX_RELEASE_THREAD(&pmix_global_lock);
88
89
90
91 if (NULL != nspace) {
92 memset(nspace, 0, PMIX_MAX_NSLEN+1);
93 }
94
95
96 cb = PMIX_NEW(pmix_cb_t);
97
98 if (PMIX_SUCCESS != (rc = PMIx_Spawn_nb(job_info, ninfo, apps, napps, spawn_cbfunc, cb))) {
99 PMIX_RELEASE(cb);
100 return rc;
101 }
102
103
104 PMIX_WAIT_THREAD(&cb->lock);
105 rc = cb->status;
106 if (NULL != nspace) {
107 pmix_strncpy(nspace, cb->pname.nspace, PMIX_MAX_NSLEN);
108 }
109 PMIX_RELEASE(cb);
110
111 return rc;
112 }
113
114 PMIX_EXPORT pmix_status_t PMIx_Spawn_nb(const pmix_info_t job_info[], size_t ninfo,
115 const pmix_app_t apps[], size_t napps,
116 pmix_spawn_cbfunc_t cbfunc, void *cbdata)
117 {
118 pmix_buffer_t *msg;
119 pmix_cmd_t cmd = PMIX_SPAWNNB_CMD;
120 pmix_status_t rc;
121 pmix_cb_t *cb;
122 size_t n, m;
123 pmix_app_t *aptr;
124 bool jobenvars = false;
125 char *harvest[2] = {"PMIX_MCA_", NULL};
126 pmix_kval_t *kv;
127 pmix_list_t ilist;
128
129 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
130
131 pmix_output_verbose(2, pmix_globals.debug_output,
132 "pmix: spawn called");
133
134 if (pmix_globals.init_cntr <= 0) {
135 PMIX_RELEASE_THREAD(&pmix_global_lock);
136 return PMIX_ERR_INIT;
137 }
138
139
140 if (!pmix_globals.connected) {
141 PMIX_RELEASE_THREAD(&pmix_global_lock);
142 return PMIX_ERR_UNREACH;
143 }
144 PMIX_RELEASE_THREAD(&pmix_global_lock);
145
146
147 if (NULL != job_info) {
148 for (n=0; n < ninfo; n++) {
149 if (PMIX_CHECK_KEY(&job_info[n], PMIX_SETUP_APP_ENVARS)) {
150 PMIX_CONSTRUCT(&ilist, pmix_list_t);
151 rc = pmix_pnet_base_harvest_envars(harvest, NULL, &ilist);
152 if (PMIX_SUCCESS != rc) {
153 PMIX_LIST_DESTRUCT(&ilist);
154 return rc;
155 }
156 PMIX_LIST_FOREACH(kv, &ilist, pmix_kval_t) {
157
158 for (m=0; m < napps; m++) {
159 aptr = (pmix_app_t*)&apps[m];
160 pmix_setenv(kv->value->data.envar.envar,
161 kv->value->data.envar.value,
162 true, &aptr->env);
163 }
164 }
165 jobenvars = true;
166 PMIX_LIST_DESTRUCT(&ilist);
167 break;
168 }
169 }
170 }
171
172 for (n=0; n < napps; n++) {
173
174
175 aptr = (pmix_app_t*)&apps[n];
176 if (NULL != aptr->info && 0 == aptr->ninfo) {
177
178 m = 0;
179 while (!(PMIX_INFO_IS_END(&aptr->info[m])) && m < SIZE_MAX) {
180 ++m;
181 }
182 if (SIZE_MAX == m) {
183
184 return PMIX_ERR_BAD_PARAM;
185 }
186 aptr->ninfo = m;
187 }
188 if (!jobenvars) {
189 for (m=0; m < aptr->ninfo; m++) {
190 if (PMIX_CHECK_KEY(&aptr->info[m], PMIX_SETUP_APP_ENVARS)) {
191 PMIX_CONSTRUCT(&ilist, pmix_list_t);
192 rc = pmix_pnet_base_harvest_envars(harvest, NULL, &ilist);
193 if (PMIX_SUCCESS != rc) {
194 PMIX_LIST_DESTRUCT(&ilist);
195 return rc;
196 }
197 PMIX_LIST_FOREACH(kv, &ilist, pmix_kval_t) {
198 pmix_setenv(kv->value->data.envar.envar,
199 kv->value->data.envar.value,
200 true, &aptr->env);
201 }
202 jobenvars = true;
203 PMIX_LIST_DESTRUCT(&ilist);
204 break;
205 }
206 }
207 }
208 }
209
210 msg = PMIX_NEW(pmix_buffer_t);
211
212 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
213 msg, &cmd, 1, PMIX_COMMAND);
214 if (PMIX_SUCCESS != rc) {
215 PMIX_ERROR_LOG(rc);
216 PMIX_RELEASE(msg);
217 return rc;
218 }
219
220
221 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
222 msg, &ninfo, 1, PMIX_SIZE);
223 if (PMIX_SUCCESS != rc) {
224 PMIX_ERROR_LOG(rc);
225 PMIX_RELEASE(msg);
226 return rc;
227 }
228 if (0 < ninfo) {
229 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
230 msg, job_info, ninfo, PMIX_INFO);
231 if (PMIX_SUCCESS != rc) {
232 PMIX_ERROR_LOG(rc);
233 PMIX_RELEASE(msg);
234 return rc;
235 }
236 }
237
238
239 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
240 msg, &napps, 1, PMIX_SIZE);
241 if (PMIX_SUCCESS != rc) {
242 PMIX_ERROR_LOG(rc);
243 PMIX_RELEASE(msg);
244 return rc;
245 }
246 if (0 < napps) {
247 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
248 msg, apps, napps, PMIX_APP);
249 if (PMIX_SUCCESS != rc) {
250 PMIX_ERROR_LOG(rc);
251 PMIX_RELEASE(msg);
252 return rc;
253 }
254 }
255
256
257
258
259 cb = PMIX_NEW(pmix_cb_t);
260 cb->cbfunc.spawnfn = cbfunc;
261 cb->cbdata = cbdata;
262
263
264 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
265 msg, wait_cbfunc, (void*)cb);
266 if (PMIX_SUCCESS != rc) {
267 PMIX_RELEASE(msg);
268 PMIX_RELEASE(cb);
269 }
270
271 return rc;
272 }
273
274
275 static void wait_cbfunc(struct pmix_peer_t *pr,
276 pmix_ptl_hdr_t *hdr,
277 pmix_buffer_t *buf, void *cbdata)
278 {
279 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
280 char nspace[PMIX_MAX_NSLEN+1];
281 char *n2 = NULL;
282 pmix_status_t rc, ret;
283 int32_t cnt;
284
285 PMIX_ACQUIRE_OBJECT(cb);
286
287 pmix_output_verbose(2, pmix_globals.debug_output,
288 "pmix:client recv callback activated with %d bytes",
289 (NULL == buf) ? -1 : (int)buf->bytes_used);
290
291
292 memset(nspace, 0, PMIX_MAX_NSLEN+1);
293
294 if (NULL == buf) {
295 ret = PMIX_ERR_BAD_PARAM;
296 goto report;
297 }
298
299
300 if (PMIX_BUFFER_IS_EMPTY(buf)) {
301 ret = PMIX_ERR_UNREACH;
302 goto report;
303 }
304
305
306 cnt = 1;
307 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
308 buf, &ret, &cnt, PMIX_STATUS);
309 if (PMIX_SUCCESS != rc) {
310 PMIX_ERROR_LOG(rc);
311 ret = rc;
312 }
313
314 cnt = 1;
315 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
316 buf, &n2, &cnt, PMIX_STRING);
317 if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
318 PMIX_ERROR_LOG(rc);
319 ret = rc;
320 }
321 pmix_output_verbose(1, pmix_globals.debug_output,
322 "pmix:client recv '%s'", n2);
323
324 if (NULL != n2) {
325
326 pmix_strncpy(nspace, n2, PMIX_MAX_NSLEN);
327 free(n2);
328 PMIX_GDS_STORE_JOB_INFO(rc, pmix_globals.mypeer, nspace, buf);
329
330 if (PMIX_SUCCESS != rc) {
331 PMIX_ERROR_LOG(rc);
332 }
333 }
334
335 report:
336 if (NULL != cb->cbfunc.spawnfn) {
337 cb->cbfunc.spawnfn(ret, nspace, cb->cbdata);
338 }
339 PMIX_RELEASE(cb);
340 }
341
342 static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata)
343 {
344 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
345
346 PMIX_ACQUIRE_OBJECT(cb);
347 cb->status = status;
348 if (NULL != nspace) {
349 cb->pname.nspace = strdup(nspace);
350 }
351 PMIX_POST_OBJECT(cb);
352 PMIX_WAKEUP_THREAD(&cb->lock);
353 }