This source file includes following definitions.
- cli_rank
- cli_init
- cli_connect
- cli_finalize
- cli_disconnect
- cli_terminate
- cli_cleanup
- test_terminated
- cli_wait_all
- cli_kill_all
- errhandler
- op_callbk
- errhandler_reg_callbk
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 #include "cli_stages.h"
  16 
  17 cli_info_t *cli_info = NULL;
  18 int cli_info_cnt = 0;
  19 bool test_abort = false;
  20 
  21 int cli_rank(cli_info_t *cli)
  22 {
  23     int i;
  24     for(i=0; i < cli_info_cnt; i++){
  25         if( cli == &cli_info[i] ){
  26             return cli->rank;
  27         }
  28     }
  29     return -1;
  30 }
  31 
  32 void cli_init(int nprocs)
  33 {
  34     int n, i;
  35     cli_state_t order[CLI_TERM+1];
  36 
  37     cli_info = malloc( sizeof(cli_info_t) * nprocs);
  38     cli_info_cnt = nprocs;
  39 
  40     order[CLI_UNINIT] = CLI_FORKED;
  41     order[CLI_FORKED] = CLI_FIN;
  42     order[CLI_CONNECTED] = CLI_UNDEF;
  43     order[CLI_FIN] = CLI_TERM;
  44     order[CLI_DISCONN] = CLI_UNDEF;
  45     order[CLI_TERM] = CLI_UNDEF;
  46 
  47     for (n=0; n < nprocs; n++) {
  48         cli_info[n].sd = -1;
  49         cli_info[n].ev = NULL;
  50         cli_info[n].pid = -1;
  51         cli_info[n].state = CLI_UNINIT;
  52         PMIX_CONSTRUCT(&(cli_info[n].modex), pmix_list_t);
  53         for (i = 0; i < CLI_TERM+1; i++) {
  54             cli_info[n].next_state[i] = order[i];
  55         }
  56         cli_info[n].rank = -1;
  57         cli_info[n].ns = NULL;
  58     }
  59 }
  60 
  61 void cli_connect(cli_info_t *cli, int sd, pmix_event_base_t * ebase, event_callback_fn callback)
  62 {
  63     if( CLI_CONNECTED != cli->next_state[cli->state] ){
  64         TEST_ERROR(("Rank %d has bad next state: expect %d have %d!",
  65                      cli_rank(cli), CLI_CONNECTED, cli->next_state[cli->state]));
  66         test_abort = true;
  67         return;
  68     }
  69 
  70     cli->sd = sd;
  71     cli->ev = pmix_event_new(ebase, sd,
  72                       EV_READ|EV_PERSIST, callback, cli);
  73     pmix_event_add(cli->ev,NULL);
  74     pmix_ptl_base_set_nonblocking(sd);
  75     TEST_VERBOSE(("Connection accepted from rank %d", cli_rank(cli) ));
  76     cli->state = CLI_CONNECTED;
  77 }
  78 
  79 void cli_finalize(cli_info_t *cli)
  80 {
  81     if( CLI_FIN != cli->next_state[cli->state] ){
  82         TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
  83                      cli_rank(cli), CLI_FIN, cli->next_state[cli->state]));
  84         test_abort = true;
  85     }
  86 
  87     cli->state = CLI_FIN;
  88 }
  89 
  90 void cli_disconnect(cli_info_t *cli)
  91 {
  92     if( CLI_DISCONN != cli->next_state[cli->state] ){
  93         TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
  94                      cli_rank(cli), CLI_DISCONN, cli->next_state[cli->state]));
  95         test_abort = true;
  96     }
  97 
  98     if( 0 > cli->sd ){
  99         TEST_ERROR(("Bad sd = %d of rank = %d ", cli->sd, cli_rank(cli)));
 100         test_abort = true;
 101     } else {
 102         TEST_VERBOSE(("close sd = %d for rank = %d", cli->sd, cli_rank(cli)));
 103         close(cli->sd);
 104         cli->sd = -1;
 105     }
 106 
 107     if( NULL == cli->ev ){
 108         TEST_ERROR(("Bad ev = NULL of rank = %d ", cli->sd, cli_rank(cli)));
 109         test_abort = true;
 110     } else {
 111         TEST_VERBOSE(("remove event of rank %d from event queue", cli_rank(cli)));
 112         pmix_event_del(cli->ev);
 113         pmix_event_free(cli->ev);
 114         cli->ev = NULL;
 115     }
 116 
 117     TEST_VERBOSE(("Destruct modex list for the rank %d", cli_rank(cli)));
 118     PMIX_LIST_DESTRUCT(&(cli->modex));
 119 
 120     cli->state = CLI_DISCONN;
 121 }
 122 
 123 void cli_terminate(cli_info_t *cli)
 124 {
 125     if( CLI_TERM != cli->next_state[cli->state] ){
 126         TEST_ERROR(("rank %d: bad client next state: expect %d have %d!",
 127                      cli_rank(cli), CLI_TERM, cli->next_state[cli->state]));
 128         test_abort = true;
 129     }
 130     cli->pid = -1;
 131     TEST_VERBOSE(("Client rank = %d terminated", cli_rank(cli)));
 132     cli->state = CLI_TERM;
 133     if (NULL != cli->ns) {
 134         free(cli->ns);
 135     }
 136 }
 137 
 138 void cli_cleanup(cli_info_t *cli)
 139 {
 140     if (CLI_TERM < cli->state) {
 141         TEST_ERROR(("Bad rank %d state %d", cli_rank(cli), cli->state));
 142         test_abort = true;
 143         return;
 144     }
 145     switch( cli->next_state[cli->state] ){
 146     case CLI_FORKED:
 147         break;
 148     case CLI_CONNECTED:
 149         
 150         if (!test_abort) {
 151             TEST_ERROR(("rank %d with state %d unexpectedly terminated.", cli_rank(cli), cli->state));
 152         }
 153         cli->state = CLI_TERM;
 154         test_abort = true;
 155         break;
 156     case CLI_FIN:
 157         
 158         if (!test_abort) {
 159             TEST_ERROR(("rank %d with state %d unexpectedly terminated.", cli_rank(cli), cli->state));
 160         }
 161         cli_finalize(cli);
 162         cli_cleanup(cli);
 163         test_abort = true;
 164         break;
 165     case CLI_DISCONN:
 166         cli_disconnect(cli);
 167         cli_cleanup(cli);
 168         break;
 169     case CLI_TERM:
 170         cli_terminate(cli);
 171         break;
 172     default:
 173         TEST_ERROR(("Bad rank %d next state %d", cli_rank(cli), cli->next_state[cli->state]));
 174         test_abort = true;
 175         return;
 176     }
 177 }
 178 
 179 
 180 bool test_terminated(void)
 181 {
 182     bool ret = true;
 183     int i;
 184 
 185     
 186     for(i=0; i < cli_info_cnt; i++){
 187         ret = ret && (CLI_TERM <= cli_info[i].state);
 188     }
 189     return (ret || test_abort);
 190 }
 191 
 192 void cli_wait_all(double timeout)
 193 {
 194     struct timeval tv;
 195     double start_time, cur_time;
 196 
 197     gettimeofday(&tv, NULL);
 198     start_time = tv.tv_sec + 1E-6*tv.tv_usec;
 199     cur_time = start_time;
 200 
 201     
 202 
 203     
 204     while( !test_terminated() && ( timeout >= (cur_time - start_time) ) ){
 205         struct timespec ts;
 206         int status, i;
 207         pid_t pid;
 208         while( 0 < (pid = waitpid(-1, &status, WNOHANG) ) ){
 209             TEST_VERBOSE(("waitpid = %d", pid));
 210             for(i=0; i < cli_info_cnt; i++){
 211                 if( cli_info[i].pid == pid ){
 212                     TEST_VERBOSE(("the child with pid = %d has rank = %d, ns = %s\n"
 213                                 "\t\texited = %d, signalled = %d", pid,
 214                                   cli_info[i].rank, cli_info[i].ns,
 215                                 WIFEXITED(status), WIFSIGNALED(status) ));
 216                     if( WIFEXITED(status) || WIFSIGNALED(status) ){
 217                         cli_cleanup(&cli_info[i]);
 218                     }
 219                 }
 220             }
 221         }
 222         if( pid < 0 ){
 223             if( errno == ECHILD ){
 224                 TEST_VERBOSE(("No more children to wait. Happens on the last cli_wait_all call "
 225                             "which is used to ensure that all children terminated.\n"));
 226                 if (pmix_test_verbose) {
 227                     sleep(1);
 228                 }
 229                 break;
 230             } else {
 231                 TEST_ERROR(("waitpid(): %d : %s", errno, strerror(errno)));
 232                 exit(0);
 233             }
 234         }
 235         ts.tv_sec = 0;
 236         ts.tv_nsec = 100000;
 237         nanosleep(&ts, NULL);
 238         
 239         gettimeofday(&tv, NULL);
 240         cur_time = tv.tv_sec + 1E-6*tv.tv_usec;
 241     }
 242 }
 243 
 244 void cli_kill_all(void)
 245 {
 246     int i;
 247     for(i = 0; i < cli_info_cnt; i++){
 248         if( CLI_UNINIT == cli_info[i].state ){
 249             TEST_ERROR(("Skip rank %d as it wasn't ever initialized (shouldn't happe)",
 250                           i));
 251             continue;
 252         } else if( CLI_TERM <= cli_info[i].state ){
 253             TEST_VERBOSE(("Skip rank %d as it was already terminated.", i));
 254             continue;
 255 
 256         }
 257         TEST_VERBOSE(("Kill rank %d (pid = %d).", i, cli_info[i].pid));
 258         kill(cli_info[i].pid, SIGKILL);
 259         cli_cleanup(&cli_info[i]);
 260     }
 261 }
 262 
 263 void errhandler(size_t evhdlr_registration_id,
 264                 pmix_status_t status,
 265                 const pmix_proc_t *source,
 266                 pmix_info_t info[], size_t ninfo,
 267                 pmix_info_t results[], size_t nresults,
 268                 pmix_event_notification_cbfunc_fn_t cbfunc,
 269                 void *cbdata)
 270 {
 271     TEST_ERROR((" PMIX server event handler with status = %d", status));
 272     
 273     PMIx_Notify_event(status, source,
 274                       PMIX_RANGE_NAMESPACE,
 275                       NULL, 0,
 276                       op_callbk, NULL);
 277 }
 278 
 279 void op_callbk(pmix_status_t status,
 280                       void *cbdata)
 281 {
 282     TEST_VERBOSE(( "OP CALLBACK CALLED WITH STATUS %d", status));
 283 }
 284 
 285 void errhandler_reg_callbk (pmix_status_t status,
 286                             size_t errhandler_ref,
 287                             void *cbdata)
 288 {
 289     TEST_VERBOSE(("ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu",
 290                 status, (unsigned long)errhandler_ref));
 291 }