This source file includes following definitions.
- tm_set_max_nb_threads
- execute_work
- bind_myself_to_core
- thread_loop
- add_work
- wait_work_completion
- submit_work
- create_threads
- get_thread_pool
- terminate_thread_pool
- get_nb_threads
- create_work
- destroy_work
- f1
- f2
- test_main
1 #include <pthread.h>
2 #include "tm_thread_pool.h"
3 #include "tm_verbose.h"
4 #include <hwloc.h>
5 #include "tm_verbose.h"
6 #include "tm_tree.h"
7 #include <errno.h>
8 #include <limits.h>
9
10 typedef enum _mapping_policy {COMPACT, SCATTER} mapping_policy_t;
11
12 static mapping_policy_t mapping_policy = COMPACT;
13 static int verbose_level = ERROR;
14 static thread_pool_t *pool = NULL;
15 static unsigned int max_nb_threads = INT_MAX;
16
17 static thread_pool_t *get_thread_pool(void);
18 static void execute_work(work_t *work);
19 static int bind_myself_to_core(hwloc_topology_t topology, int id);
20 static void *thread_loop(void *arg);
21 static void add_work(pthread_mutex_t *list_lock, pthread_cond_t *cond_var, work_t *working_list, work_t *work);
22 static thread_pool_t *create_threads(void);
23
24 static void f1 (int nb_args, void **args, int thread_id);
25 static void f2 (int nb_args, void **args, int thread_id);
26
27 #define MIN(a, b) ((a)<(b)?(a):(b))
28 #define MAX(a, b) ((a)>(b)?(a):(b))
29
30
31
32 void tm_set_max_nb_threads(unsigned int val){
33 max_nb_threads = val;
34 }
35
36 void execute_work(work_t *work){
37 work->task(work->nb_args, work->args, work->thread_id);
38 }
39
40 int bind_myself_to_core(hwloc_topology_t topology, int id){
41 hwloc_cpuset_t cpuset;
42 hwloc_obj_t obj;
43 char *str;
44 int binding_res;
45 int depth = hwloc_topology_get_depth(topology);
46 int nb_cores = hwloc_get_nbobjs_by_depth(topology, depth-1);
47 int my_core;
48 int nb_threads = get_nb_threads();
49
50
51 switch (mapping_policy){
52 case SCATTER:
53 my_core = id*(nb_cores/nb_threads);
54 break;
55 default:
56 if(verbose_level>=WARNING){
57 printf("Wrong scheduling policy. Using COMPACT\n");
58 }
59 case COMPACT:
60 my_core = id%nb_cores;
61 }
62
63 if(verbose_level>=INFO){
64 printf("Mapping thread %d on core %d\n",id,my_core);
65 }
66
67
68 obj = hwloc_get_obj_by_depth(topology, depth-1, my_core);
69 if (obj) {
70
71 cpuset = hwloc_bitmap_dup(obj->cpuset);
72
73
74
75 hwloc_bitmap_singlify(cpuset);
76
77
78
79
80
81
82
83
84 binding_res = hwloc_set_cpubind(topology, cpuset, HWLOC_CPUBIND_THREAD);
85 if (binding_res == -1){
86 int error = errno;
87 hwloc_bitmap_asprintf(&str, obj->cpuset);
88 if(verbose_level>=WARNING)
89 printf("Thread %d couldn't bind to cpuset %s: %s.\n This thread is not bound to any core...\n", my_core, str, strerror(error));
90 free(str);
91 return 0;
92 }
93
94 hwloc_bitmap_free(cpuset);
95 return 1;
96 }else{
97 if(verbose_level>=WARNING)
98 printf("No valid object for core id %d!\n",my_core);
99 return 0;
100 }
101 }
102
103
104
105
106 void *thread_loop(void *arg){
107 local_thread_t *local=(local_thread_t*)arg;
108 int id = local->id;
109 hwloc_topology_t topology= local->topology;
110 work_t *start_working_list = local ->working_list;
111 pthread_cond_t *cond_var = local->cond_var;
112 pthread_mutex_t *list_lock = local->list_lock;
113 work_t *work;
114 int *ret = (int *)MALLOC(sizeof(int));
115
116 bind_myself_to_core(topology,id);
117
118
119
120 while(1){
121 pthread_mutex_lock(list_lock);
122 while(start_working_list->next == NULL) {
123 pthread_cond_wait(cond_var, list_lock);
124 }
125
126 work = start_working_list->next;
127 start_working_list->next = work-> next;
128 pthread_mutex_unlock(list_lock);
129
130 if(!work->task){
131 *ret = 0;
132 pthread_exit(ret);
133 }
134
135 execute_work(work);
136 pthread_mutex_lock(&work->mutex);
137 work->done=1;
138 pthread_mutex_unlock(&work->mutex);
139 pthread_cond_signal(&work->work_done);
140 }
141
142 }
143
144 void add_work(pthread_mutex_t *list_lock, pthread_cond_t *cond_var, work_t *working_list, work_t *work){
145
146 work_t *elem = working_list;
147 pthread_mutex_lock(list_lock);
148 while(elem->next!=NULL){
149 elem=elem->next;
150 }
151 elem->next=work;
152 work -> next = NULL;
153 work -> done = 0;
154 pthread_cond_signal(cond_var);
155 pthread_mutex_unlock(list_lock);
156 }
157
158
159 void wait_work_completion(work_t *work){
160 pthread_mutex_lock(&work->mutex);
161 while(!work->done)
162 pthread_cond_wait(&work->work_done, &work->mutex);
163
164 }
165
166
167 int submit_work(work_t *work, int thread_id){
168 if( (thread_id>=0) && (thread_id< pool->nb_threads)){
169 work->thread_id = thread_id;
170 add_work(&pool->list_lock[thread_id], &pool->cond_var[thread_id], &pool->working_list[thread_id], work);
171 return 1;
172 }
173 return 0;
174 }
175
176 thread_pool_t *create_threads(){
177 hwloc_topology_t topology;
178 int i;
179 local_thread_t *local;
180 int nb_threads;
181 unsigned int nb_cores;
182 int depth;
183
184 verbose_level = tm_get_verbose_level();
185
186
187
188 hwloc_topology_init(&topology);
189
190
191
192 hwloc_topology_load(topology);
193 depth = hwloc_topology_get_depth(topology);
194 if (depth == -1 ) {
195 if(verbose_level>=CRITICAL)
196 fprintf(stderr,"Error: HWLOC unable to find the depth of the topology of this node!\n");
197 exit(-1);
198 }
199
200
201
202
203 nb_cores = hwloc_get_nbobjs_by_depth(topology, depth-1);
204 nb_threads = MIN(nb_cores, max_nb_threads);
205
206 if(verbose_level>=INFO)
207 printf("nb_threads = %d\n",nb_threads);
208
209 pool = (thread_pool_t*) MALLOC(sizeof(thread_pool_t));
210 pool -> topology = topology;
211 pool -> nb_threads = nb_threads;
212 pool -> thread_list = (pthread_t*)MALLOC(sizeof(pthread_t)*nb_threads);
213 pool -> working_list = (work_t*)CALLOC(nb_threads,sizeof(work_t));
214 pool -> cond_var = (pthread_cond_t*)MALLOC(sizeof(pthread_cond_t)*nb_threads);
215 pool -> list_lock = (pthread_mutex_t*)MALLOC(sizeof(pthread_mutex_t)*nb_threads);
216
217 local=(local_thread_t*)MALLOC(sizeof(local_thread_t)*nb_threads);
218 pool->local = local;
219
220 for (i=0;i<nb_threads;i++){
221 local[i].topology = topology;
222 local[i].id = i;
223 local[i].working_list = &pool->working_list[i];
224 pthread_cond_init(pool->cond_var +i, NULL);
225 local[i].cond_var = pool->cond_var +i;
226 pthread_mutex_init(pool->list_lock +i, NULL);
227 local[i].list_lock = pool->list_lock+i;
228 if (pthread_create (pool->thread_list+i, NULL, thread_loop, local+i) < 0) {
229 if(verbose_level>=CRITICAL)
230 fprintf(stderr, "pthread_create error for exec thread %d\n",i);
231 return NULL;
232 }
233 }
234 return pool;
235 }
236
237 thread_pool_t *get_thread_pool(){;
238 if (pool == NULL)
239 return create_threads();
240
241 return pool;
242 }
243
244 void terminate_thread_pool(){
245 int id;
246 int *ret=NULL;
247 work_t work;
248
249 if(pool){
250 work.task=NULL;
251 for (id=0;id<pool->nb_threads;id++){
252 submit_work(&work,id);
253 }
254
255
256 for (id=0;id<pool->nb_threads;id++){
257 pthread_join(pool->thread_list[id],(void **) &ret);
258 FREE(ret);
259 pthread_cond_destroy(pool->cond_var +id);
260 pthread_mutex_destroy(pool->list_lock +id);
261 if (pool->working_list[id].next != NULL)
262 if(verbose_level >= WARNING)
263 printf("Working list of thread %d not empty!\n",id);
264 }
265
266 hwloc_topology_destroy(pool->topology);
267 FREE(pool -> thread_list);
268 FREE(pool -> working_list);
269 FREE(pool -> cond_var);
270 FREE(pool -> list_lock);
271 FREE(pool -> local);
272 FREE(pool);
273 pool = NULL;
274 }
275 }
276
277
278
279
280 int get_nb_threads(){
281 pool = get_thread_pool();
282 return pool -> nb_threads;
283 }
284
285
286 work_t *create_work(int nb_args, void **args, void (*task) (int, void **, int)){
287 work_t *work;
288 work = MALLOC(sizeof(work_t));
289 work -> nb_args = nb_args;
290 work -> args = args;
291 work -> task = task;
292 work -> done = 0;
293 pthread_cond_init (&work->work_done, NULL);
294 pthread_mutex_init(&work->mutex, NULL);
295 if( verbose_level >= DEBUG)
296 printf("work %p created\n",(void *)work);
297 return work;
298 }
299
300
301 void destroy_work(work_t *work){
302 pthread_cond_destroy(&work->work_done);
303 pthread_mutex_destroy(&work->mutex);
304 FREE(work);
305 }
306
307
308
309 void f1 (int nb_args, void **args, int thread_id){
310 int a, b;
311 a = *(int*)args[0];
312 b = *(int*)args[1];
313 printf("id: %d, nb_args=%d, a=%d, b=%d\n",thread_id, nb_args,a,b);
314 }
315
316
317 void f2 (int nb_args, void **args, int thread_id){
318 int n, *tab;
319 int *res;
320 int i,j;
321 n = *(int*)args[0];
322 tab = (int*)args[1];
323 res=(int*)args[2];
324
325 for(j=0;j<1000000;j++){
326 *res=0;
327 for (i=0;i<n;i++)
328 *res+=tab[i];
329 }
330
331 printf("id: %d, done: %d!\n",thread_id, nb_args);
332 }
333
334
335
336 int test_main(void){
337
338 int a=3, c;
339 int b=-5;
340 void *args1[3];
341 void *args2[3];
342 int tab[100];
343 int i,res;
344 work_t *work1,*work2,*work3,*work4;
345 int nb_threads = get_nb_threads();
346
347
348 printf("nb_threads= %d\n", nb_threads);
349
350
351 args1[0] = &a;
352 args1[1] = &b;
353 work1 = create_work(2,args1,f1);
354
355
356 for (i=0;i<100;i++)
357 tab[i]=i;
358
359 c=100;
360 args2[0] = &c;
361 args2[1] = tab;
362 args2[2] = &res;
363
364 work2 = create_work(3, args2, f2);
365 work3 = create_work(4, args2, f2);
366 work4 = create_work(5, args2, f2);
367
368 submit_work(work1,0);
369 submit_work(work2,1);
370 submit_work(work3,1);
371 submit_work(work4,1);
372
373
374
375 terminate_thread_pool();
376 wait_work_completion(work1);
377 wait_work_completion(work2);
378 wait_work_completion(work3);
379 wait_work_completion(work4);
380
381 printf("res=%d\n",res);
382
383 destroy_work(work1);
384 destroy_work(work2);
385 destroy_work(work3);
386 destroy_work(work4);
387 return 0;
388 }