This source file includes following definitions.
- component_open
- component_register
- progress_callback
- component_init
- component_finalize
- component_query
- exchange_len_info
- ompi_osc_ucx_unregister_progress
- component_select
- ompi_osc_find_attached_region_position
- ompi_osc_ucx_win_attach
- ompi_osc_ucx_win_detach
- ompi_osc_ucx_free
1
2
3
4
5
6
7
8
9
10
11 #include "ompi_config.h"
12
13 #include "opal/util/printf.h"
14
15 #include "ompi/mca/osc/osc.h"
16 #include "ompi/mca/osc/base/base.h"
17 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
18 #include "opal/mca/common/ucx/common_ucx.h"
19
20 #include "osc_ucx.h"
21 #include "osc_ucx_request.h"
22
23 #define memcpy_off(_dst, _src, _len, _off) \
24 memcpy(((char*)(_dst)) + (_off), _src, _len); \
25 (_off) += (_len);
26
27 static int component_open(void);
28 static int component_register(void);
29 static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
30 static int component_finalize(void);
31 static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
32 struct ompi_communicator_t *comm, struct opal_info_t *info, int flavor);
33 static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
34 struct ompi_communicator_t *comm, struct opal_info_t *info,
35 int flavor, int *model);
36 static void ompi_osc_ucx_unregister_progress(void);
37
38 ompi_osc_ucx_component_t mca_osc_ucx_component = {
39 {
40 .osc_version = {
41 OMPI_OSC_BASE_VERSION_3_0_0,
42 .mca_component_name = "ucx",
43 MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
44 OMPI_RELEASE_VERSION),
45 .mca_open_component = component_open,
46 .mca_register_component_params = component_register,
47 },
48 .osc_data = {
49
50 MCA_BASE_METADATA_PARAM_NONE
51 },
52 .osc_init = component_init,
53 .osc_query = component_query,
54 .osc_select = component_select,
55 .osc_finalize = component_finalize,
56 },
57 .wpool = NULL,
58 .env_initialized = false,
59 .num_incomplete_req_ops = 0,
60 .num_modules = 0
61 };
62
63 ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
64 {
65 .osc_win_attach = ompi_osc_ucx_win_attach,
66 .osc_win_detach = ompi_osc_ucx_win_detach,
67 .osc_free = ompi_osc_ucx_free,
68
69 .osc_put = ompi_osc_ucx_put,
70 .osc_get = ompi_osc_ucx_get,
71 .osc_accumulate = ompi_osc_ucx_accumulate,
72 .osc_compare_and_swap = ompi_osc_ucx_compare_and_swap,
73 .osc_fetch_and_op = ompi_osc_ucx_fetch_and_op,
74 .osc_get_accumulate = ompi_osc_ucx_get_accumulate,
75
76 .osc_rput = ompi_osc_ucx_rput,
77 .osc_rget = ompi_osc_ucx_rget,
78 .osc_raccumulate = ompi_osc_ucx_raccumulate,
79 .osc_rget_accumulate = ompi_osc_ucx_rget_accumulate,
80
81 .osc_fence = ompi_osc_ucx_fence,
82
83 .osc_start = ompi_osc_ucx_start,
84 .osc_complete = ompi_osc_ucx_complete,
85 .osc_post = ompi_osc_ucx_post,
86 .osc_wait = ompi_osc_ucx_wait,
87 .osc_test = ompi_osc_ucx_test,
88
89 .osc_lock = ompi_osc_ucx_lock,
90 .osc_unlock = ompi_osc_ucx_unlock,
91 .osc_lock_all = ompi_osc_ucx_lock_all,
92 .osc_unlock_all = ompi_osc_ucx_unlock_all,
93
94 .osc_sync = ompi_osc_ucx_sync,
95 .osc_flush = ompi_osc_ucx_flush,
96 .osc_flush_all = ompi_osc_ucx_flush_all,
97 .osc_flush_local = ompi_osc_ucx_flush_local,
98 .osc_flush_local_all = ompi_osc_ucx_flush_local_all,
99 }
100 };
101
102 static int component_open(void) {
103 return OMPI_SUCCESS;
104 }
105
106 static int component_register(void) {
107 unsigned major = 0;
108 unsigned minor = 0;
109 unsigned release_number = 0;
110 char *description_str;
111
112 ucp_get_version(&major, &minor, &release_number);
113
114 mca_osc_ucx_component.priority = UCX_VERSION(major, minor, release_number) >= UCX_VERSION(1, 5, 0) ? 60 : 0;
115
116 opal_asprintf(&description_str, "Priority of the osc/ucx component (default: %d)",
117 mca_osc_ucx_component.priority);
118 (void) mca_base_component_var_register(&mca_osc_ucx_component.super.osc_version, "priority", description_str,
119 MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, OPAL_INFO_LVL_3,
120 MCA_BASE_VAR_SCOPE_GROUP, &mca_osc_ucx_component.priority);
121 free(description_str);
122
123 opal_common_ucx_mca_var_register(&mca_osc_ucx_component.super.osc_version);
124
125 return OMPI_SUCCESS;
126 }
127
128 static int progress_callback(void) {
129 if (mca_osc_ucx_component.wpool != NULL) {
130 opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
131 }
132 return 0;
133 }
134
135 static int component_init(bool enable_progress_threads, bool enable_mpi_threads) {
136 mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads;
137 mca_osc_ucx_component.wpool = opal_common_ucx_wpool_allocate();
138 opal_common_ucx_mca_register();
139 return OMPI_SUCCESS;
140 }
141
142 static int component_finalize(void) {
143 opal_common_ucx_mca_deregister();
144 opal_common_ucx_wpool_free(mca_osc_ucx_component.wpool);
145 return OMPI_SUCCESS;
146 }
147
148 static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
149 struct ompi_communicator_t *comm, struct opal_info_t *info, int flavor) {
150 if (MPI_WIN_FLAVOR_SHARED == flavor) return -1;
151 return mca_osc_ucx_component.priority;
152 }
153
154 static int exchange_len_info(void *my_info, size_t my_info_len, char **recv_info_ptr,
155 int **disps_ptr, void *metadata)
156 {
157 int ret = OMPI_SUCCESS;
158 struct ompi_communicator_t *comm = (struct ompi_communicator_t *)metadata;
159 int comm_size = ompi_comm_size(comm);
160 int lens[comm_size];
161 int total_len, i;
162
163 ret = comm->c_coll->coll_allgather(&my_info_len, 1, MPI_INT,
164 lens, 1, MPI_INT, comm,
165 comm->c_coll->coll_allgather_module);
166 if (OMPI_SUCCESS != ret) {
167 return ret;
168 }
169
170 total_len = 0;
171 (*disps_ptr) = (int *)calloc(comm_size, sizeof(int));
172 for (i = 0; i < comm_size; i++) {
173 (*disps_ptr)[i] = total_len;
174 total_len += lens[i];
175 }
176
177 (*recv_info_ptr) = (char *)calloc(total_len, sizeof(char));
178 ret = comm->c_coll->coll_allgatherv(my_info, my_info_len, MPI_BYTE,
179 (void *)(*recv_info_ptr), lens, (*disps_ptr), MPI_BYTE,
180 comm, comm->c_coll->coll_allgatherv_module);
181 if (OMPI_SUCCESS != ret) {
182 return ret;
183 }
184
185 return ret;
186 }
187
188 static void ompi_osc_ucx_unregister_progress()
189 {
190 int ret;
191
192 mca_osc_ucx_component.num_modules--;
193 OSC_UCX_ASSERT(mca_osc_ucx_component.num_modules >= 0);
194 if (0 == mca_osc_ucx_component.num_modules) {
195 ret = opal_progress_unregister(progress_callback);
196 if (OMPI_SUCCESS != ret) {
197 OSC_UCX_VERBOSE(1, "opal_progress_unregister failed: %d", ret);
198 }
199 }
200 }
201
202 static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit,
203 struct ompi_communicator_t *comm, struct opal_info_t *info,
204 int flavor, int *model) {
205 ompi_osc_ucx_module_t *module = NULL;
206 char *name = NULL;
207 long values[2];
208 int ret = OMPI_SUCCESS;
209
210 int i, comm_size = ompi_comm_size(comm);
211 bool env_initialized = false;
212 void *state_base = NULL;
213 opal_common_ucx_mem_type_t mem_type;
214 uint64_t zero = 0;
215 char *my_mem_addr;
216 int my_mem_addr_size;
217 void * my_info = NULL;
218 char *recv_buf = NULL;
219
220
221
222 if (flavor == MPI_WIN_FLAVOR_SHARED) {
223 return OMPI_ERR_NOT_SUPPORTED;
224 }
225
226 if (mca_osc_ucx_component.env_initialized == false) {
227
228 OBJ_CONSTRUCT(&mca_osc_ucx_component.requests, opal_free_list_t);
229 ret = opal_free_list_init (&mca_osc_ucx_component.requests,
230 sizeof(ompi_osc_ucx_request_t),
231 opal_cache_line_size,
232 OBJ_CLASS(ompi_osc_ucx_request_t),
233 0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL);
234 if (OMPI_SUCCESS != ret) {
235 OSC_UCX_VERBOSE(1, "opal_free_list_init failed: %d", ret);
236 goto error;
237 }
238
239 ret = opal_common_ucx_wpool_init(mca_osc_ucx_component.wpool,
240 ompi_proc_world_size(),
241 mca_osc_ucx_component.enable_mpi_threads);
242 if (OMPI_SUCCESS != ret) {
243 OSC_UCX_VERBOSE(1, "opal_common_ucx_wpool_init failed: %d", ret);
244 goto error;
245 }
246
247 mca_osc_ucx_component.env_initialized = true;
248 env_initialized = true;
249 }
250
251
252 module = (ompi_osc_ucx_module_t *)calloc(1, sizeof(ompi_osc_ucx_module_t));
253 if (module == NULL) {
254 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
255 goto error_nomem;
256 }
257
258 mca_osc_ucx_component.num_modules++;
259
260
261 memcpy(module, &ompi_osc_ucx_module_template, sizeof(ompi_osc_base_module_t));
262
263 ret = ompi_comm_dup(comm, &module->comm);
264 if (ret != OMPI_SUCCESS) {
265 goto error;
266 }
267
268 *model = MPI_WIN_UNIFIED;
269 opal_asprintf(&name, "ucx window %d", ompi_comm_get_cid(module->comm));
270 ompi_win_set_name(win, name);
271 free(name);
272
273 module->flavor = flavor;
274 module->size = size;
275
276
277
278 values[0] = disp_unit;
279 values[1] = -disp_unit;
280
281 ret = module->comm->c_coll->coll_allreduce(MPI_IN_PLACE, values, 2, MPI_LONG,
282 MPI_MIN, module->comm,
283 module->comm->c_coll->coll_allreduce_module);
284 if (OMPI_SUCCESS != ret) {
285 goto error;
286 }
287
288 if (values[0] == -values[1]) {
289 module->disp_unit = disp_unit;
290 } else {
291 module->disp_unit = -1;
292 module->disp_units = calloc(comm_size, sizeof(int));
293 if (module->disp_units == NULL) {
294 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
295 goto error;
296 }
297
298 ret = module->comm->c_coll->coll_allgather(&disp_unit, 1, MPI_INT,
299 module->disp_units, 1, MPI_INT,
300 module->comm,
301 module->comm->c_coll->coll_allgather_module);
302 if (OMPI_SUCCESS != ret) {
303 goto error;
304 }
305 }
306
307 ret = opal_common_ucx_wpctx_create(mca_osc_ucx_component.wpool, comm_size,
308 &exchange_len_info, (void *)module->comm,
309 &module->ctx);
310 if (OMPI_SUCCESS != ret) {
311 goto error;
312 }
313
314 if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) {
315 switch (flavor) {
316 case MPI_WIN_FLAVOR_ALLOCATE:
317 mem_type = OPAL_COMMON_UCX_MEM_ALLOCATE_MAP;
318 break;
319 case MPI_WIN_FLAVOR_CREATE:
320 mem_type = OPAL_COMMON_UCX_MEM_MAP;
321 break;
322 }
323
324 ret = opal_common_ucx_wpmem_create(module->ctx, base, size,
325 mem_type, &exchange_len_info,
326 (void *)module->comm,
327 &my_mem_addr, &my_mem_addr_size,
328 &module->mem);
329 if (ret != OMPI_SUCCESS) {
330 goto error;
331 }
332
333 }
334
335 state_base = (void *)&(module->state);
336 ret = opal_common_ucx_wpmem_create(module->ctx, &state_base,
337 sizeof(ompi_osc_ucx_state_t),
338 OPAL_COMMON_UCX_MEM_MAP, &exchange_len_info,
339 (void *)module->comm,
340 &my_mem_addr, &my_mem_addr_size,
341 &module->state_mem);
342 if (ret != OMPI_SUCCESS) {
343 goto error;
344 }
345
346
347 my_info = malloc(2 * sizeof(uint64_t));
348 if (my_info == NULL) {
349 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
350 goto error;
351 }
352
353 if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) {
354 memcpy(my_info, base, sizeof(uint64_t));
355 } else {
356 memcpy(my_info, &zero, sizeof(uint64_t));
357 }
358 memcpy((char*)my_info + sizeof(uint64_t), &state_base, sizeof(uint64_t));
359
360 recv_buf = (char *)calloc(comm_size, 2 * sizeof(uint64_t));
361 ret = comm->c_coll->coll_allgather((void *)my_info, 2 * sizeof(uint64_t),
362 MPI_BYTE, recv_buf, 2 * sizeof(uint64_t),
363 MPI_BYTE, comm, comm->c_coll->coll_allgather_module);
364 if (ret != OMPI_SUCCESS) {
365 goto error;
366 }
367
368 module->addrs = calloc(comm_size, sizeof(uint64_t));
369 module->state_addrs = calloc(comm_size, sizeof(uint64_t));
370 for (i = 0; i < comm_size; i++) {
371 memcpy(&(module->addrs[i]), recv_buf + i * 2 * sizeof(uint64_t), sizeof(uint64_t));
372 memcpy(&(module->state_addrs[i]), recv_buf + i * 2 * sizeof(uint64_t) + sizeof(uint64_t), sizeof(uint64_t));
373 }
374 free(recv_buf);
375
376
377 module->state.lock = TARGET_LOCK_UNLOCKED;
378 module->state.post_index = 0;
379 memset((void *)module->state.post_state, 0, sizeof(uint64_t) * OMPI_OSC_UCX_POST_PEER_MAX);
380 module->state.complete_count = 0;
381 module->state.req_flag = 0;
382 module->state.acc_lock = TARGET_LOCK_UNLOCKED;
383 module->state.dynamic_win_count = 0;
384 for (i = 0; i < OMPI_OSC_UCX_ATTACH_MAX; i++) {
385 module->local_dynamic_win_info[i].refcnt = 0;
386 }
387 module->epoch_type.access = NONE_EPOCH;
388 module->epoch_type.exposure = NONE_EPOCH;
389 module->lock_count = 0;
390 module->post_count = 0;
391 module->start_group = NULL;
392 module->post_group = NULL;
393 OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
394 OBJ_CONSTRUCT(&module->pending_posts, opal_list_t);
395 module->start_grp_ranks = NULL;
396 module->lock_all_is_nocheck = false;
397
398 ret = opal_hash_table_init(&module->outstanding_locks, comm_size);
399 if (ret != OPAL_SUCCESS) {
400 goto error;
401 }
402
403 win->w_osc_module = &module->super;
404
405
406
407 ret = module->comm->c_coll->coll_barrier(module->comm,
408 module->comm->c_coll->coll_barrier_module);
409 if (ret != OMPI_SUCCESS) {
410 goto error;
411 }
412
413 OSC_UCX_ASSERT(mca_osc_ucx_component.num_modules > 0);
414 if (1 == mca_osc_ucx_component.num_modules) {
415 ret = opal_progress_register(progress_callback);
416 if (OMPI_SUCCESS != ret) {
417 OSC_UCX_VERBOSE(1, "opal_progress_register failed: %d", ret);
418 goto error;
419 }
420 }
421 return ret;
422
423 error:
424 if (module->disp_units) free(module->disp_units);
425 if (module->comm) ompi_comm_free(&module->comm);
426 if (module) {
427 free(module);
428 ompi_osc_ucx_unregister_progress();
429 }
430
431 error_nomem:
432 if (env_initialized == true) {
433 opal_common_ucx_wpool_finalize(mca_osc_ucx_component.wpool);
434 OBJ_DESTRUCT(&mca_osc_ucx_component.requests);
435 mca_osc_ucx_component.env_initialized = false;
436 }
437 return ret;
438 }
439
440 int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins,
441 int min_index, int max_index,
442 uint64_t base, size_t len, int *insert) {
443 int mid_index = (max_index + min_index) >> 1;
444
445 if (min_index > max_index) {
446 (*insert) = min_index;
447 return -1;
448 }
449
450 if (dynamic_wins[mid_index].base > base) {
451 return ompi_osc_find_attached_region_position(dynamic_wins, min_index, mid_index-1,
452 base, len, insert);
453 } else if (base + len < dynamic_wins[mid_index].base + dynamic_wins[mid_index].size) {
454 return mid_index;
455 } else {
456 return ompi_osc_find_attached_region_position(dynamic_wins, mid_index+1, max_index,
457 base, len, insert);
458 }
459 }
460
461 int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) {
462 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
463 int insert_index = -1, contain_index;
464 int ret = OMPI_SUCCESS;
465
466 if (module->state.dynamic_win_count >= OMPI_OSC_UCX_ATTACH_MAX) {
467 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
468 }
469
470 if (module->state.dynamic_win_count > 0) {
471 contain_index = ompi_osc_find_attached_region_position((ompi_osc_dynamic_win_info_t *)module->state.dynamic_wins,
472 0, (int)module->state.dynamic_win_count,
473 (uint64_t)base, len, &insert_index);
474 if (contain_index >= 0) {
475 module->local_dynamic_win_info[contain_index].refcnt++;
476 return ret;
477 }
478
479 assert(insert_index >= 0 && (uint64_t)insert_index < module->state.dynamic_win_count);
480
481 memmove((void *)&module->local_dynamic_win_info[insert_index+1],
482 (void *)&module->local_dynamic_win_info[insert_index],
483 (OMPI_OSC_UCX_ATTACH_MAX - (insert_index + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t));
484 memmove((void *)&module->state.dynamic_wins[insert_index+1],
485 (void *)&module->state.dynamic_wins[insert_index],
486 (OMPI_OSC_UCX_ATTACH_MAX - (insert_index + 1)) * sizeof(ompi_osc_dynamic_win_info_t));
487 } else {
488 insert_index = 0;
489 }
490
491 ret = opal_common_ucx_wpmem_create(module->ctx, &base, len,
492 OPAL_COMMON_UCX_MEM_MAP, &exchange_len_info,
493 (void *)module->comm,
494 &(module->local_dynamic_win_info[insert_index].my_mem_addr),
495 &(module->local_dynamic_win_info[insert_index].my_mem_addr_size),
496 &(module->local_dynamic_win_info[insert_index].mem));
497 if (ret != OMPI_SUCCESS) {
498 return ret;
499 }
500
501 module->state.dynamic_wins[insert_index].base = (uint64_t)base;
502 module->state.dynamic_wins[insert_index].size = len;
503
504 memcpy((char *)(module->state.dynamic_wins[insert_index].mem_addr),
505 (char *)module->local_dynamic_win_info[insert_index].my_mem_addr,
506 module->local_dynamic_win_info[insert_index].my_mem_addr_size);
507
508 module->local_dynamic_win_info[insert_index].refcnt++;
509 module->state.dynamic_win_count++;
510
511 return ret;
512 }
513
514 int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
515 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
516 int insert, contain;
517 int ret = OMPI_SUCCESS;
518
519 assert(module->state.dynamic_win_count > 0);
520
521 contain = ompi_osc_find_attached_region_position((ompi_osc_dynamic_win_info_t *)module->state.dynamic_wins,
522 0, (int)module->state.dynamic_win_count,
523 (uint64_t)base, 1, &insert);
524 assert(contain >= 0 && (uint64_t)contain < module->state.dynamic_win_count);
525
526
527 if (contain < 0) {
528 return OMPI_SUCCESS;
529 }
530
531 module->local_dynamic_win_info[contain].refcnt--;
532 if (module->local_dynamic_win_info[contain].refcnt == 0) {
533 ret = opal_common_ucx_wpmem_free(module->local_dynamic_win_info[contain].mem);
534 memmove((void *)&(module->local_dynamic_win_info[contain]),
535 (void *)&(module->local_dynamic_win_info[contain+1]),
536 (OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t));
537 memmove((void *)&module->state.dynamic_wins[contain],
538 (void *)&module->state.dynamic_wins[contain+1],
539 (OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_dynamic_win_info_t));
540
541 module->state.dynamic_win_count--;
542 }
543
544 return ret;
545 }
546
547 int ompi_osc_ucx_free(struct ompi_win_t *win) {
548 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
549 int ret;
550
551 assert(module->lock_count == 0);
552 assert(opal_list_is_empty(&module->pending_posts) == true);
553 OBJ_DESTRUCT(&module->outstanding_locks);
554 OBJ_DESTRUCT(&module->pending_posts);
555
556 opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
557
558 ret = module->comm->c_coll->coll_barrier(module->comm,
559 module->comm->c_coll->coll_barrier_module);
560 if (ret != OMPI_SUCCESS) {
561 return ret;
562 }
563
564 free(module->addrs);
565 free(module->state_addrs);
566
567 ret = opal_common_ucx_wpmem_free(module->state_mem);
568 if (ret != OMPI_SUCCESS) {
569 return ret;
570 }
571
572 ret = opal_common_ucx_wpmem_free(module->mem);
573 if (ret != OMPI_SUCCESS) {
574 return ret;
575 }
576
577 opal_common_ucx_wpctx_release(module->ctx);
578
579 opal_common_ucx_wpool_finalize(mca_osc_ucx_component.wpool);
580
581 if (module->disp_units) free(module->disp_units);
582 ompi_comm_free(&module->comm);
583
584 free(module);
585 ompi_osc_ucx_unregister_progress();
586
587 return ret;
588 }