This source file includes following definitions.
- PMIx_Fence
- PMIx_Fence_nb
- unpack_return
- pack_fence
- wait_cbfunc
- op_cbfunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21
22 #include <pmix.h>
23 #include <pmix_rename.h>
24
25 #include "src/include/pmix_globals.h"
26
27 #ifdef HAVE_STRING_H
28 #include <string.h>
29 #endif
30 #include <fcntl.h>
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46 #include PMIX_EVENT_HEADER
47
48 #include "src/class/pmix_list.h"
49 #include "src/mca/bfrops/bfrops.h"
50 #include "src/util/argv.h"
51 #include "src/util/error.h"
52 #include "src/util/hash.h"
53 #include "src/util/output.h"
54 #include "src/mca/ptl/ptl.h"
55
56 #include "pmix_client_ops.h"
57
58 static pmix_status_t unpack_return(pmix_buffer_t *data);
59 static pmix_status_t pack_fence(pmix_buffer_t *msg, pmix_cmd_t cmd,
60 const pmix_proc_t *procs, size_t nprocs,
61 const pmix_info_t *info, size_t ninfo);
62 static void wait_cbfunc(struct pmix_peer_t *pr,
63 pmix_ptl_hdr_t *hdr,
64 pmix_buffer_t *buf, void *cbdata);
65 static void op_cbfunc(pmix_status_t status, void *cbdata);
66
67 PMIX_EXPORT pmix_status_t PMIx_Fence(const pmix_proc_t procs[], size_t nprocs,
68 const pmix_info_t info[], size_t ninfo)
69 {
70 pmix_cb_t *cb;
71 pmix_status_t rc;
72
73 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
74
75 pmix_output_verbose(2, pmix_globals.debug_output,
76 "pmix: executing fence");
77
78 if (pmix_globals.init_cntr <= 0) {
79 PMIX_RELEASE_THREAD(&pmix_global_lock);
80 return PMIX_ERR_INIT;
81 }
82
83
84 if (!pmix_globals.connected) {
85 PMIX_RELEASE_THREAD(&pmix_global_lock);
86 return PMIX_ERR_UNREACH;
87 }
88 PMIX_RELEASE_THREAD(&pmix_global_lock);
89
90
91
92
93 cb = PMIX_NEW(pmix_cb_t);
94
95
96 if (PMIX_SUCCESS != (rc = PMIx_Fence_nb(procs, nprocs, info, ninfo,
97 op_cbfunc, cb))) {
98 PMIX_ERROR_LOG(rc);
99 PMIX_RELEASE(cb);
100 return rc;
101 }
102
103
104 PMIX_WAIT_THREAD(&cb->lock);
105 rc = cb->status;
106 PMIX_RELEASE(cb);
107
108 pmix_output_verbose(2, pmix_globals.debug_output,
109 "pmix: fence released");
110
111 return rc;
112 }
113
114 PMIX_EXPORT pmix_status_t PMIx_Fence_nb(const pmix_proc_t procs[], size_t nprocs,
115 const pmix_info_t info[], size_t ninfo,
116 pmix_op_cbfunc_t cbfunc, void *cbdata)
117 {
118 pmix_buffer_t *msg;
119 pmix_cmd_t cmd = PMIX_FENCENB_CMD;
120 pmix_status_t rc;
121 pmix_cb_t *cb;
122 pmix_proc_t rg, *rgs;
123 size_t nrg;
124
125 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
126
127 pmix_output_verbose(2, pmix_globals.debug_output,
128 "pmix: fence_nb called");
129
130 if (pmix_globals.init_cntr <= 0) {
131 PMIX_RELEASE_THREAD(&pmix_global_lock);
132 return PMIX_ERR_INIT;
133 }
134
135
136 if (!pmix_globals.connected) {
137 PMIX_RELEASE_THREAD(&pmix_global_lock);
138 return PMIX_ERR_UNREACH;
139 }
140 PMIX_RELEASE_THREAD(&pmix_global_lock);
141
142
143 if (NULL == procs && 0 != nprocs) {
144 return PMIX_ERR_BAD_PARAM;
145 }
146
147
148 if (NULL == procs) {
149 pmix_strncpy(rg.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
150 rg.rank = PMIX_RANK_WILDCARD;
151 rgs = &rg;
152 nrg = 1;
153 } else {
154 rgs = (pmix_proc_t*)procs;
155 nrg = nprocs;
156 }
157
158 msg = PMIX_NEW(pmix_buffer_t);
159 if (PMIX_SUCCESS != (rc = pack_fence(msg, cmd, rgs, nrg, info, ninfo))) {
160 PMIX_RELEASE(msg);
161 return rc;
162 }
163
164
165
166
167 cb = PMIX_NEW(pmix_cb_t);
168 cb->cbfunc.opfn = cbfunc;
169 cb->cbdata = cbdata;
170
171
172 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
173 msg, wait_cbfunc, (void*)cb);
174 if (PMIX_SUCCESS != rc) {
175 PMIX_RELEASE(msg);
176 PMIX_RELEASE(cb);
177 }
178 return rc;
179 }
180
181 static pmix_status_t unpack_return(pmix_buffer_t *data)
182 {
183 pmix_status_t rc;
184 pmix_status_t ret;
185 int32_t cnt;
186
187 pmix_output_verbose(2, pmix_globals.debug_output,
188 "client:unpack fence called");
189
190
191 cnt = 1;
192 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
193 data, &ret, &cnt, PMIX_STATUS);
194 if (PMIX_SUCCESS != rc) {
195 PMIX_ERROR_LOG(rc);
196 return rc;
197 }
198 pmix_output_verbose(2, pmix_globals.debug_output,
199 "client:unpack fence received status %d", ret);
200 return ret;
201 }
202
203 static pmix_status_t pack_fence(pmix_buffer_t *msg, pmix_cmd_t cmd,
204 const pmix_proc_t *procs, size_t nprocs,
205 const pmix_info_t *info, size_t ninfo)
206 {
207 pmix_status_t rc;
208
209
210 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
211 msg, &cmd, 1, PMIX_COMMAND);
212 if (PMIX_SUCCESS != rc) {
213 PMIX_ERROR_LOG(rc);
214 return rc;
215 }
216
217
218 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
219 msg, &nprocs, 1, PMIX_SIZE);
220 if (PMIX_SUCCESS != rc) {
221 PMIX_ERROR_LOG(rc);
222 return rc;
223 }
224
225 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
226 msg, procs, nprocs, PMIX_PROC);
227 if (PMIX_SUCCESS != rc) {
228 PMIX_ERROR_LOG(rc);
229 return rc;
230 }
231
232 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
233 msg, &ninfo, 1, PMIX_SIZE);
234 if (PMIX_SUCCESS != rc) {
235 PMIX_ERROR_LOG(rc);
236 return rc;
237 }
238
239 if (NULL != info && 0 < ninfo) {
240 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
241 msg, info, ninfo, PMIX_INFO);
242 if (PMIX_SUCCESS != rc) {
243 PMIX_ERROR_LOG(rc);
244 return rc;
245 }
246 }
247
248 return PMIX_SUCCESS;
249 }
250
251 static void wait_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr,
252 pmix_buffer_t *buf, void *cbdata)
253 {
254 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
255 pmix_status_t rc;
256
257 pmix_output_verbose(2, pmix_globals.debug_output,
258 "pmix: fence_nb callback recvd");
259
260 if (NULL == cb) {
261 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
262 return;
263 }
264
265
266 if (PMIX_BUFFER_IS_EMPTY(buf)) {
267 rc = PMIX_ERR_UNREACH;
268 } else {
269 rc = unpack_return(buf);
270 }
271
272
273 if (NULL != cb->cbfunc.opfn) {
274 cb->cbfunc.opfn(rc, cb->cbdata);
275 }
276 PMIX_RELEASE(cb);
277 }
278
279 static void op_cbfunc(pmix_status_t status, void *cbdata)
280 {
281 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
282
283 cb->status = status;
284 PMIX_WAKEUP_THREAD(&cb->lock);
285 }