root/opal/mca/pmix/pmix4x/pmix/src/runtime/pmix_progress_threads.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. tracker_constructor
  2. tracker_destructor
  3. pmix_libev_ev_async_cb
  4. pmix_event_add
  5. pmix_event_del
  6. pmix_event_active
  7. pmix_event_base_loopbreak
  8. dummy_timeout_cb
  9. progress_engine
  10. stop_progress_engine
  11. start_progress_engine
  12. pmix_progress_thread_init
  13. pmix_progress_thread_stop
  14. pmix_progress_thread_finalize
  15. pmix_progress_thread_pause
  16. pmix_progress_tracker_get_by_base
  17. pmix_progress_thread_resume

   1 /*
   2  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
   3  * Copyright (c) 2015      Cisco Systems, Inc.  All rights reserved.
   4  * Copyright (c) 2017-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * $COPYRIGHT$
   7  *
   8  * Additional copyrights may follow
   9  *
  10  * $HEADER$
  11  */
  12 
  13 #include <src/include/pmix_config.h>
  14 
  15 #ifdef HAVE_UNISTD_H
  16 #include <unistd.h>
  17 #endif
  18 #include <string.h>
  19 #include <pthread.h>
  20 #include PMIX_EVENT_HEADER
  21 
  22 #include "src/class/pmix_list.h"
  23 #include "src/threads/threads.h"
  24 #include "src/util/error.h"
  25 #include "src/util/fd.h"
  26 
  27 #include "src/runtime/pmix_progress_threads.h"
  28 
  29 
  30 /* create a tracking object for progress threads */
  31 typedef struct {
  32     pmix_list_item_t super;
  33 
  34     int refcount;
  35     char *name;
  36 
  37     pmix_event_base_t *ev_base;
  38 
  39     /* This will be set to false when it is time for the progress
  40        thread to exit */
  41     volatile bool ev_active;
  42 
  43     /* This event will always be set on the ev_base (so that the
  44        ev_base is not empty!) */
  45     pmix_event_t block;
  46 
  47     bool engine_constructed;
  48     pmix_thread_t engine;
  49 #if PMIX_HAVE_LIBEV
  50     ev_async async;
  51     pthread_mutex_t mutex;
  52     pthread_cond_t cond;
  53     pmix_list_t list;
  54 #endif
  55 } pmix_progress_tracker_t;
  56 
  57 static void tracker_constructor(pmix_progress_tracker_t *p)
  58 {
  59     p->refcount = 1;  // start at one since someone created it
  60     p->name = NULL;
  61     p->ev_base = NULL;
  62     p->ev_active = false;
  63     p->engine_constructed = false;
  64 #if PMIX_HAVE_LIBEV
  65     pthread_mutex_init(&p->mutex, NULL);
  66     PMIX_CONSTRUCT(&p->list, pmix_list_t);
  67 #endif
  68 }
  69 
  70 static void tracker_destructor(pmix_progress_tracker_t *p)
  71 {
  72     pmix_event_del(&p->block);
  73 
  74     if (NULL != p->name) {
  75         free(p->name);
  76     }
  77     if (NULL != p->ev_base) {
  78         pmix_event_base_free(p->ev_base);
  79     }
  80     if (p->engine_constructed) {
  81         PMIX_DESTRUCT(&p->engine);
  82     }
  83 #if PMIX_HAVE_LIBEV
  84     pthread_mutex_destroy(&p->mutex);
  85     PMIX_LIST_DESTRUCT(&p->list);
  86 #endif
  87 }
  88 
  89 static PMIX_CLASS_INSTANCE(pmix_progress_tracker_t,
  90                           pmix_list_item_t,
  91                           tracker_constructor,
  92                           tracker_destructor);
  93 
  94 #if PMIX_HAVE_LIBEV
  95 
  96 typedef enum {
  97     PMIX_EVENT_ACTIVE,
  98     PMIX_EVENT_ADD,
  99     PMIX_EVENT_DEL
 100 } pmix_event_type_t;
 101 
 102 typedef struct {
 103     pmix_list_item_t super;
 104     struct event *ev;
 105     struct timeval *tv;
 106     int res;
 107     short ncalls;
 108     pmix_event_type_t type;
 109 } pmix_event_caddy_t;
 110 
 111 static PMIX_CLASS_INSTANCE(pmix_event_caddy_t,
 112                            pmix_list_item_t,
 113                            NULL, NULL);
 114 
 115 static pmix_progress_tracker_t* pmix_progress_tracker_get_by_base(struct event_base *);
 116 
 117 static void pmix_libev_ev_async_cb (EV_P_ ev_async *w, int revents)
 118 {
 119     pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base((struct event_base *)EV_A);
 120     assert(NULL != trk);
 121     pthread_mutex_lock (&trk->mutex);
 122     pmix_event_caddy_t *cd, *next;
 123     PMIX_LIST_FOREACH_SAFE(cd, next, &trk->list, pmix_event_caddy_t) {
 124         switch (cd->type) {
 125             case PMIX_EVENT_ADD:
 126                 (void)event_add(cd->ev, cd->tv);
 127                 break;
 128             case PMIX_EVENT_DEL:
 129                 (void)event_del(cd->ev);
 130                 break;
 131             case PMIX_EVENT_ACTIVE:
 132                 (void)event_active(cd->ev, cd->res, cd->ncalls);
 133                 break;
 134         }
 135         pmix_list_remove_item(&trk->list, &cd->super);
 136         PMIX_RELEASE(cd);
 137     }
 138     pthread_mutex_unlock (&trk->mutex);
 139 }
 140 
 141 int pmix_event_add(struct event *ev, struct timeval *tv) {
 142     int res;
 143     pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev->ev_base);
 144     if ((NULL != trk) && !pthread_equal(pthread_self(), trk->engine.t_handle)) {
 145         pmix_event_caddy_t *cd = PMIX_NEW(pmix_event_caddy_t);
 146         cd->type = PMIX_EVENT_ADD;
 147         cd->ev = ev;
 148         cd->tv = tv;
 149         pthread_mutex_lock(&trk->mutex);
 150         pmix_list_append(&trk->list, &cd->super);
 151         ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
 152         pthread_mutex_unlock(&trk->mutex);
 153         res = PMIX_SUCCESS;
 154     } else {
 155         res = event_add(ev, tv);
 156     }
 157     return res;
 158 }
 159 
 160 int pmix_event_del(struct event *ev) {
 161     int res;
 162     pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev->ev_base);
 163     if ((NULL != trk) && !pthread_equal(pthread_self(), trk->engine.t_handle)) {
 164         pmix_event_caddy_t *cd = PMIX_NEW(pmix_event_caddy_t);
 165         cd->type = PMIX_EVENT_DEL;
 166         cd->ev = ev;
 167         pthread_mutex_lock(&trk->mutex);
 168         pmix_list_append(&trk->list, &cd->super);
 169         ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
 170         pthread_mutex_unlock(&trk->mutex);
 171         res = PMIX_SUCCESS;
 172     } else {
 173         res = event_del(ev);
 174     }
 175     return res;
 176 }
 177 
 178 void pmix_event_active (struct event *ev, int res, short ncalls) {
 179     pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev->ev_base);
 180     if ((NULL != trk) && !pthread_equal(pthread_self(), trk->engine.t_handle)) {
 181         pmix_event_caddy_t *cd = PMIX_NEW(pmix_event_caddy_t);
 182         cd->type = PMIX_EVENT_ACTIVE;
 183         cd->ev = ev;
 184         cd->res = res;
 185         cd->ncalls = ncalls;
 186         pthread_mutex_lock(&trk->mutex);
 187         pmix_list_append(&trk->list, &cd->super);
 188         ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
 189         pthread_mutex_unlock(&trk->mutex);
 190     } else {
 191         event_active(ev, res, ncalls);
 192     }
 193 }
 194 
 195 void pmix_event_base_loopbreak (pmix_event_base_t *ev_base) {
 196     pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev_base);
 197     assert(NULL != trk);
 198     ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
 199 }
 200 #endif
 201 
 202 static bool inited = false;
 203 static pmix_list_t tracking;
 204 static struct timeval long_timeout = {
 205     .tv_sec = 3600,
 206     .tv_usec = 0
 207 };
 208 static const char *shared_thread_name = "PMIX-wide async progress thread";
 209 
 210 /*
 211  * If this event is fired, just restart it so that this event base
 212  * continues to have something to block on.
 213  */
 214 static void dummy_timeout_cb(int fd, short args, void *cbdata)
 215 {
 216     pmix_progress_tracker_t *trk = (pmix_progress_tracker_t*)cbdata;
 217 
 218     pmix_event_add(&trk->block, &long_timeout);
 219 }
 220 
 221 /*
 222  * Main for the progress thread
 223  */
 224 static void* progress_engine(pmix_object_t *obj)
 225 {
 226     pmix_thread_t *t = (pmix_thread_t*)obj;
 227     pmix_progress_tracker_t *trk = (pmix_progress_tracker_t*)t->t_arg;
 228 
 229     while (trk->ev_active) {
 230         pmix_event_loop(trk->ev_base, PMIX_EVLOOP_ONCE);
 231     }
 232 
 233     return PMIX_THREAD_CANCELLED;
 234 }
 235 
 236 static void stop_progress_engine(pmix_progress_tracker_t *trk)
 237 {
 238     assert(trk->ev_active);
 239     trk->ev_active = false;
 240     /* break the event loop - this will cause the loop to exit upon
 241        completion of any current event */
 242     pmix_event_base_loopbreak(trk->ev_base);
 243 
 244     pmix_thread_join(&trk->engine, NULL);
 245 }
 246 
 247 static int start_progress_engine(pmix_progress_tracker_t *trk)
 248 {
 249     assert(!trk->ev_active);
 250     trk->ev_active = true;
 251 
 252     /* fork off a thread to progress it */
 253     trk->engine.t_run = progress_engine;
 254     trk->engine.t_arg = trk;
 255 
 256     int rc = pmix_thread_start(&trk->engine);
 257     if (PMIX_SUCCESS != rc) {
 258         PMIX_ERROR_LOG(rc);
 259     }
 260 
 261     return rc;
 262 }
 263 
 264 pmix_event_base_t *pmix_progress_thread_init(const char *name)
 265 {
 266     pmix_progress_tracker_t *trk;
 267     int rc;
 268 
 269     if (!inited) {
 270         PMIX_CONSTRUCT(&tracking, pmix_list_t);
 271         inited = true;
 272     }
 273 
 274     if (NULL == name) {
 275         name = shared_thread_name;
 276     }
 277 
 278     /* check if we already have this thread */
 279     PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
 280         if (0 == strcmp(name, trk->name)) {
 281             /* we do, so up the refcount on it */
 282             ++trk->refcount;
 283             /* return the existing base */
 284             return trk->ev_base;
 285         }
 286     }
 287 
 288     trk = PMIX_NEW(pmix_progress_tracker_t);
 289     if (NULL == trk) {
 290         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 291         return NULL;
 292     }
 293 
 294     trk->name = strdup(name);
 295     if (NULL == trk->name) {
 296         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 297         PMIX_RELEASE(trk);
 298         return NULL;
 299     }
 300 
 301     if (NULL == (trk->ev_base = pmix_event_base_create())) {
 302         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 303         PMIX_RELEASE(trk);
 304         return NULL;
 305     }
 306 
 307     /* add an event to the new event base (if there are no events,
 308        pmix_event_loop() will return immediately) */
 309     pmix_event_assign(&trk->block, trk->ev_base, -1, PMIX_EV_PERSIST,
 310                    dummy_timeout_cb, trk);
 311     pmix_event_add(&trk->block, &long_timeout);
 312 
 313 #if PMIX_HAVE_LIBEV
 314     ev_async_init (&trk->async, pmix_libev_ev_async_cb);
 315     ev_async_start((struct ev_loop *)trk->ev_base, &trk->async);
 316 #endif
 317 
 318     /* construct the thread object */
 319     PMIX_CONSTRUCT(&trk->engine, pmix_thread_t);
 320     trk->engine_constructed = true;
 321     if (PMIX_SUCCESS != (rc = start_progress_engine(trk))) {
 322         PMIX_ERROR_LOG(rc);
 323         PMIX_RELEASE(trk);
 324         return NULL;
 325     }
 326     pmix_list_append(&tracking, &trk->super);
 327 
 328     return trk->ev_base;
 329 }
 330 
 331 int pmix_progress_thread_stop(const char *name)
 332 {
 333     pmix_progress_tracker_t *trk;
 334 
 335     if (!inited) {
 336         /* nothing we can do */
 337         return PMIX_ERR_NOT_FOUND;
 338     }
 339 
 340     if (NULL == name) {
 341         name = shared_thread_name;
 342     }
 343 
 344     /* find the specified engine */
 345     PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
 346         if (0 == strcmp(name, trk->name)) {
 347             /* decrement the refcount */
 348             --trk->refcount;
 349 
 350             /* If the refcount is still above 0, we're done here */
 351             if (trk->refcount > 0) {
 352                 return PMIX_SUCCESS;
 353             }
 354 
 355             /* If the progress thread is active, stop it */
 356             if (trk->ev_active) {
 357                 stop_progress_engine(trk);
 358             }
 359             pmix_list_remove_item(&tracking, &trk->super);
 360             PMIX_RELEASE(trk);
 361             return PMIX_SUCCESS;
 362         }
 363     }
 364 
 365     return PMIX_ERR_NOT_FOUND;
 366 }
 367 
 368 int pmix_progress_thread_finalize(const char *name)
 369 {
 370     pmix_progress_tracker_t *trk;
 371 
 372     if (!inited) {
 373         /* nothing we can do */
 374         return PMIX_ERR_NOT_FOUND;
 375     }
 376 
 377     if (NULL == name) {
 378         name = shared_thread_name;
 379     }
 380 
 381     /* find the specified engine */
 382     PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
 383         if (0 == strcmp(name, trk->name)) {
 384             /* If the refcount is still above 0, we're done here */
 385             if (trk->refcount > 0) {
 386                 return PMIX_SUCCESS;
 387             }
 388 
 389             pmix_list_remove_item(&tracking, &trk->super);
 390             PMIX_RELEASE(trk);
 391             return PMIX_SUCCESS;
 392         }
 393     }
 394 
 395     return PMIX_ERR_NOT_FOUND;
 396 }
 397 
 398 /*
 399  * Stop the progress thread, but don't delete the tracker (or event base)
 400  */
 401 int pmix_progress_thread_pause(const char *name)
 402 {
 403     pmix_progress_tracker_t *trk;
 404 
 405     if (!inited) {
 406         /* nothing we can do */
 407         return PMIX_ERR_NOT_FOUND;
 408     }
 409 
 410     if (NULL == name) {
 411         name = shared_thread_name;
 412     }
 413 
 414     /* find the specified engine */
 415     PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
 416         if (0 == strcmp(name, trk->name)) {
 417             if (trk->ev_active) {
 418                 stop_progress_engine(trk);
 419             }
 420 
 421             return PMIX_SUCCESS;
 422         }
 423     }
 424 
 425     return PMIX_ERR_NOT_FOUND;
 426 }
 427 
 428 #if PMIX_HAVE_LIBEV
 429 static pmix_progress_tracker_t* pmix_progress_tracker_get_by_base(pmix_event_base_t *base) {
 430     pmix_progress_tracker_t *trk;
 431 
 432     if (inited)  {
 433         PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
 434             if(trk->ev_base == base) {
 435                 return trk;
 436             }
 437         }
 438     }
 439     return NULL;
 440 }
 441 #endif
 442 
 443 int pmix_progress_thread_resume(const char *name)
 444 {
 445     pmix_progress_tracker_t *trk;
 446 
 447     if (!inited) {
 448         /* nothing we can do */
 449         return PMIX_ERR_NOT_FOUND;
 450     }
 451 
 452     if (NULL == name) {
 453         name = shared_thread_name;
 454     }
 455 
 456     /* find the specified engine */
 457     PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
 458         if (0 == strcmp(name, trk->name)) {
 459             if (trk->ev_active) {
 460                 return PMIX_ERR_RESOURCE_BUSY;
 461             }
 462 
 463             return start_progress_engine(trk);
 464         }
 465     }
 466 
 467     return PMIX_ERR_NOT_FOUND;
 468 }

/* [<][>][^][v][top][bottom][index][help] */