This source file includes following definitions.
- tracker_constructor
- tracker_destructor
- dummy_timeout_cb
- progress_engine
- stop_progress_engine
- start_progress_engine
- opal_progress_thread_init
- opal_progress_thread_finalize
- opal_progress_thread_pause
- opal_progress_thread_resume
1
2
3
4
5
6
7
8
9
10
11 #include "opal_config.h"
12 #include "opal/constants.h"
13
14 #ifdef HAVE_UNISTD_H
15 #include <unistd.h>
16 #endif
17
18 #include "opal/class/opal_list.h"
19 #include "opal/mca/event/event.h"
20 #include "opal/threads/threads.h"
21 #include "opal/util/error.h"
22 #include "opal/util/fd.h"
23
24 #include "opal/runtime/opal_progress_threads.h"
25
26
27
28 typedef struct {
29 opal_list_item_t super;
30
31 int refcount;
32 char *name;
33
34 opal_event_base_t *ev_base;
35
36
37
38 volatile bool ev_active;
39
40
41
42 opal_event_t block;
43
44 bool engine_constructed;
45 opal_thread_t engine;
46 } opal_progress_tracker_t;
47
48 static void tracker_constructor(opal_progress_tracker_t *p)
49 {
50 p->refcount = 1;
51 p->name = NULL;
52 p->ev_base = NULL;
53 p->ev_active = false;
54 p->engine_constructed = false;
55 }
56
57 static void tracker_destructor(opal_progress_tracker_t *p)
58 {
59 opal_event_del(&p->block);
60
61 if (NULL != p->name) {
62 free(p->name);
63 }
64 if (NULL != p->ev_base) {
65 opal_event_base_free(p->ev_base);
66 }
67 if (p->engine_constructed) {
68 OBJ_DESTRUCT(&p->engine);
69 }
70 }
71
72 static OBJ_CLASS_INSTANCE(opal_progress_tracker_t,
73 opal_list_item_t,
74 tracker_constructor,
75 tracker_destructor);
76
77 static bool inited = false;
78 static opal_list_t tracking;
79 static struct timeval long_timeout = {
80 .tv_sec = 3600,
81 .tv_usec = 0
82 };
83 static const char *shared_thread_name = "OPAL-wide async progress thread";
84
85
86
87
88
89 static void dummy_timeout_cb(int fd, short args, void *cbdata)
90 {
91 opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata;
92
93 opal_event_add(&trk->block, &long_timeout);
94 }
95
96
97
98
99 static void* progress_engine(opal_object_t *obj)
100 {
101 opal_thread_t *t = (opal_thread_t*)obj;
102 opal_progress_tracker_t *trk = (opal_progress_tracker_t*)t->t_arg;
103
104 while (trk->ev_active) {
105 opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE);
106 }
107
108 return OPAL_THREAD_CANCELLED;
109 }
110
111 static void stop_progress_engine(opal_progress_tracker_t *trk)
112 {
113 assert(trk->ev_active);
114 trk->ev_active = false;
115
116
117
118 opal_event_base_loopbreak(trk->ev_base);
119
120 opal_thread_join(&trk->engine, NULL);
121 }
122
123 static int start_progress_engine(opal_progress_tracker_t *trk)
124 {
125 assert(!trk->ev_active);
126 trk->ev_active = true;
127
128
129 trk->engine.t_run = progress_engine;
130 trk->engine.t_arg = trk;
131
132 int rc = opal_thread_start(&trk->engine);
133 if (OPAL_SUCCESS != rc) {
134 OPAL_ERROR_LOG(rc);
135 }
136
137 return rc;
138 }
139
140 opal_event_base_t *opal_progress_thread_init(const char *name)
141 {
142 opal_progress_tracker_t *trk;
143 int rc;
144
145 if (!inited) {
146 OBJ_CONSTRUCT(&tracking, opal_list_t);
147 inited = true;
148 }
149
150 if (NULL == name) {
151 name = shared_thread_name;
152 }
153
154
155 OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
156 if (0 == strcmp(name, trk->name)) {
157
158 ++trk->refcount;
159
160 return trk->ev_base;
161 }
162 }
163
164 trk = OBJ_NEW(opal_progress_tracker_t);
165 if (NULL == trk) {
166 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
167 return NULL;
168 }
169
170 trk->name = strdup(name);
171 if (NULL == trk->name) {
172 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
173 OBJ_RELEASE(trk);
174 return NULL;
175 }
176
177 if (NULL == (trk->ev_base = opal_event_base_create())) {
178 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
179 OBJ_RELEASE(trk);
180 return NULL;
181 }
182
183
184
185 opal_event_set(trk->ev_base, &trk->block, -1, OPAL_EV_PERSIST,
186 dummy_timeout_cb, trk);
187 opal_event_add(&trk->block, &long_timeout);
188
189
190 OBJ_CONSTRUCT(&trk->engine, opal_thread_t);
191 trk->engine_constructed = true;
192 if (OPAL_SUCCESS != (rc = start_progress_engine(trk))) {
193 OPAL_ERROR_LOG(rc);
194 OBJ_RELEASE(trk);
195 return NULL;
196 }
197 opal_list_append(&tracking, &trk->super);
198
199 return trk->ev_base;
200 }
201
202 int opal_progress_thread_finalize(const char *name)
203 {
204 opal_progress_tracker_t *trk;
205
206 if (!inited) {
207
208 return OPAL_ERR_NOT_FOUND;
209 }
210
211 if (NULL == name) {
212 name = shared_thread_name;
213 }
214
215
216 OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
217 if (0 == strcmp(name, trk->name)) {
218
219 --trk->refcount;
220
221
222 if (trk->refcount > 0) {
223 return OPAL_SUCCESS;
224 }
225
226
227 if (trk->ev_active) {
228 stop_progress_engine(trk);
229 }
230
231 opal_list_remove_item(&tracking, &trk->super);
232 OBJ_RELEASE(trk);
233 return OPAL_SUCCESS;
234 }
235 }
236
237 return OPAL_ERR_NOT_FOUND;
238 }
239
240
241
242
243 int opal_progress_thread_pause(const char *name)
244 {
245 opal_progress_tracker_t *trk;
246
247 if (!inited) {
248
249 return OPAL_ERR_NOT_FOUND;
250 }
251
252 if (NULL == name) {
253 name = shared_thread_name;
254 }
255
256
257 OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
258 if (0 == strcmp(name, trk->name)) {
259 if (trk->ev_active) {
260 stop_progress_engine(trk);
261 }
262
263 return OPAL_SUCCESS;
264 }
265 }
266
267 return OPAL_ERR_NOT_FOUND;
268 }
269
270 int opal_progress_thread_resume(const char *name)
271 {
272 opal_progress_tracker_t *trk;
273
274 if (!inited) {
275
276 return OPAL_ERR_NOT_FOUND;
277 }
278
279 if (NULL == name) {
280 name = shared_thread_name;
281 }
282
283
284 OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
285 if (0 == strcmp(name, trk->name)) {
286 if (trk->ev_active) {
287 return OPAL_ERR_RESOURCE_BUSY;
288 }
289
290 return start_progress_engine(trk);
291 }
292 }
293
294 return OPAL_ERR_NOT_FOUND;
295 }