This source file includes following definitions.
- component_register
- component_open
- component_init
- component_finalize
- check_win_ok
- component_query
- component_select
- ompi_osc_sm_shared_query
- ompi_osc_sm_attach
- ompi_osc_sm_detach
- ompi_osc_sm_free
- ompi_osc_sm_set_info
- component_set_blocking_fence_info
- component_set_alloc_shared_noncontig_info
- ompi_osc_sm_get_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23
24 #include "ompi/mca/osc/osc.h"
25 #include "ompi/mca/osc/base/base.h"
26 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
27 #include "ompi/request/request.h"
28 #include "opal/util/sys_limits.h"
29 #include "opal/include/opal/align.h"
30 #include "opal/util/info_subscriber.h"
31 #include "opal/util/printf.h"
32
33 #include "osc_sm.h"
34
35 static int component_open(void);
36 static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
37 static int component_finalize(void);
38 static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
39 struct ompi_communicator_t *comm, struct opal_info_t *info,
40 int flavor);
41 static int component_register (void);
42 static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
43 struct ompi_communicator_t *comm, struct opal_info_t *info,
44 int flavor, int *model);
45 static char* component_set_blocking_fence_info(opal_infosubscriber_t *obj, char *key, char *val);
46 static char* component_set_alloc_shared_noncontig_info(opal_infosubscriber_t *obj, char *key, char *val);
47
48
49 ompi_osc_sm_component_t mca_osc_sm_component = {
50 {
51 .osc_version = {
52 OMPI_OSC_BASE_VERSION_3_0_0,
53 .mca_component_name = "sm",
54 MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
55 OMPI_RELEASE_VERSION),
56 .mca_open_component = component_open,
57 .mca_register_component_params = component_register,
58 },
59 .osc_data = {
60
61 MCA_BASE_METADATA_PARAM_NONE
62 },
63 .osc_init = component_init,
64 .osc_query = component_query,
65 .osc_select = component_select,
66 .osc_finalize = component_finalize,
67 }
68 };
69
70
71 ompi_osc_sm_module_t ompi_osc_sm_module_template = {
72 {
73 .osc_win_shared_query = ompi_osc_sm_shared_query,
74
75 .osc_win_attach = ompi_osc_sm_attach,
76 .osc_win_detach = ompi_osc_sm_detach,
77 .osc_free = ompi_osc_sm_free,
78
79 .osc_put = ompi_osc_sm_put,
80 .osc_get = ompi_osc_sm_get,
81 .osc_accumulate = ompi_osc_sm_accumulate,
82 .osc_compare_and_swap = ompi_osc_sm_compare_and_swap,
83 .osc_fetch_and_op = ompi_osc_sm_fetch_and_op,
84 .osc_get_accumulate = ompi_osc_sm_get_accumulate,
85
86 .osc_rput = ompi_osc_sm_rput,
87 .osc_rget = ompi_osc_sm_rget,
88 .osc_raccumulate = ompi_osc_sm_raccumulate,
89 .osc_rget_accumulate = ompi_osc_sm_rget_accumulate,
90
91 .osc_fence = ompi_osc_sm_fence,
92
93 .osc_start = ompi_osc_sm_start,
94 .osc_complete = ompi_osc_sm_complete,
95 .osc_post = ompi_osc_sm_post,
96 .osc_wait = ompi_osc_sm_wait,
97 .osc_test = ompi_osc_sm_test,
98
99 .osc_lock = ompi_osc_sm_lock,
100 .osc_unlock = ompi_osc_sm_unlock,
101 .osc_lock_all = ompi_osc_sm_lock_all,
102 .osc_unlock_all = ompi_osc_sm_unlock_all,
103
104 .osc_sync = ompi_osc_sm_sync,
105 .osc_flush = ompi_osc_sm_flush,
106 .osc_flush_all = ompi_osc_sm_flush_all,
107 .osc_flush_local = ompi_osc_sm_flush_local,
108 .osc_flush_local_all = ompi_osc_sm_flush_local_all,
109 }
110 };
111
112 static int component_register (void)
113 {
114 if (0 == access ("/dev/shm", W_OK)) {
115 mca_osc_sm_component.backing_directory = "/dev/shm";
116 } else {
117 mca_osc_sm_component.backing_directory = ompi_process_info.proc_session_dir;
118 }
119
120 (void) mca_base_component_var_register (&mca_osc_sm_component.super.osc_version, "backing_directory",
121 "Directory to place backing files for shared memory windows. "
122 "This directory should be on a local filesystem such as /tmp or "
123 "/dev/shm (default: (linux) /dev/shm, (others) session directory)",
124 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_3,
125 MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_sm_component.backing_directory);
126
127 return OPAL_SUCCESS;
128 }
129
130 static int
131 component_open(void)
132 {
133 return OMPI_SUCCESS;
134 }
135
136
137 static int
138 component_init(bool enable_progress_threads, bool enable_mpi_threads)
139 {
140 return OMPI_SUCCESS;
141 }
142
143
144 static int
145 component_finalize(void)
146 {
147
148
149 return OMPI_SUCCESS;
150 }
151
152
153 static int
154 check_win_ok(ompi_communicator_t *comm, int flavor)
155 {
156 if (! (MPI_WIN_FLAVOR_SHARED == flavor
157 || MPI_WIN_FLAVOR_ALLOCATE == flavor) ) {
158 return OMPI_ERR_NOT_SUPPORTED;
159 }
160
161 if (ompi_group_have_remote_peers (comm->c_local_group)) {
162 return OMPI_ERR_RMA_SHARED;
163 }
164
165 return OMPI_SUCCESS;
166 }
167
168
169 static int
170 component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
171 struct ompi_communicator_t *comm, struct opal_info_t *info,
172 int flavor)
173 {
174 int ret;
175 if (OMPI_SUCCESS != (ret = check_win_ok(comm, flavor))) {
176 if (OMPI_ERR_NOT_SUPPORTED == ret) {
177 return -1;
178 }
179 return ret;
180 }
181
182 return 100;
183 }
184
185
186 static int
187 component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
188 struct ompi_communicator_t *comm, struct opal_info_t *info,
189 int flavor, int *model)
190 {
191 ompi_osc_sm_module_t *module = NULL;
192 int comm_size = ompi_comm_size (comm);
193 bool unlink_needed = false;
194 int ret = OMPI_ERROR;
195
196 if (OMPI_SUCCESS != (ret = check_win_ok(comm, flavor))) {
197 return ret;
198 }
199
200
201 module = (ompi_osc_sm_module_t*)
202 calloc(1, sizeof(ompi_osc_sm_module_t));
203 if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
204
205 win->w_osc_module = &module->super;
206
207 OBJ_CONSTRUCT(&module->lock, opal_mutex_t);
208
209 ret = opal_infosubscribe_subscribe(&(win->super), "alloc_shared_contig", "false", component_set_alloc_shared_noncontig_info);
210
211 if (OPAL_SUCCESS != ret) goto error;
212
213
214 memcpy(module, &ompi_osc_sm_module_template,
215 sizeof(ompi_osc_base_module_t));
216
217
218 ret = ompi_comm_dup(comm, &module->comm);
219 if (OMPI_SUCCESS != ret) goto error;
220
221 module->flavor = flavor;
222
223
224 if (1 == comm_size) {
225 module->segment_base = NULL;
226 module->sizes = malloc(sizeof(size_t));
227 if (NULL == module->sizes) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
228 module->bases = malloc(sizeof(void*));
229 if (NULL == module->bases) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
230
231 module->sizes[0] = size;
232 module->bases[0] = malloc(size);
233 if (NULL == module->bases[0]) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
234
235 module->global_state = malloc(sizeof(ompi_osc_sm_global_state_t));
236 if (NULL == module->global_state) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
237 module->node_states = malloc(sizeof(ompi_osc_sm_node_state_t));
238 if (NULL == module->node_states) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
239 module->posts = calloc (1, sizeof(module->posts[0]) + sizeof (module->posts[0][0]));
240 if (NULL == module->posts) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
241 module->posts[0] = (osc_sm_post_atomic_type_t *) (module->posts + 1);
242 } else {
243 unsigned long total, *rbuf;
244 int i, flag;
245 size_t pagesize;
246 size_t state_size;
247 size_t posts_size, post_size = (comm_size + 63) / 64;
248
249 OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
250 "allocating shared memory region of size %ld\n", (long) size));
251
252
253 pagesize = opal_getpagesize();
254
255 rbuf = malloc(sizeof(unsigned long) * comm_size);
256 if (NULL == rbuf) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
257
258 module->noncontig = false;
259 if (OMPI_SUCCESS != opal_info_get_bool(info, "alloc_shared_noncontig",
260 &module->noncontig, &flag)) {
261 goto error;
262 }
263
264 if (module->noncontig) {
265 total = ((size - 1) / pagesize + 1) * pagesize;
266 } else {
267 total = size;
268 }
269 ret = module->comm->c_coll->coll_allgather(&total, 1, MPI_UNSIGNED_LONG,
270 rbuf, 1, MPI_UNSIGNED_LONG,
271 module->comm,
272 module->comm->c_coll->coll_allgather_module);
273 if (OMPI_SUCCESS != ret) return ret;
274
275 total = 0;
276 for (i = 0 ; i < comm_size ; ++i) {
277 total += rbuf[i];
278 }
279
280
281 state_size = sizeof(ompi_osc_sm_global_state_t) + sizeof(ompi_osc_sm_node_state_t) * comm_size;
282 state_size += OPAL_ALIGN_PAD_AMOUNT(state_size, 64);
283 posts_size = comm_size * post_size * sizeof (module->posts[0][0]);
284 posts_size += OPAL_ALIGN_PAD_AMOUNT(posts_size, 64);
285 if (0 == ompi_comm_rank (module->comm)) {
286 char *data_file;
287 ret = opal_asprintf (&data_file, "%s" OPAL_PATH_SEP "osc_sm.%s.%x.%d.%d",
288 mca_osc_sm_component.backing_directory, ompi_process_info.nodename,
289 OMPI_PROC_MY_NAME->jobid, (int) OMPI_PROC_MY_NAME->vpid, ompi_comm_get_cid(module->comm));
290 if (ret < 0) {
291 return OMPI_ERR_OUT_OF_RESOURCE;
292 }
293
294 ret = opal_shmem_segment_create (&module->seg_ds, data_file, total + pagesize + state_size + posts_size);
295 free(data_file);
296 if (OPAL_SUCCESS != ret) {
297 goto error;
298 }
299
300 unlink_needed = true;
301 }
302
303 ret = module->comm->c_coll->coll_bcast (&module->seg_ds, sizeof (module->seg_ds), MPI_BYTE, 0,
304 module->comm, module->comm->c_coll->coll_bcast_module);
305 if (OMPI_SUCCESS != ret) {
306 goto error;
307 }
308
309 module->segment_base = opal_shmem_segment_attach (&module->seg_ds);
310 if (NULL == module->segment_base) {
311 goto error;
312 }
313
314
315 ret = module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module);
316 if (OMPI_SUCCESS != ret) {
317 goto error;
318 }
319
320 if (0 == ompi_comm_rank (module->comm)) {
321 opal_shmem_unlink (&module->seg_ds);
322 unlink_needed = false;
323 }
324
325 module->sizes = malloc(sizeof(size_t) * comm_size);
326 if (NULL == module->sizes) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
327 module->bases = malloc(sizeof(void*) * comm_size);
328 if (NULL == module->bases) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
329 module->posts = calloc (comm_size, sizeof (module->posts[0]));
330 if (NULL == module->posts) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
331
332
333 module->posts[0] = (osc_sm_post_atomic_type_t *) (module->segment_base);
334 module->global_state = (ompi_osc_sm_global_state_t *) (module->posts[0] + comm_size * post_size);
335 module->node_states = (ompi_osc_sm_node_state_t *) (module->global_state + 1);
336
337 for (i = 0, total = state_size + posts_size ; i < comm_size ; ++i) {
338 if (i > 0) {
339 module->posts[i] = module->posts[i - 1] + post_size;
340 }
341
342 module->sizes[i] = rbuf[i];
343 if (module->sizes[i]) {
344 module->bases[i] = ((char *) module->segment_base) + total;
345 total += rbuf[i];
346 } else {
347 module->bases[i] = NULL;
348 }
349 }
350
351 free(rbuf);
352 }
353
354
355 module->my_node_state = &module->node_states[ompi_comm_rank(module->comm)];
356 memset (module->my_node_state, 0, sizeof(*module->my_node_state));
357
358 *base = module->bases[ompi_comm_rank(module->comm)];
359
360 opal_atomic_lock_init(&module->my_node_state->accumulate_lock, OPAL_ATOMIC_LOCK_UNLOCKED);
361
362
363 module->disp_units = malloc(sizeof(int) * comm_size);
364 ret = module->comm->c_coll->coll_allgather(&disp_unit, 1, MPI_INT,
365 module->disp_units, 1, MPI_INT,
366 module->comm,
367 module->comm->c_coll->coll_allgather_module);
368 if (OMPI_SUCCESS != ret) goto error;
369
370 module->start_group = NULL;
371 module->post_group = NULL;
372
373
374 module->my_sense = 1;
375
376 module->outstanding_locks = calloc(comm_size, sizeof(enum ompi_osc_sm_locktype_t));
377 if (NULL == module->outstanding_locks) {
378 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
379 goto error;
380 }
381
382 if (0 == ompi_comm_rank(module->comm)) {
383 #if HAVE_PTHREAD_CONDATTR_SETPSHARED && HAVE_PTHREAD_MUTEXATTR_SETPSHARED
384 pthread_mutexattr_t mattr;
385 pthread_condattr_t cattr;
386 bool blocking_fence=false;
387 int flag;
388
389 if (OMPI_SUCCESS != opal_info_get_bool(info, "blocking_fence",
390 &blocking_fence, &flag)) {
391 goto error;
392 }
393
394 if (flag && blocking_fence) {
395 ret = pthread_mutexattr_init(&mattr);
396 ret = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
397 if (ret != 0) {
398 module->global_state->use_barrier_for_fence = 1;
399 } else {
400 ret = pthread_mutex_init(&module->global_state->mtx, &mattr);
401 if (ret != 0) {
402 module->global_state->use_barrier_for_fence = 1;
403 } else {
404 pthread_condattr_init(&cattr);
405 pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
406 ret = pthread_cond_init(&module->global_state->cond, &cattr);
407 if (ret != 0) return OMPI_ERROR;
408 pthread_condattr_destroy(&cattr);
409 }
410 }
411 module->global_state->use_barrier_for_fence = 0;
412 module->global_state->sense = module->my_sense;
413 module->global_state->count = comm_size;
414 pthread_mutexattr_destroy(&mattr);
415 } else {
416 module->global_state->use_barrier_for_fence = 1;
417 }
418 #else
419 module->global_state->use_barrier_for_fence = 1;
420 #endif
421 }
422
423 ret = opal_infosubscribe_subscribe(&(win->super), "blocking_fence", "false",
424 component_set_blocking_fence_info);
425
426 if (OPAL_SUCCESS != ret) goto error;
427
428 ret = module->comm->c_coll->coll_barrier(module->comm,
429 module->comm->c_coll->coll_barrier_module);
430 if (OMPI_SUCCESS != ret) goto error;
431
432 *model = MPI_WIN_UNIFIED;
433
434 return OMPI_SUCCESS;
435
436 error:
437
438 if (0 == ompi_comm_rank (module->comm) && unlink_needed) {
439 opal_shmem_unlink (&module->seg_ds);
440 }
441
442 ompi_osc_sm_free (win);
443
444 return ret;
445 }
446
447
448 int
449 ompi_osc_sm_shared_query(struct ompi_win_t *win, int rank, size_t *size, int *disp_unit, void *baseptr)
450 {
451 ompi_osc_sm_module_t *module =
452 (ompi_osc_sm_module_t*) win->w_osc_module;
453
454 if (module->flavor != MPI_WIN_FLAVOR_SHARED) {
455 return MPI_ERR_WIN;
456 }
457
458 if (MPI_PROC_NULL != rank) {
459 *size = module->sizes[rank];
460 *((void**) baseptr) = module->bases[rank];
461 *disp_unit = module->disp_units[rank];
462 } else {
463 int i = 0;
464
465 *size = 0;
466 *((void**) baseptr) = NULL;
467 *disp_unit = 0;
468 for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
469 if (0 != module->sizes[i]) {
470 *size = module->sizes[i];
471 *((void**) baseptr) = module->bases[i];
472 *disp_unit = module->disp_units[i];
473 break;
474 }
475 }
476 }
477
478 return OMPI_SUCCESS;
479 }
480
481
482 int
483 ompi_osc_sm_attach(struct ompi_win_t *win, void *base, size_t len)
484 {
485 ompi_osc_sm_module_t *module =
486 (ompi_osc_sm_module_t*) win->w_osc_module;
487
488 if (module->flavor != MPI_WIN_FLAVOR_DYNAMIC) {
489 return MPI_ERR_RMA_ATTACH;
490 }
491 return OMPI_SUCCESS;
492 }
493
494
495 int
496 ompi_osc_sm_detach(struct ompi_win_t *win, const void *base)
497 {
498 ompi_osc_sm_module_t *module =
499 (ompi_osc_sm_module_t*) win->w_osc_module;
500
501 if (module->flavor != MPI_WIN_FLAVOR_DYNAMIC) {
502 return MPI_ERR_RMA_ATTACH;
503 }
504 return OMPI_SUCCESS;
505 }
506
507
508 int
509 ompi_osc_sm_free(struct ompi_win_t *win)
510 {
511 ompi_osc_sm_module_t *module =
512 (ompi_osc_sm_module_t*) win->w_osc_module;
513
514
515 if (NULL != module->segment_base) {
516
517 module->comm->c_coll->coll_barrier(module->comm,
518 module->comm->c_coll->coll_barrier_module);
519
520 opal_shmem_segment_detach (&module->seg_ds);
521 } else {
522 free(module->node_states);
523 free(module->global_state);
524 if (NULL != module->bases) {
525 free(module->bases[0]);
526 }
527 }
528 free(module->disp_units);
529 free(module->outstanding_locks);
530 free(module->sizes);
531 free(module->bases);
532
533 free (module->posts);
534
535
536 ompi_comm_free(&module->comm);
537
538 OBJ_DESTRUCT(&module->lock);
539
540 free(module);
541
542 return OMPI_SUCCESS;
543 }
544
545
546 int
547 ompi_osc_sm_set_info(struct ompi_win_t *win, struct opal_info_t *info)
548 {
549 ompi_osc_sm_module_t *module =
550 (ompi_osc_sm_module_t*) win->w_osc_module;
551
552
553 return module->comm->c_coll->coll_barrier(module->comm,
554 module->comm->c_coll->coll_barrier_module);
555 }
556
557
558 static char*
559 component_set_blocking_fence_info(opal_infosubscriber_t *obj, char *key, char *val)
560 {
561 ompi_osc_sm_module_t *module = (ompi_osc_sm_module_t*) ((struct ompi_win_t*) obj)->w_osc_module;
562
563
564
565 return module->global_state->use_barrier_for_fence ? "true" : "false";
566 }
567
568
569 static char*
570 component_set_alloc_shared_noncontig_info(opal_infosubscriber_t *obj, char *key, char *val)
571 {
572
573 ompi_osc_sm_module_t *module = (ompi_osc_sm_module_t*) ((struct ompi_win_t*) obj)->w_osc_module;
574
575
576
577 return module->noncontig ? "true" : "false";
578 }
579
580
581 int
582 ompi_osc_sm_get_info(struct ompi_win_t *win, struct opal_info_t **info_used)
583 {
584 ompi_osc_sm_module_t *module =
585 (ompi_osc_sm_module_t*) win->w_osc_module;
586
587 opal_info_t *info = OBJ_NEW(opal_info_t);
588 if (NULL == info) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
589
590 if (module->flavor == MPI_WIN_FLAVOR_SHARED) {
591 opal_info_set(info, "blocking_fence",
592 (1 == module->global_state->use_barrier_for_fence) ? "true" : "false");
593 opal_info_set(info, "alloc_shared_noncontig",
594 (module->noncontig) ? "true" : "false");
595 }
596
597 *info_used = info;
598
599 return OMPI_SUCCESS;
600 }