This source file includes following definitions.
- mca_coll_sm_module_construct
- mca_coll_sm_module_destruct
- mca_coll_sm_module_disable
- mca_coll_sm_init_query
- mca_coll_sm_comm_query
- sm_module_enable
- ompi_coll_sm_lazy_enable
- bootstrap_comm
- mca_coll_sm_ft_event
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 #include "ompi_config.h"
37
38 #include <stdio.h>
39 #include <string.h>
40 #ifdef HAVE_SCHED_H
41 #include <sched.h>
42 #endif
43 #include <sys/types.h>
44 #ifdef HAVE_SYS_MMAN_H
45 #include <sys/mman.h>
46 #endif
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include "mpi.h"
52 #include "opal_stdint.h"
53 #include "opal/mca/hwloc/base/base.h"
54 #include "opal/util/os_path.h"
55 #include "opal/util/printf.h"
56
57 #include "ompi/communicator/communicator.h"
58 #include "ompi/group/group.h"
59 #include "ompi/mca/coll/coll.h"
60 #include "ompi/mca/coll/base/base.h"
61 #include "ompi/mca/rte/rte.h"
62 #include "ompi/proc/proc.h"
63 #include "coll_sm.h"
64
65 #include "ompi/mca/coll/base/coll_tags.h"
66 #include "ompi/mca/pml/pml.h"
67
68
69
70
71 uint32_t mca_coll_sm_one = 1;
72
73
74
75
76
77 static int sm_module_enable(mca_coll_base_module_t *module,
78 struct ompi_communicator_t *comm);
79 static int bootstrap_comm(ompi_communicator_t *comm,
80 mca_coll_sm_module_t *module);
81 static int mca_coll_sm_module_disable(mca_coll_base_module_t *module,
82 struct ompi_communicator_t *comm);
83
84
85
86
87 static void mca_coll_sm_module_construct(mca_coll_sm_module_t *module)
88 {
89 module->enabled = false;
90 module->sm_comm_data = NULL;
91 module->previous_reduce = NULL;
92 module->previous_reduce_module = NULL;
93 module->super.coll_module_disable = mca_coll_sm_module_disable;
94 }
95
96
97
98
99 static void mca_coll_sm_module_destruct(mca_coll_sm_module_t *module)
100 {
101 mca_coll_sm_comm_t *c = module->sm_comm_data;
102
103 if (NULL != c) {
104
105 if (NULL != c->sm_bootstrap_meta) {
106
107
108 mca_common_sm_fini(c->sm_bootstrap_meta);
109 OBJ_RELEASE(c->sm_bootstrap_meta);
110 }
111 free(c);
112 }
113
114
115 if (NULL != module->previous_reduce_module) {
116 OBJ_RELEASE(module->previous_reduce_module);
117 }
118
119 module->enabled = false;
120 }
121
122
123
124
125 static int mca_coll_sm_module_disable(mca_coll_base_module_t *module, struct ompi_communicator_t *comm)
126 {
127 mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
128 if (NULL != sm_module->previous_reduce_module) {
129 sm_module->previous_reduce = NULL;
130 OBJ_RELEASE(sm_module->previous_reduce_module);
131 sm_module->previous_reduce_module = NULL;
132 }
133 return OMPI_SUCCESS;
134 }
135
136
137 OBJ_CLASS_INSTANCE(mca_coll_sm_module_t,
138 mca_coll_base_module_t,
139 mca_coll_sm_module_construct,
140 mca_coll_sm_module_destruct);
141
142
143
144
145
146
147
148 int mca_coll_sm_init_query(bool enable_progress_threads,
149 bool enable_mpi_threads)
150 {
151
152 if (NULL == ompi_process_info.job_session_dir) {
153 return OMPI_ERR_OUT_OF_RESOURCE;
154 }
155
156
157 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
158 "coll:sm:init_query: pick me! pick me!");
159 return OMPI_SUCCESS;
160 }
161
162
163
164
165
166
167
168 mca_coll_base_module_t *
169 mca_coll_sm_comm_query(struct ompi_communicator_t *comm, int *priority)
170 {
171 mca_coll_sm_module_t *sm_module;
172
173
174
175
176 if (OMPI_COMM_IS_INTER(comm) || 1 == ompi_comm_size(comm) || ompi_group_have_remote_peers (comm->c_local_group)) {
177 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
178 "coll:sm:comm_query (%d/%s): intercomm, comm is too small, or not all peers local; disqualifying myself", comm->c_contextid, comm->c_name);
179 return NULL;
180 }
181
182
183
184 *priority = mca_coll_sm_component.sm_priority;
185 if (mca_coll_sm_component.sm_priority <= 0) {
186 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
187 "coll:sm:comm_query (%d/%s): priority too low; disqualifying myself", comm->c_contextid, comm->c_name);
188 return NULL;
189 }
190
191 sm_module = OBJ_NEW(mca_coll_sm_module_t);
192 if (NULL == sm_module) {
193 return NULL;
194 }
195
196
197 sm_module->super.coll_module_enable = sm_module_enable;
198 sm_module->super.ft_event = mca_coll_sm_ft_event;
199 sm_module->super.coll_allgather = NULL;
200 sm_module->super.coll_allgatherv = NULL;
201 sm_module->super.coll_allreduce = mca_coll_sm_allreduce_intra;
202 sm_module->super.coll_alltoall = NULL;
203 sm_module->super.coll_alltoallv = NULL;
204 sm_module->super.coll_alltoallw = NULL;
205 sm_module->super.coll_barrier = mca_coll_sm_barrier_intra;
206 sm_module->super.coll_bcast = mca_coll_sm_bcast_intra;
207 sm_module->super.coll_exscan = NULL;
208 sm_module->super.coll_gather = NULL;
209 sm_module->super.coll_gatherv = NULL;
210 sm_module->super.coll_reduce = mca_coll_sm_reduce_intra;
211 sm_module->super.coll_reduce_scatter = NULL;
212 sm_module->super.coll_scan = NULL;
213 sm_module->super.coll_scatter = NULL;
214 sm_module->super.coll_scatterv = NULL;
215
216 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
217 "coll:sm:comm_query (%d/%s): pick me! pick me!",
218 comm->c_contextid, comm->c_name);
219 return &(sm_module->super);
220 }
221
222
223
224
225
226 static int sm_module_enable(mca_coll_base_module_t *module,
227 struct ompi_communicator_t *comm)
228 {
229 if (NULL == comm->c_coll->coll_reduce ||
230 NULL == comm->c_coll->coll_reduce_module) {
231 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
232 "coll:sm:enable (%d/%s): no underlying reduce; disqualifying myself",
233 comm->c_contextid, comm->c_name);
234 return OMPI_ERROR;
235 }
236
237
238 return OMPI_SUCCESS;
239 }
240
241 int ompi_coll_sm_lazy_enable(mca_coll_base_module_t *module,
242 struct ompi_communicator_t *comm)
243 {
244 int i, j, root, ret;
245 int rank = ompi_comm_rank(comm);
246 int size = ompi_comm_size(comm);
247 mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
248 mca_coll_sm_comm_t *data = NULL;
249 size_t control_size, frag_size;
250 mca_coll_sm_component_t *c = &mca_coll_sm_component;
251 opal_hwloc_base_memory_segment_t *maffinity;
252 int parent, min_child, num_children;
253 unsigned char *base = NULL;
254 const int num_barrier_buffers = 2;
255
256
257 if (sm_module->enabled) {
258 return OMPI_SUCCESS;
259 }
260 sm_module->enabled = true;
261
262
263
264 maffinity = (opal_hwloc_base_memory_segment_t*)
265 malloc(sizeof(opal_hwloc_base_memory_segment_t) *
266 c->sm_comm_num_segments * 3);
267 if (NULL == maffinity) {
268 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
269 "coll:sm:enable (%d/%s): malloc failed (1)",
270 comm->c_contextid, comm->c_name);
271 return OMPI_ERR_OUT_OF_RESOURCE;
272 }
273
274
275
276
277
278
279
280
281
282
283
284
285
286 sm_module->sm_comm_data = data = (mca_coll_sm_comm_t*)
287 malloc(sizeof(mca_coll_sm_comm_t) +
288 (c->sm_comm_num_segments *
289 sizeof(mca_coll_sm_data_index_t)) +
290 (size *
291 (sizeof(mca_coll_sm_tree_node_t) +
292 (sizeof(mca_coll_sm_tree_node_t*) * c->sm_tree_degree))));
293 if (NULL == data) {
294 free(maffinity);
295 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
296 "coll:sm:enable (%d/%s): malloc failed (2)",
297 comm->c_contextid, comm->c_name);
298 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
299 }
300 data->mcb_operation_count = 0;
301
302
303
304 data->mcb_data_index = (mca_coll_sm_data_index_t*) (data + 1);
305
306 data->mcb_tree = (mca_coll_sm_tree_node_t*)
307 (data->mcb_data_index + c->sm_comm_num_segments);
308
309
310 data->mcb_tree[0].mcstn_children = (mca_coll_sm_tree_node_t**)
311 (data->mcb_tree + size);
312 for (i = 1; i < size; ++i) {
313 data->mcb_tree[i].mcstn_children =
314 data->mcb_tree[i - 1].mcstn_children + c->sm_tree_degree;
315 }
316
317
318
319
320 for (root = 0; root < size; ++root) {
321 parent = (root - 1) / mca_coll_sm_component.sm_tree_degree;
322 num_children = mca_coll_sm_component.sm_tree_degree;
323
324
325
326 if ((root * num_children) + 1 >= size) {
327
328 min_child = -1;
329 num_children = 0;
330 } else {
331
332 int max_child;
333 min_child = root * num_children + 1;
334 max_child = root * num_children + num_children;
335 if (max_child >= size) {
336 max_child = size - 1;
337 }
338 num_children = max_child - min_child + 1;
339 }
340
341
342 data->mcb_tree[root].mcstn_id = root;
343 if (root == 0 && parent == 0) {
344 data->mcb_tree[root].mcstn_parent = NULL;
345 } else {
346 data->mcb_tree[root].mcstn_parent = &data->mcb_tree[parent];
347 }
348 data->mcb_tree[root].mcstn_num_children = num_children;
349 for (i = 0; i < c->sm_tree_degree; ++i) {
350 data->mcb_tree[root].mcstn_children[i] =
351 (i < num_children) ?
352 &data->mcb_tree[min_child + i] : NULL;
353 }
354 }
355
356
357 if (OMPI_SUCCESS != (ret = bootstrap_comm(comm, sm_module))) {
358 free(data);
359 free(maffinity);
360 sm_module->sm_comm_data = NULL;
361 return ret;
362 }
363
364
365
366
367
368
369
370
371
372 control_size = c->sm_control_size;
373 base = data->sm_bootstrap_meta->module_data_addr;
374 data->mcb_barrier_control_me = (uint32_t*)
375 (base + (rank * control_size * num_barrier_buffers * 2));
376 if (data->mcb_tree[rank].mcstn_parent) {
377 data->mcb_barrier_control_parent = (opal_atomic_uint32_t*)
378 (base +
379 (data->mcb_tree[rank].mcstn_parent->mcstn_id * control_size *
380 num_barrier_buffers * 2));
381 } else {
382 data->mcb_barrier_control_parent = NULL;
383 }
384 if (data->mcb_tree[rank].mcstn_num_children > 0) {
385 data->mcb_barrier_control_children = (uint32_t*)
386 (base +
387 (data->mcb_tree[rank].mcstn_children[0]->mcstn_id * control_size *
388 num_barrier_buffers * 2));
389 } else {
390 data->mcb_barrier_control_children = NULL;
391 }
392 data->mcb_barrier_count = 0;
393
394
395
396
397 base += (c->sm_control_size * size * num_barrier_buffers * 2);
398 data->mcb_in_use_flags = (mca_coll_sm_in_use_flag_t*) base;
399
400
401
402
403 j = 0;
404 if (0 == rank) {
405 maffinity[j].mbs_start_addr = base;
406 maffinity[j].mbs_len = c->sm_control_size *
407 c->sm_comm_num_in_use_flags;
408
409
410
411
412
413 for (i = 0; i < mca_coll_sm_component.sm_comm_num_in_use_flags; ++i) {
414 ((mca_coll_sm_in_use_flag_t *)base)[i].mcsiuf_operation_count = 1;
415 ((mca_coll_sm_in_use_flag_t *)base)[i].mcsiuf_num_procs_using = 0;
416 }
417 ++j;
418 }
419
420
421
422 base += (c->sm_comm_num_in_use_flags * c->sm_control_size);
423 control_size = size * c->sm_control_size;
424 frag_size = size * c->sm_fragment_size;
425 for (i = 0; i < c->sm_comm_num_segments; ++i) {
426 data->mcb_data_index[i].mcbmi_control = (uint32_t*)
427 (base + (i * (control_size + frag_size)));
428 data->mcb_data_index[i].mcbmi_data =
429 (((char*) data->mcb_data_index[i].mcbmi_control) +
430 control_size);
431
432
433
434 maffinity[j].mbs_len = c->sm_control_size;
435 maffinity[j].mbs_start_addr = (void *)
436 (((char*) data->mcb_data_index[i].mcbmi_control) +
437 (rank * c->sm_control_size));
438 ++j;
439
440
441
442 maffinity[j].mbs_len = c->sm_fragment_size;
443 maffinity[j].mbs_start_addr =
444 ((char*) data->mcb_data_index[i].mcbmi_data) +
445 (rank * c->sm_control_size);
446 ++j;
447 }
448
449
450
451 opal_hwloc_base_memory_set(maffinity, j);
452 free(maffinity);
453
454
455 memset(data->mcb_barrier_control_me, 0,
456 num_barrier_buffers * 2 * c->sm_control_size);
457 for (i = 0; i < c->sm_comm_num_segments; ++i) {
458 memset((void *) data->mcb_data_index[i].mcbmi_control, 0,
459 c->sm_control_size);
460 }
461
462
463 sm_module->previous_reduce = comm->c_coll->coll_reduce;
464 sm_module->previous_reduce_module = comm->c_coll->coll_reduce_module;
465 OBJ_RETAIN(sm_module->previous_reduce_module);
466
467
468 opal_atomic_add (&(data->sm_bootstrap_meta->module_seg->seg_inited), 1);
469
470
471 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
472 "coll:sm:enable (%d/%s): waiting for peers to attach",
473 comm->c_contextid, comm->c_name);
474 SPIN_CONDITION(size == data->sm_bootstrap_meta->module_seg->seg_inited, seg_init_exit);
475
476
477 if (0 == rank) {
478 unlink(data->sm_bootstrap_meta->shmem_ds.seg_name);
479 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
480 "coll:sm:enable (%d/%s): removed mmap file %s",
481 comm->c_contextid, comm->c_name,
482 data->sm_bootstrap_meta->shmem_ds.seg_name);
483 }
484
485
486
487 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
488 "coll:sm:enable (%d/%s): success!",
489 comm->c_contextid, comm->c_name);
490 return OMPI_SUCCESS;
491 }
492
493 static int bootstrap_comm(ompi_communicator_t *comm,
494 mca_coll_sm_module_t *module)
495 {
496 int i;
497 char *shortpath, *fullpath;
498 mca_coll_sm_component_t *c = &mca_coll_sm_component;
499 mca_coll_sm_comm_t *data = module->sm_comm_data;
500 int comm_size = ompi_comm_size(comm);
501 int num_segments = c->sm_comm_num_segments;
502 int num_in_use = c->sm_comm_num_in_use_flags;
503 int frag_size = c->sm_fragment_size;
504 int control_size = c->sm_control_size;
505 ompi_process_name_t *lowest_name = NULL;
506 size_t size;
507 ompi_proc_t *proc;
508
509
510
511
512
513 proc = ompi_group_peer_lookup(comm->c_local_group, 0);
514 lowest_name = OMPI_CAST_RTE_NAME(&proc->super.proc_name);
515 for (i = 1; i < comm_size; ++i) {
516 proc = ompi_group_peer_lookup(comm->c_local_group, i);
517 if (ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
518 OMPI_CAST_RTE_NAME(&proc->super.proc_name),
519 lowest_name) < 0) {
520 lowest_name = OMPI_CAST_RTE_NAME(&proc->super.proc_name);
521 }
522 }
523 opal_asprintf(&shortpath, "coll-sm-cid-%d-name-%s.mmap", comm->c_contextid,
524 OMPI_NAME_PRINT(lowest_name));
525 if (NULL == shortpath) {
526 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
527 "coll:sm:enable:bootstrap comm (%d/%s): asprintf failed",
528 comm->c_contextid, comm->c_name);
529 return OMPI_ERR_OUT_OF_RESOURCE;
530 }
531 fullpath = opal_os_path(false, ompi_process_info.job_session_dir,
532 shortpath, NULL);
533 free(shortpath);
534 if (NULL == fullpath) {
535 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
536 "coll:sm:enable:bootstrap comm (%d/%s): opal_os_path failed",
537 comm->c_contextid, comm->c_name);
538 return OMPI_ERR_OUT_OF_RESOURCE;
539 }
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562 size = 4 * control_size +
563 (num_in_use * control_size) +
564 (num_segments * (comm_size * control_size * 2)) +
565 (num_segments * (comm_size * frag_size));
566 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
567 "coll:sm:enable:bootstrap comm (%d/%s): attaching to %" PRIsize_t " byte mmap: %s",
568 comm->c_contextid, comm->c_name, size, fullpath);
569 if (0 == ompi_comm_rank (comm)) {
570 data->sm_bootstrap_meta = mca_common_sm_module_create_and_attach (size, fullpath, sizeof(mca_common_sm_seg_header_t), 8);
571 if (NULL == data->sm_bootstrap_meta) {
572 opal_output_verbose(10, ompi_coll_base_framework.framework_output,
573 "coll:sm:enable:bootstrap comm (%d/%s): mca_common_sm_init_group failed",
574 comm->c_contextid, comm->c_name);
575 free(fullpath);
576 return OMPI_ERR_OUT_OF_RESOURCE;
577 }
578
579 for (int i = 1 ; i < ompi_comm_size (comm) ; ++i) {
580 MCA_PML_CALL(send(&data->sm_bootstrap_meta->shmem_ds, sizeof (data->sm_bootstrap_meta->shmem_ds), MPI_BYTE,
581 i, MCA_COLL_BASE_TAG_BCAST, MCA_PML_BASE_SEND_STANDARD, comm));
582 }
583 } else {
584 opal_shmem_ds_t shmem_ds;
585 MCA_PML_CALL(recv(&shmem_ds, sizeof (shmem_ds), MPI_BYTE, 0, MCA_COLL_BASE_TAG_BCAST, comm, MPI_STATUS_IGNORE));
586 data->sm_bootstrap_meta = mca_common_sm_module_attach (&shmem_ds, sizeof(mca_common_sm_seg_header_t), 8);
587 }
588
589
590 free(fullpath);
591 return OMPI_SUCCESS;
592 }
593
594
595 int mca_coll_sm_ft_event(int state) {
596 if(OPAL_CRS_CHECKPOINT == state) {
597 ;
598 }
599 else if(OPAL_CRS_CONTINUE == state) {
600 ;
601 }
602 else if(OPAL_CRS_RESTART == state) {
603 ;
604 }
605 else if(OPAL_CRS_TERM == state ) {
606 ;
607 }
608 else {
609 ;
610 }
611
612 return OMPI_SUCCESS;
613 }