root/opal/mca/pmix/pmix4x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ft_constructor
  2. ft_destructor
  3. cd_con
  4. cd_des
  5. bcon
  6. bdes
  7. add_tracker
  8. heartbeat_start
  9. del_tracker
  10. heartbeat_stop
  11. opcbfunc
  12. check_heartbeat
  13. add_beat
  14. pmix_psensor_heartbeat_recv_beats

   1 /*
   2  * Copyright (c) 2010      Cisco Systems, Inc.  All rights reserved.
   3  * Copyright (c) 2011-2012 Los Alamos National Security, LLC.  All rights
   4  *                         reserved.
   5   *
   6  * Copyright (c) 2017-2018 Intel, Inc.  All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #include <src/include/pmix_config.h>
  15 #include <pmix_common.h>
  16 
  17 #include <errno.h>
  18 #ifdef HAVE_UNISTD_H
  19 #include <unistd.h>
  20 #endif  /* HAVE_UNISTD_H */
  21 #ifdef HAVE_STRING_H
  22 #include <string.h>
  23 #endif  /* HAVE_STRING_H */
  24 #include <stdio.h>
  25 #include <pthread.h>
  26 #include PMIX_EVENT_HEADER
  27 
  28 #include "src/util/argv.h"
  29 #include "src/util/error.h"
  30 #include "src/util/output.h"
  31 #include "src/util/show_help.h"
  32 #include "src/include/pmix_globals.h"
  33 #include "src/mca/ptl/base/base.h"
  34 
  35 #include "src/mca/psensor/base/base.h"
  36 #include "psensor_heartbeat.h"
  37 
  38 /* declare the API functions */
  39 static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error,
  40                                      const pmix_info_t *monitor,
  41                                      const pmix_info_t directives[], size_t ndirs);
  42 static pmix_status_t heartbeat_stop(pmix_peer_t *requestor, char *id);
  43 
  44 /* instantiate the module */
  45 pmix_psensor_base_module_t pmix_psensor_heartbeat_module = {
  46     .start = heartbeat_start,
  47     .stop = heartbeat_stop
  48 };
  49 
  50 /* tracker object */
  51 typedef struct {
  52     pmix_list_item_t super;
  53     pmix_peer_t *requestor;
  54     char *id;
  55     bool event_active;
  56     pmix_event_t ev;
  57     pmix_event_t cdev;
  58     struct timeval tv;
  59     uint32_t nbeats;
  60     uint32_t ndrops;
  61     uint32_t nmissed;
  62     pmix_status_t error;
  63     pmix_data_range_t range;
  64     pmix_info_t *info;
  65     size_t ninfo;
  66     bool stopped;
  67 } pmix_heartbeat_trkr_t;
  68 
  69 static void ft_constructor(pmix_heartbeat_trkr_t *ft)
  70 {
  71     ft->requestor = NULL;
  72     ft->id = NULL;
  73     ft->event_active = false;
  74     ft->tv.tv_sec = 0;
  75     ft->tv.tv_usec = 0;
  76     ft->nbeats = 0;
  77     ft->ndrops = 0;
  78     ft->nmissed = 0;
  79     ft->error = PMIX_SUCCESS;
  80     ft->range = PMIX_RANGE_NAMESPACE;
  81     ft->info = NULL;
  82     ft->ninfo = 0;
  83     ft->stopped = false;
  84 }
  85 static void ft_destructor(pmix_heartbeat_trkr_t *ft)
  86 {
  87     if (NULL != ft->requestor) {
  88         PMIX_RELEASE(ft->requestor);
  89     }
  90     if (NULL != ft->id) {
  91         free(ft->id);
  92     }
  93     if (ft->event_active) {
  94         pmix_event_del(&ft->ev);
  95     }
  96     if (NULL != ft->info) {
  97         PMIX_INFO_FREE(ft->info, ft->ninfo);
  98     }
  99 }
 100 PMIX_CLASS_INSTANCE(pmix_heartbeat_trkr_t,
 101                     pmix_list_item_t,
 102                     ft_constructor, ft_destructor);
 103 
 104 /* define a local caddy */
 105 typedef struct {
 106     pmix_object_t super;
 107     pmix_event_t ev;
 108     pmix_peer_t *requestor;
 109     char *id;
 110 } heartbeat_caddy_t;
 111 static void cd_con(heartbeat_caddy_t *p)
 112 {
 113     p->requestor = NULL;
 114     p->id = NULL;
 115 }
 116 static void cd_des(heartbeat_caddy_t *p)
 117 {
 118     if (NULL != (p->requestor)) {
 119         PMIX_RELEASE(p->requestor);
 120     }
 121     if (NULL != p->id) {
 122         free(p->id);
 123     }
 124 }
 125 PMIX_CLASS_INSTANCE(heartbeat_caddy_t,
 126                     pmix_object_t,
 127                     cd_con, cd_des);
 128 
 129 typedef struct {
 130     pmix_object_t super;
 131     pmix_event_t ev;
 132     pmix_peer_t *peer;
 133 } pmix_psensor_beat_t;
 134 
 135 static void bcon(pmix_psensor_beat_t *p)
 136 {
 137     p->peer = NULL;
 138 }
 139 static void bdes(pmix_psensor_beat_t *p)
 140 {
 141     if (NULL != p->peer) {
 142         PMIX_RELEASE(p->peer);
 143     }
 144 }
 145 PMIX_CLASS_INSTANCE(pmix_psensor_beat_t,
 146                     pmix_object_t,
 147                     bcon, bdes);
 148 
 149 static void check_heartbeat(int fd, short dummy, void *arg);
 150 
 151 static void add_tracker(int sd, short flags, void *cbdata)
 152 {
 153     pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
 154 
 155     PMIX_ACQUIRE_OBJECT(ft);
 156 
 157     /* add the tracker to our list */
 158     pmix_list_append(&mca_psensor_heartbeat_component.trackers, &ft->super);
 159 
 160     /* setup the timer event */
 161     pmix_event_evtimer_set(pmix_psensor_base.evbase, &ft->ev,
 162                            check_heartbeat, ft);
 163     pmix_event_evtimer_add(&ft->ev, &ft->tv);
 164     ft->event_active = true;
 165 }
 166 
 167 static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error,
 168                                      const pmix_info_t *monitor,
 169                                      const pmix_info_t directives[], size_t ndirs)
 170 {
 171     pmix_heartbeat_trkr_t *ft;
 172     size_t n;
 173     pmix_ptl_posted_recv_t *rcv;
 174 
 175     PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
 176                          "[%s:%d] checking heartbeat monitoring for requestor %s:%d",
 177                          pmix_globals.myid.nspace, pmix_globals.myid.rank,
 178                          requestor->info->pname.nspace, requestor->info->pname.rank));
 179 
 180     /* if they didn't ask for heartbeats, then nothing for us to do */
 181     if (0 != strcmp(monitor->key, PMIX_MONITOR_HEARTBEAT)) {
 182         return PMIX_ERR_TAKE_NEXT_OPTION;
 183     }
 184 
 185     /* setup to track this monitoring operation */
 186     ft = PMIX_NEW(pmix_heartbeat_trkr_t);
 187     PMIX_RETAIN(requestor);
 188     ft->requestor = requestor;
 189     ft->error = error;
 190 
 191     /* check the directives to see what they want monitored */
 192     for (n=0; n < ndirs; n++) {
 193         if (0 == strcmp(directives[n].key, PMIX_MONITOR_HEARTBEAT_TIME)) {
 194             ft->tv.tv_sec = directives[n].value.data.uint32;
 195         } else if (0 == strcmp(directives[n].key, PMIX_MONITOR_HEARTBEAT_DROPS)) {
 196             ft->ndrops = directives[n].value.data.uint32;
 197         } else if (0 == strcmp(directives[n].key, PMIX_RANGE)) {
 198             ft->range = directives[n].value.data.range;
 199         }
 200     }
 201 
 202     if (0 == ft->tv.tv_sec) {
 203         /* didn't specify a sample rate, or what should be sampled */
 204         PMIX_RELEASE(ft);
 205         return PMIX_ERR_BAD_PARAM;
 206     }
 207 
 208     /* if the recv hasn't been posted, so so now */
 209     if (!mca_psensor_heartbeat_component.recv_active) {
 210         /* setup to receive heartbeats */
 211         rcv = PMIX_NEW(pmix_ptl_posted_recv_t);
 212         rcv->tag = PMIX_PTL_TAG_HEARTBEAT;
 213         rcv->cbfunc = pmix_psensor_heartbeat_recv_beats;
 214         /* add it to the beginning of the list of recvs */
 215         pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &rcv->super);
 216         mca_psensor_heartbeat_component.recv_active = true;
 217     }
 218 
 219     /* need to push into our event base to add this to our trackers */
 220     pmix_event_assign(&ft->cdev, pmix_psensor_base.evbase, -1,
 221                       EV_WRITE, add_tracker, ft);
 222     PMIX_POST_OBJECT(ft);
 223     pmix_event_active(&ft->cdev, EV_WRITE, 1);
 224 
 225     return PMIX_SUCCESS;
 226 }
 227 
 228 static void del_tracker(int sd, short flags, void *cbdata)
 229 {
 230     heartbeat_caddy_t *cd = (heartbeat_caddy_t*)cbdata;
 231     pmix_heartbeat_trkr_t *ft, *ftnext;
 232 
 233     PMIX_ACQUIRE_OBJECT(cd);
 234 
 235     /* remove the tracker from our list */
 236     PMIX_LIST_FOREACH_SAFE(ft, ftnext, &mca_psensor_heartbeat_component.trackers, pmix_heartbeat_trkr_t) {
 237         if (ft->requestor != cd->requestor) {
 238             continue;
 239         }
 240         if (NULL == cd->id ||
 241             (NULL != ft->id && 0 == strcmp(ft->id, cd->id))) {
 242             pmix_list_remove_item(&mca_psensor_heartbeat_component.trackers, &ft->super);
 243             PMIX_RELEASE(ft);
 244         }
 245     }
 246     PMIX_RELEASE(cd);
 247 }
 248 
 249 static pmix_status_t heartbeat_stop(pmix_peer_t *requestor, char *id)
 250 {
 251     heartbeat_caddy_t *cd;
 252 
 253     cd = PMIX_NEW(heartbeat_caddy_t);
 254     PMIX_RETAIN(requestor);
 255     cd->requestor = requestor;
 256     if (NULL != id) {
 257         cd->id = strdup(id);
 258     }
 259 
 260     /* need to push into our event base to remove this from our trackers */
 261     pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1,
 262                       EV_WRITE, del_tracker, cd);
 263     PMIX_POST_OBJECT(cd);
 264     pmix_event_active(&cd->ev, EV_WRITE, 1);
 265 
 266     return PMIX_SUCCESS;
 267 }
 268 
 269 static void opcbfunc(pmix_status_t status, void *cbdata)
 270 {
 271     pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
 272 
 273     PMIX_RELEASE(ft);  // maintain accounting
 274 }
 275 
 276 /* this function automatically gets periodically called
 277  * by the event library so we can check on the state
 278  * of the various procs we are monitoring
 279  */
 280 static void check_heartbeat(int fd, short dummy, void *cbdata)
 281 {
 282     pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
 283     pmix_status_t rc;
 284     pmix_proc_t source;
 285 
 286     PMIX_ACQUIRE_OBJECT(ft);
 287 
 288     PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
 289                          "[%s:%d] sensor:check_heartbeat for proc %s:%d",
 290                          pmix_globals.myid.nspace, pmix_globals.myid.rank,
 291                         ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
 292 
 293     if (0 == ft->nbeats && !ft->stopped) {
 294         /* no heartbeat recvd in last window */
 295         PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
 296                              "[%s:%d] sensor:check_heartbeat failed for proc %s:%d",
 297                              pmix_globals.myid.nspace, pmix_globals.myid.rank,
 298                              ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
 299         /* generate an event */
 300         pmix_strncpy(source.nspace, ft->requestor->info->pname.nspace, PMIX_MAX_NSLEN);
 301         source.rank = ft->requestor->info->pname.rank;
 302         /* ensure the tracker remains throughout the process */
 303         PMIX_RETAIN(ft);
 304         /* mark that the process appears stopped so we don't
 305          * continue to report it */
 306         ft->stopped = true;
 307         rc = PMIx_Notify_event(PMIX_MONITOR_HEARTBEAT_ALERT, &source,
 308                                ft->range, ft->info, ft->ninfo, opcbfunc, ft);
 309         if (PMIX_SUCCESS != rc) {
 310             PMIX_ERROR_LOG(rc);
 311         }
 312     } else {
 313         PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
 314                              "[%s:%d] sensor:check_heartbeat detected %d beats for proc %s:%d",
 315                              pmix_globals.myid.nspace, pmix_globals.myid.rank, ft->nbeats,
 316                              ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
 317     }
 318     /* reset for next period */
 319     ft->nbeats = 0;
 320 
 321     /* reset the timer */
 322     pmix_event_evtimer_add(&ft->ev, &ft->tv);
 323 }
 324 
 325 static void add_beat(int sd, short args, void *cbdata)
 326 {
 327     pmix_psensor_beat_t *b = (pmix_psensor_beat_t*)cbdata;
 328     pmix_heartbeat_trkr_t *ft;
 329 
 330     PMIX_ACQUIRE_OBJECT(b);
 331 
 332     /* find this peer in our trackers */
 333     PMIX_LIST_FOREACH(ft, &mca_psensor_heartbeat_component.trackers, pmix_heartbeat_trkr_t) {
 334         if (ft->requestor == b->peer) {
 335             /* increment the beat count */
 336             ++ft->nbeats;
 337             /* ensure we know that the proc is alive */
 338             ft->stopped = false;
 339             break;
 340         }
 341     }
 342 
 343     PMIX_RELEASE(b);
 344 }
 345 
 346 void pmix_psensor_heartbeat_recv_beats(struct pmix_peer_t *peer,
 347                                        pmix_ptl_hdr_t *hdr,
 348                                        pmix_buffer_t *buf, void *cbdata)
 349 {
 350     pmix_psensor_beat_t *b;
 351 
 352     b = PMIX_NEW(pmix_psensor_beat_t);
 353     PMIX_RETAIN(peer);
 354     b->peer = peer;
 355 
 356     /* shift this to our thread for processing */
 357     pmix_event_assign(&b->ev, pmix_psensor_base.evbase, -1,
 358                       EV_WRITE, add_beat, b);
 359     PMIX_POST_OBJECT(b);
 360     pmix_event_active(&b->ev, EV_WRITE, 1);
 361 }

/* [<][>][^][v][top][bottom][index][help] */