This source file includes following definitions.
- check_config_value_bool
- component_register
- component_progress
- component_init
- component_finalize
- component_query
- component_select
- ompi_osc_pt2pt_set_info
- ompi_osc_pt2pt_get_info
- ompi_osc_pt2pt_receive_construct
- ompi_osc_pt2pt_receive_destruct
- ompi_osc_pt2pt_peer_construct
- ompi_osc_pt2pt_peer_destruct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 #include "ompi_config.h"
29 #include "opal/util/show_help.h"
30 #include "opal/util/printf.h"
31
32 #include <string.h>
33
34 #include "osc_pt2pt.h"
35 #include "osc_pt2pt_frag.h"
36 #include "osc_pt2pt_request.h"
37 #include "osc_pt2pt_data_move.h"
38
39 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
40
41 static int component_register(void);
42 static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
43 static int component_finalize(void);
44 static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
45 struct ompi_communicator_t *comm, struct opal_info_t *info,
46 int flavor);
47 static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
48 struct ompi_communicator_t *comm, struct opal_info_t *info,
49 int flavor, int *model);
50
51 ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
52 {
53 .osc_version = {
54 OMPI_OSC_BASE_VERSION_3_0_0,
55 .mca_component_name = "pt2pt",
56 MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
57 OMPI_RELEASE_VERSION),
58 .mca_register_component_params = component_register,
59 },
60 .osc_data = {
61
62 MCA_BASE_METADATA_PARAM_NONE
63 },
64 .osc_init = component_init,
65 .osc_query = component_query,
66 .osc_select = component_select,
67 .osc_finalize = component_finalize,
68 }
69 };
70
71
72 ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_template = {
73 .super = {
74 .osc_win_attach = ompi_osc_pt2pt_attach,
75 .osc_win_detach = ompi_osc_pt2pt_detach,
76 .osc_free = ompi_osc_pt2pt_free,
77
78 .osc_put = ompi_osc_pt2pt_put,
79 .osc_get = ompi_osc_pt2pt_get,
80 .osc_accumulate = ompi_osc_pt2pt_accumulate,
81 .osc_compare_and_swap = ompi_osc_pt2pt_compare_and_swap,
82 .osc_fetch_and_op = ompi_osc_pt2pt_fetch_and_op,
83 .osc_get_accumulate = ompi_osc_pt2pt_get_accumulate,
84
85 .osc_rput = ompi_osc_pt2pt_rput,
86 .osc_rget = ompi_osc_pt2pt_rget,
87 .osc_raccumulate = ompi_osc_pt2pt_raccumulate,
88 .osc_rget_accumulate = ompi_osc_pt2pt_rget_accumulate,
89
90 .osc_fence = ompi_osc_pt2pt_fence,
91
92 .osc_start = ompi_osc_pt2pt_start,
93 .osc_complete = ompi_osc_pt2pt_complete,
94 .osc_post = ompi_osc_pt2pt_post,
95 .osc_wait = ompi_osc_pt2pt_wait,
96 .osc_test = ompi_osc_pt2pt_test,
97
98 .osc_lock = ompi_osc_pt2pt_lock,
99 .osc_unlock = ompi_osc_pt2pt_unlock,
100 .osc_lock_all = ompi_osc_pt2pt_lock_all,
101 .osc_unlock_all = ompi_osc_pt2pt_unlock_all,
102
103 .osc_sync = ompi_osc_pt2pt_sync,
104 .osc_flush = ompi_osc_pt2pt_flush,
105 .osc_flush_all = ompi_osc_pt2pt_flush_all,
106 .osc_flush_local = ompi_osc_pt2pt_flush_local,
107 .osc_flush_local_all = ompi_osc_pt2pt_flush_local_all,
108 }
109 };
110
111 bool ompi_osc_pt2pt_no_locks = false;
112 static bool using_thread_multiple = false;
113
114
115
116
117 static bool check_config_value_bool(char *key, opal_info_t *info, bool result)
118 {
119 int flag;
120
121 (void) opal_info_get_bool (info, key, &result, &flag);
122 return result;
123 }
124
125 static int component_register (void)
126 {
127 ompi_osc_pt2pt_no_locks = false;
128 (void) mca_base_component_var_register(&mca_osc_pt2pt_component.super.osc_version,
129 "no_locks",
130 "Enable optimizations available only if MPI_LOCK is "
131 "not used. "
132 "Info key of same name overrides this value.",
133 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
134 OPAL_INFO_LVL_9,
135 MCA_BASE_VAR_SCOPE_READONLY,
136 &ompi_osc_pt2pt_no_locks);
137
138 mca_osc_pt2pt_component.buffer_size = 8192;
139 (void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "buffer_size",
140 "Data transfers smaller than this limit may be coalesced before "
141 "being transferred (default: 8k)", MCA_BASE_VAR_TYPE_UNSIGNED_INT,
142 NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
143 &mca_osc_pt2pt_component.buffer_size);
144
145 mca_osc_pt2pt_component.receive_count = 4;
146 (void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "receive_count",
147 "Number of receives to post for each window for incoming fragments "
148 "(default: 4)", MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, OPAL_INFO_LVL_4,
149 MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_pt2pt_component.receive_count);
150
151 return OMPI_SUCCESS;
152 }
153
154 static int component_progress (void)
155 {
156 int pending_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
157 int recv_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_receives);
158 ompi_osc_pt2pt_pending_t *pending, *next;
159
160 if (recv_count) {
161 for (int i = 0 ; i < recv_count ; ++i) {
162 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
163 ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) opal_list_remove_first (&mca_osc_pt2pt_component.pending_receives);
164 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
165 if (NULL == recv) {
166 break;
167 }
168
169 (void) ompi_osc_pt2pt_process_receive (recv);
170 }
171 }
172
173
174 if (pending_count) {
175 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
176 OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
177 int ret;
178
179 switch (pending->header.base.type) {
180 case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
181 ret = ompi_osc_pt2pt_process_flush (pending->module, pending->source,
182 &pending->header.flush);
183 break;
184 case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
185 ret = ompi_osc_pt2pt_process_unlock (pending->module, pending->source,
186 &pending->header.unlock);
187 break;
188 default:
189
190 assert (0);
191 abort ();
192 }
193
194 if (OMPI_SUCCESS == ret) {
195 opal_list_remove_item (&mca_osc_pt2pt_component.pending_operations, &pending->super);
196 OBJ_RELEASE(pending);
197 }
198 }
199 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
200 }
201
202 return 1;
203 }
204
205 static int
206 component_init(bool enable_progress_threads,
207 bool enable_mpi_threads)
208 {
209 int ret;
210
211 if (enable_mpi_threads) {
212 using_thread_multiple = true;
213 }
214
215 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t);
216 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t);
217 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t);
218 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives, opal_list_t);
219 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives_lock, opal_mutex_t);
220
221 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules,
222 opal_hash_table_t);
223 opal_hash_table_init(&mca_osc_pt2pt_component.modules, 2);
224
225 mca_osc_pt2pt_component.progress_enable = false;
226 mca_osc_pt2pt_component.module_count = 0;
227
228 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, opal_free_list_t);
229 ret = opal_free_list_init (&mca_osc_pt2pt_component.frags,
230 sizeof(ompi_osc_pt2pt_frag_t), 8,
231 OBJ_CLASS(ompi_osc_pt2pt_frag_t),
232 mca_osc_pt2pt_component.buffer_size +
233 sizeof (ompi_osc_pt2pt_frag_header_t),
234 8, 1, -1, 1, NULL, 0, NULL, NULL, NULL);
235 if (OMPI_SUCCESS != ret) {
236 opal_output_verbose(1, ompi_osc_base_framework.framework_output,
237 "%s:%d: opal_free_list_init failed: %d",
238 __FILE__, __LINE__, ret);
239 return ret;
240 }
241
242 OBJ_CONSTRUCT(&mca_osc_pt2pt_component.requests, opal_free_list_t);
243 ret = opal_free_list_init (&mca_osc_pt2pt_component.requests,
244 sizeof(ompi_osc_pt2pt_request_t), 8,
245 OBJ_CLASS(ompi_osc_pt2pt_request_t),
246 0, 0, 0, -1, 32, NULL, 0, NULL, NULL, NULL);
247 if (OMPI_SUCCESS != ret) {
248 opal_output_verbose(1, ompi_osc_base_framework.framework_output,
249 "%s:%d: opal_free_list_init failed: %d\n",
250 __FILE__, __LINE__, ret);
251 return ret;
252 }
253
254 return ret;
255 }
256
257
258 int
259 component_finalize(void)
260 {
261 size_t num_modules;
262
263 if (mca_osc_pt2pt_component.progress_enable) {
264 opal_progress_unregister (component_progress);
265 }
266
267 if (0 !=
268 (num_modules = opal_hash_table_get_size(&mca_osc_pt2pt_component.modules))) {
269 opal_output(ompi_osc_base_framework.framework_output,
270 "WARNING: There were %d Windows created but not freed.",
271 (int) num_modules);
272 }
273
274 OBJ_DESTRUCT(&mca_osc_pt2pt_component.frags);
275 OBJ_DESTRUCT(&mca_osc_pt2pt_component.modules);
276 OBJ_DESTRUCT(&mca_osc_pt2pt_component.lock);
277 OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests);
278 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations);
279 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock);
280 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives);
281 OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives_lock);
282
283 return OMPI_SUCCESS;
284 }
285
286
287 static int
288 component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
289 struct ompi_communicator_t *comm, struct opal_info_t *info,
290 int flavor)
291 {
292 if (MPI_WIN_FLAVOR_SHARED == flavor) return -1;
293
294 return 10;
295 }
296
297
298 static int
299 component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
300 struct ompi_communicator_t *comm, struct opal_info_t *info,
301 int flavor, int *model)
302 {
303 ompi_osc_pt2pt_module_t *module = NULL;
304 int ret;
305 char *name;
306
307
308
309 if (MPI_WIN_FLAVOR_SHARED == flavor) return OMPI_ERR_NOT_SUPPORTED;
310
311
312
313
314
315 if (using_thread_multiple) {
316 opal_show_help("help-osc-pt2pt.txt", "mpi-thread-multiple-not-supported", true);
317 return OMPI_ERR_NOT_SUPPORTED;
318 }
319
320
321 module = (ompi_osc_pt2pt_module_t*)
322 calloc(1, sizeof(ompi_osc_pt2pt_module_t));
323 if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
324
325
326 memcpy(module, &ompi_osc_pt2pt_module_template,
327 sizeof(ompi_osc_base_module_t));
328
329
330 OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t);
331 OBJ_CONSTRUCT(&module->cond, opal_condition_t);
332 OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
333 OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
334 OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
335 OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
336 OBJ_CONSTRUCT(&module->pending_acc_lock, opal_mutex_t);
337 OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
338 OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
339 OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);
340 OBJ_CONSTRUCT(&module->peer_hash, opal_hash_table_t);
341 OBJ_CONSTRUCT(&module->peer_lock, opal_mutex_t);
342
343 ret = opal_hash_table_init (&module->outstanding_locks, 64);
344 if (OPAL_SUCCESS != ret) {
345 goto cleanup;
346 }
347
348 ret = opal_hash_table_init (&module->peer_hash, 128);
349 if (OPAL_SUCCESS != ret) {
350 goto cleanup;
351 }
352
353
354
355 #if 1
356 module->accumulate_ordering = 1;
357 #else
358 ompi_osc_base_config_value_equal("accumulate_ordering", info, "none");
359 #endif
360
361
362 if (MPI_WIN_FLAVOR_ALLOCATE == flavor && size) {
363 module->free_after = *base = malloc(size);
364 if (NULL == *base) {
365 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
366 goto cleanup;
367 }
368 }
369
370
371 if (MPI_WIN_FLAVOR_DYNAMIC != flavor) {
372 module->baseptr = *base;
373 }
374
375 ret = ompi_comm_dup(comm, &module->comm);
376 if (OMPI_SUCCESS != ret) goto cleanup;
377
378 OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
379 "pt2pt component creating window with id %d",
380 ompi_comm_get_cid(module->comm)));
381
382
383 module->disp_unit = disp_unit;
384
385
386 module->epoch_outgoing_frag_count = calloc (ompi_comm_size(comm), sizeof(uint32_t));
387 if (NULL == module->epoch_outgoing_frag_count) {
388 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
389 goto cleanup;
390 }
391
392
393
394 #if 0
395
396
397
398 module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
399 module->all_sync.eager_send_active = true;
400 #endif
401
402
403 module->no_locks = check_config_value_bool ("no_locks", info, ompi_osc_pt2pt_no_locks);
404
405
406 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
407 ret = opal_hash_table_set_value_uint32(&mca_osc_pt2pt_component.modules,
408 ompi_comm_get_cid(module->comm),
409 module);
410 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
411 if (OMPI_SUCCESS != ret) goto cleanup;
412
413
414 *model = MPI_WIN_UNIFIED;
415 win->w_osc_module = (ompi_osc_base_module_t*) module;
416 opal_asprintf(&name, "pt2pt window %d", ompi_comm_get_cid(module->comm));
417 ompi_win_set_name(win, name);
418 free(name);
419
420
421 opal_atomic_mb();
422
423 ret = ompi_osc_pt2pt_frag_start_receive (module);
424 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
425 goto cleanup;
426 }
427
428
429
430 ret = module->comm->c_coll->coll_barrier(module->comm,
431 module->comm->c_coll->coll_barrier_module);
432 if (OMPI_SUCCESS != ret) goto cleanup;
433
434 if (!mca_osc_pt2pt_component.progress_enable) {
435 opal_progress_register (component_progress);
436 mca_osc_pt2pt_component.progress_enable = true;
437 }
438
439 if (module->no_locks) {
440 win->w_flags |= OMPI_WIN_NO_LOCKS;
441 }
442
443 OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
444 "done creating pt2pt window %d", ompi_comm_get_cid(module->comm)));
445
446 return OMPI_SUCCESS;
447
448 cleanup:
449
450 win->w_osc_module = (ompi_osc_base_module_t*) module;
451 ompi_osc_pt2pt_free (win);
452
453 return ret;
454 }
455
456
457 int
458 ompi_osc_pt2pt_set_info(struct ompi_win_t *win, struct opal_info_t *info)
459 {
460 ompi_osc_pt2pt_module_t *module =
461 (ompi_osc_pt2pt_module_t*) win->w_osc_module;
462
463
464 return module->comm->c_coll->coll_barrier(module->comm,
465 module->comm->c_coll->coll_barrier_module);
466 }
467
468
469 int
470 ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct opal_info_t **info_used)
471 {
472 opal_info_t *info = OBJ_NEW(opal_info_t);
473 if (NULL == info) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
474
475 *info_used = info;
476
477 return OMPI_SUCCESS;
478 }
479
480 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL);
481
482 static void ompi_osc_pt2pt_receive_construct (ompi_osc_pt2pt_receive_t *recv)
483 {
484 recv->buffer = NULL;
485 recv->pml_request = NULL;
486 }
487
488 static void ompi_osc_pt2pt_receive_destruct (ompi_osc_pt2pt_receive_t *recv)
489 {
490 free (recv->buffer);
491 if (recv->pml_request && MPI_REQUEST_NULL != recv->pml_request) {
492 recv->pml_request->req_complete_cb = NULL;
493 ompi_request_cancel (recv->pml_request);
494 ompi_request_free (&recv->pml_request);
495 }
496 }
497
498 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_receive_t, opal_list_item_t,
499 ompi_osc_pt2pt_receive_construct,
500 ompi_osc_pt2pt_receive_destruct);
501
502 static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
503 {
504 OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);
505 OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
506 peer->active_frag = 0;
507 peer->passive_incoming_frag_count = 0;
508 peer->flags = 0;
509 }
510
511 static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)
512 {
513 OBJ_DESTRUCT(&peer->queued_frags);
514 OBJ_DESTRUCT(&peer->lock);
515 }
516
517 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_peer_t, opal_object_t,
518 ompi_osc_pt2pt_peer_construct,
519 ompi_osc_pt2pt_peer_destruct);