root/ompi/mca/topo/treematch/treematch/tm_thread_pool.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. tm_set_max_nb_threads
  2. execute_work
  3. bind_myself_to_core
  4. thread_loop
  5. add_work
  6. wait_work_completion
  7. submit_work
  8. create_threads
  9. get_thread_pool
  10. terminate_thread_pool
  11. get_nb_threads
  12. create_work
  13. destroy_work
  14. f1
  15. f2
  16. test_main

   1 #include <pthread.h>
   2 #include "tm_thread_pool.h"
   3 #include "tm_verbose.h"
   4 #include <hwloc.h>
   5 #include "tm_verbose.h"
   6 #include "tm_tree.h"
   7 #include <errno.h>
   8 #include <limits.h>
   9 
  10 typedef enum _mapping_policy {COMPACT, SCATTER} mapping_policy_t;
  11 
  12 static mapping_policy_t mapping_policy = COMPACT;
  13 static int verbose_level = ERROR;
  14 static thread_pool_t *pool = NULL;
  15 static unsigned int max_nb_threads = INT_MAX;
  16 
  17 static thread_pool_t *get_thread_pool(void);
  18 static void execute_work(work_t *work);
  19 static int bind_myself_to_core(hwloc_topology_t topology, int id);
  20 static void *thread_loop(void *arg);
  21 static void add_work(pthread_mutex_t *list_lock, pthread_cond_t *cond_var, work_t *working_list, work_t *work);
  22 static thread_pool_t *create_threads(void);
  23 
  24 static void f1 (int nb_args, void **args, int thread_id);
  25 static void f2 (int nb_args, void **args, int thread_id);
  26 
  27 #define MIN(a, b) ((a)<(b)?(a):(b))
  28 #define MAX(a, b) ((a)>(b)?(a):(b))
  29 
  30 
  31 
  32 void tm_set_max_nb_threads(unsigned int val){
  33   max_nb_threads = val;
  34 }
  35 
  36 void execute_work(work_t *work){
  37   work->task(work->nb_args, work->args, work->thread_id);
  38 }
  39 
  40 int bind_myself_to_core(hwloc_topology_t topology, int id){
  41   hwloc_cpuset_t cpuset;
  42   hwloc_obj_t obj;
  43   char *str;
  44   int binding_res;
  45   int depth = hwloc_topology_get_depth(topology);
  46   int nb_cores = hwloc_get_nbobjs_by_depth(topology, depth-1);
  47   int my_core;
  48   int nb_threads = get_nb_threads();
  49   /* printf("depth=%d\n",depth); */
  50 
  51   switch (mapping_policy){
  52   case SCATTER:
  53     my_core = id*(nb_cores/nb_threads);
  54     break;
  55   default:
  56     if(verbose_level>=WARNING){
  57       printf("Wrong scheduling policy. Using COMPACT\n");
  58     }
  59   case COMPACT:
  60     my_core = id%nb_cores;
  61   }
  62 
  63     if(verbose_level>=INFO){
  64        printf("Mapping thread %d on core %d\n",id,my_core);
  65    }
  66 
  67     /* Get my core. */
  68     obj = hwloc_get_obj_by_depth(topology, depth-1, my_core);
  69     if (obj) {
  70       /* Get a copy of its cpuset that we may modify. */
  71       cpuset = hwloc_bitmap_dup(obj->cpuset);
  72 
  73       /* Get only one logical processor (in case the core is
  74          SMT/hyperthreaded). */
  75       hwloc_bitmap_singlify(cpuset);
  76 
  77 
  78       /*hwloc_bitmap_asprintf(&str, cpuset);
  79       printf("Binding thread %d to cpuset %s\n", my_core,str);
  80       FREE(str);
  81       */
  82 
  83       /* And try  to bind ourself there. */
  84       binding_res = hwloc_set_cpubind(topology, cpuset, HWLOC_CPUBIND_THREAD);
  85       if (binding_res == -1){
  86         int error = errno;
  87         hwloc_bitmap_asprintf(&str, obj->cpuset);
  88         if(verbose_level>=WARNING)
  89           printf("Thread %d couldn't bind to cpuset %s: %s.\n This thread is not bound to any core...\n", my_core, str, strerror(error));
  90         free(str); /* str is allocated by hlwoc, free it normally*/
  91         return 0;
  92       }
  93       /* FREE our cpuset copy */
  94       hwloc_bitmap_free(cpuset);
  95       return 1;
  96     }else{
  97       if(verbose_level>=WARNING)
  98         printf("No valid object for core id %d!\n",my_core);
  99       return 0;
 100     }
 101 }
 102 
 103 
 104 
 105 
 106 void *thread_loop(void *arg){
 107   local_thread_t *local=(local_thread_t*)arg;
 108   int id = local->id;
 109   hwloc_topology_t topology= local->topology;
 110   work_t *start_working_list = local ->working_list;
 111   pthread_cond_t *cond_var = local->cond_var;
 112   pthread_mutex_t *list_lock = local->list_lock;
 113   work_t *work;
 114   int *ret = (int *)MALLOC(sizeof(int));
 115 
 116   bind_myself_to_core(topology,id);
 117 
 118 
 119 
 120   while(1){
 121     pthread_mutex_lock(list_lock);
 122     while(start_working_list->next == NULL) {
 123       pthread_cond_wait(cond_var, list_lock);
 124     }
 125 
 126     work = start_working_list->next;
 127     start_working_list->next = work-> next;
 128     pthread_mutex_unlock(list_lock);
 129 
 130     if(!work->task){
 131       *ret = 0;
 132       pthread_exit(ret);
 133     }
 134 
 135     execute_work(work);
 136     pthread_mutex_lock(&work->mutex);
 137     work->done=1;
 138     pthread_mutex_unlock(&work->mutex);
 139     pthread_cond_signal(&work->work_done);
 140   }
 141 
 142 }
 143 
 144 void add_work(pthread_mutex_t *list_lock, pthread_cond_t *cond_var, work_t *working_list, work_t *work){
 145 
 146   work_t *elem = working_list;
 147   pthread_mutex_lock(list_lock);
 148   while(elem->next!=NULL){
 149     elem=elem->next;
 150   }
 151   elem->next=work;
 152   work -> next = NULL;
 153   work -> done = 0;
 154   pthread_cond_signal(cond_var);
 155   pthread_mutex_unlock(list_lock);
 156 }
 157 
 158 
 159 void wait_work_completion(work_t *work){
 160   pthread_mutex_lock(&work->mutex);
 161   while(!work->done)
 162     pthread_cond_wait(&work->work_done, &work->mutex);
 163 
 164 }
 165 
 166 
 167 int submit_work(work_t *work, int thread_id){
 168   if( (thread_id>=0) && (thread_id< pool->nb_threads)){
 169     work->thread_id = thread_id;
 170     add_work(&pool->list_lock[thread_id], &pool->cond_var[thread_id], &pool->working_list[thread_id], work);
 171     return 1;
 172   }
 173   return 0;
 174 }
 175 
 176 thread_pool_t *create_threads(){
 177   hwloc_topology_t topology;
 178   int i;
 179   local_thread_t *local;
 180   int nb_threads;
 181   unsigned int nb_cores;
 182   int depth;
 183 
 184   verbose_level = tm_get_verbose_level();
 185 
 186     /*Get number of cores: set 1 thread per core*/
 187   /* Allocate and initialize topology object. */
 188   hwloc_topology_init(&topology);
 189   /* Only keep relevant levels
 190      hwloc_topology_ignore_all_keep_structure(topology);*/
 191   /* Perform the topology detection. */
 192   hwloc_topology_load(topology);
 193   depth = hwloc_topology_get_depth(topology);
 194   if (depth == -1 ) {
 195     if(verbose_level>=CRITICAL)
 196       fprintf(stderr,"Error: HWLOC unable to find the depth of the topology of this node!\n");
 197     exit(-1);
 198   }
 199 
 200 
 201 
 202   /* at depth 'depth' it is necessary a PU/core where we can execute things*/
 203   nb_cores = hwloc_get_nbobjs_by_depth(topology, depth-1);
 204   nb_threads = MIN(nb_cores,  max_nb_threads);
 205 
 206   if(verbose_level>=INFO)
 207     printf("nb_threads = %d\n",nb_threads);
 208 
 209   pool = (thread_pool_t*) MALLOC(sizeof(thread_pool_t));
 210   pool -> topology = topology;
 211   pool -> nb_threads = nb_threads;
 212   pool -> thread_list = (pthread_t*)MALLOC(sizeof(pthread_t)*nb_threads);
 213   pool -> working_list = (work_t*)CALLOC(nb_threads,sizeof(work_t));
 214   pool -> cond_var = (pthread_cond_t*)MALLOC(sizeof(pthread_cond_t)*nb_threads);
 215   pool -> list_lock = (pthread_mutex_t*)MALLOC(sizeof(pthread_mutex_t)*nb_threads);
 216 
 217   local=(local_thread_t*)MALLOC(sizeof(local_thread_t)*nb_threads);
 218   pool->local = local;
 219 
 220   for (i=0;i<nb_threads;i++){
 221     local[i].topology = topology;
 222     local[i].id = i;
 223     local[i].working_list = &pool->working_list[i];
 224     pthread_cond_init(pool->cond_var +i, NULL);
 225     local[i].cond_var = pool->cond_var +i;
 226     pthread_mutex_init(pool->list_lock +i, NULL);
 227     local[i].list_lock = pool->list_lock+i;
 228     if (pthread_create (pool->thread_list+i, NULL, thread_loop, local+i) < 0) {
 229       if(verbose_level>=CRITICAL)
 230         fprintf(stderr, "pthread_create error for exec thread %d\n",i);
 231       return NULL;
 232     }
 233   }
 234   return pool;
 235 }
 236 
 237 thread_pool_t *get_thread_pool(){;
 238   if (pool == NULL)
 239     return create_threads();
 240 
 241   return pool;
 242 }
 243 
 244 void terminate_thread_pool(){
 245   int id;
 246   int *ret=NULL;
 247   work_t work;
 248 
 249   if(pool){
 250     work.task=NULL;
 251     for (id=0;id<pool->nb_threads;id++){
 252       submit_work(&work,id);
 253     }
 254 
 255 
 256     for (id=0;id<pool->nb_threads;id++){
 257       pthread_join(pool->thread_list[id],(void **) &ret);
 258       FREE(ret);
 259       pthread_cond_destroy(pool->cond_var +id);
 260       pthread_mutex_destroy(pool->list_lock +id);
 261       if (pool->working_list[id].next != NULL)
 262         if(verbose_level >= WARNING)
 263           printf("Working list of thread %d not empty!\n",id);
 264     }
 265 
 266     hwloc_topology_destroy(pool->topology);
 267     FREE(pool -> thread_list);
 268     FREE(pool -> working_list);
 269     FREE(pool -> cond_var);
 270     FREE(pool -> list_lock);
 271     FREE(pool -> local);
 272     FREE(pool);
 273     pool = NULL;
 274   }
 275 }
 276 
 277 
 278 
 279 
 280 int get_nb_threads(){
 281   pool = get_thread_pool();
 282   return pool -> nb_threads;
 283 }
 284 
 285 
 286 work_t *create_work(int nb_args, void **args, void (*task) (int, void **, int)){
 287   work_t *work;
 288   work = MALLOC(sizeof(work_t));
 289   work -> nb_args = nb_args;
 290   work -> args = args;
 291   work -> task = task;
 292   work -> done = 0;
 293   pthread_cond_init (&work->work_done, NULL);
 294   pthread_mutex_init(&work->mutex,     NULL);
 295   if( verbose_level >= DEBUG)
 296     printf("work %p created\n",(void *)work);
 297   return work;
 298 }
 299 
 300 
 301 void destroy_work(work_t *work){
 302   pthread_cond_destroy(&work->work_done);
 303   pthread_mutex_destroy(&work->mutex);
 304   FREE(work);
 305 }
 306 
 307 /* CODE example 2 functions  and  test driver*/
 308 
 309 void f1 (int nb_args, void **args, int thread_id){
 310   int a, b;
 311   a = *(int*)args[0];
 312   b = *(int*)args[1];
 313   printf("id: %d, nb_args=%d, a=%d, b=%d\n",thread_id, nb_args,a,b);
 314 }
 315 
 316 
 317 void f2 (int nb_args, void **args, int thread_id){
 318   int n, *tab;
 319   int *res;
 320   int i,j;
 321   n = *(int*)args[0];
 322   tab = (int*)args[1];
 323   res=(int*)args[2];
 324 
 325   for(j=0;j<1000000;j++){
 326     *res=0;
 327     for (i=0;i<n;i++)
 328       *res+=tab[i];
 329   }
 330 
 331   printf("id: %d, done: %d!\n",thread_id, nb_args);
 332 }
 333 
 334 
 335 
 336 int test_main(void){
 337 
 338   int a=3, c;
 339   int b=-5;
 340   void *args1[3];
 341   void *args2[3];
 342   int tab[100];
 343   int i,res;
 344   work_t *work1,*work2,*work3,*work4;
 345   int nb_threads = get_nb_threads();
 346 
 347 
 348   printf("nb_threads= %d\n", nb_threads);
 349 
 350 
 351   args1[0] = &a;
 352   args1[1] = &b;
 353   work1 = create_work(2,args1,f1);
 354 
 355 
 356   for (i=0;i<100;i++)
 357     tab[i]=i;
 358 
 359   c=100;
 360   args2[0] = &c;
 361   args2[1] = tab;
 362   args2[2] = &res;
 363 
 364   work2 = create_work(3, args2, f2);
 365   work3 = create_work(4, args2, f2);
 366   work4 = create_work(5, args2, f2);
 367 
 368   submit_work(work1,0);
 369   submit_work(work2,1);
 370   submit_work(work3,1);
 371   submit_work(work4,1);
 372 
 373 
 374 
 375   terminate_thread_pool();
 376   wait_work_completion(work1);
 377   wait_work_completion(work2);
 378   wait_work_completion(work3);
 379   wait_work_completion(work4);
 380 
 381   printf("res=%d\n",res);
 382 
 383   destroy_work(work1);
 384   destroy_work(work2);
 385   destroy_work(work3);
 386   destroy_work(work4);
 387   return 0;
 388 }

/* [<][>][^][v][top][bottom][index][help] */