root/orte/test/system/evthread-test.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. main
  2. stop_handler
  3. progress_engine
  4. send_handler

   1 #include <sys/types.h>
   2 #include <sys/stat.h>
   3 #include <fcntl.h>
   4 #include <stdlib.h>
   5 #include <stdio.h>
   6 #include <string.h>
   7 #include <stdbool.h>
   8 #ifndef WIN32
   9 #include <unistd.h>
  10 #include <sys/time.h>
  11 #endif
  12 #include <errno.h>
  13 
  14 #include "opal/threads/threads.h"
  15 #include "opal/runtime/opal.h"
  16 #include "opal/mca/event/event.h"
  17 
  18 static orte_event_base_t *my_base=NULL;
  19 static opal_thread_t progress_thread;
  20 static bool progress_thread_stop=false;
  21 static int progress_thread_pipe[2];
  22 static opal_mutex_t lock;
  23 static opal_condition_t cond;
  24 static bool active=false;
  25 typedef struct {
  26     opal_object_t super;
  27     opal_event_t write_event;
  28 } foo_caddy_t;
  29 OBJ_CLASS_INSTANCE(foo_caddy_t,
  30                    opal_object_t,
  31                    NULL, NULL);
  32 static bool fd_written=false;
  33 
  34 static void* progress_engine(opal_object_t *obj);
  35 static void send_handler(int sd, short flags, void *arg);
  36 
  37 int main(int argc, char **argv)
  38 {
  39     char byte='a';
  40     struct timespec tp={0, 100};
  41     int count=0;
  42     foo_caddy_t *foo;
  43 
  44    /* Initialize the event library */
  45     opal_init(&argc, &argv);
  46 
  47     /* setup for threads */
  48     opal_event_use_threads();
  49 
  50     /* create a new base */
  51     my_base = orte_event_base_create();
  52 
  53    /* launch a progress thread on that base*/
  54     pipe(progress_thread_pipe);
  55     OBJ_CONSTRUCT(&lock, opal_mutex_t);
  56     OBJ_CONSTRUCT(&cond, opal_condition_t);
  57     OBJ_CONSTRUCT(&progress_thread, opal_thread_t);
  58     progress_thread.t_run = progress_engine;
  59     if (OPAL_SUCCESS != opal_thread_start(&progress_thread)) {
  60         fprintf(stderr, "Unable to start progress thread\n");
  61         orte_event_base_finalize(my_base);
  62         exit(1);
  63     }
  64 
  65     /* wait a little while - reflects reality in an async system */
  66     while (count < 100) {
  67         nanosleep(&tp, NULL);
  68         count++;
  69     }
  70     count=0;
  71 
  72     /* make a dummy event */
  73     fprintf(stderr, "activating the write_event");
  74     foo = OBJ_NEW(foo_caddy_t);
  75     opal_event_set(my_base,
  76                    &foo->write_event,
  77                    -1,
  78                    0,
  79                    send_handler,
  80                    foo);
  81     /* activate it. */
  82     opal_event_active(&foo->write_event, EV_WRITE, 1);
  83 
  84     /* wait for it to trigger */
  85     while (!fd_written && count < 1000) {
  86         if (0 == (count % 100)) {
  87             fprintf(stderr, "Waiting...\n");
  88         }
  89         nanosleep(&tp, NULL);
  90         count++;
  91     }
  92 
  93     /* stop the thread */
  94     OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
  95     progress_thread_stop = true;
  96     OPAL_RELEASE_THREAD(&lock, &cond, &active);
  97     opal_fd_write(progress_thread_pipe[1], 1, &byte);
  98     opal_thread_join(&progress_thread, NULL);
  99 
 100     /* release the base */
 101     fprintf(stderr, "Cleaning up\n");
 102     opal_finalize();
 103     fprintf(stderr, "Cleanup completed\n");
 104     return 0;
 105 }
 106 
 107 static struct event stop_event;
 108 static void stop_handler(int sd, short flags, void* cbdata)
 109 {
 110     char byte;
 111 
 112     opal_fd_read(progress_thread_pipe[0], 1, &byte);
 113     fprintf(stderr, "Stop handler called\n");
 114     /* reset the event */
 115     opal_event_add(&stop_event, 0);
 116     return;
 117 }
 118 
 119 static void* progress_engine(opal_object_t *obj)
 120 {
 121     /* define an event that will be used to kick us out of a blocking
 122      * situation when we want to exit
 123      */
 124     /* define an event that will be used to kick us out of a blocking
 125      * situation when we want to exit
 126      */
 127     opal_event_set(my_base, &stop_event,
 128                    progress_thread_pipe[0], OPAL_EV_READ, stop_handler, NULL);
 129     opal_event_add(&stop_event, 0);
 130 
 131     while (1) {
 132         OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
 133         if (progress_thread_stop) {
 134             fprintf(stderr, "Thread stopping\n");
 135             OPAL_RELEASE_THREAD(&lock, &cond, &active);
 136             opal_event_del(&stop_event);
 137             return OPAL_THREAD_CANCELLED;
 138         }
 139         OPAL_RELEASE_THREAD(&lock, &cond, &active);
 140         fprintf(stderr, "Looping...\n");
 141         opal_event_loop(my_base, OPAL_EVLOOP_ONCE);
 142     }
 143 }
 144 
 145 static void send_handler(int sd, short flags, void *arg)
 146 {
 147     foo_caddy_t *foo = (foo_caddy_t*)arg;
 148     fprintf(stderr, "Deleting event\n");
 149     opal_event_del(&foo->write_event);
 150     OBJ_RELEASE(foo);
 151     fprintf(stderr, "Write event fired\n");
 152     fd_written = true; /* This needs a lock around it if you are reading it
 153                         * in the main thread and changing it here XXX */
 154 }

/* [<][>][^][v][top][bottom][index][help] */