This source file includes following definitions.
- mycon
- mydes
- localcbfunc
- pmix_plog_base_log
1
2
3
4
5
6
7
8
9
10
11
12 #include <src/include/pmix_config.h>
13
14 #include <pmix_common.h>
15 #include "src/include/pmix_globals.h"
16
17 #include "src/class/pmix_list.h"
18 #include "src/util/error.h"
19 #include "src/server/pmix_server_ops.h"
20
21 #include "src/mca/plog/base/base.h"
22
23 typedef struct {
24 pmix_object_t super;
25 pmix_lock_t lock;
26 size_t nreqs;
27 pmix_status_t status;
28 pmix_op_cbfunc_t cbfunc;
29 void *cbdata;
30 } pmix_mycount_t;
31 static void mycon(pmix_mycount_t *p)
32 {
33 PMIX_CONSTRUCT_LOCK(&p->lock);
34 p->lock.active = false;
35 p->nreqs = 0;
36 p->status = PMIX_ERR_NOT_AVAILABLE;
37 p->cbfunc = NULL;
38 p->cbdata = NULL;
39 }
40 static void mydes(pmix_mycount_t *p)
41 {
42 PMIX_DESTRUCT_LOCK(&p->lock);
43 }
44 static PMIX_CLASS_INSTANCE(pmix_mycount_t,
45 pmix_object_t,
46 mycon, mydes);
47
48 static void localcbfunc(pmix_status_t status, void *cbdata)
49 {
50 pmix_mycount_t *mycount = (pmix_mycount_t*)cbdata;
51
52 PMIX_ACQUIRE_THREAD(&mycount->lock);
53 mycount->nreqs--;
54 if (PMIX_SUCCESS != status && PMIX_SUCCESS == mycount->status) {
55 mycount->status = status;
56 }
57 if (0 == mycount->nreqs) {
58
59 if (NULL != mycount->cbfunc) {
60 mycount->cbfunc(mycount->status, mycount->cbdata);
61 }
62 PMIX_RELEASE_THREAD(&mycount->lock);
63 PMIX_RELEASE(mycount);
64 return;
65 }
66 PMIX_RELEASE_THREAD(&mycount->lock);
67 }
68
69 pmix_status_t pmix_plog_base_log(const pmix_proc_t *source,
70 const pmix_info_t data[], size_t ndata,
71 const pmix_info_t directives[], size_t ndirs,
72 pmix_op_cbfunc_t cbfunc, void *cbdata)
73 {
74 pmix_plog_base_active_module_t *active;
75 pmix_status_t rc = PMIX_ERR_NOT_AVAILABLE;
76 size_t n, k;
77 int m;
78 bool logonce = false;
79 pmix_mycount_t *mycount;
80 pmix_list_t channels;
81 bool all_complete = true;
82
83 if (!pmix_plog_globals.initialized) {
84 return PMIX_ERR_INIT;
85 }
86
87
88
89
90 PMIX_ACQUIRE_THREAD(&pmix_plog_globals.lock);
91
92 pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
93 "plog:log called");
94
95
96 mycount = PMIX_NEW(pmix_mycount_t);
97 if (NULL == mycount) {
98 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
99 return PMIX_ERR_NOMEM;
100 }
101 mycount->cbfunc = cbfunc;
102 mycount->cbdata = cbdata;
103
104 PMIX_CONSTRUCT(&channels, pmix_list_t);
105
106 if (NULL != directives) {
107
108
109
110
111 for (n=0; n < ndirs; n++) {
112 if (0 == strncmp(directives[n].key, PMIX_LOG_ONCE, PMIX_MAX_KEYLEN)) {
113 logonce = true;
114 break;
115 }
116 }
117 }
118
119
120
121
122 for (n=0; n < ndata; n++) {
123 if (PMIX_INFO_OP_IS_COMPLETE(&data[n])) {
124 continue;
125 }
126 all_complete = false;
127 for (m=0; m < pmix_plog_globals.actives.size; m++) {
128 if (NULL == (active = (pmix_plog_base_active_module_t*)pmix_pointer_array_get_item(&pmix_plog_globals.actives, m))) {
129 continue;
130 }
131
132
133 if (NULL == active->module->channels) {
134 if (!active->added) {
135
136 pmix_list_append(&channels, &active->super);
137
138 active->added = true;
139 }
140 } else {
141 for (k=0; NULL != active->module->channels[k]; k++) {
142 if (NULL != strstr(data[n].key, active->module->channels[k])) {
143 if (!active->added) {
144
145 pmix_list_append(&channels, &active->super);
146
147 active->added = true;
148 break;
149 }
150 }
151 }
152 }
153 }
154 }
155
156 PMIX_LIST_FOREACH(active, &channels, pmix_plog_base_active_module_t) {
157 active->added = false;
158 }
159 if (all_complete) {
160
161 while (NULL != pmix_list_remove_first(&channels));
162 PMIX_DESTRUCT(&channels);
163 PMIX_RELEASE(mycount);
164 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
165 return PMIX_SUCCESS;
166 }
167 PMIX_ACQUIRE_THREAD(&mycount->lock);
168 PMIX_LIST_FOREACH(active, &channels, pmix_plog_base_active_module_t) {
169 if (NULL != active->module->log) {
170 mycount->nreqs++;
171 rc = active->module->log(source, data, ndata, directives, ndirs,
172 localcbfunc, (void*)mycount);
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 if (PMIX_SUCCESS == rc) {
209 mycount->nreqs--;
210 mycount->status = rc;
211 if (logonce) {
212 break;
213 }
214 } else if (PMIX_ERR_NOT_AVAILABLE == rc ||
215 PMIX_ERR_TAKE_NEXT_OPTION == rc) {
216 mycount->nreqs--;
217 } else if (PMIX_OPERATION_IN_PROGRESS == rc) {
218
219
220 mycount->status = PMIX_SUCCESS;
221 if (logonce) {
222 break;
223 }
224 } else {
225
226
227
228 mycount->nreqs--;
229 mycount->status = rc;
230 }
231 }
232 }
233
234
235 while (NULL != pmix_list_remove_first(&channels));
236 PMIX_DESTRUCT(&channels);
237
238 rc = mycount->status;
239 if (0 == mycount->nreqs) {
240
241 if (NULL != mycount->cbfunc) {
242 mycount->cbfunc(mycount->status, mycount->cbdata);
243 }
244 PMIX_RELEASE_THREAD(&mycount->lock);
245 PMIX_RELEASE(mycount);
246 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
247 return PMIX_SUCCESS;
248 }
249 PMIX_RELEASE_THREAD(&mycount->lock);
250 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
251
252 return rc;
253 }