This source file includes following definitions.
- force_quit
- init
- finalize
- _send_notification
- hnp_notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14 #include "orte_config.h"
15
16 #include <sys/types.h>
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif
20 #include <string.h>
21
22 #include "opal/util/output.h"
23 #include "opal/mca/pmix/pmix.h"
24
25 #include "orte/mca/errmgr/errmgr.h"
26 #include "orte/mca/grpcomm/grpcomm.h"
27 #include "orte/mca/rml/rml.h"
28 #include "orte/mca/iof/iof.h"
29 #include "orte/mca/plm/base/base.h"
30 #include "orte/mca/ras/base/base.h"
31 #include "orte/mca/rmaps/base/base.h"
32 #include "orte/mca/routed/routed.h"
33 #include "orte/util/session_dir.h"
34 #include "orte/runtime/orte_quit.h"
35
36 #include "orte/mca/state/state.h"
37 #include "orte/mca/state/base/base.h"
38 #include "orte/mca/state/base/state_private.h"
39 #include "state_hnp.h"
40
41
42
43
44 static int init(void);
45 static int finalize(void);
46
47
48
49
50
51
52
53 orte_state_base_module_t orte_state_hnp_module = {
54 init,
55 finalize,
56 orte_state_base_activate_job_state,
57 orte_state_base_add_job_state,
58 orte_state_base_set_job_state_callback,
59 orte_state_base_set_job_state_priority,
60 orte_state_base_remove_job_state,
61 orte_state_base_activate_proc_state,
62 orte_state_base_add_proc_state,
63 orte_state_base_set_proc_state_callback,
64 orte_state_base_set_proc_state_priority,
65 orte_state_base_remove_proc_state
66 };
67
68 static void hnp_notify(int sd, short args, void *cbdata);
69
70
71
72
73 static orte_job_state_t launch_states[] = {
74 ORTE_JOB_STATE_INIT,
75 ORTE_JOB_STATE_INIT_COMPLETE,
76 ORTE_JOB_STATE_ALLOCATE,
77 ORTE_JOB_STATE_ALLOCATION_COMPLETE,
78 ORTE_JOB_STATE_DAEMONS_LAUNCHED,
79 ORTE_JOB_STATE_DAEMONS_REPORTED,
80 ORTE_JOB_STATE_VM_READY,
81 ORTE_JOB_STATE_MAP,
82 ORTE_JOB_STATE_MAP_COMPLETE,
83 ORTE_JOB_STATE_SYSTEM_PREP,
84 ORTE_JOB_STATE_LAUNCH_APPS,
85 ORTE_JOB_STATE_SEND_LAUNCH_MSG,
86 ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE,
87 ORTE_JOB_STATE_RUNNING,
88 ORTE_JOB_STATE_REGISTERED,
89
90 ORTE_JOB_STATE_TERMINATED,
91 ORTE_JOB_STATE_NOTIFY_COMPLETED,
92 ORTE_JOB_STATE_NOTIFIED,
93 ORTE_JOB_STATE_ALL_JOBS_COMPLETE
94 };
95 static orte_state_cbfunc_t launch_callbacks[] = {
96 orte_plm_base_setup_job,
97 orte_plm_base_setup_job_complete,
98 orte_ras_base_allocate,
99 orte_plm_base_allocation_complete,
100 orte_plm_base_daemons_launched,
101 orte_plm_base_daemons_reported,
102 orte_plm_base_vm_ready,
103 orte_rmaps_base_map_job,
104 orte_plm_base_mapping_complete,
105 orte_plm_base_complete_setup,
106 orte_plm_base_launch_apps,
107 orte_plm_base_send_launch_msg,
108 orte_state_base_local_launch_complete,
109 orte_plm_base_post_launch,
110 orte_plm_base_registered,
111 orte_state_base_check_all_complete,
112 hnp_notify,
113 orte_state_base_cleanup_job,
114 orte_quit
115 };
116
117 static orte_proc_state_t proc_states[] = {
118 ORTE_PROC_STATE_RUNNING,
119 ORTE_PROC_STATE_REGISTERED,
120 ORTE_PROC_STATE_IOF_COMPLETE,
121 ORTE_PROC_STATE_WAITPID_FIRED,
122 ORTE_PROC_STATE_TERMINATED
123 };
124 static orte_state_cbfunc_t proc_callbacks[] = {
125 orte_state_base_track_procs,
126 orte_state_base_track_procs,
127 orte_state_base_track_procs,
128 orte_state_base_track_procs,
129 orte_state_base_track_procs
130 };
131
132 static void force_quit(int fd, short args, void *cbdata)
133 {
134 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
135
136
137 orte_plm.terminate_orteds();
138 OBJ_RELEASE(caddy);
139 }
140
141
142
143
144 static int init(void)
145 {
146 int i, rc;
147 int num_states;
148
149
150 OBJ_CONSTRUCT(&orte_job_states, opal_list_t);
151 OBJ_CONSTRUCT(&orte_proc_states, opal_list_t);
152
153
154 num_states = sizeof(launch_states) / sizeof(orte_job_state_t);
155 for (i=0; i < num_states; i++) {
156 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i],
157 launch_callbacks[i],
158 ORTE_SYS_PRI))) {
159 ORTE_ERROR_LOG(rc);
160 }
161 }
162
163 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_DAEMONS_TERMINATED,
164 orte_quit, ORTE_SYS_PRI))) {
165 ORTE_ERROR_LOG(rc);
166 }
167
168 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT,
169 force_quit, ORTE_ERROR_PRI))) {
170 ORTE_ERROR_LOG(rc);
171 }
172
173 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS,
174 orte_state_base_report_progress, ORTE_ERROR_PRI))) {
175 ORTE_ERROR_LOG(rc);
176 }
177 if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
178 orte_state_base_print_job_state_machine();
179 }
180
181
182
183
184 num_states = sizeof(proc_states) / sizeof(orte_proc_state_t);
185 for (i=0; i < num_states; i++) {
186 if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i],
187 proc_callbacks[i],
188 ORTE_SYS_PRI))) {
189 ORTE_ERROR_LOG(rc);
190 }
191 }
192 if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
193 orte_state_base_print_proc_state_machine();
194 }
195
196 return ORTE_SUCCESS;
197 }
198
199 static int finalize(void)
200 {
201
202 OPAL_LIST_DESTRUCT(&orte_proc_states);
203
204 OPAL_LIST_DESTRUCT(&orte_job_states);
205
206 return ORTE_SUCCESS;
207 }
208
209 static void _send_notification(int status,
210 orte_proc_state_t state,
211 orte_process_name_t *proc,
212 orte_process_name_t *target)
213 {
214 opal_buffer_t *buf;
215 orte_grpcomm_signature_t sig;
216 int rc;
217 opal_value_t kv, *kvptr;
218 orte_process_name_t daemon;
219
220 buf = OBJ_NEW(opal_buffer_t);
221
222 opal_output_verbose(5, orte_state_base_framework.framework_output,
223 "%s state:hnp:sending notification %s proc %s target %s",
224 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
225 ORTE_ERROR_NAME(status),
226 ORTE_NAME_PRINT(proc),
227 ORTE_NAME_PRINT(target));
228
229
230 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
231 ORTE_ERROR_LOG(rc);
232 OBJ_RELEASE(buf);
233 return;
234 }
235
236
237 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, proc, 1, ORTE_NAME))) {
238 ORTE_ERROR_LOG(rc);
239 OBJ_RELEASE(buf);
240 return;
241 }
242
243 if (OPAL_ERR_PROC_ABORTED == status) {
244
245 rc = 3;
246 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
247 ORTE_ERROR_LOG(rc);
248 OBJ_RELEASE(buf);
249 return;
250 }
251
252 OBJ_CONSTRUCT(&kv, opal_value_t);
253 kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
254 kv.type = OPAL_NAME;
255 kv.data.name.jobid = proc->jobid;
256 kv.data.name.vpid = proc->vpid;
257 kvptr = &kv;
258 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
259 ORTE_ERROR_LOG(rc);
260 OBJ_DESTRUCT(&kv);
261 OBJ_RELEASE(buf);
262 return;
263 }
264 OBJ_DESTRUCT(&kv);
265 } else {
266
267 rc = 2;
268 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
269 ORTE_ERROR_LOG(rc);
270 OBJ_RELEASE(buf);
271 return;
272 }
273 }
274
275
276 OBJ_CONSTRUCT(&kv, opal_value_t);
277 kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
278 kv.type = OPAL_NAME;
279 kv.data.name.jobid = proc->jobid;
280 kv.data.name.vpid = proc->vpid;
281 kvptr = &kv;
282 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
283 ORTE_ERROR_LOG(rc);
284 OBJ_DESTRUCT(&kv);
285 OBJ_RELEASE(buf);
286 return;
287 }
288 OBJ_DESTRUCT(&kv);
289
290
291 OBJ_CONSTRUCT(&kv, opal_value_t);
292 kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE);
293 kv.type = OPAL_NAME;
294 kv.data.name.jobid = target->jobid;
295 kv.data.name.vpid = target->vpid;
296 kvptr = &kv;
297 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
298 ORTE_ERROR_LOG(rc);
299 OBJ_DESTRUCT(&kv);
300 OBJ_RELEASE(buf);
301 return;
302 }
303 OBJ_DESTRUCT(&kv);
304
305
306 if (ORTE_VPID_WILDCARD == target->vpid) {
307 OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t);
308 sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
309 sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
310 sig.signature[0].vpid = ORTE_VPID_WILDCARD;
311 sig.sz = 1;
312
313 if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
314 ORTE_ERROR_LOG(rc);
315 }
316 OBJ_DESTRUCT(&sig);
317 OBJ_RELEASE(buf);
318 } else {
319
320 daemon.jobid = ORTE_PROC_MY_NAME->jobid;
321 daemon.vpid = orte_get_proc_daemon_vpid(target);
322
323 opal_output_verbose(5, orte_state_base_framework.framework_output,
324 "%s state:base:sending notification %s to proc %s at daemon %s",
325 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
326 ORTE_ERROR_NAME(status),
327 ORTE_NAME_PRINT(target),
328 ORTE_NAME_PRINT(&daemon));
329 if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&daemon, buf,
330 ORTE_RML_TAG_NOTIFICATION,
331 orte_rml_send_callback, NULL))) {
332 ORTE_ERROR_LOG(rc);
333 OBJ_RELEASE(buf);
334 }
335 }
336 }
337
338 static void hnp_notify(int sd, short args, void *cbdata)
339 {
340 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
341 orte_job_t *jdata = caddy->jdata;
342 orte_process_name_t parent, target, *npptr;
343
344
345 if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) {
346
347
348 npptr = &parent;
349 if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) {
350
351 target.jobid = jdata->jobid;
352 target.vpid = ORTE_VPID_WILDCARD;
353 _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, ORTE_NAME_WILDCARD);
354 } else {
355 target.jobid = jdata->jobid;
356 target.vpid = ORTE_VPID_WILDCARD;
357 _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, &parent);
358 }
359 }
360 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFIED);
361
362 OBJ_RELEASE(caddy);
363 }