This source file includes following definitions.
- opcbfunc
- log_cbfunc
- PMIx_Log
- localcbfunc
- PMIx_Log_nb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include <src/include/pmix_config.h>
16
17 #include <src/include/pmix_stdint.h>
18 #include <src/include/pmix_socket_errno.h>
19
20 #include <pmix.h>
21 #include <pmix_common.h>
22 #include <pmix_server.h>
23 #include <pmix_rename.h>
24
25 #include "src/threads/threads.h"
26 #include "src/util/argv.h"
27 #include "src/util/error.h"
28 #include "src/util/name_fns.h"
29 #include "src/util/output.h"
30 #include "src/mca/bfrops/bfrops.h"
31 #include "src/mca/plog/base/base.h"
32
33 #include "src/client/pmix_client_ops.h"
34 #include "src/server/pmix_server_ops.h"
35 #include "src/include/pmix_globals.h"
36
37 static void opcbfunc(pmix_status_t status, void *cbdata)
38 {
39 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
40 cb->status = status;
41 PMIX_WAKEUP_THREAD(&cb->lock);
42 }
43
44 static void log_cbfunc(struct pmix_peer_t *peer,
45 pmix_ptl_hdr_t *hdr,
46 pmix_buffer_t *buf, void *cbdata)
47 {
48 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
49 int32_t m;
50 pmix_status_t rc, status;
51
52
53 m=1;
54 PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &m, PMIX_STATUS);
55 if (PMIX_SUCCESS != rc) {
56 status = rc;
57 }
58
59 if (NULL != cd->cbfunc.opcbfn) {
60 cd->cbfunc.opcbfn(status, cd->cbdata);
61 }
62 PMIX_RELEASE(cd);
63 }
64
65 PMIX_EXPORT pmix_status_t PMIx_Log(const pmix_info_t data[], size_t ndata,
66 const pmix_info_t directives[], size_t ndirs)
67 {
68 pmix_cb_t cb;
69 pmix_status_t rc;
70
71 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
72
73 if (pmix_globals.init_cntr <= 0) {
74 PMIX_RELEASE_THREAD(&pmix_global_lock);
75 return PMIX_ERR_INIT;
76 }
77 PMIX_RELEASE_THREAD(&pmix_global_lock);
78
79 pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
80 "%s pmix:log", PMIX_NAME_PRINT(&pmix_globals.myid));
81
82
83
84
85 PMIX_CONSTRUCT(&cb, pmix_cb_t);
86 if (PMIX_SUCCESS != (rc = PMIx_Log_nb(data, ndata, directives,
87 ndirs, opcbfunc, &cb))) {
88 PMIX_DESTRUCT(&cb);
89 return rc;
90 }
91
92
93 PMIX_WAIT_THREAD(&cb.lock);
94 rc = cb.status;
95 PMIX_DESTRUCT(&cb);
96
97 pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
98 "pmix:log completed");
99
100 return rc;
101 }
102
103 static void localcbfunc(pmix_status_t status, void *cbdata)
104 {
105 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
106
107 PMIX_INFO_FREE(cd->directives, cd->ndirs);
108 if (NULL != cd->cbfunc.opcbfn) {
109 cd->cbfunc.opcbfn(status, cd->cbdata);
110 }
111 PMIX_RELEASE(cd);
112 }
113
114 PMIX_EXPORT pmix_status_t PMIx_Log_nb(const pmix_info_t data[], size_t ndata,
115 const pmix_info_t directives[], size_t ndirs,
116 pmix_op_cbfunc_t cbfunc, void *cbdata)
117
118 {
119 pmix_shift_caddy_t *cd;
120 pmix_cmd_t cmd = PMIX_LOG_CMD;
121 pmix_buffer_t *msg;
122 pmix_status_t rc;
123 size_t n;
124 time_t timestamp = 0;
125 pmix_proc_t *source = NULL;
126
127 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
128
129 pmix_output_verbose(2, pmix_globals.debug_output,
130 "pmix:log non-blocking");
131
132 if (pmix_globals.init_cntr <= 0) {
133 PMIX_RELEASE_THREAD(&pmix_global_lock);
134 return PMIX_ERR_INIT;
135 }
136
137 if (0 == ndata || NULL == data) {
138 PMIX_RELEASE_THREAD(&pmix_global_lock);
139 return PMIX_ERR_BAD_PARAM;
140 }
141
142
143
144 if (NULL != directives) {
145 for (n=0; n < ndirs; n++) {
146 if (0 == strncmp(directives[n].key, PMIX_LOG_GENERATE_TIMESTAMP, PMIX_MAX_KEYLEN)) {
147 if (PMIX_INFO_TRUE(&directives[n])) {
148
149 timestamp = time(NULL);
150 }
151 } else if (0 == strncmp(directives[n].key, PMIX_LOG_SOURCE, PMIX_MAX_KEYLEN)) {
152 source = directives[n].value.data.proc;
153 }
154 }
155 }
156
157
158
159 if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
160 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
161
162 if (!pmix_globals.connected) {
163 PMIX_RELEASE_THREAD(&pmix_global_lock);
164 return PMIX_ERR_UNREACH;
165 }
166 PMIX_RELEASE_THREAD(&pmix_global_lock);
167
168
169 cd = PMIX_NEW(pmix_shift_caddy_t);
170 cd->cbfunc.opcbfn = cbfunc;
171 cd->cbdata = cbdata;
172 msg = PMIX_NEW(pmix_buffer_t);
173 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
174 msg, &cmd, 1, PMIX_COMMAND);
175 if (PMIX_SUCCESS != rc) {
176 PMIX_ERROR_LOG(rc);
177 PMIX_RELEASE(msg);
178 PMIX_RELEASE(cd);
179 return rc;
180 }
181
182
183 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
184 msg, ×tamp, 1, PMIX_TIME);
185 if (PMIX_SUCCESS != rc) {
186 PMIX_ERROR_LOG(rc);
187 PMIX_RELEASE(msg);
188 PMIX_RELEASE(cd);
189 return rc;
190 }
191
192 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
193 msg, &ndata, 1, PMIX_SIZE);
194 if (PMIX_SUCCESS != rc) {
195 PMIX_ERROR_LOG(rc);
196 PMIX_RELEASE(msg);
197 PMIX_RELEASE(cd);
198 return rc;
199 }
200 if (0 < ndata) {
201 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
202 msg, data, ndata, PMIX_INFO);
203 if (PMIX_SUCCESS != rc) {
204 PMIX_ERROR_LOG(rc);
205 PMIX_RELEASE(msg);
206 PMIX_RELEASE(cd);
207 return rc;
208 }
209 }
210 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
211 msg, &ndirs, 1, PMIX_SIZE);
212 if (PMIX_SUCCESS != rc) {
213 PMIX_ERROR_LOG(rc);
214 PMIX_RELEASE(msg);
215 PMIX_RELEASE(cd);
216 return rc;
217 }
218 if (0 < ndirs) {
219 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
220 msg, directives, ndirs, PMIX_INFO);
221 if (PMIX_SUCCESS != rc) {
222 PMIX_ERROR_LOG(rc);
223 PMIX_RELEASE(msg);
224 PMIX_RELEASE(cd);
225 return rc;
226 }
227 }
228
229 pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
230 "pmix:log sending to server");
231 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
232 msg, log_cbfunc, (void*)cd);
233 if (PMIX_SUCCESS != rc) {
234 PMIX_ERROR_LOG(rc);
235 PMIX_RELEASE(cd);
236 }
237 return rc;
238 }
239 PMIX_RELEASE_THREAD(&pmix_global_lock);
240
241
242 if (NULL == source) {
243 source = &pmix_globals.myid;
244 cd = PMIX_NEW(pmix_shift_caddy_t);
245 cd->cbfunc.opcbfn = cbfunc;
246 cd->cbdata = cbdata;
247 cd->ndirs = ndirs + 1;
248 PMIX_INFO_CREATE(cd->directives, cd->ndirs);
249 for (n=0; n < ndirs; n++) {
250 PMIX_INFO_XFER(&cd->directives[n], (pmix_info_t*)&directives[n]);
251 }
252 PMIX_INFO_LOAD(&cd->directives[ndirs], PMIX_LOG_SOURCE, &source, PMIX_PROC);
253
254
255 rc = pmix_plog.log(source, data, ndata, cd->directives, cd->ndirs, localcbfunc, cd);
256 if (PMIX_SUCCESS != rc) {
257 PMIX_INFO_FREE(cd->directives, cd->ndirs);
258 PMIX_RELEASE(cd);
259 }
260 } else if (0 == strncmp(source->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) &&
261 source->rank == pmix_globals.myid.rank) {
262
263
264
265
266 rc = PMIX_ERR_NOT_SUPPORTED;
267 } else {
268
269
270 rc = pmix_plog.log(source, data, ndata, directives, ndirs, cbfunc, cbdata);
271 }
272
273 return rc;
274 }