root/opal/runtime/opal_progress_threads.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. tracker_constructor
  2. tracker_destructor
  3. dummy_timeout_cb
  4. progress_engine
  5. stop_progress_engine
  6. start_progress_engine
  7. opal_progress_thread_init
  8. opal_progress_thread_finalize
  9. opal_progress_thread_pause
  10. opal_progress_thread_resume

   1 /*
   2  * Copyright (c) 2014-2015 Intel, Inc.  All rights reserved.
   3  * Copyright (c) 2015 Cisco Systems, Inc.  All rights reserved.
   4  * $COPYRIGHT$
   5  *
   6  * Additional copyrights may follow
   7  *
   8  * $HEADER$
   9  */
  10 
  11 #include "opal_config.h"
  12 #include "opal/constants.h"
  13 
  14 #ifdef HAVE_UNISTD_H
  15 #include <unistd.h>
  16 #endif
  17 
  18 #include "opal/class/opal_list.h"
  19 #include "opal/mca/event/event.h"
  20 #include "opal/threads/threads.h"
  21 #include "opal/util/error.h"
  22 #include "opal/util/fd.h"
  23 
  24 #include "opal/runtime/opal_progress_threads.h"
  25 
  26 
  27 /* create a tracking object for progress threads */
  28 typedef struct {
  29     opal_list_item_t super;
  30 
  31     int refcount;
  32     char *name;
  33 
  34     opal_event_base_t *ev_base;
  35 
  36     /* This will be set to false when it is time for the progress
  37        thread to exit */
  38     volatile bool ev_active;
  39 
  40     /* This event will always be set on the ev_base (so that the
  41        ev_base is not empty!) */
  42     opal_event_t block;
  43 
  44     bool engine_constructed;
  45     opal_thread_t engine;
  46 } opal_progress_tracker_t;
  47 
  48 static void tracker_constructor(opal_progress_tracker_t *p)
  49 {
  50     p->refcount = 1;  // start at one since someone created it
  51     p->name = NULL;
  52     p->ev_base = NULL;
  53     p->ev_active = false;
  54     p->engine_constructed = false;
  55 }
  56 
  57 static void tracker_destructor(opal_progress_tracker_t *p)
  58 {
  59     opal_event_del(&p->block);
  60 
  61     if (NULL != p->name) {
  62         free(p->name);
  63     }
  64     if (NULL != p->ev_base) {
  65         opal_event_base_free(p->ev_base);
  66     }
  67     if (p->engine_constructed) {
  68         OBJ_DESTRUCT(&p->engine);
  69     }
  70 }
  71 
  72 static OBJ_CLASS_INSTANCE(opal_progress_tracker_t,
  73                           opal_list_item_t,
  74                           tracker_constructor,
  75                           tracker_destructor);
  76 
  77 static bool inited = false;
  78 static opal_list_t tracking;
  79 static struct timeval long_timeout = {
  80     .tv_sec = 3600,
  81     .tv_usec = 0
  82 };
  83 static const char *shared_thread_name = "OPAL-wide async progress thread";
  84 
  85 /*
  86  * If this event is fired, just restart it so that this event base
  87  * continues to have something to block on.
  88  */
  89 static void dummy_timeout_cb(int fd, short args, void *cbdata)
  90 {
  91     opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata;
  92 
  93     opal_event_add(&trk->block, &long_timeout);
  94 }
  95 
  96 /*
  97  * Main for the progress thread
  98  */
  99 static void* progress_engine(opal_object_t *obj)
 100 {
 101     opal_thread_t *t = (opal_thread_t*)obj;
 102     opal_progress_tracker_t *trk = (opal_progress_tracker_t*)t->t_arg;
 103 
 104     while (trk->ev_active) {
 105         opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE);
 106     }
 107 
 108     return OPAL_THREAD_CANCELLED;
 109 }
 110 
 111 static void stop_progress_engine(opal_progress_tracker_t *trk)
 112 {
 113     assert(trk->ev_active);
 114     trk->ev_active = false;
 115 
 116     /* break the event loop - this will cause the loop to exit upon
 117        completion of any current event */
 118     opal_event_base_loopbreak(trk->ev_base);
 119 
 120     opal_thread_join(&trk->engine, NULL);
 121 }
 122 
 123 static int start_progress_engine(opal_progress_tracker_t *trk)
 124 {
 125     assert(!trk->ev_active);
 126     trk->ev_active = true;
 127 
 128     /* fork off a thread to progress it */
 129     trk->engine.t_run = progress_engine;
 130     trk->engine.t_arg = trk;
 131 
 132     int rc = opal_thread_start(&trk->engine);
 133     if (OPAL_SUCCESS != rc) {
 134         OPAL_ERROR_LOG(rc);
 135     }
 136 
 137     return rc;
 138 }
 139 
 140 opal_event_base_t *opal_progress_thread_init(const char *name)
 141 {
 142     opal_progress_tracker_t *trk;
 143     int rc;
 144 
 145     if (!inited) {
 146         OBJ_CONSTRUCT(&tracking, opal_list_t);
 147         inited = true;
 148     }
 149 
 150     if (NULL == name) {
 151         name = shared_thread_name;
 152     }
 153 
 154     /* check if we already have this thread */
 155     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 156         if (0 == strcmp(name, trk->name)) {
 157             /* we do, so up the refcount on it */
 158             ++trk->refcount;
 159             /* return the existing base */
 160             return trk->ev_base;
 161         }
 162     }
 163 
 164     trk = OBJ_NEW(opal_progress_tracker_t);
 165     if (NULL == trk) {
 166         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 167         return NULL;
 168     }
 169 
 170     trk->name = strdup(name);
 171     if (NULL == trk->name) {
 172         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 173         OBJ_RELEASE(trk);
 174         return NULL;
 175     }
 176 
 177     if (NULL == (trk->ev_base = opal_event_base_create())) {
 178         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 179         OBJ_RELEASE(trk);
 180         return NULL;
 181     }
 182 
 183     /* add an event to the new event base (if there are no events,
 184        opal_event_loop() will return immediately) */
 185     opal_event_set(trk->ev_base, &trk->block, -1, OPAL_EV_PERSIST,
 186                    dummy_timeout_cb, trk);
 187     opal_event_add(&trk->block, &long_timeout);
 188 
 189     /* construct the thread object */
 190     OBJ_CONSTRUCT(&trk->engine, opal_thread_t);
 191     trk->engine_constructed = true;
 192     if (OPAL_SUCCESS != (rc = start_progress_engine(trk))) {
 193         OPAL_ERROR_LOG(rc);
 194         OBJ_RELEASE(trk);
 195         return NULL;
 196     }
 197     opal_list_append(&tracking, &trk->super);
 198 
 199     return trk->ev_base;
 200 }
 201 
 202 int opal_progress_thread_finalize(const char *name)
 203 {
 204     opal_progress_tracker_t *trk;
 205 
 206     if (!inited) {
 207         /* nothing we can do */
 208         return OPAL_ERR_NOT_FOUND;
 209     }
 210 
 211     if (NULL == name) {
 212         name = shared_thread_name;
 213     }
 214 
 215     /* find the specified engine */
 216     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 217         if (0 == strcmp(name, trk->name)) {
 218             /* decrement the refcount */
 219             --trk->refcount;
 220 
 221             /* If the refcount is still above 0, we're done here */
 222             if (trk->refcount > 0) {
 223                 return OPAL_SUCCESS;
 224             }
 225 
 226             /* If the progress thread is active, stop it */
 227             if (trk->ev_active) {
 228                 stop_progress_engine(trk);
 229             }
 230 
 231             opal_list_remove_item(&tracking, &trk->super);
 232             OBJ_RELEASE(trk);
 233             return OPAL_SUCCESS;
 234         }
 235     }
 236 
 237     return OPAL_ERR_NOT_FOUND;
 238 }
 239 
 240 /*
 241  * Stop the progress thread, but don't delete the tracker (or event base)
 242  */
 243 int opal_progress_thread_pause(const char *name)
 244 {
 245     opal_progress_tracker_t *trk;
 246 
 247     if (!inited) {
 248         /* nothing we can do */
 249         return OPAL_ERR_NOT_FOUND;
 250     }
 251 
 252     if (NULL == name) {
 253         name = shared_thread_name;
 254     }
 255 
 256     /* find the specified engine */
 257     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 258         if (0 == strcmp(name, trk->name)) {
 259             if (trk->ev_active) {
 260                 stop_progress_engine(trk);
 261             }
 262 
 263             return OPAL_SUCCESS;
 264         }
 265     }
 266 
 267     return OPAL_ERR_NOT_FOUND;
 268 }
 269 
 270 int opal_progress_thread_resume(const char *name)
 271 {
 272     opal_progress_tracker_t *trk;
 273 
 274     if (!inited) {
 275         /* nothing we can do */
 276         return OPAL_ERR_NOT_FOUND;
 277     }
 278 
 279     if (NULL == name) {
 280         name = shared_thread_name;
 281     }
 282 
 283     /* find the specified engine */
 284     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 285         if (0 == strcmp(name, trk->name)) {
 286             if (trk->ev_active) {
 287                 return OPAL_ERR_RESOURCE_BUSY;
 288             }
 289 
 290             return start_progress_engine(trk);
 291         }
 292     }
 293 
 294     return OPAL_ERR_NOT_FOUND;
 295 }

/* [<][>][^][v][top][bottom][index][help] */