root/orte/test/system/event-threads.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. main
  2. stop_handler
  3. progress_engine
  4. send_handler

   1 #include <sys/types.h>
   2 #include <sys/stat.h>
   3 #include <fcntl.h>
   4 #include <stdlib.h>
   5 #include <stdio.h>
   6 #include <string.h>
   7 #ifndef WIN32
   8 #include <unistd.h>
   9 #include <sys/time.h>
  10 #endif
  11 #include <errno.h>
  12 
  13 #include "opal/util/fd.h"
  14 #include "opal/threads/threads.h"
  15 #include "opal/runtime/opal.h"
  16 #include "opal/mca/event/event.h"
  17 
  18 static opal_event_base_t *my_base=NULL;
  19 static opal_thread_t progress_thread;
  20 static bool progress_thread_stop=false;
  21 static int progress_thread_pipe[2];
  22 static opal_mutex_t lock;
  23 static opal_condition_t cond;
  24 static bool active=false;
  25 static opal_event_t write_event;
  26 static int my_fd;
  27 static bool fd_written=false;
  28 
  29 static void* progress_engine(opal_object_t *obj);
  30 static void send_handler(int sd, short flags, void *arg);
  31 
  32 int main(int argc, char **argv)
  33 {
  34     char byte='a';
  35     struct timespec tp={0, 100};
  36     int count=0;
  37 
  38     /* Initialize the event library */
  39     opal_init(&argc, &argv);
  40 
  41     /* setup for threads */
  42     opal_event_use_threads();
  43 
  44     /* create a new base */
  45     my_base = opal_event_base_create();
  46 
  47     /* launch a progress thread on that base*/
  48     pipe(progress_thread_pipe);
  49     OBJ_CONSTRUCT(&lock, opal_mutex_t);
  50     OBJ_CONSTRUCT(&cond, opal_condition_t);
  51     OBJ_CONSTRUCT(&progress_thread, opal_thread_t);
  52     progress_thread.t_run = progress_engine;
  53     if (OPAL_SUCCESS != opal_thread_start(&progress_thread)) {
  54         fprintf(stderr, "Unable to start progress thread\n");
  55         exit(1);
  56     }
  57 
  58     /* wait a little while - reflects reality in an async system */
  59     while (count < 100) {
  60         nanosleep(&tp, NULL);
  61         count++;
  62     }
  63     count=0;
  64 
  65     /* define a file descriptor event - looks like an incoming socket
  66      * connection being created
  67      */
  68     if (0 > (my_fd = open("foo", O_CREAT | O_TRUNC | O_RDWR, 0644))) {
  69         fprintf(stderr, "Couldnt open file\n");
  70         exit(1);
  71     }
  72     opal_event_set(my_base,
  73                    &write_event,
  74                    my_fd,
  75                    OPAL_EV_WRITE|OPAL_EV_PERSIST,
  76                    send_handler,
  77                    NULL);
  78     opal_event_add(&write_event, 0);
  79     /*    opal_fd_write(progress_thread_pipe[1], 1, &byte); */
  80 
  81     /* wait for it to trigger */
  82     while (count < 1000) {
  83         OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
  84         if (fd_written) {
  85             OPAL_RELEASE_THREAD(&lock, &cond, &active);
  86             break;
  87         }
  88         OPAL_RELEASE_THREAD(&lock, &cond, &active);
  89         if (0 == (count % 100)) {
  90             fprintf(stderr, "Waiting...\n");
  91         }
  92         nanosleep(&tp, NULL);
  93         count++;
  94     }
  95     fprintf(stderr, "Done waiting\n");
  96 
  97     /* stop the thread */
  98     OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
  99     progress_thread_stop = true;
 100     OPAL_RELEASE_THREAD(&lock, &cond, &active);
 101     opal_fd_write(progress_thread_pipe[1], 1, &byte);
 102     opal_thread_join(&progress_thread, NULL);
 103 
 104     opal_finalize();
 105     return 0;
 106 }
 107 
 108 static opal_event_t stop_event;
 109 static void stop_handler(int sd, short flags, void* cbdata)
 110 {
 111     char byte;
 112     opal_fd_read(progress_thread_pipe[0], 1, &byte);
 113     fprintf(stderr, "Stop handler called\n");
 114     /* reset the event */
 115     opal_event_add(&stop_event, 0);
 116     return;
 117 }
 118 
 119 static void* progress_engine(opal_object_t *obj)
 120 {
 121     /* define an event that will be used to kick us out of a blocking
 122      * situation when we want to exit
 123      */
 124     opal_event_set(my_base, &stop_event,
 125                    progress_thread_pipe[0], OPAL_EV_READ, stop_handler, NULL);
 126     opal_event_add(&stop_event, 0);
 127 
 128     while (1) {
 129         OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
 130         if (progress_thread_stop) {
 131             fprintf(stderr, "Thread stopping\n");
 132             OPAL_RELEASE_THREAD(&lock, &cond, &active);
 133             opal_event_del(&stop_event);
 134             return OPAL_THREAD_CANCELLED;
 135         }
 136         OPAL_RELEASE_THREAD(&lock, &cond, &active);
 137         fprintf(stderr, "Looping...\n");
 138         opal_event_loop(my_base, OPAL_EVLOOP_ONCE);
 139     }
 140 }
 141 
 142 static void send_handler(int sd, short flags, void *arg)
 143 {
 144     char *bytes="This is an output string\n";
 145 
 146     fprintf(stderr, "Write event fired\n");
 147     opal_fd_write(my_fd, strlen(bytes), bytes);
 148     opal_event_del(&write_event);
 149     OPAL_ACQUIRE_THREAD(&lock, &cond, &active);
 150     fd_written = true;
 151     OPAL_RELEASE_THREAD(&lock, &cond, &active);
 152 }

/* [<][>][^][v][top][bottom][index][help] */