This source file includes following definitions.
- tracker_constructor
- tracker_destructor
- dummy_timeout_cb
- progress_engine
- stop_progress_engine
- start_progress_engine
- opal_progress_thread_init
- opal_progress_thread_finalize
- opal_progress_thread_pause
- opal_progress_thread_resume
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 #include "opal_config.h"
  12 #include "opal/constants.h"
  13 
  14 #ifdef HAVE_UNISTD_H
  15 #include <unistd.h>
  16 #endif
  17 
  18 #include "opal/class/opal_list.h"
  19 #include "opal/mca/event/event.h"
  20 #include "opal/threads/threads.h"
  21 #include "opal/util/error.h"
  22 #include "opal/util/fd.h"
  23 
  24 #include "opal/runtime/opal_progress_threads.h"
  25 
  26 
  27 
  28 typedef struct {
  29     opal_list_item_t super;
  30 
  31     int refcount;
  32     char *name;
  33 
  34     opal_event_base_t *ev_base;
  35 
  36     
  37 
  38     volatile bool ev_active;
  39 
  40     
  41 
  42     opal_event_t block;
  43 
  44     bool engine_constructed;
  45     opal_thread_t engine;
  46 } opal_progress_tracker_t;
  47 
  48 static void tracker_constructor(opal_progress_tracker_t *p)
  49 {
  50     p->refcount = 1;  
  51     p->name = NULL;
  52     p->ev_base = NULL;
  53     p->ev_active = false;
  54     p->engine_constructed = false;
  55 }
  56 
  57 static void tracker_destructor(opal_progress_tracker_t *p)
  58 {
  59     opal_event_del(&p->block);
  60 
  61     if (NULL != p->name) {
  62         free(p->name);
  63     }
  64     if (NULL != p->ev_base) {
  65         opal_event_base_free(p->ev_base);
  66     }
  67     if (p->engine_constructed) {
  68         OBJ_DESTRUCT(&p->engine);
  69     }
  70 }
  71 
  72 static OBJ_CLASS_INSTANCE(opal_progress_tracker_t,
  73                           opal_list_item_t,
  74                           tracker_constructor,
  75                           tracker_destructor);
  76 
  77 static bool inited = false;
  78 static opal_list_t tracking;
  79 static struct timeval long_timeout = {
  80     .tv_sec = 3600,
  81     .tv_usec = 0
  82 };
  83 static const char *shared_thread_name = "OPAL-wide async progress thread";
  84 
  85 
  86 
  87 
  88 
  89 static void dummy_timeout_cb(int fd, short args, void *cbdata)
  90 {
  91     opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata;
  92 
  93     opal_event_add(&trk->block, &long_timeout);
  94 }
  95 
  96 
  97 
  98 
  99 static void* progress_engine(opal_object_t *obj)
 100 {
 101     opal_thread_t *t = (opal_thread_t*)obj;
 102     opal_progress_tracker_t *trk = (opal_progress_tracker_t*)t->t_arg;
 103 
 104     while (trk->ev_active) {
 105         opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE);
 106     }
 107 
 108     return OPAL_THREAD_CANCELLED;
 109 }
 110 
 111 static void stop_progress_engine(opal_progress_tracker_t *trk)
 112 {
 113     assert(trk->ev_active);
 114     trk->ev_active = false;
 115 
 116     
 117 
 118     opal_event_base_loopbreak(trk->ev_base);
 119 
 120     opal_thread_join(&trk->engine, NULL);
 121 }
 122 
 123 static int start_progress_engine(opal_progress_tracker_t *trk)
 124 {
 125     assert(!trk->ev_active);
 126     trk->ev_active = true;
 127 
 128     
 129     trk->engine.t_run = progress_engine;
 130     trk->engine.t_arg = trk;
 131 
 132     int rc = opal_thread_start(&trk->engine);
 133     if (OPAL_SUCCESS != rc) {
 134         OPAL_ERROR_LOG(rc);
 135     }
 136 
 137     return rc;
 138 }
 139 
 140 opal_event_base_t *opal_progress_thread_init(const char *name)
 141 {
 142     opal_progress_tracker_t *trk;
 143     int rc;
 144 
 145     if (!inited) {
 146         OBJ_CONSTRUCT(&tracking, opal_list_t);
 147         inited = true;
 148     }
 149 
 150     if (NULL == name) {
 151         name = shared_thread_name;
 152     }
 153 
 154     
 155     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 156         if (0 == strcmp(name, trk->name)) {
 157             
 158             ++trk->refcount;
 159             
 160             return trk->ev_base;
 161         }
 162     }
 163 
 164     trk = OBJ_NEW(opal_progress_tracker_t);
 165     if (NULL == trk) {
 166         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 167         return NULL;
 168     }
 169 
 170     trk->name = strdup(name);
 171     if (NULL == trk->name) {
 172         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 173         OBJ_RELEASE(trk);
 174         return NULL;
 175     }
 176 
 177     if (NULL == (trk->ev_base = opal_event_base_create())) {
 178         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 179         OBJ_RELEASE(trk);
 180         return NULL;
 181     }
 182 
 183     
 184 
 185     opal_event_set(trk->ev_base, &trk->block, -1, OPAL_EV_PERSIST,
 186                    dummy_timeout_cb, trk);
 187     opal_event_add(&trk->block, &long_timeout);
 188 
 189     
 190     OBJ_CONSTRUCT(&trk->engine, opal_thread_t);
 191     trk->engine_constructed = true;
 192     if (OPAL_SUCCESS != (rc = start_progress_engine(trk))) {
 193         OPAL_ERROR_LOG(rc);
 194         OBJ_RELEASE(trk);
 195         return NULL;
 196     }
 197     opal_list_append(&tracking, &trk->super);
 198 
 199     return trk->ev_base;
 200 }
 201 
 202 int opal_progress_thread_finalize(const char *name)
 203 {
 204     opal_progress_tracker_t *trk;
 205 
 206     if (!inited) {
 207         
 208         return OPAL_ERR_NOT_FOUND;
 209     }
 210 
 211     if (NULL == name) {
 212         name = shared_thread_name;
 213     }
 214 
 215     
 216     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 217         if (0 == strcmp(name, trk->name)) {
 218             
 219             --trk->refcount;
 220 
 221             
 222             if (trk->refcount > 0) {
 223                 return OPAL_SUCCESS;
 224             }
 225 
 226             
 227             if (trk->ev_active) {
 228                 stop_progress_engine(trk);
 229             }
 230 
 231             opal_list_remove_item(&tracking, &trk->super);
 232             OBJ_RELEASE(trk);
 233             return OPAL_SUCCESS;
 234         }
 235     }
 236 
 237     return OPAL_ERR_NOT_FOUND;
 238 }
 239 
 240 
 241 
 242 
 243 int opal_progress_thread_pause(const char *name)
 244 {
 245     opal_progress_tracker_t *trk;
 246 
 247     if (!inited) {
 248         
 249         return OPAL_ERR_NOT_FOUND;
 250     }
 251 
 252     if (NULL == name) {
 253         name = shared_thread_name;
 254     }
 255 
 256     
 257     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 258         if (0 == strcmp(name, trk->name)) {
 259             if (trk->ev_active) {
 260                 stop_progress_engine(trk);
 261             }
 262 
 263             return OPAL_SUCCESS;
 264         }
 265     }
 266 
 267     return OPAL_ERR_NOT_FOUND;
 268 }
 269 
 270 int opal_progress_thread_resume(const char *name)
 271 {
 272     opal_progress_tracker_t *trk;
 273 
 274     if (!inited) {
 275         
 276         return OPAL_ERR_NOT_FOUND;
 277     }
 278 
 279     if (NULL == name) {
 280         name = shared_thread_name;
 281     }
 282 
 283     
 284     OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
 285         if (0 == strcmp(name, trk->name)) {
 286             if (trk->ev_active) {
 287                 return OPAL_ERR_RESOURCE_BUSY;
 288             }
 289 
 290             return start_progress_engine(trk);
 291         }
 292     }
 293 
 294     return OPAL_ERR_NOT_FOUND;
 295 }