root/orte/runtime/orte_wait.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. timer_const
  2. timer_dest
  3. wccon
  4. wcdes
  5. orte_wait_disable
  6. orte_wait_enable
  7. orte_wait_init
  8. orte_wait_finalize
  9. orte_wait_cb
  10. cancel_callback
  11. orte_wait_cb_cancel
  12. wait_signal_callback

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2008 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007-2013 Los Alamos National Security, LLC.  All rights
  13  *                         reserved.
  14  * Copyright (c) 2008      Institut National de Recherche en Informatique
  15  *                         et Automatique. All rights reserved.
  16  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
  17  * $COPYRIGHT$
  18  *
  19  * Additional copyrights may follow
  20  *
  21  * $HEADER$
  22  */
  23 
  24 
  25 #include "orte_config.h"
  26 
  27 #include <string.h>
  28 #include <assert.h>
  29 #ifdef HAVE_UNISTD_H
  30 #include <unistd.h>
  31 #endif
  32 #ifdef HAVE_SYS_QUEUE_H
  33 #include <sys/queue.h>
  34 #endif
  35 #include <errno.h>
  36 #ifdef HAVE_SYS_TIME_H
  37 #include <sys/time.h>
  38 #endif
  39 #ifdef HAVE_SYS_TYPES_H
  40 #include <sys/types.h>
  41 #endif
  42 #include <fcntl.h>
  43 #include <stdlib.h>
  44 #include <signal.h>
  45 #include <stdio.h>
  46 #include <sys/stat.h>
  47 #ifdef HAVE_SYS_WAIT_H
  48 #include <sys/wait.h>
  49 #endif
  50 
  51 #include "opal/dss/dss_types.h"
  52 #include "opal/class/opal_object.h"
  53 #include "opal/util/output.h"
  54 #include "opal/class/opal_list.h"
  55 #include "opal/mca/event/event.h"
  56 #include "opal/threads/mutex.h"
  57 #include "opal/threads/condition.h"
  58 #include "opal/sys/atomic.h"
  59 
  60 #include "orte/constants.h"
  61 #include "orte/mca/errmgr/errmgr.h"
  62 #include "orte/util/name_fns.h"
  63 #include "orte/util/threads.h"
  64 #include "orte/runtime/orte_globals.h"
  65 
  66 #include "orte/runtime/orte_wait.h"
  67 
  68 /* Timer Object Declaration */
  69 static void timer_const(orte_timer_t *tm)
  70 {
  71     tm->ev = opal_event_alloc();
  72     tm->payload = NULL;
  73 }
  74 static void timer_dest(orte_timer_t *tm)
  75 {
  76     opal_event_free(tm->ev);
  77 }
  78 OBJ_CLASS_INSTANCE(orte_timer_t,
  79                    opal_object_t,
  80                    timer_const,
  81                    timer_dest);
  82 
  83 
  84 static void wccon(orte_wait_tracker_t *p)
  85 {
  86     p->child = NULL;
  87     p->cbfunc = NULL;
  88     p->cbdata = NULL;
  89 }
  90 static void wcdes(orte_wait_tracker_t *p)
  91 {
  92     if (NULL != p->child) {
  93         OBJ_RELEASE(p->child);
  94     }
  95 }
  96 OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
  97                    opal_list_item_t,
  98                    wccon, wcdes);
  99 
 100 /* Local Variables */
 101 static opal_event_t handler;
 102 static opal_list_t pending_cbs;
 103 
 104 /* Local Function Prototypes */
 105 static void wait_signal_callback(int fd, short event, void *arg);
 106 
 107 /* Interface Functions */
 108 
 109 void orte_wait_disable(void)
 110 {
 111     opal_event_del(&handler);
 112 }
 113 
 114 void orte_wait_enable(void)
 115 {
 116     opal_event_add(&handler, NULL);
 117 }
 118 
 119 int orte_wait_init(void)
 120 {
 121     OBJ_CONSTRUCT(&pending_cbs, opal_list_t);
 122 
 123     opal_event_set(orte_event_base,
 124                    &handler, SIGCHLD, OPAL_EV_SIGNAL|OPAL_EV_PERSIST,
 125                    wait_signal_callback,
 126                    &handler);
 127     opal_event_set_priority(&handler, ORTE_SYS_PRI);
 128 
 129     opal_event_add(&handler, NULL);
 130     return ORTE_SUCCESS;
 131 }
 132 
 133 
 134 int orte_wait_finalize(void)
 135 {
 136     opal_event_del(&handler);
 137 
 138     /* clear out the pending cbs */
 139     OPAL_LIST_DESTRUCT(&pending_cbs);
 140 
 141     return ORTE_SUCCESS;
 142 }
 143 
 144 /* this function *must* always be called from
 145  * within an event in the orte_event_base */
 146 void orte_wait_cb(orte_proc_t *child, orte_wait_cbfunc_t callback,
 147                   opal_event_base_t *evb, void *data)
 148 {
 149     orte_wait_tracker_t *t2;
 150 
 151     if (NULL == child || NULL == callback) {
 152         /* bozo protection */
 153         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 154         return;
 155     }
 156 
 157     /* see if this proc is still alive */
 158     if (!ORTE_FLAG_TEST(child, ORTE_PROC_FLAG_ALIVE)) {
 159         if (NULL != callback) {
 160             /* already heard this proc is dead, so just do the callback */
 161             t2 = OBJ_NEW(orte_wait_tracker_t);
 162             OBJ_RETAIN(child);  // protect against race conditions
 163             t2->child = child;
 164             t2->evb = evb;
 165             t2->cbfunc = callback;
 166             t2->cbdata = data;
 167             opal_event_set(t2->evb, &t2->ev, -1,
 168                            OPAL_EV_WRITE, t2->cbfunc, t2);
 169             opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
 170             opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
 171         }
 172         return;
 173     }
 174 
 175    /* we just override any existing registration */
 176     OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
 177         if (t2->child == child) {
 178             t2->cbfunc = callback;
 179             t2->cbdata = data;
 180             return;
 181         }
 182     }
 183     /* get here if this is a new registration */
 184     t2 = OBJ_NEW(orte_wait_tracker_t);
 185     OBJ_RETAIN(child);  // protect against race conditions
 186     t2->child = child;
 187     t2->evb = evb;
 188     t2->cbfunc = callback;
 189     t2->cbdata = data;
 190     opal_list_append(&pending_cbs, &t2->super);
 191 }
 192 
 193 static void cancel_callback(int fd, short args, void *cbdata)
 194 {
 195     orte_wait_tracker_t *trk = (orte_wait_tracker_t*)cbdata;
 196     orte_wait_tracker_t *t2;
 197 
 198     ORTE_ACQUIRE_OBJECT(trk);
 199 
 200     OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
 201         if (t2->child == trk->child) {
 202             opal_list_remove_item(&pending_cbs, &t2->super);
 203             OBJ_RELEASE(t2);
 204             OBJ_RELEASE(trk);
 205             return;
 206         }
 207     }
 208 
 209     OBJ_RELEASE(trk);
 210 }
 211 
 212 void orte_wait_cb_cancel(orte_proc_t *child)
 213 {
 214     orte_wait_tracker_t *trk;
 215 
 216     if (NULL == child) {
 217         /* bozo protection */
 218         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 219         return;
 220     }
 221 
 222     /* push this into the event library for handling */
 223     trk = OBJ_NEW(orte_wait_tracker_t);
 224     OBJ_RETAIN(child);  // protect against race conditions
 225     trk->child = child;
 226     ORTE_THREADSHIFT(trk, orte_event_base, cancel_callback, ORTE_SYS_PRI);
 227 }
 228 
 229 
 230 /* callback from the event library whenever a SIGCHLD is received */
 231 static void wait_signal_callback(int fd, short event, void *arg)
 232 {
 233     opal_event_t *signal = (opal_event_t*) arg;
 234     int status;
 235     pid_t pid;
 236     orte_wait_tracker_t *t2;
 237 
 238     ORTE_ACQUIRE_OBJECT(signal);
 239 
 240     if (SIGCHLD != OPAL_EVENT_SIGNAL(signal)) {
 241         return;
 242     }
 243 
 244     /* we can have multiple children leave but only get one
 245      * sigchild callback, so reap all the waitpids until we
 246      * don't get anything valid back */
 247     while (1) {
 248         pid = waitpid(-1, &status, WNOHANG);
 249         if (-1 == pid && EINTR == errno) {
 250             /* try it again */
 251             continue;
 252         }
 253         /* if we got garbage, then nothing we can do */
 254         if (pid <= 0) {
 255             return;
 256         }
 257 
 258         /* we are already in an event, so it is safe to access the list */
 259         OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) {
 260             if (pid == t2->child->pid) {
 261                 /* found it! */
 262                 t2->child->exit_code = status;
 263                 opal_list_remove_item(&pending_cbs, &t2->super);
 264                 if (NULL != t2->cbfunc) {
 265                     opal_event_set(t2->evb, &t2->ev, -1,
 266                                    OPAL_EV_WRITE, t2->cbfunc, t2);
 267                     opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
 268                     opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
 269                 } else {
 270                     OBJ_RELEASE(t2);
 271                 }
 272                 break;
 273             }
 274         }
 275     }
 276 }

/* [<][>][^][v][top][bottom][index][help] */