This source file includes following definitions.
- _winfo_create
- _winfo_reset
- _winfo_release
- opal_common_ucx_wpool_allocate
- opal_common_ucx_wpool_free
- opal_common_ucx_wpool_init
- opal_common_ucx_wpool_finalize
- opal_common_ucx_wpool_progress
- _wpool_list_put
- _wpool_list_get
- _wpool_get_idle
- _wpool_add_active
- opal_common_ucx_wpctx_create
- opal_common_ucx_wpctx_release
- _common_ucx_wpctx_free
- _common_ucx_wpctx_append
- _common_ucx_wpctx_remove
- opal_common_ucx_wpmem_create
- opal_common_ucx_wpmem_free
- _comm_ucx_wpmem_map
- _common_ucx_wpmem_free
- _common_ucx_wpmem_signup
- _common_ucx_mem_signout
- _common_ucx_tls_init
- _tlocal_get_tls
- _tlocal_cleanup
- _common_ucx_tls_cleanup
- _tlocal_tls_ctxtbl_extend
- _tlocal_tls_memtbl_extend
- _tlocal_ctx_search
- _tlocal_ctx_record_cleanup
- _tlocal_add_ctx
- _tlocal_ctx_connect
- _tlocal_search_mem
- _tlocal_mem_record_cleanup
- _tlocal_add_mem
- _tlocal_mem_create_rkey
- opal_common_ucx_tlocal_fetch_spath
- opal_common_ucx_winfo_flush
- opal_common_ucx_wpmem_flush
- opal_common_ucx_wpmem_fence
- opal_common_ucx_req_init
- opal_common_ucx_req_completion
1 #include "opal_config.h"
2
3 #include "common_ucx.h"
4 #include "common_ucx_wpool.h"
5 #include "common_ucx_wpool_int.h"
6 #include "opal/mca/base/mca_base_var.h"
7 #include "opal/mca/base/mca_base_framework.h"
8 #include "opal/mca/pmix/pmix.h"
9 #include "opal/memoryhooks/memory.h"
10
11 #include <ucm/api/ucm.h>
12
13
14
15
16
17
18
19
20
21
22 OBJ_CLASS_INSTANCE(_winfo_list_item_t, opal_list_item_t, NULL, NULL);
23 OBJ_CLASS_INSTANCE(_ctx_record_list_item_t, opal_list_item_t, NULL, NULL);
24 OBJ_CLASS_INSTANCE(_mem_record_list_item_t, opal_list_item_t, NULL, NULL);
25 OBJ_CLASS_INSTANCE(_tlocal_table_t, opal_list_item_t, NULL, NULL);
26
27
28 #ifdef OPAL_COMMON_UCX_WPOOL_DBG
29 __thread FILE *tls_pf = NULL;
30 __thread int initialized = 0;
31 #endif
32
33
34
35
36
37 static opal_common_ucx_winfo_t *
38 _winfo_create(opal_common_ucx_wpool_t *wpool)
39 {
40 ucp_worker_params_t worker_params;
41 ucp_worker_h worker;
42 ucs_status_t status;
43 opal_common_ucx_winfo_t *winfo = NULL;
44
45 memset(&worker_params, 0, sizeof(worker_params));
46 worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
47 worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
48 status = ucp_worker_create(wpool->ucp_ctx, &worker_params, &worker);
49 if (UCS_OK != status) {
50 MCA_COMMON_UCX_ERROR("ucp_worker_create failed: %d", status);
51 goto exit;
52 }
53
54 winfo = calloc(1, sizeof(*winfo));
55 if (NULL == winfo) {
56 MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info");
57 goto release_worker;
58 }
59
60 OBJ_CONSTRUCT(&winfo->mutex, opal_recursive_mutex_t);
61 winfo->worker = worker;
62 winfo->endpoints = NULL;
63 winfo->comm_size = 0;
64 winfo->released = 0;
65 winfo->inflight_ops = NULL;
66 winfo->global_inflight_ops = 0;
67 winfo->inflight_req = UCS_OK;
68
69 return winfo;
70
71 release_worker:
72 ucp_worker_destroy(worker);
73 exit:
74 return winfo;
75 }
76
77 static void
78 _winfo_reset(opal_common_ucx_winfo_t *winfo)
79 {
80 if (winfo->inflight_req != UCS_OK) {
81 opal_common_ucx_wait_request_mt(winfo->inflight_req,
82 "opal_common_ucx_flush");
83 winfo->inflight_req = UCS_OK;
84 }
85
86 assert(winfo->global_inflight_ops == 0);
87
88 if(winfo->comm_size != 0) {
89 size_t i;
90 for (i = 0; i < winfo->comm_size; i++) {
91 if (NULL != winfo->endpoints[i]){
92 ucp_ep_destroy(winfo->endpoints[i]);
93 }
94 assert(winfo->inflight_ops[i] == 0);
95 }
96 free(winfo->endpoints);
97 free(winfo->inflight_ops);
98 }
99 winfo->endpoints = NULL;
100 winfo->comm_size = 0;
101 winfo->released = 0;
102 }
103
104 static void
105 _winfo_release(opal_common_ucx_winfo_t *winfo)
106 {
107 OBJ_DESTRUCT(&winfo->mutex);
108 ucp_worker_destroy(winfo->worker);
109 free(winfo);
110 }
111
112
113
114
115
116 OPAL_DECLSPEC opal_common_ucx_wpool_t *
117 opal_common_ucx_wpool_allocate(void)
118 {
119 opal_common_ucx_wpool_t *ptr = calloc(1, sizeof(opal_common_ucx_wpool_t));
120 ptr->refcnt = 0;
121
122 return ptr;
123 }
124
125 OPAL_DECLSPEC void
126 opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)
127 {
128 assert(wpool->refcnt == 0);
129 free(wpool);
130 }
131
132 OPAL_DECLSPEC int
133 opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
134 int proc_world_size, bool enable_mt)
135 {
136 ucp_config_t *config = NULL;
137 ucp_params_t context_params;
138 opal_common_ucx_winfo_t *winfo;
139 ucs_status_t status;
140 int rc = OPAL_SUCCESS;
141
142 wpool->refcnt++;
143
144 if (1 < wpool->refcnt) {
145 return rc;
146 }
147
148 OBJ_CONSTRUCT(&wpool->mutex, opal_recursive_mutex_t);
149 OBJ_CONSTRUCT(&wpool->tls_list, opal_list_t);
150
151 status = ucp_config_read("MPI", NULL, &config);
152 if (UCS_OK != status) {
153 MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
154 return OPAL_ERROR;
155 }
156
157
158 memset(&context_params, 0, sizeof(context_params));
159 context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
160 UCP_PARAM_FIELD_MT_WORKERS_SHARED |
161 UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
162 UCP_PARAM_FIELD_REQUEST_INIT |
163 UCP_PARAM_FIELD_REQUEST_SIZE;
164 context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 |
165 UCP_FEATURE_AMO64;
166 context_params.mt_workers_shared = (enable_mt ? 1 : 0);
167 context_params.estimated_num_eps = proc_world_size;
168 context_params.request_init = opal_common_ucx_req_init;
169 context_params.request_size = sizeof(opal_common_ucx_request_t);
170
171 status = ucp_init(&context_params, config, &wpool->ucp_ctx);
172 ucp_config_release(config);
173 if (UCS_OK != status) {
174 MCA_COMMON_UCX_VERBOSE(1, "ucp_init failed: %d", status);
175 rc = OPAL_ERROR;
176 goto err_ucp_init;
177 }
178
179
180 OBJ_CONSTRUCT(&wpool->idle_workers, opal_list_t);
181 OBJ_CONSTRUCT(&wpool->active_workers, opal_list_t);
182
183 winfo = _winfo_create(wpool);
184 if (NULL == winfo) {
185 MCA_COMMON_UCX_ERROR("Failed to create receive worker");
186 rc = OPAL_ERROR;
187 goto err_worker_create;
188 }
189 wpool->dflt_worker = winfo->worker;
190
191 status = ucp_worker_get_address(wpool->dflt_worker,
192 &wpool->recv_waddr, &wpool->recv_waddr_len);
193 if (status != UCS_OK) {
194 MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
195 rc = OPAL_ERROR;
196 goto err_get_addr;
197 }
198
199 rc = _wpool_list_put(wpool, &wpool->idle_workers, winfo);
200 if (rc) {
201 goto err_wpool_add;
202 }
203
204 opal_tsd_key_create(&wpool->tls_key, _tlocal_cleanup);
205
206 return rc;
207
208 err_wpool_add:
209 free(wpool->recv_waddr);
210 err_get_addr:
211 if (NULL != wpool->dflt_worker) {
212 ucp_worker_destroy(wpool->dflt_worker);
213 }
214 err_worker_create:
215 ucp_cleanup(wpool->ucp_ctx);
216 err_ucp_init:
217 return rc;
218 }
219
220 OPAL_DECLSPEC
221 void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
222 {
223 _tlocal_table_t *tls_item = NULL, *tls_next;
224
225 wpool->refcnt--;
226 if (wpool->refcnt > 0) {
227 return;
228 }
229
230
231
232 opal_tsd_key_delete(wpool->tls_key);
233
234
235 OPAL_LIST_FOREACH_SAFE(tls_item, tls_next, &wpool->tls_list,
236 _tlocal_table_t) {
237 opal_list_remove_item(&wpool->tls_list, &tls_item->super);
238 _common_ucx_tls_cleanup(tls_item);
239 }
240 OBJ_DESTRUCT(&wpool->tls_list);
241
242
243
244 ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr);
245
246
247 if (!opal_list_is_empty(&wpool->idle_workers)) {
248 _winfo_list_item_t *item, *next;
249 OPAL_LIST_FOREACH_SAFE(item, next, &wpool->idle_workers,
250 _winfo_list_item_t) {
251 opal_list_remove_item(&wpool->idle_workers, &item->super);
252 _winfo_release(item->ptr);
253 OBJ_RELEASE(item);
254 }
255 }
256 OBJ_DESTRUCT(&wpool->idle_workers);
257
258
259
260 if (!opal_list_is_empty(&wpool->active_workers)) {
261 _winfo_list_item_t *item, *next;
262 OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers,
263 _winfo_list_item_t) {
264 opal_list_remove_item(&wpool->active_workers, &item->super);
265 _winfo_reset(item->ptr);
266 _winfo_release(item->ptr);
267 OBJ_RELEASE(item);
268 }
269 }
270 OBJ_DESTRUCT(&wpool->active_workers);
271
272 OBJ_DESTRUCT(&wpool->mutex);
273 ucp_cleanup(wpool->ucp_ctx);
274 return;
275 }
276
277 OPAL_DECLSPEC void
278 opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
279 {
280 _winfo_list_item_t *item = NULL, *next = NULL;
281
282
283
284
285 if (!opal_mutex_trylock (&wpool->mutex)) {
286 OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers,
287 _winfo_list_item_t) {
288 opal_common_ucx_winfo_t *winfo = item->ptr;
289 opal_mutex_lock(&winfo->mutex);
290 if( OPAL_UNLIKELY(winfo->released) ) {
291
292 opal_list_remove_item(&wpool->active_workers, &item->super);
293 _winfo_reset(winfo);
294 opal_list_append(&wpool->idle_workers, &item->super);
295 } else {
296
297 while(ucp_worker_progress(winfo->worker));
298 }
299 opal_mutex_unlock(&winfo->mutex);
300 }
301 opal_mutex_unlock(&wpool->mutex);
302 }
303 }
304
305 static int
306 _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
307 opal_common_ucx_winfo_t *winfo)
308 {
309 _winfo_list_item_t *item;
310
311 item = OBJ_NEW(_winfo_list_item_t);
312 if (NULL == item) {
313 MCA_COMMON_UCX_ERROR("Cannot allocate memory for winfo list item");
314 return OPAL_ERR_OUT_OF_RESOURCE;
315 }
316 item->ptr = winfo;
317
318 opal_mutex_lock(&wpool->mutex);
319 opal_list_append(list, &item->super);
320 opal_mutex_unlock(&wpool->mutex);
321
322 return OPAL_SUCCESS;
323 }
324
325 static opal_common_ucx_winfo_t*
326 _wpool_list_get(opal_common_ucx_wpool_t *wpool, opal_list_t *list)
327 {
328 opal_common_ucx_winfo_t *winfo = NULL;
329 _winfo_list_item_t *item = NULL;
330
331 opal_mutex_lock(&wpool->mutex);
332 if (!opal_list_is_empty(list)) {
333 item = (_winfo_list_item_t *)opal_list_get_first(list);
334 opal_list_remove_item(list, &item->super);
335 }
336 opal_mutex_unlock(&wpool->mutex);
337
338 if (item != NULL) {
339 winfo = item->ptr;
340 OBJ_RELEASE(item);
341 }
342 return winfo;
343 }
344
345 static opal_common_ucx_winfo_t *
346 _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size)
347 {
348 opal_common_ucx_winfo_t *winfo;
349 winfo = _wpool_list_get(wpool, &wpool->idle_workers);
350 if (!winfo) {
351 winfo = _winfo_create(wpool);
352 if (!winfo) {
353 MCA_COMMON_UCX_ERROR("Failed to allocate worker info structure");
354 return NULL;
355 }
356 }
357
358 winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h));
359 winfo->inflight_ops = calloc(comm_size, sizeof(short));
360 winfo->comm_size = comm_size;
361 return winfo;
362 }
363
364 static int
365 _wpool_add_active(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo)
366 {
367 return _wpool_list_put(wpool, &wpool->active_workers, winfo);
368 }
369
370
371
372
373
374 OPAL_DECLSPEC int
375 opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
376 opal_common_ucx_exchange_func_t exchange_func,
377 void *exchange_metadata,
378 opal_common_ucx_ctx_t **ctx_ptr)
379 {
380 opal_common_ucx_ctx_t *ctx = calloc(1, sizeof(*ctx));
381 int ret = OPAL_SUCCESS;
382
383 OBJ_CONSTRUCT(&ctx->mutex, opal_recursive_mutex_t);
384 OBJ_CONSTRUCT(&ctx->tls_workers, opal_list_t);
385 ctx->released = 0;
386 ctx->refcntr = 1;
387 ctx->wpool = wpool;
388 ctx->comm_size = comm_size;
389
390 ctx->recv_worker_addrs = NULL;
391 ctx->recv_worker_displs = NULL;
392 ret = exchange_func(wpool->recv_waddr, wpool->recv_waddr_len,
393 &ctx->recv_worker_addrs,
394 &ctx->recv_worker_displs, exchange_metadata);
395 if (ret != OPAL_SUCCESS) {
396 goto error;
397 }
398
399 (*ctx_ptr) = ctx;
400 return ret;
401 error:
402 OBJ_DESTRUCT(&ctx->mutex);
403 OBJ_DESTRUCT(&ctx->tls_workers);
404 free(ctx);
405 (*ctx_ptr) = NULL;
406 return ret;
407 }
408
409 OPAL_DECLSPEC void
410 opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx)
411 {
412 int my_refcntr = -1;
413
414
415
416
417
418
419 ctx->released = 1;
420
421
422 my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
423
424
425 opal_atomic_mb();
426
427
428
429 if (0 == my_refcntr) {
430 _common_ucx_wpctx_free(ctx);
431 }
432 }
433
434
435
436 static void
437 _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx)
438 {
439 free(ctx->recv_worker_addrs);
440 free(ctx->recv_worker_displs);
441 OBJ_DESTRUCT(&ctx->mutex);
442 OBJ_DESTRUCT(&ctx->tls_workers);
443 free(ctx);
444 }
445
446
447 static int
448 _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
449 opal_common_ucx_winfo_t *winfo)
450 {
451 _ctx_record_list_item_t *item = OBJ_NEW(_ctx_record_list_item_t);
452 if (NULL == item) {
453 return OPAL_ERR_OUT_OF_RESOURCE;
454 }
455
456 OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, 1);
457
458
459 item->ptr = winfo;
460 opal_mutex_lock(&ctx->mutex);
461 opal_list_append(&ctx->tls_workers, &item->super);
462 opal_mutex_unlock(&ctx->mutex);
463
464 return OPAL_SUCCESS;
465 }
466
467
468 static void
469 _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
470 opal_common_ucx_winfo_t *winfo)
471 {
472 _ctx_record_list_item_t *item = NULL, *next;
473 int my_refcntr = -1;
474
475 opal_mutex_lock(&ctx->mutex);
476
477 OPAL_LIST_FOREACH_SAFE(item, next, &ctx->tls_workers,
478 _ctx_record_list_item_t) {
479 if (winfo == item->ptr) {
480 opal_list_remove_item(&ctx->tls_workers, &item->super);
481 opal_mutex_lock(&winfo->mutex);
482 winfo->released = 1;
483 opal_mutex_unlock(&winfo->mutex);
484 OBJ_RELEASE(item);
485 break;
486 }
487 }
488 opal_mutex_unlock(&ctx->mutex);
489
490
491 opal_atomic_rmb();
492
493
494 my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
495
496
497 opal_atomic_wmb();
498
499 if (0 == my_refcntr) {
500
501
502 _common_ucx_wpctx_free(ctx);
503 }
504 return;
505 }
506
507
508
509
510
511 OPAL_DECLSPEC
512 int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
513 void **mem_base, size_t mem_size,
514 opal_common_ucx_mem_type_t mem_type,
515 opal_common_ucx_exchange_func_t exchange_func,
516 void *exchange_metadata,
517 char **my_mem_addr,
518 int *my_mem_addr_size,
519 opal_common_ucx_wpmem_t **mem_ptr)
520 {
521 opal_common_ucx_wpmem_t *mem = calloc(1, sizeof(*mem));
522 void *rkey_addr = NULL;
523 size_t rkey_addr_len;
524 ucs_status_t status;
525 int ret = OPAL_SUCCESS;
526
527 mem->released = 0;
528 mem->refcntr = 1;
529 mem->ctx = ctx;
530 mem->mem_addrs = NULL;
531 mem->mem_displs = NULL;
532
533 ret = _comm_ucx_wpmem_map(ctx->wpool, mem_base, mem_size, &mem->memh,
534 mem_type);
535 if (ret != OPAL_SUCCESS) {
536 MCA_COMMON_UCX_VERBOSE(1, "_comm_ucx_mem_map failed: %d", ret);
537 goto error_mem_map;
538 }
539
540 status = ucp_rkey_pack(ctx->wpool->ucp_ctx, mem->memh,
541 &rkey_addr, &rkey_addr_len);
542 if (status != UCS_OK) {
543 MCA_COMMON_UCX_VERBOSE(1, "ucp_rkey_pack failed: %d", status);
544 ret = OPAL_ERROR;
545 goto error_rkey_pack;
546 }
547
548 ret = exchange_func(rkey_addr, rkey_addr_len,
549 &mem->mem_addrs, &mem->mem_displs, exchange_metadata);
550 if (ret != OPAL_SUCCESS) {
551 goto error_rkey_pack;
552 }
553
554
555
556 opal_tsd_key_create(&mem->mem_tls_key, NULL);
557
558 (*mem_ptr) = mem;
559 (*my_mem_addr) = rkey_addr;
560 (*my_mem_addr_size) = rkey_addr_len;
561
562 return ret;
563
564 error_rkey_pack:
565 ucp_mem_unmap(ctx->wpool->ucp_ctx, mem->memh);
566 error_mem_map:
567 free(mem);
568 (*mem_ptr) = NULL;
569 return ret;
570 }
571
572 OPAL_DECLSPEC int
573 opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
574 {
575 int my_refcntr = -1;
576
577
578 mem->released = 1;
579
580
581 my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
582
583
584 opal_atomic_wmb();
585
586 if (0 == my_refcntr) {
587 _common_ucx_wpmem_free(mem);
588 }
589 return OPAL_SUCCESS;
590 }
591
592
593 static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
594 void **base, size_t size, ucp_mem_h *memh_ptr,
595 opal_common_ucx_mem_type_t mem_type)
596 {
597 ucp_mem_map_params_t mem_params;
598 ucp_mem_attr_t mem_attrs;
599 ucs_status_t status;
600 int ret = OPAL_SUCCESS;
601
602 memset(&mem_params, 0, sizeof(ucp_mem_map_params_t));
603 mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
604 UCP_MEM_MAP_PARAM_FIELD_LENGTH |
605 UCP_MEM_MAP_PARAM_FIELD_FLAGS;
606 mem_params.length = size;
607 if (mem_type == OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
608 mem_params.address = NULL;
609 mem_params.flags = UCP_MEM_MAP_ALLOCATE;
610 } else {
611 mem_params.address = (*base);
612 }
613
614 status = ucp_mem_map(wpool->ucp_ctx, &mem_params, memh_ptr);
615 if (status != UCS_OK) {
616 MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_map failed: %d", status);
617 ret = OPAL_ERROR;
618 return ret;
619 }
620
621 mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH;
622 status = ucp_mem_query((*memh_ptr), &mem_attrs);
623 if (status != UCS_OK) {
624 MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_query failed: %d", status);
625 ret = OPAL_ERROR;
626 goto error;
627 }
628
629 assert(mem_attrs.length >= size);
630 if (mem_type != OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
631 assert(mem_attrs.address == (*base));
632 } else {
633 (*base) = mem_attrs.address;
634 }
635
636 return ret;
637 error:
638 ucp_mem_unmap(wpool->ucp_ctx, (*memh_ptr));
639 return ret;
640 }
641
642 static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
643 {
644 opal_tsd_key_delete(mem->mem_tls_key);
645 free(mem->mem_addrs);
646 free(mem->mem_displs);
647 ucp_mem_unmap(mem->ctx->wpool->ucp_ctx, mem->memh);
648 free(mem);
649 }
650
651 static int
652 _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem)
653 {
654
655 OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, 1);
656 return OPAL_SUCCESS;
657 }
658
659 static void
660 _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem)
661 {
662 int my_refcntr = -1;
663
664
665
666
667
668 opal_atomic_rmb();
669
670
671 my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
672
673
674 opal_atomic_wmb();
675
676 if (0 == my_refcntr) {
677 _common_ucx_wpmem_free(mem);
678 }
679
680 return;
681 }
682
683
684
685
686
687 static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool)
688 {
689 _tlocal_table_t *tls = OBJ_NEW(_tlocal_table_t);
690
691 if (tls == NULL) {
692
693 return NULL;
694 }
695
696 tls->ctx_tbl = NULL;
697 tls->ctx_tbl_size = 0;
698 tls->mem_tbl = NULL;
699 tls->mem_tbl_size = 0;
700
701
702
703 tls->wpool = wpool;
704 opal_mutex_lock(&wpool->mutex);
705 opal_list_append(&wpool->tls_list, &tls->super);
706 opal_mutex_unlock(&wpool->mutex);
707
708 if(_tlocal_tls_ctxtbl_extend(tls, 4)){
709 MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool context table");
710 return NULL;
711 }
712 if(_tlocal_tls_memtbl_extend(tls, 4)) {
713 MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool memory table");
714 return NULL;
715 }
716
717 opal_tsd_setspecific(wpool->tls_key, tls);
718
719 return tls;
720 }
721
722 static inline _tlocal_table_t *
723 _tlocal_get_tls(opal_common_ucx_wpool_t *wpool){
724 _tlocal_table_t *tls;
725 int rc = opal_tsd_getspecific(wpool->tls_key, (void**)&tls);
726
727 if (OPAL_SUCCESS != rc) {
728 return NULL;
729 }
730
731 if (OPAL_UNLIKELY(NULL == tls)) {
732 tls = _common_ucx_tls_init(wpool);
733 }
734 return tls;
735 }
736
737 static void _tlocal_cleanup(void *arg)
738 {
739 _tlocal_table_t *item = NULL, *next;
740 _tlocal_table_t *tls = (_tlocal_table_t *)arg;
741 opal_common_ucx_wpool_t *wpool = NULL;
742
743 if (NULL == tls) {
744 return;
745 }
746 wpool = tls->wpool;
747
748
749 tls->wpool = wpool;
750 opal_mutex_lock(&wpool->mutex);
751 OPAL_LIST_FOREACH_SAFE(item, next, &wpool->tls_list, _tlocal_table_t) {
752 if (item == tls) {
753 opal_list_remove_item(&wpool->tls_list, &item->super);
754 break;
755 }
756 }
757 opal_mutex_unlock(&wpool->mutex);
758 _common_ucx_tls_cleanup(tls);
759 }
760
761
762 static void _common_ucx_tls_cleanup(_tlocal_table_t *tls)
763 {
764 size_t i, size;
765
766
767 size = tls->mem_tbl_size;
768 for (i = 0; i < size; i++) {
769 if (NULL != tls->mem_tbl[i]->gmem){
770 _tlocal_mem_record_cleanup(tls->mem_tbl[i]);
771 }
772
773 free(tls->mem_tbl[i]);
774 }
775
776
777 size = tls->ctx_tbl_size;
778 for (i = 0; i < size; i++) {
779 if (NULL != tls->ctx_tbl[i]->gctx){
780 assert(tls->ctx_tbl[i]->refcnt == 0);
781 _tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
782 }
783 free(tls->ctx_tbl[i]);
784 }
785
786 opal_tsd_setspecific(tls->wpool->tls_key, NULL);
787
788 OBJ_RELEASE(tls);
789 return;
790 }
791
792 static int
793 _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append)
794 {
795 size_t i;
796 size_t newsize = (tbl->ctx_tbl_size + append);
797 tbl->ctx_tbl = realloc(tbl->ctx_tbl, newsize * sizeof(*tbl->ctx_tbl));
798 for (i = tbl->ctx_tbl_size; i < newsize; i++) {
799 tbl->ctx_tbl[i] = calloc(1, sizeof(*tbl->ctx_tbl[i]));
800 if (NULL == tbl->ctx_tbl[i]) {
801 return OPAL_ERR_OUT_OF_RESOURCE;
802 }
803
804 }
805 tbl->ctx_tbl_size = newsize;
806 return OPAL_SUCCESS;
807 }
808
809 static int
810 _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append)
811 {
812 size_t i;
813 size_t newsize = (tbl->mem_tbl_size + append);
814
815 tbl->mem_tbl = realloc(tbl->mem_tbl, newsize * sizeof(*tbl->mem_tbl));
816 for (i = tbl->mem_tbl_size; i < tbl->mem_tbl_size + append; i++) {
817 tbl->mem_tbl[i] = calloc(1, sizeof(*tbl->mem_tbl[i]));
818 if (NULL == tbl->mem_tbl[i]) {
819 return OPAL_ERR_OUT_OF_RESOURCE;
820 }
821 }
822 tbl->mem_tbl_size = newsize;
823 return OPAL_SUCCESS;
824 }
825
826
827 static inline _tlocal_ctx_t *
828 _tlocal_ctx_search(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
829 {
830 size_t i;
831 for(i=0; i<tls->ctx_tbl_size; i++) {
832 if (tls->ctx_tbl[i]->gctx == ctx){
833 return tls->ctx_tbl[i];
834 }
835 }
836 return NULL;
837 }
838
839 static int
840 _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec)
841 {
842 if (NULL == ctx_rec->gctx) {
843 return OPAL_SUCCESS;
844 }
845
846 if (ctx_rec->refcnt > 0) {
847 return OPAL_SUCCESS;
848 }
849
850
851
852
853 _common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec->winfo);
854
855
856 memset(ctx_rec, 0, sizeof(*ctx_rec));
857
858 return OPAL_SUCCESS;
859 }
860
861
862 static _tlocal_ctx_t *
863 _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
864 {
865 size_t i, free_idx = -1;
866 int rc, found = 0;
867
868
869
870 for (i=0; i<tls->ctx_tbl_size; i++) {
871 if (NULL != tls->ctx_tbl[i]->gctx && tls->ctx_tbl[i]->refcnt == 0) {
872 if (tls->ctx_tbl[i]->gctx->released ) {
873
874 _tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
875 }
876 }
877 if ((NULL == tls->ctx_tbl[i]->gctx) && !found) {
878
879 free_idx = i;
880 found = 1;
881 }
882 }
883
884
885 if (!found) {
886 free_idx = tls->ctx_tbl_size;
887 rc = _tlocal_tls_ctxtbl_extend(tls, 4);
888 if (rc) {
889
890 return NULL;
891 }
892 }
893
894 tls->ctx_tbl[free_idx]->gctx = ctx;
895 tls->ctx_tbl[free_idx]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size);
896 if (NULL == tls->ctx_tbl[free_idx]->winfo) {
897 MCA_COMMON_UCX_ERROR("Failed to allocate new worker");
898 return NULL;
899 }
900
901
902
903
904
905
906 opal_atomic_wmb();
907
908
909 _wpool_add_active(tls->wpool, tls->ctx_tbl[free_idx]->winfo);
910
911
912 rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[free_idx]->winfo);
913 if (rc) {
914
915 return NULL;
916 }
917
918
919 return tls->ctx_tbl[free_idx];
920 }
921
922 static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target)
923 {
924 ucp_ep_params_t ep_params;
925 opal_common_ucx_winfo_t *winfo = ctx_rec->winfo;
926 opal_common_ucx_ctx_t *gctx = ctx_rec->gctx;
927 ucs_status_t status;
928 int displ;
929
930 memset(&ep_params, 0, sizeof(ucp_ep_params_t));
931 ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
932
933 opal_mutex_lock(&winfo->mutex);
934 displ = gctx->recv_worker_displs[target];
935 ep_params.address = (ucp_address_t *)&(gctx->recv_worker_addrs[displ]);
936 status = ucp_ep_create(winfo->worker, &ep_params, &winfo->endpoints[target]);
937 if (status != UCS_OK) {
938 opal_mutex_unlock(&winfo->mutex);
939 MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_create failed: %d", status);
940 return OPAL_ERROR;
941 }
942 opal_mutex_unlock(&winfo->mutex);
943 return OPAL_SUCCESS;
944 }
945
946
947
948 static inline _tlocal_mem_t *
949 _tlocal_search_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *gmem)
950 {
951 size_t i;
952 for(i=0; i<tls->mem_tbl_size; i++) {
953 if( tls->mem_tbl[i]->gmem == gmem){
954 return tls->mem_tbl[i];
955 }
956 }
957 return NULL;
958 }
959
960 static void
961 _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
962 {
963 size_t i;
964
965 for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) {
966 if (mem_rec->mem->rkeys[i]) {
967 ucp_rkey_destroy(mem_rec->mem->rkeys[i]);
968 }
969 }
970 free(mem_rec->mem->rkeys);
971
972
973
974
975 _common_ucx_mem_signout(mem_rec->gmem);
976
977
978 if (NULL != mem_rec->mem_tls_ptr) {
979 free(mem_rec->mem_tls_ptr);
980 }
981
982 assert(mem_rec->ctx_rec != NULL);
983 OPAL_ATOMIC_ADD_FETCH32(&mem_rec->ctx_rec->refcnt, -1);
984 assert(mem_rec->ctx_rec->refcnt >= 0);
985
986 free(mem_rec->mem);
987
988 memset(mem_rec, 0, sizeof(*mem_rec));
989 }
990
991 static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
992 opal_common_ucx_wpmem_t *mem)
993 {
994 size_t i, free_idx = -1;
995 _tlocal_ctx_t *ctx_rec = NULL;
996 int rc = OPAL_SUCCESS, found = 0;
997
998
999 for (i=0; i<tls->mem_tbl_size; i++) {
1000 if (NULL != tls->mem_tbl[i]->gmem) {
1001 if (tls->mem_tbl[i]->gmem->released) {
1002
1003 _tlocal_mem_record_cleanup(tls->mem_tbl[i]);
1004 }
1005 }
1006 if ((NULL == tls->mem_tbl[i]->gmem) && !found) {
1007
1008 free_idx = i;
1009 found = 1;
1010 }
1011 }
1012
1013 if (!found){
1014 free_idx = tls->mem_tbl_size;
1015 rc = _tlocal_tls_memtbl_extend(tls, 4);
1016 if (rc != OPAL_SUCCESS) {
1017
1018 return NULL;
1019 }
1020 }
1021
1022 tls->mem_tbl[free_idx]->gmem = mem;
1023 tls->mem_tbl[free_idx]->mem = calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem));
1024
1025 ctx_rec = _tlocal_ctx_search(tls, mem->ctx);
1026 if (NULL == ctx_rec) {
1027
1028 return NULL;
1029 }
1030
1031 tls->mem_tbl[free_idx]->ctx_rec = ctx_rec;
1032 OPAL_ATOMIC_ADD_FETCH32(&ctx_rec->refcnt, 1);
1033
1034 tls->mem_tbl[free_idx]->mem->worker = ctx_rec->winfo;
1035 tls->mem_tbl[free_idx]->mem->rkeys = calloc(mem->ctx->comm_size,
1036 sizeof(*tls->mem_tbl[free_idx]->mem->rkeys));
1037
1038 tls->mem_tbl[free_idx]->mem_tls_ptr =
1039 calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem_tls_ptr));
1040 tls->mem_tbl[free_idx]->mem_tls_ptr->winfo = ctx_rec->winfo;
1041 tls->mem_tbl[free_idx]->mem_tls_ptr->rkeys = tls->mem_tbl[free_idx]->mem->rkeys;
1042 opal_tsd_setspecific(mem->mem_tls_key, tls->mem_tbl[free_idx]->mem_tls_ptr);
1043
1044
1045
1046
1047
1048
1049 opal_atomic_wmb();
1050
1051 rc = _common_ucx_wpmem_signup(mem);
1052 if (rc) {
1053
1054 return NULL;
1055 }
1056
1057 return tls->mem_tbl[free_idx];
1058 }
1059
1060 static int
1061 _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
1062 {
1063 _mem_info_t *minfo = mem_rec->mem;
1064 opal_common_ucx_wpmem_t *gmem = mem_rec->gmem;
1065 int displ = gmem->mem_displs[target];
1066 ucs_status_t status;
1067
1068 status = ucp_ep_rkey_unpack(ep, &gmem->mem_addrs[displ],
1069 &minfo->rkeys[target]);
1070 if (status != UCS_OK) {
1071 MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
1072 return OPAL_ERROR;
1073 }
1074
1075 return OPAL_SUCCESS;
1076 }
1077
1078
1079 OPAL_DECLSPEC int
1080 opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target)
1081 {
1082 _tlocal_table_t *tls = NULL;
1083 _tlocal_ctx_t *ctx_rec = NULL;
1084 opal_common_ucx_winfo_t *winfo = NULL;
1085 _tlocal_mem_t *mem_rec = NULL;
1086 _mem_info_t *mem_info = NULL;
1087 ucp_ep_h ep;
1088 int rc = OPAL_SUCCESS;
1089
1090 tls = _tlocal_get_tls(mem->ctx->wpool);
1091
1092
1093 ctx_rec = _tlocal_ctx_search(tls, mem->ctx);
1094
1095 if (OPAL_UNLIKELY(NULL == ctx_rec)) {
1096 ctx_rec = _tlocal_add_ctx(tls, mem->ctx);
1097 if (NULL == ctx_rec) {
1098 return OPAL_ERR_OUT_OF_RESOURCE;
1099 }
1100 }
1101 winfo = ctx_rec->winfo;
1102
1103
1104 if (OPAL_UNLIKELY(NULL == winfo->endpoints[target])) {
1105 rc = _tlocal_ctx_connect(ctx_rec, target);
1106 if (rc != OPAL_SUCCESS) {
1107 return rc;
1108 }
1109 }
1110 ep = winfo->endpoints[target];
1111
1112
1113 mem_rec = _tlocal_search_mem(tls, mem);
1114 if (OPAL_UNLIKELY(mem_rec == NULL)) {
1115 mem_rec = _tlocal_add_mem(tls, mem);
1116 if (NULL == mem_rec) {
1117 return OPAL_ERR_OUT_OF_RESOURCE;
1118 }
1119 }
1120 mem_info = mem_rec->mem;
1121
1122
1123 if (OPAL_UNLIKELY(NULL == mem_info->rkeys[target])) {
1124
1125 rc = _tlocal_mem_create_rkey(mem_rec, ep, target);
1126 if (rc) {
1127 return rc;
1128 }
1129 }
1130
1131 return OPAL_SUCCESS;
1132 }
1133
1134 OPAL_DECLSPEC int
1135 opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,
1136 opal_common_ucx_flush_type_t type,
1137 opal_common_ucx_flush_scope_t scope,
1138 ucs_status_ptr_t *req_ptr)
1139 {
1140 ucs_status_ptr_t req;
1141 ucs_status_t status = UCS_OK;
1142 int rc = OPAL_SUCCESS;
1143
1144 #if HAVE_DECL_UCP_EP_FLUSH_NB
1145 if (scope == OPAL_COMMON_UCX_SCOPE_EP) {
1146 req = ucp_ep_flush_nb(winfo->endpoints[target], 0, opal_common_ucx_empty_complete_cb);
1147 } else {
1148 req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_empty_complete_cb);
1149 }
1150 if (UCS_PTR_IS_PTR(req)) {
1151 ((opal_common_ucx_request_t *)req)->winfo = winfo;
1152 }
1153
1154 if(OPAL_COMMON_UCX_FLUSH_B) {
1155 rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb");
1156 } else {
1157 *req_ptr = req;
1158 }
1159 return rc;
1160 #endif
1161 switch (type) {
1162 case OPAL_COMMON_UCX_FLUSH_NB_PREFERRED:
1163 case OPAL_COMMON_UCX_FLUSH_B:
1164 if (scope == OPAL_COMMON_UCX_SCOPE_EP) {
1165 status = ucp_ep_flush(winfo->endpoints[target]);
1166 } else {
1167 status = ucp_worker_flush(winfo->worker);
1168 }
1169 rc = (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
1170 case OPAL_COMMON_UCX_FLUSH_NB:
1171 default:
1172 rc = OPAL_ERROR;
1173 }
1174 return rc;
1175 }
1176
1177
1178 OPAL_DECLSPEC int
1179 opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
1180 opal_common_ucx_flush_scope_t scope,
1181 int target)
1182 {
1183 _ctx_record_list_item_t *item;
1184 opal_common_ucx_ctx_t *ctx = mem->ctx;
1185 int rc = OPAL_SUCCESS;
1186
1187 opal_mutex_lock(&ctx->mutex);
1188
1189 OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) {
1190 if ((scope == OPAL_COMMON_UCX_SCOPE_EP) &&
1191 (NULL == item->ptr->endpoints[target])) {
1192 continue;
1193 }
1194 opal_mutex_lock(&item->ptr->mutex);
1195 rc = opal_common_ucx_winfo_flush(item->ptr, target, OPAL_COMMON_UCX_FLUSH_B,
1196 scope, NULL);
1197 switch (scope) {
1198 case OPAL_COMMON_UCX_SCOPE_WORKER:
1199 item->ptr->global_inflight_ops = 0;
1200 memset(item->ptr->inflight_ops, 0, item->ptr->comm_size * sizeof(short));
1201 break;
1202 case OPAL_COMMON_UCX_SCOPE_EP:
1203 item->ptr->global_inflight_ops -= item->ptr->inflight_ops[target];
1204 item->ptr->inflight_ops[target] = 0;
1205 break;
1206 }
1207 opal_mutex_unlock(&item->ptr->mutex);
1208
1209 if (rc != OPAL_SUCCESS) {
1210 MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d",
1211 rc);
1212 rc = OPAL_ERROR;
1213 }
1214 }
1215 opal_mutex_unlock(&ctx->mutex);
1216
1217 return rc;
1218 }
1219
1220 OPAL_DECLSPEC int
1221 opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem) {
1222
1223 return OPAL_SUCCESS;
1224 }
1225
1226 OPAL_DECLSPEC void
1227 opal_common_ucx_req_init(void *request) {
1228 opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
1229 req->ext_req = NULL;
1230 req->ext_cb = NULL;
1231 req->winfo = NULL;
1232 }
1233
1234 OPAL_DECLSPEC void
1235 opal_common_ucx_req_completion(void *request, ucs_status_t status) {
1236 opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
1237 if (req->ext_cb != NULL) {
1238 (*req->ext_cb)(req->ext_req);
1239 }
1240 ucp_request_release(req);
1241 }