root/opal/mca/pmix/pmix4x/pmix/examples/asyncgroup.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. notification_fn
  2. op_callbk
  3. errhandler_reg_callbk
  4. grpcomplete
  5. invitefn
  6. main

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2012 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2018 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2015      Mellanox Technologies, Inc.  All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  *
  24  */
  25 
  26 #include <stdbool.h>
  27 #include <stdio.h>
  28 #include <stdlib.h>
  29 #include <unistd.h>
  30 #include <time.h>
  31 #include <pthread.h>
  32 
  33 #include <pmix.h>
  34 
  35 typedef struct {
  36     pthread_mutex_t mutex;
  37     pthread_cond_t cond;
  38     volatile bool active;
  39     pmix_status_t status;
  40 } mylock_t;
  41 
  42 #define DEBUG_CONSTRUCT_LOCK(l)                     \
  43     do {                                            \
  44         pthread_mutex_init(&(l)->mutex, NULL);      \
  45         pthread_cond_init(&(l)->cond, NULL);        \
  46         (l)->active = true;                         \
  47         (l)->status = PMIX_SUCCESS;                 \
  48     } while(0)
  49 
  50 #define DEBUG_DESTRUCT_LOCK(l)              \
  51     do {                                    \
  52         pthread_mutex_destroy(&(l)->mutex); \
  53         pthread_cond_destroy(&(l)->cond);   \
  54     } while(0)
  55 
  56 #define DEBUG_WAIT_THREAD(lck)                                      \
  57     do {                                                            \
  58         pthread_mutex_lock(&(lck)->mutex);                          \
  59         while ((lck)->active) {                                     \
  60             pthread_cond_wait(&(lck)->cond, &(lck)->mutex);         \
  61         }                                                           \
  62         pthread_mutex_unlock(&(lck)->mutex);                        \
  63     } while(0)
  64 
  65 #define DEBUG_WAKEUP_THREAD(lck)                        \
  66     do {                                                \
  67         pthread_mutex_lock(&(lck)->mutex);              \
  68         (lck)->active = false;                          \
  69         pthread_cond_broadcast(&(lck)->cond);           \
  70         pthread_mutex_unlock(&(lck)->mutex);            \
  71     } while(0)
  72 
  73 
  74 static pmix_proc_t myproc;
  75 static mylock_t invitedlock;
  76 
  77 static void notification_fn(size_t evhdlr_registration_id,
  78                             pmix_status_t status,
  79                             const pmix_proc_t *source,
  80                             pmix_info_t info[], size_t ninfo,
  81                             pmix_info_t results[], size_t nresults,
  82                             pmix_event_notification_cbfunc_fn_t cbfunc,
  83                             void *cbdata)
  84 {
  85     fprintf(stderr, "Client %s:%d NOTIFIED with status %d\n", myproc.nspace, myproc.rank, status);
  86     if (NULL != cbfunc) {
  87         cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
  88     }
  89 }
  90 
  91 static void op_callbk(pmix_status_t status,
  92                       void *cbdata)
  93 {
  94     mylock_t *lock = (mylock_t*)cbdata;
  95 
  96     lock->status = status;
  97     DEBUG_WAKEUP_THREAD(lock);
  98 }
  99 
 100 static void errhandler_reg_callbk(pmix_status_t status,
 101                                   size_t errhandler_ref,
 102                                   void *cbdata)
 103 {
 104     mylock_t *lock = (mylock_t*)cbdata;
 105 
 106     lock->status = status;
 107     DEBUG_WAKEUP_THREAD(lock);
 108 }
 109 
 110 static void grpcomplete(pmix_status_t status,
 111                         pmix_info_t *info, size_t ninfo,
 112                         void *cbdata,
 113                         pmix_release_cbfunc_t release_fn,
 114                         void *release_cbdata)
 115 {
 116     fprintf(stderr, "%s:%d GRPCOMPLETE\n", myproc.nspace, myproc.rank);
 117     DEBUG_WAKEUP_THREAD(&invitedlock);
 118 }
 119 
 120 static void invitefn(size_t evhdlr_registration_id,
 121                      pmix_status_t status,
 122                      const pmix_proc_t *source,
 123                      pmix_info_t info[], size_t ninfo,
 124                      pmix_info_t results[], size_t nresults,
 125                      pmix_event_notification_cbfunc_fn_t cbfunc,
 126                      void *cbdata)
 127 {
 128     size_t n;
 129     char *grp;
 130     pmix_status_t rc;
 131 
 132     /* if I am the leader, I can ignore this event */
 133     if (PMIX_CHECK_PROCID(source, &myproc)) {
 134         fprintf(stderr, "%s:%d INVITED, BUT LEADER\n", myproc.nspace, myproc.rank);
 135         /* mark the event chain as complete */
 136         if (NULL != cbfunc) {
 137             cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
 138         }
 139         return;
 140     }
 141 
 142     /* search for grp id */
 143     for (n=0; n < ninfo; n++) {
 144         if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_ID)) {
 145             grp = info[n].value.data.string;
 146             break;
 147         }
 148     }
 149     fprintf(stderr, "Client %s:%d INVITED by source %s:%d\n",
 150             myproc.nspace, myproc.rank,
 151             source->nspace, source->rank);
 152     invitedlock.status = status;
 153     fprintf(stderr, "%s:%d ACCEPTING INVITE\n", myproc.nspace, myproc.rank);
 154     rc = PMIx_Group_join_nb(grp, source, PMIX_GROUP_ACCEPT, NULL, 0, grpcomplete, NULL);
 155     if (PMIX_SUCCESS != rc) {
 156         fprintf(stderr, "%s:%d Error in Group_join_nb: %sn", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 157     }
 158 
 159     /* mark the event chain as complete */
 160     if (NULL != cbfunc) {
 161         cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
 162     }
 163 }
 164 
 165 
 166 int main(int argc, char **argv)
 167 {
 168     int rc;
 169     pmix_value_t value;
 170     pmix_value_t *val = &value;
 171     pmix_proc_t proc, *procs;
 172     uint32_t nprocs;
 173     mylock_t lock;
 174     pmix_status_t code;
 175     pmix_info_t *results;
 176     size_t nresults;
 177 
 178     /* init us */
 179     if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) {
 180         fprintf(stderr, "Client ns %s rank %d: PMIx_Init failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 181         exit(0);
 182     }
 183     fprintf(stderr, "[%d] Client ns %s rank %d: Running\n", (int)getpid(), myproc.nspace, myproc.rank);
 184 
 185     DEBUG_CONSTRUCT_LOCK(&invitedlock);
 186 
 187     PMIX_PROC_CONSTRUCT(&proc);
 188     (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
 189     proc.rank = PMIX_RANK_WILDCARD;
 190 
 191     /* get our universe size */
 192     if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
 193         fprintf(stderr, "Client ns %s rank %d: PMIx_Get universe size failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 194         goto done;
 195     }
 196     nprocs = val->data.uint32;
 197     PMIX_VALUE_RELEASE(val);
 198     if (nprocs < 4) {
 199         if (0 == myproc.rank) {
 200             fprintf(stderr, "This example requires a minimum of 4 processes\n");
 201         }
 202         goto done;
 203     }
 204     fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs);
 205 
 206     /* register our default errhandler */
 207     DEBUG_CONSTRUCT_LOCK(&lock);
 208     PMIx_Register_event_handler(NULL, 0, NULL, 0,
 209                                 notification_fn, errhandler_reg_callbk, (void*)&lock);
 210     DEBUG_WAIT_THREAD(&lock);
 211     rc = lock.status;
 212     DEBUG_DESTRUCT_LOCK(&lock);
 213     if (PMIX_SUCCESS != rc) {
 214         goto done;
 215     }
 216 
 217     /* we need to register handlers for invitations */
 218     DEBUG_CONSTRUCT_LOCK(&lock);
 219     code = PMIX_GROUP_INVITED;
 220     PMIx_Register_event_handler(&code, 1, NULL, 0,
 221                                 invitefn, errhandler_reg_callbk, (void*)&lock);
 222     DEBUG_WAIT_THREAD(&lock);
 223     rc = lock.status;
 224     DEBUG_DESTRUCT_LOCK(&lock);
 225     if (PMIX_SUCCESS != rc) {
 226         goto done;
 227     }
 228 
 229     /* call fence to sync */
 230     PMIX_PROC_CONSTRUCT(&proc);
 231     (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
 232     proc.rank = PMIX_RANK_WILDCARD;
 233     if (PMIX_SUCCESS != (rc = PMIx_Fence(&proc, 1, NULL, 0))) {
 234         fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %d\n", myproc.nspace, myproc.rank, rc);
 235         goto done;
 236     }
 237 
 238     /* rank=0 constructs a new group */
 239     if (0 == myproc.rank) {
 240         fprintf(stderr, "%d executing Group_invite\n", myproc.rank);
 241         nprocs = 3;
 242         PMIX_PROC_CREATE(procs, nprocs);
 243         PMIX_PROC_LOAD(&procs[0], myproc.nspace, 0);
 244         PMIX_PROC_LOAD(&procs[1], myproc.nspace, 2);
 245         PMIX_PROC_LOAD(&procs[2], myproc.nspace, 3);
 246         rc = PMIx_Group_invite("ourgroup", procs, nprocs, NULL, 0, &results, &nresults);
 247         if (PMIX_SUCCESS != rc) {
 248             fprintf(stderr, "Client ns %s rank %d: PMIx_Group_invite failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 249             goto done;
 250         }
 251         PMIX_PROC_FREE(procs, nprocs);
 252         fprintf(stderr, "%s:%d Execute fence across group\n", myproc.nspace, myproc.rank);
 253         PMIX_PROC_LOAD(&proc, "ourgroup", PMIX_RANK_WILDCARD);
 254         rc = PMIx_Fence(&proc, 1, NULL, 0);
 255         if (PMIX_SUCCESS != rc) {
 256             fprintf(stderr, "Client ns %s rank %d: PMIx_Fence across group failed: %d\n", myproc.nspace, myproc.rank, rc);
 257             goto done;
 258         }
 259         fprintf(stderr, "%d executing Group_destruct\n", myproc.rank);
 260         rc = PMIx_Group_destruct("ourgroup", NULL, 0);
 261         if (PMIX_SUCCESS != rc) {
 262             fprintf(stderr, "Client ns %s rank %d: PMIx_Group_destruct failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 263             goto done;
 264         }
 265     } else if (2 == myproc.rank || 3 == myproc.rank) {
 266         /* wait to be invited */
 267         fprintf(stderr, "%s:%d waiting to be invited\n", myproc.nspace, myproc.rank);
 268         DEBUG_WAIT_THREAD(&invitedlock);
 269         DEBUG_DESTRUCT_LOCK(&invitedlock);
 270         fprintf(stderr, "%s:%d Execute fence across group\n", myproc.nspace, myproc.rank);
 271         PMIX_PROC_LOAD(&proc, "ourgroup", PMIX_RANK_WILDCARD);
 272         rc = PMIx_Fence(&proc, 1, NULL, 0);
 273         if (PMIX_SUCCESS != rc) {
 274             fprintf(stderr, "Client ns %s rank %d: PMIx_Fence across group failed: %d\n", myproc.nspace, myproc.rank, rc);
 275             goto done;
 276         }
 277         fprintf(stderr, "%d executing Group_destruct\n", myproc.rank);
 278         rc = PMIx_Group_destruct("ourgroup", NULL, 0);
 279         if (PMIX_SUCCESS != rc) {
 280             fprintf(stderr, "Client ns %s rank %d: PMIx_Group_destruct failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 281             goto done;
 282         }
 283     }
 284 
 285     /* call fence to sync */
 286     PMIX_PROC_CONSTRUCT(&proc);
 287     (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
 288     proc.rank = PMIX_RANK_WILDCARD;
 289     if (PMIX_SUCCESS != (rc = PMIx_Fence(&proc, 1, NULL, 0))) {
 290         fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 291         goto done;
 292     }
 293 
 294  done:
 295     /* finalize us */
 296     DEBUG_CONSTRUCT_LOCK(&lock);
 297     PMIx_Deregister_event_handler(1, op_callbk, &lock);
 298     DEBUG_WAIT_THREAD(&lock);
 299     DEBUG_DESTRUCT_LOCK(&lock);
 300 
 301     fprintf(stderr, "Client ns %s rank %d: Finalizing\n", myproc.nspace, myproc.rank);
 302     if (PMIX_SUCCESS != (rc = PMIx_Finalize(NULL, 0))) {
 303         fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
 304     } else {
 305         fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize successfully completed\n", myproc.nspace, myproc.rank);
 306     }
 307     fprintf(stderr, "%s:%d COMPLETE\n", myproc.nspace, myproc.rank);
 308     fflush(stderr);
 309     return(0);
 310 }

/* [<][>][^][v][top][bottom][index][help] */