This source file includes following definitions.
- ft_constructor
- ft_destructor
- cd_con
- cd_des
- bcon
- bdes
- add_tracker
- heartbeat_start
- del_tracker
- heartbeat_stop
- opcbfunc
- check_heartbeat
- add_beat
- pmix_psensor_heartbeat_recv_beats
1
2
3
4
5
6
7
8
9
10
11
12
13
14 #include <src/include/pmix_config.h>
15 #include <pmix_common.h>
16
17 #include <errno.h>
18 #ifdef HAVE_UNISTD_H
19 #include <unistd.h>
20 #endif
21 #ifdef HAVE_STRING_H
22 #include <string.h>
23 #endif
24 #include <stdio.h>
25 #include <pthread.h>
26 #include PMIX_EVENT_HEADER
27
28 #include "src/util/argv.h"
29 #include "src/util/error.h"
30 #include "src/util/output.h"
31 #include "src/util/show_help.h"
32 #include "src/include/pmix_globals.h"
33 #include "src/mca/ptl/base/base.h"
34
35 #include "src/mca/psensor/base/base.h"
36 #include "psensor_heartbeat.h"
37
38
39 static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error,
40 const pmix_info_t *monitor,
41 const pmix_info_t directives[], size_t ndirs);
42 static pmix_status_t heartbeat_stop(pmix_peer_t *requestor, char *id);
43
44
45 pmix_psensor_base_module_t pmix_psensor_heartbeat_module = {
46 .start = heartbeat_start,
47 .stop = heartbeat_stop
48 };
49
50
51 typedef struct {
52 pmix_list_item_t super;
53 pmix_peer_t *requestor;
54 char *id;
55 bool event_active;
56 pmix_event_t ev;
57 pmix_event_t cdev;
58 struct timeval tv;
59 uint32_t nbeats;
60 uint32_t ndrops;
61 uint32_t nmissed;
62 pmix_status_t error;
63 pmix_data_range_t range;
64 pmix_info_t *info;
65 size_t ninfo;
66 bool stopped;
67 } pmix_heartbeat_trkr_t;
68
69 static void ft_constructor(pmix_heartbeat_trkr_t *ft)
70 {
71 ft->requestor = NULL;
72 ft->id = NULL;
73 ft->event_active = false;
74 ft->tv.tv_sec = 0;
75 ft->tv.tv_usec = 0;
76 ft->nbeats = 0;
77 ft->ndrops = 0;
78 ft->nmissed = 0;
79 ft->error = PMIX_SUCCESS;
80 ft->range = PMIX_RANGE_NAMESPACE;
81 ft->info = NULL;
82 ft->ninfo = 0;
83 ft->stopped = false;
84 }
85 static void ft_destructor(pmix_heartbeat_trkr_t *ft)
86 {
87 if (NULL != ft->requestor) {
88 PMIX_RELEASE(ft->requestor);
89 }
90 if (NULL != ft->id) {
91 free(ft->id);
92 }
93 if (ft->event_active) {
94 pmix_event_del(&ft->ev);
95 }
96 if (NULL != ft->info) {
97 PMIX_INFO_FREE(ft->info, ft->ninfo);
98 }
99 }
100 PMIX_CLASS_INSTANCE(pmix_heartbeat_trkr_t,
101 pmix_list_item_t,
102 ft_constructor, ft_destructor);
103
104
105 typedef struct {
106 pmix_object_t super;
107 pmix_event_t ev;
108 pmix_peer_t *requestor;
109 char *id;
110 } heartbeat_caddy_t;
111 static void cd_con(heartbeat_caddy_t *p)
112 {
113 p->requestor = NULL;
114 p->id = NULL;
115 }
116 static void cd_des(heartbeat_caddy_t *p)
117 {
118 if (NULL != (p->requestor)) {
119 PMIX_RELEASE(p->requestor);
120 }
121 if (NULL != p->id) {
122 free(p->id);
123 }
124 }
125 PMIX_CLASS_INSTANCE(heartbeat_caddy_t,
126 pmix_object_t,
127 cd_con, cd_des);
128
129 typedef struct {
130 pmix_object_t super;
131 pmix_event_t ev;
132 pmix_peer_t *peer;
133 } pmix_psensor_beat_t;
134
135 static void bcon(pmix_psensor_beat_t *p)
136 {
137 p->peer = NULL;
138 }
139 static void bdes(pmix_psensor_beat_t *p)
140 {
141 if (NULL != p->peer) {
142 PMIX_RELEASE(p->peer);
143 }
144 }
145 PMIX_CLASS_INSTANCE(pmix_psensor_beat_t,
146 pmix_object_t,
147 bcon, bdes);
148
149 static void check_heartbeat(int fd, short dummy, void *arg);
150
151 static void add_tracker(int sd, short flags, void *cbdata)
152 {
153 pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
154
155 PMIX_ACQUIRE_OBJECT(ft);
156
157
158 pmix_list_append(&mca_psensor_heartbeat_component.trackers, &ft->super);
159
160
161 pmix_event_evtimer_set(pmix_psensor_base.evbase, &ft->ev,
162 check_heartbeat, ft);
163 pmix_event_evtimer_add(&ft->ev, &ft->tv);
164 ft->event_active = true;
165 }
166
167 static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error,
168 const pmix_info_t *monitor,
169 const pmix_info_t directives[], size_t ndirs)
170 {
171 pmix_heartbeat_trkr_t *ft;
172 size_t n;
173 pmix_ptl_posted_recv_t *rcv;
174
175 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
176 "[%s:%d] checking heartbeat monitoring for requestor %s:%d",
177 pmix_globals.myid.nspace, pmix_globals.myid.rank,
178 requestor->info->pname.nspace, requestor->info->pname.rank));
179
180
181 if (0 != strcmp(monitor->key, PMIX_MONITOR_HEARTBEAT)) {
182 return PMIX_ERR_TAKE_NEXT_OPTION;
183 }
184
185
186 ft = PMIX_NEW(pmix_heartbeat_trkr_t);
187 PMIX_RETAIN(requestor);
188 ft->requestor = requestor;
189 ft->error = error;
190
191
192 for (n=0; n < ndirs; n++) {
193 if (0 == strcmp(directives[n].key, PMIX_MONITOR_HEARTBEAT_TIME)) {
194 ft->tv.tv_sec = directives[n].value.data.uint32;
195 } else if (0 == strcmp(directives[n].key, PMIX_MONITOR_HEARTBEAT_DROPS)) {
196 ft->ndrops = directives[n].value.data.uint32;
197 } else if (0 == strcmp(directives[n].key, PMIX_RANGE)) {
198 ft->range = directives[n].value.data.range;
199 }
200 }
201
202 if (0 == ft->tv.tv_sec) {
203
204 PMIX_RELEASE(ft);
205 return PMIX_ERR_BAD_PARAM;
206 }
207
208
209 if (!mca_psensor_heartbeat_component.recv_active) {
210
211 rcv = PMIX_NEW(pmix_ptl_posted_recv_t);
212 rcv->tag = PMIX_PTL_TAG_HEARTBEAT;
213 rcv->cbfunc = pmix_psensor_heartbeat_recv_beats;
214
215 pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &rcv->super);
216 mca_psensor_heartbeat_component.recv_active = true;
217 }
218
219
220 pmix_event_assign(&ft->cdev, pmix_psensor_base.evbase, -1,
221 EV_WRITE, add_tracker, ft);
222 PMIX_POST_OBJECT(ft);
223 pmix_event_active(&ft->cdev, EV_WRITE, 1);
224
225 return PMIX_SUCCESS;
226 }
227
228 static void del_tracker(int sd, short flags, void *cbdata)
229 {
230 heartbeat_caddy_t *cd = (heartbeat_caddy_t*)cbdata;
231 pmix_heartbeat_trkr_t *ft, *ftnext;
232
233 PMIX_ACQUIRE_OBJECT(cd);
234
235
236 PMIX_LIST_FOREACH_SAFE(ft, ftnext, &mca_psensor_heartbeat_component.trackers, pmix_heartbeat_trkr_t) {
237 if (ft->requestor != cd->requestor) {
238 continue;
239 }
240 if (NULL == cd->id ||
241 (NULL != ft->id && 0 == strcmp(ft->id, cd->id))) {
242 pmix_list_remove_item(&mca_psensor_heartbeat_component.trackers, &ft->super);
243 PMIX_RELEASE(ft);
244 }
245 }
246 PMIX_RELEASE(cd);
247 }
248
249 static pmix_status_t heartbeat_stop(pmix_peer_t *requestor, char *id)
250 {
251 heartbeat_caddy_t *cd;
252
253 cd = PMIX_NEW(heartbeat_caddy_t);
254 PMIX_RETAIN(requestor);
255 cd->requestor = requestor;
256 if (NULL != id) {
257 cd->id = strdup(id);
258 }
259
260
261 pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1,
262 EV_WRITE, del_tracker, cd);
263 PMIX_POST_OBJECT(cd);
264 pmix_event_active(&cd->ev, EV_WRITE, 1);
265
266 return PMIX_SUCCESS;
267 }
268
269 static void opcbfunc(pmix_status_t status, void *cbdata)
270 {
271 pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
272
273 PMIX_RELEASE(ft);
274 }
275
276
277
278
279
280 static void check_heartbeat(int fd, short dummy, void *cbdata)
281 {
282 pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
283 pmix_status_t rc;
284 pmix_proc_t source;
285
286 PMIX_ACQUIRE_OBJECT(ft);
287
288 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
289 "[%s:%d] sensor:check_heartbeat for proc %s:%d",
290 pmix_globals.myid.nspace, pmix_globals.myid.rank,
291 ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
292
293 if (0 == ft->nbeats && !ft->stopped) {
294
295 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
296 "[%s:%d] sensor:check_heartbeat failed for proc %s:%d",
297 pmix_globals.myid.nspace, pmix_globals.myid.rank,
298 ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
299
300 pmix_strncpy(source.nspace, ft->requestor->info->pname.nspace, PMIX_MAX_NSLEN);
301 source.rank = ft->requestor->info->pname.rank;
302
303 PMIX_RETAIN(ft);
304
305
306 ft->stopped = true;
307 rc = PMIx_Notify_event(PMIX_MONITOR_HEARTBEAT_ALERT, &source,
308 ft->range, ft->info, ft->ninfo, opcbfunc, ft);
309 if (PMIX_SUCCESS != rc) {
310 PMIX_ERROR_LOG(rc);
311 }
312 } else {
313 PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
314 "[%s:%d] sensor:check_heartbeat detected %d beats for proc %s:%d",
315 pmix_globals.myid.nspace, pmix_globals.myid.rank, ft->nbeats,
316 ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
317 }
318
319 ft->nbeats = 0;
320
321
322 pmix_event_evtimer_add(&ft->ev, &ft->tv);
323 }
324
325 static void add_beat(int sd, short args, void *cbdata)
326 {
327 pmix_psensor_beat_t *b = (pmix_psensor_beat_t*)cbdata;
328 pmix_heartbeat_trkr_t *ft;
329
330 PMIX_ACQUIRE_OBJECT(b);
331
332
333 PMIX_LIST_FOREACH(ft, &mca_psensor_heartbeat_component.trackers, pmix_heartbeat_trkr_t) {
334 if (ft->requestor == b->peer) {
335
336 ++ft->nbeats;
337
338 ft->stopped = false;
339 break;
340 }
341 }
342
343 PMIX_RELEASE(b);
344 }
345
346 void pmix_psensor_heartbeat_recv_beats(struct pmix_peer_t *peer,
347 pmix_ptl_hdr_t *hdr,
348 pmix_buffer_t *buf, void *cbdata)
349 {
350 pmix_psensor_beat_t *b;
351
352 b = PMIX_NEW(pmix_psensor_beat_t);
353 PMIX_RETAIN(peer);
354 b->peer = peer;
355
356
357 pmix_event_assign(&b->ev, pmix_psensor_base.evbase, -1,
358 EV_WRITE, add_beat, b);
359 PMIX_POST_OBJECT(b);
360 pmix_event_active(&b->ev, EV_WRITE, 1);
361 }