This source file includes following definitions.
- tracker_constructor
- tracker_destructor
- pmix_libev_ev_async_cb
- pmix_event_add
- pmix_event_del
- pmix_event_active
- pmix_event_base_loopbreak
- dummy_timeout_cb
- progress_engine
- stop_progress_engine
- start_progress_engine
- pmix_progress_thread_init
- pmix_progress_thread_stop
- pmix_progress_thread_finalize
- pmix_progress_thread_pause
- pmix_progress_tracker_get_by_base
- pmix_progress_thread_resume
1
2
3
4
5
6
7
8
9
10
11
12
13 #include <src/include/pmix_config.h>
14
15 #ifdef HAVE_UNISTD_H
16 #include <unistd.h>
17 #endif
18 #include <string.h>
19 #include <pthread.h>
20 #include PMIX_EVENT_HEADER
21
22 #include "src/class/pmix_list.h"
23 #include "src/threads/threads.h"
24 #include "src/util/error.h"
25 #include "src/util/fd.h"
26
27 #include "src/runtime/pmix_progress_threads.h"
28
29
30
31 typedef struct {
32 pmix_list_item_t super;
33
34 int refcount;
35 char *name;
36
37 pmix_event_base_t *ev_base;
38
39
40
41 volatile bool ev_active;
42
43
44
45 pmix_event_t block;
46
47 bool engine_constructed;
48 pmix_thread_t engine;
49 #if PMIX_HAVE_LIBEV
50 ev_async async;
51 pthread_mutex_t mutex;
52 pthread_cond_t cond;
53 pmix_list_t list;
54 #endif
55 } pmix_progress_tracker_t;
56
57 static void tracker_constructor(pmix_progress_tracker_t *p)
58 {
59 p->refcount = 1;
60 p->name = NULL;
61 p->ev_base = NULL;
62 p->ev_active = false;
63 p->engine_constructed = false;
64 #if PMIX_HAVE_LIBEV
65 pthread_mutex_init(&p->mutex, NULL);
66 PMIX_CONSTRUCT(&p->list, pmix_list_t);
67 #endif
68 }
69
70 static void tracker_destructor(pmix_progress_tracker_t *p)
71 {
72 pmix_event_del(&p->block);
73
74 if (NULL != p->name) {
75 free(p->name);
76 }
77 if (NULL != p->ev_base) {
78 pmix_event_base_free(p->ev_base);
79 }
80 if (p->engine_constructed) {
81 PMIX_DESTRUCT(&p->engine);
82 }
83 #if PMIX_HAVE_LIBEV
84 pthread_mutex_destroy(&p->mutex);
85 PMIX_LIST_DESTRUCT(&p->list);
86 #endif
87 }
88
89 static PMIX_CLASS_INSTANCE(pmix_progress_tracker_t,
90 pmix_list_item_t,
91 tracker_constructor,
92 tracker_destructor);
93
94 #if PMIX_HAVE_LIBEV
95
96 typedef enum {
97 PMIX_EVENT_ACTIVE,
98 PMIX_EVENT_ADD,
99 PMIX_EVENT_DEL
100 } pmix_event_type_t;
101
102 typedef struct {
103 pmix_list_item_t super;
104 struct event *ev;
105 struct timeval *tv;
106 int res;
107 short ncalls;
108 pmix_event_type_t type;
109 } pmix_event_caddy_t;
110
111 static PMIX_CLASS_INSTANCE(pmix_event_caddy_t,
112 pmix_list_item_t,
113 NULL, NULL);
114
115 static pmix_progress_tracker_t* pmix_progress_tracker_get_by_base(struct event_base *);
116
117 static void pmix_libev_ev_async_cb (EV_P_ ev_async *w, int revents)
118 {
119 pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base((struct event_base *)EV_A);
120 assert(NULL != trk);
121 pthread_mutex_lock (&trk->mutex);
122 pmix_event_caddy_t *cd, *next;
123 PMIX_LIST_FOREACH_SAFE(cd, next, &trk->list, pmix_event_caddy_t) {
124 switch (cd->type) {
125 case PMIX_EVENT_ADD:
126 (void)event_add(cd->ev, cd->tv);
127 break;
128 case PMIX_EVENT_DEL:
129 (void)event_del(cd->ev);
130 break;
131 case PMIX_EVENT_ACTIVE:
132 (void)event_active(cd->ev, cd->res, cd->ncalls);
133 break;
134 }
135 pmix_list_remove_item(&trk->list, &cd->super);
136 PMIX_RELEASE(cd);
137 }
138 pthread_mutex_unlock (&trk->mutex);
139 }
140
141 int pmix_event_add(struct event *ev, struct timeval *tv) {
142 int res;
143 pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev->ev_base);
144 if ((NULL != trk) && !pthread_equal(pthread_self(), trk->engine.t_handle)) {
145 pmix_event_caddy_t *cd = PMIX_NEW(pmix_event_caddy_t);
146 cd->type = PMIX_EVENT_ADD;
147 cd->ev = ev;
148 cd->tv = tv;
149 pthread_mutex_lock(&trk->mutex);
150 pmix_list_append(&trk->list, &cd->super);
151 ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
152 pthread_mutex_unlock(&trk->mutex);
153 res = PMIX_SUCCESS;
154 } else {
155 res = event_add(ev, tv);
156 }
157 return res;
158 }
159
160 int pmix_event_del(struct event *ev) {
161 int res;
162 pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev->ev_base);
163 if ((NULL != trk) && !pthread_equal(pthread_self(), trk->engine.t_handle)) {
164 pmix_event_caddy_t *cd = PMIX_NEW(pmix_event_caddy_t);
165 cd->type = PMIX_EVENT_DEL;
166 cd->ev = ev;
167 pthread_mutex_lock(&trk->mutex);
168 pmix_list_append(&trk->list, &cd->super);
169 ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
170 pthread_mutex_unlock(&trk->mutex);
171 res = PMIX_SUCCESS;
172 } else {
173 res = event_del(ev);
174 }
175 return res;
176 }
177
178 void pmix_event_active (struct event *ev, int res, short ncalls) {
179 pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev->ev_base);
180 if ((NULL != trk) && !pthread_equal(pthread_self(), trk->engine.t_handle)) {
181 pmix_event_caddy_t *cd = PMIX_NEW(pmix_event_caddy_t);
182 cd->type = PMIX_EVENT_ACTIVE;
183 cd->ev = ev;
184 cd->res = res;
185 cd->ncalls = ncalls;
186 pthread_mutex_lock(&trk->mutex);
187 pmix_list_append(&trk->list, &cd->super);
188 ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
189 pthread_mutex_unlock(&trk->mutex);
190 } else {
191 event_active(ev, res, ncalls);
192 }
193 }
194
195 void pmix_event_base_loopbreak (pmix_event_base_t *ev_base) {
196 pmix_progress_tracker_t *trk = pmix_progress_tracker_get_by_base(ev_base);
197 assert(NULL != trk);
198 ev_async_send ((struct ev_loop *)trk->ev_base, &trk->async);
199 }
200 #endif
201
202 static bool inited = false;
203 static pmix_list_t tracking;
204 static struct timeval long_timeout = {
205 .tv_sec = 3600,
206 .tv_usec = 0
207 };
208 static const char *shared_thread_name = "PMIX-wide async progress thread";
209
210
211
212
213
214 static void dummy_timeout_cb(int fd, short args, void *cbdata)
215 {
216 pmix_progress_tracker_t *trk = (pmix_progress_tracker_t*)cbdata;
217
218 pmix_event_add(&trk->block, &long_timeout);
219 }
220
221
222
223
224 static void* progress_engine(pmix_object_t *obj)
225 {
226 pmix_thread_t *t = (pmix_thread_t*)obj;
227 pmix_progress_tracker_t *trk = (pmix_progress_tracker_t*)t->t_arg;
228
229 while (trk->ev_active) {
230 pmix_event_loop(trk->ev_base, PMIX_EVLOOP_ONCE);
231 }
232
233 return PMIX_THREAD_CANCELLED;
234 }
235
236 static void stop_progress_engine(pmix_progress_tracker_t *trk)
237 {
238 assert(trk->ev_active);
239 trk->ev_active = false;
240
241
242 pmix_event_base_loopbreak(trk->ev_base);
243
244 pmix_thread_join(&trk->engine, NULL);
245 }
246
247 static int start_progress_engine(pmix_progress_tracker_t *trk)
248 {
249 assert(!trk->ev_active);
250 trk->ev_active = true;
251
252
253 trk->engine.t_run = progress_engine;
254 trk->engine.t_arg = trk;
255
256 int rc = pmix_thread_start(&trk->engine);
257 if (PMIX_SUCCESS != rc) {
258 PMIX_ERROR_LOG(rc);
259 }
260
261 return rc;
262 }
263
264 pmix_event_base_t *pmix_progress_thread_init(const char *name)
265 {
266 pmix_progress_tracker_t *trk;
267 int rc;
268
269 if (!inited) {
270 PMIX_CONSTRUCT(&tracking, pmix_list_t);
271 inited = true;
272 }
273
274 if (NULL == name) {
275 name = shared_thread_name;
276 }
277
278
279 PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
280 if (0 == strcmp(name, trk->name)) {
281
282 ++trk->refcount;
283
284 return trk->ev_base;
285 }
286 }
287
288 trk = PMIX_NEW(pmix_progress_tracker_t);
289 if (NULL == trk) {
290 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
291 return NULL;
292 }
293
294 trk->name = strdup(name);
295 if (NULL == trk->name) {
296 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
297 PMIX_RELEASE(trk);
298 return NULL;
299 }
300
301 if (NULL == (trk->ev_base = pmix_event_base_create())) {
302 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
303 PMIX_RELEASE(trk);
304 return NULL;
305 }
306
307
308
309 pmix_event_assign(&trk->block, trk->ev_base, -1, PMIX_EV_PERSIST,
310 dummy_timeout_cb, trk);
311 pmix_event_add(&trk->block, &long_timeout);
312
313 #if PMIX_HAVE_LIBEV
314 ev_async_init (&trk->async, pmix_libev_ev_async_cb);
315 ev_async_start((struct ev_loop *)trk->ev_base, &trk->async);
316 #endif
317
318
319 PMIX_CONSTRUCT(&trk->engine, pmix_thread_t);
320 trk->engine_constructed = true;
321 if (PMIX_SUCCESS != (rc = start_progress_engine(trk))) {
322 PMIX_ERROR_LOG(rc);
323 PMIX_RELEASE(trk);
324 return NULL;
325 }
326 pmix_list_append(&tracking, &trk->super);
327
328 return trk->ev_base;
329 }
330
331 int pmix_progress_thread_stop(const char *name)
332 {
333 pmix_progress_tracker_t *trk;
334
335 if (!inited) {
336
337 return PMIX_ERR_NOT_FOUND;
338 }
339
340 if (NULL == name) {
341 name = shared_thread_name;
342 }
343
344
345 PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
346 if (0 == strcmp(name, trk->name)) {
347
348 --trk->refcount;
349
350
351 if (trk->refcount > 0) {
352 return PMIX_SUCCESS;
353 }
354
355
356 if (trk->ev_active) {
357 stop_progress_engine(trk);
358 }
359 pmix_list_remove_item(&tracking, &trk->super);
360 PMIX_RELEASE(trk);
361 return PMIX_SUCCESS;
362 }
363 }
364
365 return PMIX_ERR_NOT_FOUND;
366 }
367
368 int pmix_progress_thread_finalize(const char *name)
369 {
370 pmix_progress_tracker_t *trk;
371
372 if (!inited) {
373
374 return PMIX_ERR_NOT_FOUND;
375 }
376
377 if (NULL == name) {
378 name = shared_thread_name;
379 }
380
381
382 PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
383 if (0 == strcmp(name, trk->name)) {
384
385 if (trk->refcount > 0) {
386 return PMIX_SUCCESS;
387 }
388
389 pmix_list_remove_item(&tracking, &trk->super);
390 PMIX_RELEASE(trk);
391 return PMIX_SUCCESS;
392 }
393 }
394
395 return PMIX_ERR_NOT_FOUND;
396 }
397
398
399
400
401 int pmix_progress_thread_pause(const char *name)
402 {
403 pmix_progress_tracker_t *trk;
404
405 if (!inited) {
406
407 return PMIX_ERR_NOT_FOUND;
408 }
409
410 if (NULL == name) {
411 name = shared_thread_name;
412 }
413
414
415 PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
416 if (0 == strcmp(name, trk->name)) {
417 if (trk->ev_active) {
418 stop_progress_engine(trk);
419 }
420
421 return PMIX_SUCCESS;
422 }
423 }
424
425 return PMIX_ERR_NOT_FOUND;
426 }
427
428 #if PMIX_HAVE_LIBEV
429 static pmix_progress_tracker_t* pmix_progress_tracker_get_by_base(pmix_event_base_t *base) {
430 pmix_progress_tracker_t *trk;
431
432 if (inited) {
433 PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
434 if(trk->ev_base == base) {
435 return trk;
436 }
437 }
438 }
439 return NULL;
440 }
441 #endif
442
443 int pmix_progress_thread_resume(const char *name)
444 {
445 pmix_progress_tracker_t *trk;
446
447 if (!inited) {
448
449 return PMIX_ERR_NOT_FOUND;
450 }
451
452 if (NULL == name) {
453 name = shared_thread_name;
454 }
455
456
457 PMIX_LIST_FOREACH(trk, &tracking, pmix_progress_tracker_t) {
458 if (0 == strcmp(name, trk->name)) {
459 if (trk->ev_active) {
460 return PMIX_ERR_RESOURCE_BUSY;
461 }
462
463 return start_progress_engine(trk);
464 }
465 }
466
467 return PMIX_ERR_NOT_FOUND;
468 }