This source file includes following definitions.
- ft_constructor
- ft_destructor
- cd_con
- cd_des
- add_tracker
- start
- del_tracker
- stop
- opcbfunc
- file_sample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <src/include/pmix_config.h>
20 #include <pmix_common.h>
21
22 #include <stdio.h>
23 #include <stddef.h>
24 #include <ctype.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif
28 #ifdef HAVE_NETDB_H
29 #include <netdb.h>
30 #endif
31 #ifdef HAVE_SYS_PARAM_H
32 #include <sys/param.h>
33 #endif
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <signal.h>
37 #ifdef HAVE_TIME_H
38 #include <time.h>
39 #endif
40 #include <sys/stat.h>
41 #include <sys/types.h>
42
43 #include "src/class/pmix_list.h"
44 #include "src/include/pmix_globals.h"
45 #include "src/util/error.h"
46 #include "src/util/output.h"
47 #include "src/util/show_help.h"
48
49 #include "src/mca/psensor/base/base.h"
50 #include "psensor_file.h"
51
52
53 static pmix_status_t start(pmix_peer_t *requestor, pmix_status_t error,
54 const pmix_info_t *monitor,
55 const pmix_info_t directives[], size_t ndirs);
56 static pmix_status_t stop(pmix_peer_t *requestor, char *id);
57
58
59 pmix_psensor_base_module_t pmix_psensor_file_module = {
60 .start = start,
61 .stop = stop
62 };
63
64
65 typedef struct {
66 pmix_list_item_t super;
67 pmix_peer_t *requestor;
68 char *id;
69 bool event_active;
70 pmix_event_t ev;
71 pmix_event_t cdev;
72 struct timeval tv;
73 int tick;
74 char *file;
75 bool file_size;
76 bool file_access;
77 bool file_mod;
78 size_t last_size;
79 time_t last_access;
80 time_t last_mod;
81 uint32_t ndrops;
82 uint32_t nmisses;
83 pmix_status_t error;
84 pmix_data_range_t range;
85 pmix_info_t *info;
86 size_t ninfo;
87 } file_tracker_t;
88 static void ft_constructor(file_tracker_t *ft)
89 {
90 ft->requestor = NULL;
91 ft->id = NULL;
92 ft->event_active = false;
93 ft->tv.tv_sec = 0;
94 ft->tv.tv_usec = 0;
95 ft->tick = 0;
96 ft->file_size = false;
97 ft->file_access = false;
98 ft->file_mod = false;
99 ft->last_size = 0;
100 ft->last_access = 0;
101 ft->last_mod = 0;
102 ft->ndrops = 0;
103 ft->nmisses = 0;
104 ft->error = PMIX_SUCCESS;
105 ft->range = PMIX_RANGE_NAMESPACE;
106 ft->info = NULL;
107 ft->ninfo = 0;
108 }
109 static void ft_destructor(file_tracker_t *ft)
110 {
111 if (NULL != ft->requestor) {
112 PMIX_RELEASE(ft->requestor);
113 }
114 if (NULL != ft->id) {
115 free(ft->id);
116 }
117 if (ft->event_active) {
118 pmix_event_del(&ft->ev);
119 }
120 if (NULL != ft->file) {
121 free(ft->file);
122 }
123 if (NULL != ft->info) {
124 PMIX_INFO_FREE(ft->info, ft->ninfo);
125 }
126 }
127 PMIX_CLASS_INSTANCE(file_tracker_t,
128 pmix_list_item_t,
129 ft_constructor, ft_destructor);
130
131
132 typedef struct {
133 pmix_object_t super;
134 pmix_event_t ev;
135 pmix_peer_t *requestor;
136 char *id;
137 } file_caddy_t;
138 static void cd_con(file_caddy_t *p)
139 {
140 p->requestor = NULL;
141 p->id = NULL;
142 }
143 static void cd_des(file_caddy_t *p)
144 {
145 if (NULL != (p->requestor)) {
146 PMIX_RELEASE(p->requestor);
147 }
148 if (NULL != p->id) {
149 free(p->id);
150 }
151 }
152 PMIX_CLASS_INSTANCE(file_caddy_t,
153 pmix_object_t,
154 cd_con, cd_des);
155
156 static void file_sample(int sd, short args, void *cbdata);
157
158 static void add_tracker(int sd, short flags, void *cbdata)
159 {
160 file_tracker_t *ft = (file_tracker_t*)cbdata;
161
162 PMIX_ACQUIRE_OBJECT(fd);
163
164
165 pmix_list_append(&mca_psensor_file_component.trackers, &ft->super);
166
167
168 pmix_event_evtimer_set(pmix_psensor_base.evbase, &ft->ev,
169 file_sample, ft);
170 pmix_event_evtimer_add(&ft->ev, &ft->tv);
171 ft->event_active = true;
172 }
173
174
175
176
177 static pmix_status_t start(pmix_peer_t *requestor, pmix_status_t error,
178 const pmix_info_t *monitor,
179 const pmix_info_t directives[], size_t ndirs)
180 {
181 file_tracker_t *ft;
182 size_t n;
183
184 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
185 "[%s:%d] checking file monitoring for requestor %s:%d",
186 pmix_globals.myid.nspace, pmix_globals.myid.rank,
187 requestor->info->pname.nspace, requestor->info->pname.rank));
188
189
190 if (0 != strcmp(monitor->key, PMIX_MONITOR_FILE)) {
191 return PMIX_ERR_TAKE_NEXT_OPTION;
192 }
193
194
195 ft = PMIX_NEW(file_tracker_t);
196 PMIX_RETAIN(requestor);
197 ft->requestor = requestor;
198 ft->file = strdup(monitor->value.data.string);
199
200
201 for (n=0; n < ndirs; n++) {
202 if (0 == strcmp(directives[n].key, PMIX_MONITOR_FILE_SIZE)) {
203 ft->file_size = PMIX_INFO_TRUE(&directives[n]);
204 } else if (0 == strcmp(directives[n].key, PMIX_MONITOR_FILE_ACCESS)) {
205 ft->file_access = PMIX_INFO_TRUE(&directives[n]);
206 } else if (0 == strcmp(directives[n].key, PMIX_MONITOR_FILE_MODIFY)) {
207 ft->file_mod = PMIX_INFO_TRUE(&directives[n]);
208 } else if (0 == strcmp(directives[n].key, PMIX_MONITOR_FILE_DROPS)) {
209 ft->ndrops = directives[n].value.data.uint32;
210 } else if (0 == strcmp(directives[n].key, PMIX_MONITOR_FILE_CHECK_TIME)) {
211 ft->tv.tv_sec = directives[n].value.data.uint32;
212 } else if (0 == strcmp(directives[n].key, PMIX_RANGE)) {
213 ft->range = directives[n].value.data.range;
214 }
215 }
216
217 if (0 == ft->tv.tv_sec ||
218 (!ft->file_size && !ft->file_access && !ft->file_mod)) {
219
220 PMIX_RELEASE(ft);
221 return PMIX_ERR_BAD_PARAM;
222 }
223
224
225 pmix_event_assign(&ft->cdev, pmix_psensor_base.evbase, -1,
226 EV_WRITE, add_tracker, ft);
227 PMIX_POST_OBJECT(ft);
228 pmix_event_active(&ft->cdev, EV_WRITE, 1);
229
230 return PMIX_SUCCESS;
231 }
232
233
234 static void del_tracker(int sd, short flags, void *cbdata)
235 {
236 file_caddy_t *cd = (file_caddy_t*)cbdata;
237 file_tracker_t *ft, *ftnext;
238
239 PMIX_ACQUIRE_OBJECT(cd);
240
241
242 PMIX_LIST_FOREACH_SAFE(ft, ftnext, &mca_psensor_file_component.trackers, file_tracker_t) {
243 if (ft->requestor != cd->requestor) {
244 continue;
245 }
246 if (NULL == cd->id ||
247 (NULL != ft->id && 0 == strcmp(ft->id, cd->id))) {
248 pmix_list_remove_item(&mca_psensor_file_component.trackers, &ft->super);
249 PMIX_RELEASE(ft);
250 }
251 }
252 PMIX_RELEASE(cd);
253 }
254
255 static pmix_status_t stop(pmix_peer_t *requestor, char *id)
256 {
257 file_caddy_t *cd;
258
259 cd = PMIX_NEW(file_caddy_t);
260 PMIX_RETAIN(requestor);
261 cd->requestor = requestor;
262 if (NULL != id) {
263 cd->id = strdup(id);
264 }
265
266
267 pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1,
268 EV_WRITE, del_tracker, cd);
269 PMIX_POST_OBJECT(cd);
270 pmix_event_active(&cd->ev, EV_WRITE, 1);
271
272 return PMIX_SUCCESS;
273 }
274
275 static void opcbfunc(pmix_status_t status, void *cbdata)
276 {
277 file_tracker_t *ft = (file_tracker_t*)cbdata;
278
279 PMIX_RELEASE(ft);
280 }
281
282 static void file_sample(int sd, short args, void *cbdata)
283 {
284 file_tracker_t *ft = (file_tracker_t*)cbdata;
285 struct stat buf;
286 pmix_status_t rc;
287 pmix_proc_t source;
288
289 PMIX_ACQUIRE_OBJECT(ft);
290
291 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
292 "[%s:%d] sampling file %s",
293 pmix_globals.myid.nspace, pmix_globals.myid.rank,
294 ft->file));
295
296
297 if (0 > stat(ft->file, &buf)) {
298
299 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
300 "[%s:%d] could not stat %s",
301 pmix_globals.myid.nspace, pmix_globals.myid.rank,
302 ft->file));
303
304 pmix_event_evtimer_add(&ft->ev, &ft->tv);
305 return;
306 }
307
308 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
309 "[%s:%d] size %lu access %s\tmod %s",
310 pmix_globals.myid.nspace, pmix_globals.myid.rank,
311 (unsigned long)buf.st_size, ctime(&buf.st_atime), ctime(&buf.st_mtime)));
312
313 if (ft->file_size) {
314 if (buf.st_size == (int64_t)ft->last_size) {
315 ft->nmisses++;
316 } else {
317 ft->nmisses = 0;
318 ft->last_size = buf.st_size;
319 }
320 } else if (ft->file_access) {
321 if (buf.st_atime == ft->last_access) {
322 ft->nmisses++;
323 } else {
324 ft->nmisses = 0;
325 ft->last_access = buf.st_atime;
326 }
327 } else if (ft->file_mod) {
328 if (buf.st_mtime == ft->last_mod) {
329 ft->nmisses++;
330 } else {
331 ft->nmisses = 0;
332 ft->last_mod = buf.st_mtime;
333 }
334 }
335
336 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
337 "[%s:%d] sampled file %s misses %d",
338 pmix_globals.myid.nspace, pmix_globals.myid.rank,
339 ft->file, ft->nmisses));
340
341 if (ft->nmisses == ft->ndrops) {
342 if (4 < pmix_output_get_verbosity(pmix_psensor_base_framework.framework_output)) {
343 pmix_show_help("help-pmix-psensor-file.txt", "file-stalled", true,
344 ft->file, ft->last_size, ctime(&ft->last_access), ctime(&ft->last_mod));
345 }
346
347 pmix_list_remove_item(&mca_psensor_file_component.trackers, &ft->super);
348
349 pmix_strncpy(source.nspace, ft->requestor->info->pname.nspace, PMIX_MAX_NSLEN);
350 source.rank = ft->requestor->info->pname.rank;
351 rc = PMIx_Notify_event(PMIX_MONITOR_FILE_ALERT, &source,
352 ft->range, ft->info, ft->ninfo, opcbfunc, ft);
353 if (PMIX_SUCCESS != rc) {
354 PMIX_ERROR_LOG(rc);
355 }
356 return;
357 }
358
359
360 pmix_event_evtimer_add(&ft->ev, &ft->tv);
361 }