This source file includes following definitions.
- timer_const
- timer_dest
- wccon
- wcdes
- orte_wait_disable
- orte_wait_enable
- orte_wait_init
- orte_wait_finalize
- orte_wait_cb
- cancel_callback
- orte_wait_cb_cancel
- wait_signal_callback
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 #include "orte_config.h"
  26 
  27 #include <string.h>
  28 #include <assert.h>
  29 #ifdef HAVE_UNISTD_H
  30 #include <unistd.h>
  31 #endif
  32 #ifdef HAVE_SYS_QUEUE_H
  33 #include <sys/queue.h>
  34 #endif
  35 #include <errno.h>
  36 #ifdef HAVE_SYS_TIME_H
  37 #include <sys/time.h>
  38 #endif
  39 #ifdef HAVE_SYS_TYPES_H
  40 #include <sys/types.h>
  41 #endif
  42 #include <fcntl.h>
  43 #include <stdlib.h>
  44 #include <signal.h>
  45 #include <stdio.h>
  46 #include <sys/stat.h>
  47 #ifdef HAVE_SYS_WAIT_H
  48 #include <sys/wait.h>
  49 #endif
  50 
  51 #include "opal/dss/dss_types.h"
  52 #include "opal/class/opal_object.h"
  53 #include "opal/util/output.h"
  54 #include "opal/class/opal_list.h"
  55 #include "opal/mca/event/event.h"
  56 #include "opal/threads/mutex.h"
  57 #include "opal/threads/condition.h"
  58 #include "opal/sys/atomic.h"
  59 
  60 #include "orte/constants.h"
  61 #include "orte/mca/errmgr/errmgr.h"
  62 #include "orte/util/name_fns.h"
  63 #include "orte/util/threads.h"
  64 #include "orte/runtime/orte_globals.h"
  65 
  66 #include "orte/runtime/orte_wait.h"
  67 
  68 
  69 static void timer_const(orte_timer_t *tm)
  70 {
  71     tm->ev = opal_event_alloc();
  72     tm->payload = NULL;
  73 }
  74 static void timer_dest(orte_timer_t *tm)
  75 {
  76     opal_event_free(tm->ev);
  77 }
  78 OBJ_CLASS_INSTANCE(orte_timer_t,
  79                    opal_object_t,
  80                    timer_const,
  81                    timer_dest);
  82 
  83 
  84 static void wccon(orte_wait_tracker_t *p)
  85 {
  86     p->child = NULL;
  87     p->cbfunc = NULL;
  88     p->cbdata = NULL;
  89 }
  90 static void wcdes(orte_wait_tracker_t *p)
  91 {
  92     if (NULL != p->child) {
  93         OBJ_RELEASE(p->child);
  94     }
  95 }
  96 OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
  97                    opal_list_item_t,
  98                    wccon, wcdes);
  99 
 100 
 101 static opal_event_t handler;
 102 static opal_list_t pending_cbs;
 103 
 104 
 105 static void wait_signal_callback(int fd, short event, void *arg);
 106 
 107 
 108 
 109 void orte_wait_disable(void)
 110 {
 111     opal_event_del(&handler);
 112 }
 113 
 114 void orte_wait_enable(void)
 115 {
 116     opal_event_add(&handler, NULL);
 117 }
 118 
 119 int orte_wait_init(void)
 120 {
 121     OBJ_CONSTRUCT(&pending_cbs, opal_list_t);
 122 
 123     opal_event_set(orte_event_base,
 124                    &handler, SIGCHLD, OPAL_EV_SIGNAL|OPAL_EV_PERSIST,
 125                    wait_signal_callback,
 126                    &handler);
 127     opal_event_set_priority(&handler, ORTE_SYS_PRI);
 128 
 129     opal_event_add(&handler, NULL);
 130     return ORTE_SUCCESS;
 131 }
 132 
 133 
 134 int orte_wait_finalize(void)
 135 {
 136     opal_event_del(&handler);
 137 
 138     
 139     OPAL_LIST_DESTRUCT(&pending_cbs);
 140 
 141     return ORTE_SUCCESS;
 142 }
 143 
 144 
 145 
 146 void orte_wait_cb(orte_proc_t *child, orte_wait_cbfunc_t callback,
 147                   opal_event_base_t *evb, void *data)
 148 {
 149     orte_wait_tracker_t *t2;
 150 
 151     if (NULL == child || NULL == callback) {
 152         
 153         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 154         return;
 155     }
 156 
 157     
 158     if (!ORTE_FLAG_TEST(child, ORTE_PROC_FLAG_ALIVE)) {
 159         if (NULL != callback) {
 160             
 161             t2 = OBJ_NEW(orte_wait_tracker_t);
 162             OBJ_RETAIN(child);  
 163             t2->child = child;
 164             t2->evb = evb;
 165             t2->cbfunc = callback;
 166             t2->cbdata = data;
 167             opal_event_set(t2->evb, &t2->ev, -1,
 168                            OPAL_EV_WRITE, t2->cbfunc, t2);
 169             opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
 170             opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
 171         }
 172         return;
 173     }
 174 
 175    
 176     OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
 177         if (t2->child == child) {
 178             t2->cbfunc = callback;
 179             t2->cbdata = data;
 180             return;
 181         }
 182     }
 183     
 184     t2 = OBJ_NEW(orte_wait_tracker_t);
 185     OBJ_RETAIN(child);  
 186     t2->child = child;
 187     t2->evb = evb;
 188     t2->cbfunc = callback;
 189     t2->cbdata = data;
 190     opal_list_append(&pending_cbs, &t2->super);
 191 }
 192 
 193 static void cancel_callback(int fd, short args, void *cbdata)
 194 {
 195     orte_wait_tracker_t *trk = (orte_wait_tracker_t*)cbdata;
 196     orte_wait_tracker_t *t2;
 197 
 198     ORTE_ACQUIRE_OBJECT(trk);
 199 
 200     OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
 201         if (t2->child == trk->child) {
 202             opal_list_remove_item(&pending_cbs, &t2->super);
 203             OBJ_RELEASE(t2);
 204             OBJ_RELEASE(trk);
 205             return;
 206         }
 207     }
 208 
 209     OBJ_RELEASE(trk);
 210 }
 211 
 212 void orte_wait_cb_cancel(orte_proc_t *child)
 213 {
 214     orte_wait_tracker_t *trk;
 215 
 216     if (NULL == child) {
 217         
 218         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 219         return;
 220     }
 221 
 222     
 223     trk = OBJ_NEW(orte_wait_tracker_t);
 224     OBJ_RETAIN(child);  
 225     trk->child = child;
 226     ORTE_THREADSHIFT(trk, orte_event_base, cancel_callback, ORTE_SYS_PRI);
 227 }
 228 
 229 
 230 
 231 static void wait_signal_callback(int fd, short event, void *arg)
 232 {
 233     opal_event_t *signal = (opal_event_t*) arg;
 234     int status;
 235     pid_t pid;
 236     orte_wait_tracker_t *t2;
 237 
 238     ORTE_ACQUIRE_OBJECT(signal);
 239 
 240     if (SIGCHLD != OPAL_EVENT_SIGNAL(signal)) {
 241         return;
 242     }
 243 
 244     
 245 
 246 
 247     while (1) {
 248         pid = waitpid(-1, &status, WNOHANG);
 249         if (-1 == pid && EINTR == errno) {
 250             
 251             continue;
 252         }
 253         
 254         if (pid <= 0) {
 255             return;
 256         }
 257 
 258         
 259         OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
 260             if (pid == t2->child->pid) {
 261                 
 262                 t2->child->exit_code = status;
 263                 opal_list_remove_item(&pending_cbs, &t2->super);
 264                 if (NULL != t2->cbfunc) {
 265                     opal_event_set(t2->evb, &t2->ev, -1,
 266                                    OPAL_EV_WRITE, t2->cbfunc, t2);
 267                     opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
 268                     opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
 269                 } else {
 270                     OBJ_RELEASE(t2);
 271                 }
 272                 break;
 273             }
 274         }
 275     }
 276 }