This source file includes following definitions.
- timer_const
- timer_dest
- wccon
- wcdes
- orte_wait_disable
- orte_wait_enable
- orte_wait_init
- orte_wait_finalize
- orte_wait_cb
- cancel_callback
- orte_wait_cb_cancel
- wait_signal_callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 #include "orte_config.h"
26
27 #include <string.h>
28 #include <assert.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_QUEUE_H
33 #include <sys/queue.h>
34 #endif
35 #include <errno.h>
36 #ifdef HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39 #ifdef HAVE_SYS_TYPES_H
40 #include <sys/types.h>
41 #endif
42 #include <fcntl.h>
43 #include <stdlib.h>
44 #include <signal.h>
45 #include <stdio.h>
46 #include <sys/stat.h>
47 #ifdef HAVE_SYS_WAIT_H
48 #include <sys/wait.h>
49 #endif
50
51 #include "opal/dss/dss_types.h"
52 #include "opal/class/opal_object.h"
53 #include "opal/util/output.h"
54 #include "opal/class/opal_list.h"
55 #include "opal/mca/event/event.h"
56 #include "opal/threads/mutex.h"
57 #include "opal/threads/condition.h"
58 #include "opal/sys/atomic.h"
59
60 #include "orte/constants.h"
61 #include "orte/mca/errmgr/errmgr.h"
62 #include "orte/util/name_fns.h"
63 #include "orte/util/threads.h"
64 #include "orte/runtime/orte_globals.h"
65
66 #include "orte/runtime/orte_wait.h"
67
68
69 static void timer_const(orte_timer_t *tm)
70 {
71 tm->ev = opal_event_alloc();
72 tm->payload = NULL;
73 }
74 static void timer_dest(orte_timer_t *tm)
75 {
76 opal_event_free(tm->ev);
77 }
78 OBJ_CLASS_INSTANCE(orte_timer_t,
79 opal_object_t,
80 timer_const,
81 timer_dest);
82
83
84 static void wccon(orte_wait_tracker_t *p)
85 {
86 p->child = NULL;
87 p->cbfunc = NULL;
88 p->cbdata = NULL;
89 }
90 static void wcdes(orte_wait_tracker_t *p)
91 {
92 if (NULL != p->child) {
93 OBJ_RELEASE(p->child);
94 }
95 }
96 OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
97 opal_list_item_t,
98 wccon, wcdes);
99
100
101 static opal_event_t handler;
102 static opal_list_t pending_cbs;
103
104
105 static void wait_signal_callback(int fd, short event, void *arg);
106
107
108
109 void orte_wait_disable(void)
110 {
111 opal_event_del(&handler);
112 }
113
114 void orte_wait_enable(void)
115 {
116 opal_event_add(&handler, NULL);
117 }
118
119 int orte_wait_init(void)
120 {
121 OBJ_CONSTRUCT(&pending_cbs, opal_list_t);
122
123 opal_event_set(orte_event_base,
124 &handler, SIGCHLD, OPAL_EV_SIGNAL|OPAL_EV_PERSIST,
125 wait_signal_callback,
126 &handler);
127 opal_event_set_priority(&handler, ORTE_SYS_PRI);
128
129 opal_event_add(&handler, NULL);
130 return ORTE_SUCCESS;
131 }
132
133
134 int orte_wait_finalize(void)
135 {
136 opal_event_del(&handler);
137
138
139 OPAL_LIST_DESTRUCT(&pending_cbs);
140
141 return ORTE_SUCCESS;
142 }
143
144
145
146 void orte_wait_cb(orte_proc_t *child, orte_wait_cbfunc_t callback,
147 opal_event_base_t *evb, void *data)
148 {
149 orte_wait_tracker_t *t2;
150
151 if (NULL == child || NULL == callback) {
152
153 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
154 return;
155 }
156
157
158 if (!ORTE_FLAG_TEST(child, ORTE_PROC_FLAG_ALIVE)) {
159 if (NULL != callback) {
160
161 t2 = OBJ_NEW(orte_wait_tracker_t);
162 OBJ_RETAIN(child);
163 t2->child = child;
164 t2->evb = evb;
165 t2->cbfunc = callback;
166 t2->cbdata = data;
167 opal_event_set(t2->evb, &t2->ev, -1,
168 OPAL_EV_WRITE, t2->cbfunc, t2);
169 opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
170 opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
171 }
172 return;
173 }
174
175
176 OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
177 if (t2->child == child) {
178 t2->cbfunc = callback;
179 t2->cbdata = data;
180 return;
181 }
182 }
183
184 t2 = OBJ_NEW(orte_wait_tracker_t);
185 OBJ_RETAIN(child);
186 t2->child = child;
187 t2->evb = evb;
188 t2->cbfunc = callback;
189 t2->cbdata = data;
190 opal_list_append(&pending_cbs, &t2->super);
191 }
192
193 static void cancel_callback(int fd, short args, void *cbdata)
194 {
195 orte_wait_tracker_t *trk = (orte_wait_tracker_t*)cbdata;
196 orte_wait_tracker_t *t2;
197
198 ORTE_ACQUIRE_OBJECT(trk);
199
200 OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
201 if (t2->child == trk->child) {
202 opal_list_remove_item(&pending_cbs, &t2->super);
203 OBJ_RELEASE(t2);
204 OBJ_RELEASE(trk);
205 return;
206 }
207 }
208
209 OBJ_RELEASE(trk);
210 }
211
212 void orte_wait_cb_cancel(orte_proc_t *child)
213 {
214 orte_wait_tracker_t *trk;
215
216 if (NULL == child) {
217
218 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
219 return;
220 }
221
222
223 trk = OBJ_NEW(orte_wait_tracker_t);
224 OBJ_RETAIN(child);
225 trk->child = child;
226 ORTE_THREADSHIFT(trk, orte_event_base, cancel_callback, ORTE_SYS_PRI);
227 }
228
229
230
231 static void wait_signal_callback(int fd, short event, void *arg)
232 {
233 opal_event_t *signal = (opal_event_t*) arg;
234 int status;
235 pid_t pid;
236 orte_wait_tracker_t *t2;
237
238 ORTE_ACQUIRE_OBJECT(signal);
239
240 if (SIGCHLD != OPAL_EVENT_SIGNAL(signal)) {
241 return;
242 }
243
244
245
246
247 while (1) {
248 pid = waitpid(-1, &status, WNOHANG);
249 if (-1 == pid && EINTR == errno) {
250
251 continue;
252 }
253
254 if (pid <= 0) {
255 return;
256 }
257
258
259 OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
260 if (pid == t2->child->pid) {
261
262 t2->child->exit_code = status;
263 opal_list_remove_item(&pending_cbs, &t2->super);
264 if (NULL != t2->cbfunc) {
265 opal_event_set(t2->evb, &t2->ev, -1,
266 OPAL_EV_WRITE, t2->cbfunc, t2);
267 opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
268 opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
269 } else {
270 OBJ_RELEASE(t2);
271 }
272 break;
273 }
274 }
275 }
276 }