This source file includes following definitions.
- main
- stop_handler
- progress_engine
- send_handler
   1 #include <sys/types.h>
   2 #include <sys/stat.h>
   3 #include <fcntl.h>
   4 #include <stdlib.h>
   5 #include <stdio.h>
   6 #include <string.h>
   7 #include <stdbool.h>
   8 #ifndef WIN32
   9 #include <unistd.h>
  10 #include <sys/time.h>
  11 #endif
  12 #include <errno.h>
  13 
  14 #include "opal/threads/threads.h"
  15 #include "opal/runtime/opal.h"
  16 #include "opal/mca/event/event.h"
  17 
  18 static orte_event_base_t *my_base=NULL;
  19 static opal_thread_t progress_thread;
  20 static bool progress_thread_stop=false;
  21 static int progress_thread_pipe[2];
  22 static opal_mutex_t lock;
  23 static opal_condition_t cond;
  24 static bool active=false;
  25 typedef struct {
  26     opal_object_t super;
  27     opal_event_t write_event;
  28 } foo_caddy_t;
  29 OBJ_CLASS_INSTANCE(foo_caddy_t,
  30                    opal_object_t,
  31                    NULL, NULL);
  32 static bool fd_written=false;
  33 
  34 static void* progress_engine(opal_object_t *obj);
  35 static void send_handler(int sd, short flags, void *arg);
  36 
  37 int main(int argc, char **argv)
  38 {
  39     char byte='a';
  40     struct timespec tp={0, 100};
  41     int count=0;
  42     foo_caddy_t *foo;
  43 
  44    
  45     opal_init(&argc, &argv);
  46 
  47     
  48     opal_event_use_threads();
  49 
  50     
  51     my_base = orte_event_base_create();
  52 
  53    
  54     pipe(progress_thread_pipe);
  55     OBJ_CONSTRUCT(&lock, opal_mutex_t);
  56     OBJ_CONSTRUCT(&cond, opal_condition_t);
  57     OBJ_CONSTRUCT(&progress_thread, opal_thread_t);
  58     progress_thread.t_run = progress_engine;
  59     if (OPAL_SUCCESS != opal_thread_start(&progress_thread)) {
  60         fprintf(stderr, "Unable to start progress thread\n");
  61         orte_event_base_finalize(my_base);
  62         exit(1);
  63     }
  64 
  65     
  66     while (count < 100) {
  67         nanosleep(&tp, NULL);
  68         count++;
  69     }
  70     count=0;
  71 
  72     
  73     fprintf(stderr, "activating the write_event");
  74     foo = OBJ_NEW(foo_caddy_t);
  75     opal_event_set(my_base,
  76                    &foo->write_event,
  77                    -1,
  78                    0,
  79                    send_handler,
  80                    foo);
  81     
  82     opal_event_active(&foo->write_event, EV_WRITE, 1);
  83 
  84     
  85     while (!fd_written && count < 1000) {
  86         if (0 == (count % 100)) {
  87             fprintf(stderr, "Waiting...\n");
  88         }
  89         nanosleep(&tp, NULL);
  90         count++;
  91     }
  92 
  93     
  94     OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
  95     progress_thread_stop = true;
  96     OPAL_RELEASE_THREAD(&lock, &cond, &active);
  97     opal_fd_write(progress_thread_pipe[1], 1, &byte);
  98     opal_thread_join(&progress_thread, NULL);
  99 
 100     
 101     fprintf(stderr, "Cleaning up\n");
 102     opal_finalize();
 103     fprintf(stderr, "Cleanup completed\n");
 104     return 0;
 105 }
 106 
 107 static struct event stop_event;
 108 static void stop_handler(int sd, short flags, void* cbdata)
 109 {
 110     char byte;
 111 
 112     opal_fd_read(progress_thread_pipe[0], 1, &byte);
 113     fprintf(stderr, "Stop handler called\n");
 114     
 115     opal_event_add(&stop_event, 0);
 116     return;
 117 }
 118 
 119 static void* progress_engine(opal_object_t *obj)
 120 {
 121     
 122 
 123 
 124     
 125 
 126 
 127     opal_event_set(my_base, &stop_event,
 128                    progress_thread_pipe[0], OPAL_EV_READ, stop_handler, NULL);
 129     opal_event_add(&stop_event, 0);
 130 
 131     while (1) {
 132         OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
 133         if (progress_thread_stop) {
 134             fprintf(stderr, "Thread stopping\n");
 135             OPAL_RELEASE_THREAD(&lock, &cond, &active);
 136             opal_event_del(&stop_event);
 137             return OPAL_THREAD_CANCELLED;
 138         }
 139         OPAL_RELEASE_THREAD(&lock, &cond, &active);
 140         fprintf(stderr, "Looping...\n");
 141         opal_event_loop(my_base, OPAL_EVLOOP_ONCE);
 142     }
 143 }
 144 
 145 static void send_handler(int sd, short flags, void *arg)
 146 {
 147     foo_caddy_t *foo = (foo_caddy_t*)arg;
 148     fprintf(stderr, "Deleting event\n");
 149     opal_event_del(&foo->write_event);
 150     OBJ_RELEASE(foo);
 151     fprintf(stderr, "Write event fired\n");
 152     fd_written = true; 
 153 
 154 }