1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2018 Intel, Inc. All rights reserved.
4 *
5 * $COPYRIGHT$
6 *
7 * Additional copyrights may follow
8 *
9 * $HEADER$
10 */
11
12 #include <src/include/pmix_config.h>
13
14 #include <pmix_common.h>
15 #include "src/include/pmix_globals.h"
16
17 #include "src/class/pmix_list.h"
18 #include "src/util/error.h"
19 #include "src/server/pmix_server_ops.h"
20
21 #include "src/mca/plog/base/base.h"
22
23 typedef struct {
24 pmix_object_t super;
25 pmix_lock_t lock;
26 size_t nreqs;
27 pmix_status_t status;
28 pmix_op_cbfunc_t cbfunc;
29 void *cbdata;
30 } pmix_mycount_t;
31 static void mycon(pmix_mycount_t *p)
32 {
33 PMIX_CONSTRUCT_LOCK(&p->lock);
34 p->lock.active = false;
35 p->nreqs = 0;
36 p->status = PMIX_ERR_NOT_AVAILABLE;
37 p->cbfunc = NULL;
38 p->cbdata = NULL;
39 }
40 static void mydes(pmix_mycount_t *p)
41 {
42 PMIX_DESTRUCT_LOCK(&p->lock);
43 }
44 static PMIX_CLASS_INSTANCE(pmix_mycount_t,
45 pmix_object_t,
46 mycon, mydes);
47
48 static void localcbfunc(pmix_status_t status, void *cbdata)
49 {
50 pmix_mycount_t *mycount = (pmix_mycount_t*)cbdata;
51
52 PMIX_ACQUIRE_THREAD(&mycount->lock);
53 mycount->nreqs--;
54 if (PMIX_SUCCESS != status && PMIX_SUCCESS == mycount->status) {
55 mycount->status = status;
56 }
57 if (0 == mycount->nreqs) {
58 /* execute their callback */
59 if (NULL != mycount->cbfunc) {
60 mycount->cbfunc(mycount->status, mycount->cbdata);
61 }
62 PMIX_RELEASE_THREAD(&mycount->lock);
63 PMIX_RELEASE(mycount);
64 return;
65 }
66 PMIX_RELEASE_THREAD(&mycount->lock);
67 }
68
69 pmix_status_t pmix_plog_base_log(const pmix_proc_t *source,
70 const pmix_info_t data[], size_t ndata,
71 const pmix_info_t directives[], size_t ndirs,
72 pmix_op_cbfunc_t cbfunc, void *cbdata)
73 {
74 pmix_plog_base_active_module_t *active;
75 pmix_status_t rc = PMIX_ERR_NOT_AVAILABLE;
76 size_t n, k;
77 int m;
78 bool logonce = false;
79 pmix_mycount_t *mycount;
80 pmix_list_t channels;
81 bool all_complete = true;
82
83 if (!pmix_plog_globals.initialized) {
84 return PMIX_ERR_INIT;
85 }
86
87 /* we have to serialize our way thru here as we are going
88 * to construct a list of the available modules, and those
89 * can only be on one list at a time */
90 PMIX_ACQUIRE_THREAD(&pmix_plog_globals.lock);
91
92 pmix_output_verbose(2, pmix_plog_base_framework.framework_output,
93 "plog:log called");
94
95 /* initialize the tracker */
96 mycount = PMIX_NEW(pmix_mycount_t);
97 if (NULL == mycount) {
98 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
99 return PMIX_ERR_NOMEM;
100 }
101 mycount->cbfunc = cbfunc;
102 mycount->cbdata = cbdata;
103 /* initialize the list of channels */
104 PMIX_CONSTRUCT(&channels, pmix_list_t);
105
106 if (NULL != directives) {
107 /* scan the directives for the PMIX_LOG_ONCE attribute
108 * which indicates we should stop with the first log
109 * channel that can successfully handle this request,
110 * and any channel directives */
111 for (n=0; n < ndirs; n++) {
112 if (0 == strncmp(directives[n].key, PMIX_LOG_ONCE, PMIX_MAX_KEYLEN)) {
113 logonce = true;
114 break;
115 }
116 }
117 }
118
119 /* scan the incoming logging requests and assemble the modules in
120 * the corresponding order - this will ensure that the one they
121 * requested first gets first shot at "log once" */
122 for (n=0; n < ndata; n++) {
123 if (PMIX_INFO_OP_IS_COMPLETE(&data[n])) {
124 continue;
125 }
126 all_complete = false;
127 for (m=0; m < pmix_plog_globals.actives.size; m++) {
128 if (NULL == (active = (pmix_plog_base_active_module_t*)pmix_pointer_array_get_item(&pmix_plog_globals.actives, m))) {
129 continue;
130 }
131 /* if this channel is included in the ones serviced by this
132 * module, then include the module */
133 if (NULL == active->module->channels) {
134 if (!active->added) {
135 /* add this channel to the list */
136 pmix_list_append(&channels, &active->super);
137 /* mark it as added */
138 active->added = true;
139 }
140 } else {
141 for (k=0; NULL != active->module->channels[k]; k++) {
142 if (NULL != strstr(data[n].key, active->module->channels[k])) {
143 if (!active->added) {
144 /* add this channel to the list */
145 pmix_list_append(&channels, &active->super);
146 /* mark it as added */
147 active->added = true;
148 break;
149 }
150 }
151 }
152 }
153 }
154 }
155 /* reset the added marker for the next time we are called */
156 PMIX_LIST_FOREACH(active, &channels, pmix_plog_base_active_module_t) {
157 active->added = false;
158 }
159 if (all_complete) {
160 /* nothing we need do */
161 while (NULL != pmix_list_remove_first(&channels));
162 PMIX_DESTRUCT(&channels);
163 PMIX_RELEASE(mycount);
164 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
165 return PMIX_SUCCESS;
166 }
167 PMIX_ACQUIRE_THREAD(&mycount->lock);
168 PMIX_LIST_FOREACH(active, &channels, pmix_plog_base_active_module_t) {
169 if (NULL != active->module->log) {
170 mycount->nreqs++;
171 rc = active->module->log(source, data, ndata, directives, ndirs,
172 localcbfunc, (void*)mycount);
173 /* The plugins are required to return:
174 *
175 * PMIX_SUCCESS - indicating that the logging operation for
176 * that component was very quick, and therefore
177 * done atomically. No callback will be issued
178 *
179 * PMIX_OPERATION_IN_PROGRESS - indicates that the plugin
180 * expects to execute the desired logging request,
181 * but must do so asynchronously. The provided
182 * callback _must_ be executed upon completion
183 * of the operation, indicating success or failure.
184 *
185 * PMIX_ERR_NOT_AVAILABLE - indicates that the plugin is unable
186 * to process the request.
187 * No callback will be issued.
188 *
189 * PMIX_ERR_TAKE_NEXT_OPTION - indicates that the plugin didn't
190 * find any directives that it supports.
191 * No callback will be issued.
192 *
193 * PMIX_ERR_NOT_SUPPORTED - indicates that the request cannot be
194 * supported. The list will cease processing at
195 * that point and return this error
196 *
197 * All other returned errors indicate that the plugin should
198 * have attempted to perform the requested operation, but determined
199 * that it could not do so. Note that this differs from the case
200 * where a plugin asynchronously attempts an operation that subsequently
201 * fails - that error would be returned in the callback function.
202 * In this case, the error indicates that the request contained
203 * an incorrect/invalid element that prevents the plugin from
204 * executing it. The first such retured error will be cached and
205 * returned to the caller upon completion of all pending operations.
206 * No callback from failed plugins shall be executed.
207 */
208 if (PMIX_SUCCESS == rc) {
209 mycount->nreqs--;
210 mycount->status = rc;
211 if (logonce) {
212 break;
213 }
214 } else if (PMIX_ERR_NOT_AVAILABLE == rc ||
215 PMIX_ERR_TAKE_NEXT_OPTION == rc) {
216 mycount->nreqs--;
217 } else if (PMIX_OPERATION_IN_PROGRESS == rc) {
218 /* even though the operation hasn't completed,
219 * we still treat this as a completed request */
220 mycount->status = PMIX_SUCCESS;
221 if (logonce) {
222 break;
223 }
224 } else {
225 /* we may have outstanding requests we need
226 * to wait for, so mark that there was an error
227 * for reporting purposes */
228 mycount->nreqs--;
229 mycount->status = rc;
230 }
231 }
232 }
233
234 /* cannot release the modules - just remove everything from the list */
235 while (NULL != pmix_list_remove_first(&channels));
236 PMIX_DESTRUCT(&channels);
237
238 rc = mycount->status; // save the status as it could change when the lock is released
239 if (0 == mycount->nreqs) {
240 /* execute their callback */
241 if (NULL != mycount->cbfunc) {
242 mycount->cbfunc(mycount->status, mycount->cbdata);
243 }
244 PMIX_RELEASE_THREAD(&mycount->lock);
245 PMIX_RELEASE(mycount);
246 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
247 return PMIX_SUCCESS;
248 }
249 PMIX_RELEASE_THREAD(&mycount->lock);
250 PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
251
252 return rc;
253 }