root/opal/mca/common/ucx/common_ucx_wpool.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _winfo_create
  2. _winfo_reset
  3. _winfo_release
  4. opal_common_ucx_wpool_allocate
  5. opal_common_ucx_wpool_free
  6. opal_common_ucx_wpool_init
  7. opal_common_ucx_wpool_finalize
  8. opal_common_ucx_wpool_progress
  9. _wpool_list_put
  10. _wpool_list_get
  11. _wpool_get_idle
  12. _wpool_add_active
  13. opal_common_ucx_wpctx_create
  14. opal_common_ucx_wpctx_release
  15. _common_ucx_wpctx_free
  16. _common_ucx_wpctx_append
  17. _common_ucx_wpctx_remove
  18. opal_common_ucx_wpmem_create
  19. opal_common_ucx_wpmem_free
  20. _comm_ucx_wpmem_map
  21. _common_ucx_wpmem_free
  22. _common_ucx_wpmem_signup
  23. _common_ucx_mem_signout
  24. _common_ucx_tls_init
  25. _tlocal_get_tls
  26. _tlocal_cleanup
  27. _common_ucx_tls_cleanup
  28. _tlocal_tls_ctxtbl_extend
  29. _tlocal_tls_memtbl_extend
  30. _tlocal_ctx_search
  31. _tlocal_ctx_record_cleanup
  32. _tlocal_add_ctx
  33. _tlocal_ctx_connect
  34. _tlocal_search_mem
  35. _tlocal_mem_record_cleanup
  36. _tlocal_add_mem
  37. _tlocal_mem_create_rkey
  38. opal_common_ucx_tlocal_fetch_spath
  39. opal_common_ucx_winfo_flush
  40. opal_common_ucx_wpmem_flush
  41. opal_common_ucx_wpmem_fence
  42. opal_common_ucx_req_init
  43. opal_common_ucx_req_completion

   1 #include "opal_config.h"
   2 
   3 #include "common_ucx.h"
   4 #include "common_ucx_wpool.h"
   5 #include "common_ucx_wpool_int.h"
   6 #include "opal/mca/base/mca_base_var.h"
   7 #include "opal/mca/base/mca_base_framework.h"
   8 #include "opal/mca/pmix/pmix.h"
   9 #include "opal/memoryhooks/memory.h"
  10 
  11 #include <ucm/api/ucm.h>
  12 
  13 /*******************************************************************************
  14  *******************************************************************************
  15  *
  16  * Worker Pool (wpool) framework
  17  * Used to manage multi-threaded implementation of UCX for ompi/OSC & OSHMEM
  18  *
  19  *******************************************************************************
  20  ******************************************************************************/
  21 
  22 OBJ_CLASS_INSTANCE(_winfo_list_item_t, opal_list_item_t, NULL, NULL);
  23 OBJ_CLASS_INSTANCE(_ctx_record_list_item_t, opal_list_item_t, NULL, NULL);
  24 OBJ_CLASS_INSTANCE(_mem_record_list_item_t, opal_list_item_t, NULL, NULL);
  25 OBJ_CLASS_INSTANCE(_tlocal_table_t, opal_list_item_t, NULL, NULL);
  26 
  27 // TODO: Remove once debug is completed
  28 #ifdef OPAL_COMMON_UCX_WPOOL_DBG
  29 __thread FILE *tls_pf = NULL;
  30 __thread int initialized = 0;
  31 #endif
  32 
  33 /* -----------------------------------------------------------------------------
  34  * Worker information (winfo) management functionality
  35  *----------------------------------------------------------------------------*/
  36 
  37 static opal_common_ucx_winfo_t *
  38 _winfo_create(opal_common_ucx_wpool_t *wpool)
  39 {
  40     ucp_worker_params_t worker_params;
  41     ucp_worker_h worker;
  42     ucs_status_t status;
  43     opal_common_ucx_winfo_t *winfo = NULL;
  44 
  45     memset(&worker_params, 0, sizeof(worker_params));
  46     worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
  47     worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
  48     status = ucp_worker_create(wpool->ucp_ctx, &worker_params, &worker);
  49     if (UCS_OK != status) {
  50         MCA_COMMON_UCX_ERROR("ucp_worker_create failed: %d", status);
  51         goto exit;
  52     }
  53 
  54     winfo = calloc(1, sizeof(*winfo));
  55     if (NULL == winfo) {
  56         MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info");
  57         goto release_worker;
  58     }
  59 
  60     OBJ_CONSTRUCT(&winfo->mutex, opal_recursive_mutex_t);
  61     winfo->worker = worker;
  62     winfo->endpoints = NULL;
  63     winfo->comm_size = 0;
  64     winfo->released = 0;
  65     winfo->inflight_ops = NULL;
  66     winfo->global_inflight_ops = 0;
  67     winfo->inflight_req = UCS_OK;
  68 
  69     return winfo;
  70 
  71 release_worker:
  72     ucp_worker_destroy(worker);
  73 exit:
  74     return winfo;
  75 }
  76 
  77 static void
  78 _winfo_reset(opal_common_ucx_winfo_t *winfo)
  79 {
  80     if (winfo->inflight_req != UCS_OK) {
  81         opal_common_ucx_wait_request_mt(winfo->inflight_req,
  82                                         "opal_common_ucx_flush");
  83         winfo->inflight_req = UCS_OK;
  84     }
  85 
  86     assert(winfo->global_inflight_ops == 0);
  87 
  88     if(winfo->comm_size != 0) {
  89         size_t i;
  90         for (i = 0; i < winfo->comm_size; i++) {
  91             if (NULL != winfo->endpoints[i]){
  92                 ucp_ep_destroy(winfo->endpoints[i]);
  93             }
  94             assert(winfo->inflight_ops[i] == 0);
  95         }
  96         free(winfo->endpoints);
  97         free(winfo->inflight_ops);
  98     }
  99     winfo->endpoints = NULL;
 100     winfo->comm_size = 0;
 101     winfo->released = 0;
 102 }
 103 
 104 static void
 105 _winfo_release(opal_common_ucx_winfo_t *winfo)
 106 {
 107     OBJ_DESTRUCT(&winfo->mutex);
 108     ucp_worker_destroy(winfo->worker);
 109     free(winfo);
 110 }
 111 
 112 /* -----------------------------------------------------------------------------
 113  * Worker Pool management functionality
 114  *----------------------------------------------------------------------------*/
 115 
 116 OPAL_DECLSPEC opal_common_ucx_wpool_t *
 117 opal_common_ucx_wpool_allocate(void)
 118 {
 119     opal_common_ucx_wpool_t *ptr = calloc(1, sizeof(opal_common_ucx_wpool_t));
 120     ptr->refcnt = 0;
 121 
 122     return ptr;
 123 }
 124 
 125 OPAL_DECLSPEC void
 126 opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)
 127 {
 128     assert(wpool->refcnt == 0);
 129     free(wpool);
 130 }
 131 
 132 OPAL_DECLSPEC int
 133 opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
 134                            int proc_world_size, bool enable_mt)
 135 {
 136     ucp_config_t *config = NULL;
 137     ucp_params_t context_params;
 138     opal_common_ucx_winfo_t *winfo;
 139     ucs_status_t status;
 140     int rc = OPAL_SUCCESS;
 141 
 142     wpool->refcnt++;
 143 
 144     if (1 < wpool->refcnt) {
 145         return rc;
 146     }
 147 
 148     OBJ_CONSTRUCT(&wpool->mutex, opal_recursive_mutex_t);
 149     OBJ_CONSTRUCT(&wpool->tls_list, opal_list_t);
 150 
 151     status = ucp_config_read("MPI", NULL, &config);
 152     if (UCS_OK != status) {
 153         MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
 154         return OPAL_ERROR;
 155     }
 156 
 157     /* initialize UCP context */
 158     memset(&context_params, 0, sizeof(context_params));
 159     context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
 160                                 UCP_PARAM_FIELD_MT_WORKERS_SHARED |
 161                                 UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
 162                                 UCP_PARAM_FIELD_REQUEST_INIT |
 163                                 UCP_PARAM_FIELD_REQUEST_SIZE;
 164     context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 |
 165                               UCP_FEATURE_AMO64;
 166     context_params.mt_workers_shared = (enable_mt ? 1 : 0);
 167     context_params.estimated_num_eps = proc_world_size;
 168     context_params.request_init = opal_common_ucx_req_init;
 169     context_params.request_size = sizeof(opal_common_ucx_request_t);
 170 
 171     status = ucp_init(&context_params, config, &wpool->ucp_ctx);
 172     ucp_config_release(config);
 173     if (UCS_OK != status) {
 174         MCA_COMMON_UCX_VERBOSE(1, "ucp_init failed: %d", status);
 175         rc = OPAL_ERROR;
 176         goto err_ucp_init;
 177     }
 178 
 179     /* create recv worker and add to idle pool */
 180     OBJ_CONSTRUCT(&wpool->idle_workers, opal_list_t);
 181     OBJ_CONSTRUCT(&wpool->active_workers, opal_list_t);
 182 
 183     winfo = _winfo_create(wpool);
 184     if (NULL == winfo) {
 185         MCA_COMMON_UCX_ERROR("Failed to create receive worker");
 186         rc = OPAL_ERROR;
 187         goto err_worker_create;
 188     }
 189     wpool->dflt_worker = winfo->worker;
 190 
 191     status = ucp_worker_get_address(wpool->dflt_worker,
 192                                     &wpool->recv_waddr, &wpool->recv_waddr_len);
 193     if (status != UCS_OK) {
 194         MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
 195         rc = OPAL_ERROR;
 196         goto err_get_addr;
 197     }
 198 
 199     rc = _wpool_list_put(wpool, &wpool->idle_workers, winfo);
 200     if (rc) {
 201         goto err_wpool_add;
 202     }
 203 
 204     opal_tsd_key_create(&wpool->tls_key, _tlocal_cleanup);
 205 
 206     return rc;
 207 
 208 err_wpool_add:
 209     free(wpool->recv_waddr);
 210 err_get_addr:
 211     if (NULL != wpool->dflt_worker) {
 212         ucp_worker_destroy(wpool->dflt_worker);
 213     }
 214  err_worker_create:
 215     ucp_cleanup(wpool->ucp_ctx);
 216  err_ucp_init:
 217     return rc;
 218 }
 219 
 220 OPAL_DECLSPEC
 221 void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
 222 {
 223     _tlocal_table_t *tls_item = NULL, *tls_next;
 224 
 225     wpool->refcnt--;
 226     if (wpool->refcnt > 0) {
 227         return;
 228     }
 229 
 230     /* After this have been called no thread cleanup callback
 231      * will be called */
 232     opal_tsd_key_delete(wpool->tls_key);
 233 
 234     /* Go over remaining TLS structures and release it */
 235     OPAL_LIST_FOREACH_SAFE(tls_item, tls_next, &wpool->tls_list,
 236                            _tlocal_table_t) {
 237         opal_list_remove_item(&wpool->tls_list, &tls_item->super);
 238         _common_ucx_tls_cleanup(tls_item);
 239     }
 240     OBJ_DESTRUCT(&wpool->tls_list);
 241 
 242     /* Release the address here. recv worker will be released
 243      * below along with other idle workers */
 244     ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr);
 245 
 246     /* Go over the list, free idle list items */
 247     if (!opal_list_is_empty(&wpool->idle_workers)) {
 248         _winfo_list_item_t *item, *next;
 249         OPAL_LIST_FOREACH_SAFE(item, next, &wpool->idle_workers,
 250                                _winfo_list_item_t) {
 251             opal_list_remove_item(&wpool->idle_workers, &item->super);
 252             _winfo_release(item->ptr);
 253             OBJ_RELEASE(item);
 254         }
 255     }
 256     OBJ_DESTRUCT(&wpool->idle_workers);
 257 
 258     /* Release active workers. They are no longer active actually
 259      * because opal_common_ucx_wpool_finalize is being called. */
 260     if (!opal_list_is_empty(&wpool->active_workers)) {
 261         _winfo_list_item_t *item, *next;
 262         OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers,
 263                                _winfo_list_item_t) {
 264             opal_list_remove_item(&wpool->active_workers, &item->super);
 265             _winfo_reset(item->ptr);
 266             _winfo_release(item->ptr);
 267             OBJ_RELEASE(item);
 268         }
 269     }
 270     OBJ_DESTRUCT(&wpool->active_workers);
 271 
 272     OBJ_DESTRUCT(&wpool->mutex);
 273     ucp_cleanup(wpool->ucp_ctx);
 274     return;
 275 }
 276 
 277 OPAL_DECLSPEC void
 278 opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
 279 {
 280     _winfo_list_item_t *item = NULL, *next = NULL;
 281 
 282     /* Go over all active workers and progress them
 283      * TODO: may want to have some partitioning to progress only part of
 284      * workers */
 285     if (!opal_mutex_trylock (&wpool->mutex)) {
 286         OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers,
 287                                _winfo_list_item_t) {
 288             opal_common_ucx_winfo_t *winfo = item->ptr;
 289             opal_mutex_lock(&winfo->mutex);
 290             if( OPAL_UNLIKELY(winfo->released) ) {
 291                 /* Do garbage collection of worker info's if needed */
 292                 opal_list_remove_item(&wpool->active_workers, &item->super);
 293                 _winfo_reset(winfo);
 294                 opal_list_append(&wpool->idle_workers, &item->super);
 295             } else {
 296                 /* Progress worker until there are existing events */
 297                 while(ucp_worker_progress(winfo->worker));
 298             }
 299             opal_mutex_unlock(&winfo->mutex);
 300         }
 301         opal_mutex_unlock(&wpool->mutex);
 302     }
 303 }
 304 
 305 static int
 306 _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
 307                 opal_common_ucx_winfo_t *winfo)
 308 {
 309     _winfo_list_item_t *item;
 310 
 311     item = OBJ_NEW(_winfo_list_item_t);
 312     if (NULL == item) {
 313         MCA_COMMON_UCX_ERROR("Cannot allocate memory for winfo list item");
 314         return OPAL_ERR_OUT_OF_RESOURCE;
 315     }
 316     item->ptr = winfo;
 317 
 318     opal_mutex_lock(&wpool->mutex);
 319     opal_list_append(list, &item->super);
 320     opal_mutex_unlock(&wpool->mutex);
 321 
 322     return OPAL_SUCCESS;
 323 }
 324 
 325 static opal_common_ucx_winfo_t*
 326 _wpool_list_get(opal_common_ucx_wpool_t *wpool, opal_list_t *list)
 327 {
 328     opal_common_ucx_winfo_t *winfo = NULL;
 329     _winfo_list_item_t *item = NULL;
 330 
 331     opal_mutex_lock(&wpool->mutex);
 332     if (!opal_list_is_empty(list)) {
 333         item = (_winfo_list_item_t *)opal_list_get_first(list);
 334         opal_list_remove_item(list, &item->super);
 335     }
 336     opal_mutex_unlock(&wpool->mutex);
 337 
 338     if (item != NULL) {
 339         winfo = item->ptr;
 340         OBJ_RELEASE(item);
 341     }
 342     return winfo;
 343 }
 344 
 345 static opal_common_ucx_winfo_t *
 346 _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size)
 347 {
 348     opal_common_ucx_winfo_t *winfo;
 349     winfo = _wpool_list_get(wpool, &wpool->idle_workers);
 350     if (!winfo) {
 351         winfo = _winfo_create(wpool);
 352         if (!winfo) {
 353             MCA_COMMON_UCX_ERROR("Failed to allocate worker info structure");
 354             return NULL;
 355         }
 356     }
 357 
 358     winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h));
 359     winfo->inflight_ops = calloc(comm_size, sizeof(short));
 360     winfo->comm_size = comm_size;
 361     return winfo;
 362 }
 363 
 364 static int
 365 _wpool_add_active(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo)
 366 {
 367     return _wpool_list_put(wpool, &wpool->active_workers, winfo);
 368 }
 369 
 370 /* -----------------------------------------------------------------------------
 371  * Worker Pool Communication context management functionality
 372  *----------------------------------------------------------------------------*/
 373 
 374 OPAL_DECLSPEC int
 375 opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
 376                              opal_common_ucx_exchange_func_t exchange_func,
 377                              void *exchange_metadata,
 378                              opal_common_ucx_ctx_t **ctx_ptr)
 379 {
 380     opal_common_ucx_ctx_t *ctx = calloc(1, sizeof(*ctx));
 381     int ret = OPAL_SUCCESS;
 382 
 383     OBJ_CONSTRUCT(&ctx->mutex, opal_recursive_mutex_t);
 384     OBJ_CONSTRUCT(&ctx->tls_workers, opal_list_t);
 385     ctx->released = 0;
 386     ctx->refcntr = 1; /* application holding the context */
 387     ctx->wpool = wpool;
 388     ctx->comm_size = comm_size;
 389 
 390     ctx->recv_worker_addrs = NULL;
 391     ctx->recv_worker_displs = NULL;
 392     ret = exchange_func(wpool->recv_waddr, wpool->recv_waddr_len,
 393                         &ctx->recv_worker_addrs,
 394                         &ctx->recv_worker_displs, exchange_metadata);
 395     if (ret != OPAL_SUCCESS) {
 396         goto error;
 397     }
 398 
 399     (*ctx_ptr) = ctx;
 400     return ret;
 401 error:
 402     OBJ_DESTRUCT(&ctx->mutex);
 403     OBJ_DESTRUCT(&ctx->tls_workers);
 404     free(ctx);
 405     (*ctx_ptr) = NULL;
 406     return ret;
 407 }
 408 
 409 OPAL_DECLSPEC void
 410 opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx)
 411 {
 412     int my_refcntr = -1;
 413 
 414     /* Application is expected to guarantee that no operation
 415      * is performed on the context that is being released */
 416 
 417     /* Mark that this context was released by application
 418      * Threads will use this flag to perform deferred cleanup */
 419     ctx->released = 1;
 420 
 421     /* Decrement the reference counter */
 422     my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
 423 
 424     /* Make sure that all the loads/stores are complete */
 425     opal_atomic_mb();
 426 
 427     /* If there is no more references to this handler
 428      * we can release it */
 429     if (0 == my_refcntr) {
 430         _common_ucx_wpctx_free(ctx);
 431     }
 432 }
 433 
 434 /* Final cleanup of the context structure
 435  * once all references cleared */
 436 static void
 437 _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx)
 438 {
 439     free(ctx->recv_worker_addrs);
 440     free(ctx->recv_worker_displs);
 441     OBJ_DESTRUCT(&ctx->mutex);
 442     OBJ_DESTRUCT(&ctx->tls_workers);
 443     free(ctx);
 444 }
 445 
 446 /* Subscribe a new TLS to this context */
 447 static int
 448 _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
 449                          opal_common_ucx_winfo_t *winfo)
 450 {
 451     _ctx_record_list_item_t *item = OBJ_NEW(_ctx_record_list_item_t);
 452     if (NULL == item) {
 453         return OPAL_ERR_OUT_OF_RESOURCE;
 454     }
 455     /* Increment the reference counter */
 456     OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, 1);
 457 
 458     /* Add new worker to the context */
 459     item->ptr = winfo;
 460     opal_mutex_lock(&ctx->mutex);
 461     opal_list_append(&ctx->tls_workers, &item->super);
 462     opal_mutex_unlock(&ctx->mutex);
 463 
 464     return OPAL_SUCCESS;
 465 }
 466 
 467 /* Unsubscribe a particular TLS to this context */
 468 static void
 469 _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
 470                          opal_common_ucx_winfo_t *winfo)
 471 {
 472     _ctx_record_list_item_t *item = NULL, *next;
 473     int my_refcntr = -1;
 474 
 475     opal_mutex_lock(&ctx->mutex);
 476 
 477     OPAL_LIST_FOREACH_SAFE(item, next, &ctx->tls_workers,
 478                            _ctx_record_list_item_t) {
 479         if (winfo == item->ptr) {
 480             opal_list_remove_item(&ctx->tls_workers, &item->super);
 481             opal_mutex_lock(&winfo->mutex);
 482             winfo->released = 1;
 483             opal_mutex_unlock(&winfo->mutex);
 484             OBJ_RELEASE(item);
 485             break;
 486         }
 487     }
 488     opal_mutex_unlock(&ctx->mutex);
 489 
 490     /* Make sure that all the loads/stores are complete */
 491     opal_atomic_rmb();
 492 
 493     /* Decrement the reference counter */
 494     my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
 495 
 496     /* a counterpart to the rmb above */
 497     opal_atomic_wmb();
 498 
 499     if (0 == my_refcntr) {
 500         /* All references to this data structure were removed
 501          * We can safely release communication context structure */
 502         _common_ucx_wpctx_free(ctx);
 503     }
 504     return;
 505 }
 506 
 507 /* -----------------------------------------------------------------------------
 508  * Worker Pool Memory management functionality
 509  *----------------------------------------------------------------------------*/
 510 
 511 OPAL_DECLSPEC
 512 int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
 513                                void **mem_base, size_t mem_size,
 514                                opal_common_ucx_mem_type_t mem_type,
 515                                opal_common_ucx_exchange_func_t exchange_func,
 516                                void *exchange_metadata,
 517                                  char **my_mem_addr,
 518                                  int *my_mem_addr_size,
 519                                opal_common_ucx_wpmem_t **mem_ptr)
 520 {
 521     opal_common_ucx_wpmem_t *mem = calloc(1, sizeof(*mem));
 522     void *rkey_addr = NULL;
 523     size_t rkey_addr_len;
 524     ucs_status_t status;
 525     int ret = OPAL_SUCCESS;
 526 
 527     mem->released = 0;
 528     mem->refcntr = 1; /* application holding this memory handler */
 529     mem->ctx = ctx;
 530     mem->mem_addrs = NULL;
 531     mem->mem_displs = NULL;
 532 
 533     ret = _comm_ucx_wpmem_map(ctx->wpool, mem_base, mem_size, &mem->memh,
 534                             mem_type);
 535     if (ret != OPAL_SUCCESS) {
 536         MCA_COMMON_UCX_VERBOSE(1, "_comm_ucx_mem_map failed: %d", ret);
 537         goto error_mem_map;
 538     }
 539 
 540     status = ucp_rkey_pack(ctx->wpool->ucp_ctx, mem->memh,
 541                            &rkey_addr, &rkey_addr_len);
 542     if (status != UCS_OK) {
 543         MCA_COMMON_UCX_VERBOSE(1, "ucp_rkey_pack failed: %d", status);
 544         ret = OPAL_ERROR;
 545         goto error_rkey_pack;
 546     }
 547 
 548     ret = exchange_func(rkey_addr, rkey_addr_len,
 549                         &mem->mem_addrs, &mem->mem_displs, exchange_metadata);
 550     if (ret != OPAL_SUCCESS) {
 551         goto error_rkey_pack;
 552     }
 553 
 554     /* Dont need the destructor here, will use
 555      * wpool-level destructor */
 556     opal_tsd_key_create(&mem->mem_tls_key, NULL);
 557 
 558     (*mem_ptr) = mem;
 559     (*my_mem_addr) = rkey_addr;
 560     (*my_mem_addr_size) = rkey_addr_len;
 561 
 562     return ret;
 563 
 564  error_rkey_pack:
 565     ucp_mem_unmap(ctx->wpool->ucp_ctx, mem->memh);
 566  error_mem_map:
 567     free(mem);
 568     (*mem_ptr) = NULL;
 569     return ret;
 570 }
 571 
 572 OPAL_DECLSPEC int
 573 opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
 574 {
 575     int my_refcntr = -1;
 576 
 577     /* Mark that this memory handler has been called */
 578     mem->released = 1;
 579 
 580     /* Decrement the reference counter */
 581     my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
 582 
 583     /* Make sure that all the loads/stores are complete */
 584     opal_atomic_wmb();
 585 
 586     if (0 == my_refcntr) {
 587         _common_ucx_wpmem_free(mem);
 588     }
 589     return OPAL_SUCCESS;
 590 }
 591 
 592 
 593 static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
 594                              void **base, size_t size, ucp_mem_h *memh_ptr,
 595                              opal_common_ucx_mem_type_t mem_type)
 596 {
 597     ucp_mem_map_params_t mem_params;
 598     ucp_mem_attr_t mem_attrs;
 599     ucs_status_t status;
 600     int ret = OPAL_SUCCESS;
 601 
 602     memset(&mem_params, 0, sizeof(ucp_mem_map_params_t));
 603     mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
 604                             UCP_MEM_MAP_PARAM_FIELD_LENGTH |
 605                             UCP_MEM_MAP_PARAM_FIELD_FLAGS;
 606     mem_params.length = size;
 607     if (mem_type == OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
 608         mem_params.address = NULL;
 609         mem_params.flags = UCP_MEM_MAP_ALLOCATE;
 610     } else {
 611         mem_params.address = (*base);
 612     }
 613 
 614     status = ucp_mem_map(wpool->ucp_ctx, &mem_params, memh_ptr);
 615     if (status != UCS_OK) {
 616         MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_map failed: %d", status);
 617         ret = OPAL_ERROR;
 618         return ret;
 619     }
 620 
 621     mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH;
 622     status = ucp_mem_query((*memh_ptr), &mem_attrs);
 623     if (status != UCS_OK) {
 624         MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_query failed: %d", status);
 625         ret = OPAL_ERROR;
 626         goto error;
 627     }
 628 
 629     assert(mem_attrs.length >= size);
 630     if (mem_type != OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
 631         assert(mem_attrs.address == (*base));
 632     } else {
 633         (*base) = mem_attrs.address;
 634     }
 635 
 636     return ret;
 637  error:
 638     ucp_mem_unmap(wpool->ucp_ctx, (*memh_ptr));
 639     return ret;
 640 }
 641 
 642 static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
 643 {
 644     opal_tsd_key_delete(mem->mem_tls_key);
 645     free(mem->mem_addrs);
 646     free(mem->mem_displs);
 647     ucp_mem_unmap(mem->ctx->wpool->ucp_ctx, mem->memh);
 648     free(mem);
 649 }
 650 
 651 static int
 652 _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem)
 653 {
 654     /* Increment the reference counter */
 655     OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, 1);
 656     return OPAL_SUCCESS;
 657 }
 658 
 659 static void
 660 _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem)
 661 {
 662     int my_refcntr = -1;
 663 
 664     /* Make sure that all the loads are complete at this
 665      * point so if somebody else will see refcntr ==0
 666      * and release the structure we would have all we need
 667      */
 668     opal_atomic_rmb();
 669 
 670     /* Decrement the reference counter */
 671     my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
 672 
 673     /* a counterpart to the rmb above */
 674     opal_atomic_wmb();
 675 
 676     if (0 == my_refcntr) {
 677         _common_ucx_wpmem_free(mem);
 678     }
 679 
 680     return;
 681 }
 682 
 683 /* -----------------------------------------------------------------------------
 684  * Worker Pool TLS management functions management functionality
 685  *----------------------------------------------------------------------------*/
 686 
 687 static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool)
 688 {
 689     _tlocal_table_t *tls = OBJ_NEW(_tlocal_table_t);
 690 
 691     if (tls == NULL) {
 692         // return OPAL_ERR_OUT_OF_RESOURCE
 693         return NULL;
 694     }
 695 
 696     tls->ctx_tbl = NULL;
 697     tls->ctx_tbl_size = 0;
 698     tls->mem_tbl = NULL;
 699     tls->mem_tbl_size = 0;
 700 
 701     /* Add this TLS to the global wpool structure for future
 702      * cleanup purposes */
 703     tls->wpool = wpool;
 704     opal_mutex_lock(&wpool->mutex);
 705     opal_list_append(&wpool->tls_list, &tls->super);
 706     opal_mutex_unlock(&wpool->mutex);
 707 
 708     if(_tlocal_tls_ctxtbl_extend(tls, 4)){
 709         MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool context table");
 710         return NULL;
 711     }
 712     if(_tlocal_tls_memtbl_extend(tls, 4)) {
 713         MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool memory table");
 714         return NULL;
 715     }
 716 
 717     opal_tsd_setspecific(wpool->tls_key, tls);
 718 
 719     return tls;
 720 }
 721 
 722 static inline _tlocal_table_t *
 723 _tlocal_get_tls(opal_common_ucx_wpool_t *wpool){
 724     _tlocal_table_t *tls;
 725     int rc = opal_tsd_getspecific(wpool->tls_key, (void**)&tls);
 726 
 727     if (OPAL_SUCCESS != rc) {
 728         return NULL;
 729     }
 730 
 731     if (OPAL_UNLIKELY(NULL == tls)) {
 732         tls = _common_ucx_tls_init(wpool);
 733     }
 734     return tls;
 735 }
 736 
 737 static void _tlocal_cleanup(void *arg)
 738 {
 739     _tlocal_table_t *item = NULL, *next;
 740     _tlocal_table_t *tls = (_tlocal_table_t *)arg;
 741     opal_common_ucx_wpool_t *wpool = NULL;
 742 
 743     if (NULL == tls) {
 744         return;
 745     }
 746     wpool = tls->wpool;
 747 
 748     /* 1. Remove us from tls_list */
 749     tls->wpool = wpool;
 750     opal_mutex_lock(&wpool->mutex);
 751     OPAL_LIST_FOREACH_SAFE(item, next, &wpool->tls_list, _tlocal_table_t) {
 752         if (item == tls) {
 753             opal_list_remove_item(&wpool->tls_list, &item->super);
 754             break;
 755         }
 756     }
 757     opal_mutex_unlock(&wpool->mutex);
 758     _common_ucx_tls_cleanup(tls);
 759 }
 760 
 761 // TODO: don't want to inline this function
 762 static void _common_ucx_tls_cleanup(_tlocal_table_t *tls)
 763 {
 764     size_t i, size;
 765 
 766     // Cleanup memory table
 767     size = tls->mem_tbl_size;
 768     for (i = 0; i < size; i++) {
 769         if (NULL != tls->mem_tbl[i]->gmem){
 770             _tlocal_mem_record_cleanup(tls->mem_tbl[i]);
 771         }
 772 
 773         free(tls->mem_tbl[i]);
 774     }
 775 
 776     // Cleanup ctx table
 777     size = tls->ctx_tbl_size;
 778     for (i = 0; i < size; i++) {
 779         if (NULL != tls->ctx_tbl[i]->gctx){
 780             assert(tls->ctx_tbl[i]->refcnt == 0);
 781             _tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
 782         }
 783         free(tls->ctx_tbl[i]);
 784     }
 785 
 786     opal_tsd_setspecific(tls->wpool->tls_key, NULL);
 787 
 788     OBJ_RELEASE(tls);
 789     return;
 790 }
 791 
 792 static int
 793 _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append)
 794 {
 795     size_t i;
 796     size_t newsize = (tbl->ctx_tbl_size + append);
 797     tbl->ctx_tbl = realloc(tbl->ctx_tbl, newsize * sizeof(*tbl->ctx_tbl));
 798     for (i = tbl->ctx_tbl_size; i < newsize; i++) {
 799         tbl->ctx_tbl[i] = calloc(1, sizeof(*tbl->ctx_tbl[i]));
 800         if (NULL == tbl->ctx_tbl[i]) {
 801             return OPAL_ERR_OUT_OF_RESOURCE;
 802         }
 803 
 804     }
 805     tbl->ctx_tbl_size = newsize;
 806     return OPAL_SUCCESS;
 807 }
 808 
 809 static int
 810 _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append)
 811 {
 812     size_t i;
 813     size_t newsize = (tbl->mem_tbl_size + append);
 814 
 815     tbl->mem_tbl = realloc(tbl->mem_tbl, newsize * sizeof(*tbl->mem_tbl));
 816     for (i = tbl->mem_tbl_size; i < tbl->mem_tbl_size + append; i++) {
 817         tbl->mem_tbl[i] = calloc(1, sizeof(*tbl->mem_tbl[i]));
 818         if (NULL == tbl->mem_tbl[i]) {
 819             return OPAL_ERR_OUT_OF_RESOURCE;
 820         }
 821     }
 822     tbl->mem_tbl_size = newsize;
 823     return OPAL_SUCCESS;
 824 }
 825 
 826 
 827 static inline _tlocal_ctx_t *
 828 _tlocal_ctx_search(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
 829 {
 830     size_t i;
 831     for(i=0; i<tls->ctx_tbl_size; i++) {
 832         if (tls->ctx_tbl[i]->gctx == ctx){
 833             return tls->ctx_tbl[i];
 834         }
 835     }
 836     return NULL;
 837 }
 838 
 839 static int
 840 _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec)
 841 {
 842     if (NULL == ctx_rec->gctx) {
 843         return OPAL_SUCCESS;
 844     }
 845 
 846     if (ctx_rec->refcnt > 0) {
 847         return OPAL_SUCCESS;
 848     }
 849 
 850     /* Remove myself from the communication context structure
 851      * This may result in context release as we are using
 852      * delayed cleanup */
 853     _common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec->winfo);
 854 
 855     /* Erase the record so it can be reused */
 856     memset(ctx_rec, 0, sizeof(*ctx_rec));
 857 
 858     return OPAL_SUCCESS;
 859 }
 860 
 861 // TODO: Don't want to inline this (slow path)
 862 static _tlocal_ctx_t *
 863 _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
 864 {
 865     size_t i, free_idx = -1;
 866     int rc, found = 0;
 867 
 868     /* Try to find available record in the TLS table
 869      * In parallel perform deferred cleanups */
 870     for (i=0; i<tls->ctx_tbl_size; i++) {
 871         if (NULL != tls->ctx_tbl[i]->gctx && tls->ctx_tbl[i]->refcnt == 0) {
 872             if (tls->ctx_tbl[i]->gctx->released ) {
 873                 /* Found dirty record, need to clean first */
 874                 _tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
 875             }
 876         }
 877         if ((NULL == tls->ctx_tbl[i]->gctx) && !found) {
 878             /* Found clean record */
 879             free_idx = i;
 880             found = 1;
 881         }
 882     }
 883 
 884     /* if needed - extend the table */
 885     if (!found) {
 886         free_idx = tls->ctx_tbl_size;
 887         rc = _tlocal_tls_ctxtbl_extend(tls, 4);
 888         if (rc) {
 889             //TODO: error out
 890             return NULL;
 891         }
 892     }
 893 
 894     tls->ctx_tbl[free_idx]->gctx = ctx;
 895     tls->ctx_tbl[free_idx]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size);
 896     if (NULL == tls->ctx_tbl[free_idx]->winfo) {
 897         MCA_COMMON_UCX_ERROR("Failed to allocate new worker");
 898         return NULL;
 899     }
 900 
 901     /* Make sure that we completed all the data structures before
 902      * placing the item to the list
 903      * NOTE: essentially we don't need this as list append is an
 904      * operation protected by mutex
 905      */
 906     opal_atomic_wmb();
 907 
 908     /* Add this worker to the active worker list */
 909     _wpool_add_active(tls->wpool, tls->ctx_tbl[free_idx]->winfo);
 910 
 911     /* add this worker into the context list */
 912     rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[free_idx]->winfo);
 913     if (rc) {
 914         //TODO: error out
 915         return NULL;
 916     }
 917 
 918     /* All good - return the record */
 919     return tls->ctx_tbl[free_idx];
 920 }
 921 
 922 static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target)
 923 {
 924     ucp_ep_params_t ep_params;
 925     opal_common_ucx_winfo_t *winfo = ctx_rec->winfo;
 926     opal_common_ucx_ctx_t *gctx = ctx_rec->gctx;
 927     ucs_status_t status;
 928     int displ;
 929 
 930     memset(&ep_params, 0, sizeof(ucp_ep_params_t));
 931     ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
 932 
 933     opal_mutex_lock(&winfo->mutex);
 934     displ = gctx->recv_worker_displs[target];
 935     ep_params.address = (ucp_address_t *)&(gctx->recv_worker_addrs[displ]);
 936     status = ucp_ep_create(winfo->worker, &ep_params, &winfo->endpoints[target]);
 937     if (status != UCS_OK) {
 938         opal_mutex_unlock(&winfo->mutex);
 939         MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_create failed: %d", status);
 940         return OPAL_ERROR;
 941     }
 942     opal_mutex_unlock(&winfo->mutex);
 943     return OPAL_SUCCESS;
 944 }
 945 
 946 /* TLS memory management */
 947 
 948 static inline _tlocal_mem_t *
 949 _tlocal_search_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *gmem)
 950 {
 951     size_t i;
 952     for(i=0; i<tls->mem_tbl_size; i++) {
 953         if( tls->mem_tbl[i]->gmem == gmem){
 954             return tls->mem_tbl[i];
 955         }
 956     }
 957     return NULL;
 958 }
 959 
 960 static void
 961 _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
 962 {
 963     size_t i;
 964 
 965     for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) {
 966         if (mem_rec->mem->rkeys[i]) {
 967             ucp_rkey_destroy(mem_rec->mem->rkeys[i]);
 968         }
 969     }
 970     free(mem_rec->mem->rkeys);
 971 
 972     /* Remove myself from the memory context structure
 973      * This may result in context release as we are using
 974      * delayed cleanup */
 975     _common_ucx_mem_signout(mem_rec->gmem);
 976 
 977     /* Release fast-path pointers */
 978     if (NULL != mem_rec->mem_tls_ptr) {
 979         free(mem_rec->mem_tls_ptr);
 980     }
 981 
 982     assert(mem_rec->ctx_rec != NULL);
 983     OPAL_ATOMIC_ADD_FETCH32(&mem_rec->ctx_rec->refcnt, -1);
 984     assert(mem_rec->ctx_rec->refcnt >= 0);
 985 
 986     free(mem_rec->mem);
 987 
 988     memset(mem_rec, 0, sizeof(*mem_rec));
 989 }
 990 
 991 static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
 992                                        opal_common_ucx_wpmem_t *mem)
 993 {
 994     size_t i, free_idx = -1;
 995     _tlocal_ctx_t *ctx_rec = NULL;
 996     int rc = OPAL_SUCCESS, found = 0;
 997 
 998     /* Try to find available spot in the table */
 999     for (i=0; i<tls->mem_tbl_size; i++) {
1000         if (NULL != tls->mem_tbl[i]->gmem) {
1001             if (tls->mem_tbl[i]->gmem->released) {
1002                 /* Found a dirty record. Need to clean it first */
1003                 _tlocal_mem_record_cleanup(tls->mem_tbl[i]);
1004             }
1005         }
1006         if ((NULL == tls->mem_tbl[i]->gmem) && !found) {
1007             /* Found a clear record */
1008             free_idx = i;
1009             found = 1;
1010         }
1011     }
1012 
1013     if (!found){
1014         free_idx = tls->mem_tbl_size;
1015         rc = _tlocal_tls_memtbl_extend(tls, 4);
1016         if (rc != OPAL_SUCCESS) {
1017             //TODO: error out
1018             return NULL;
1019         }
1020     }
1021 
1022     tls->mem_tbl[free_idx]->gmem = mem;
1023     tls->mem_tbl[free_idx]->mem = calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem));
1024 
1025     ctx_rec = _tlocal_ctx_search(tls, mem->ctx);
1026     if (NULL == ctx_rec) {
1027         // TODO: act accordingly - cleanup
1028         return NULL;
1029     }
1030 
1031     tls->mem_tbl[free_idx]->ctx_rec = ctx_rec;
1032     OPAL_ATOMIC_ADD_FETCH32(&ctx_rec->refcnt, 1);
1033 
1034     tls->mem_tbl[free_idx]->mem->worker = ctx_rec->winfo;
1035     tls->mem_tbl[free_idx]->mem->rkeys = calloc(mem->ctx->comm_size,
1036                                          sizeof(*tls->mem_tbl[free_idx]->mem->rkeys));
1037 
1038     tls->mem_tbl[free_idx]->mem_tls_ptr =
1039             calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem_tls_ptr));
1040     tls->mem_tbl[free_idx]->mem_tls_ptr->winfo = ctx_rec->winfo;
1041     tls->mem_tbl[free_idx]->mem_tls_ptr->rkeys = tls->mem_tbl[free_idx]->mem->rkeys;
1042     opal_tsd_setspecific(mem->mem_tls_key, tls->mem_tbl[free_idx]->mem_tls_ptr);
1043 
1044     /* Make sure that we completed all the data structures before
1045      * placing the item to the list
1046      * NOTE: essentially we don't need this as list append is an
1047      * operation protected by mutex
1048      */
1049     opal_atomic_wmb();
1050 
1051     rc = _common_ucx_wpmem_signup(mem);
1052     if (rc) {
1053         // TODO: error handling
1054         return NULL;
1055     }
1056 
1057     return tls->mem_tbl[free_idx];
1058 }
1059 
1060 static int
1061 _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
1062 {
1063     _mem_info_t *minfo = mem_rec->mem;
1064     opal_common_ucx_wpmem_t *gmem = mem_rec->gmem;
1065     int displ = gmem->mem_displs[target];
1066     ucs_status_t status;
1067 
1068     status = ucp_ep_rkey_unpack(ep, &gmem->mem_addrs[displ],
1069                                 &minfo->rkeys[target]);
1070     if (status != UCS_OK) {
1071         MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
1072         return OPAL_ERROR;
1073     }
1074 
1075     return OPAL_SUCCESS;
1076 }
1077 
1078 /* Get the TLS in case of slow path (not everything has been yet initialized */
1079 OPAL_DECLSPEC int
1080 opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target)
1081 {
1082     _tlocal_table_t *tls = NULL;
1083     _tlocal_ctx_t *ctx_rec = NULL;
1084     opal_common_ucx_winfo_t *winfo = NULL;
1085     _tlocal_mem_t *mem_rec = NULL;
1086     _mem_info_t *mem_info = NULL;
1087     ucp_ep_h ep;
1088     int rc = OPAL_SUCCESS;
1089 
1090     tls = _tlocal_get_tls(mem->ctx->wpool);
1091 
1092     /* Obtain the worker structure */
1093     ctx_rec = _tlocal_ctx_search(tls, mem->ctx);
1094 
1095     if (OPAL_UNLIKELY(NULL == ctx_rec)) {
1096         ctx_rec = _tlocal_add_ctx(tls, mem->ctx);
1097         if (NULL == ctx_rec) {
1098             return OPAL_ERR_OUT_OF_RESOURCE;
1099         }
1100     }
1101     winfo = ctx_rec->winfo;
1102 
1103     /* Obtain the endpoint */
1104     if (OPAL_UNLIKELY(NULL == winfo->endpoints[target])) {
1105         rc = _tlocal_ctx_connect(ctx_rec, target);
1106         if (rc != OPAL_SUCCESS) {
1107             return rc;
1108         }
1109     }
1110     ep = winfo->endpoints[target];
1111 
1112     /* Obtain the memory region info */
1113     mem_rec = _tlocal_search_mem(tls, mem);
1114     if (OPAL_UNLIKELY(mem_rec == NULL)) {
1115         mem_rec = _tlocal_add_mem(tls, mem);
1116         if (NULL == mem_rec) {
1117             return OPAL_ERR_OUT_OF_RESOURCE;
1118         }
1119     }
1120     mem_info = mem_rec->mem;
1121 
1122     /* Obtain the rkey */
1123     if (OPAL_UNLIKELY(NULL == mem_info->rkeys[target])) {
1124         /* Create the rkey */
1125         rc = _tlocal_mem_create_rkey(mem_rec, ep, target);
1126         if (rc) {
1127             return rc;
1128         }
1129     }
1130 
1131     return OPAL_SUCCESS;
1132 }
1133 
1134 OPAL_DECLSPEC int
1135 opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,
1136                             opal_common_ucx_flush_type_t type,
1137                             opal_common_ucx_flush_scope_t scope,
1138                             ucs_status_ptr_t *req_ptr)
1139 {
1140     ucs_status_ptr_t req;
1141     ucs_status_t status = UCS_OK;
1142     int rc = OPAL_SUCCESS;
1143 
1144 #if HAVE_DECL_UCP_EP_FLUSH_NB
1145     if (scope == OPAL_COMMON_UCX_SCOPE_EP) {
1146         req = ucp_ep_flush_nb(winfo->endpoints[target], 0, opal_common_ucx_empty_complete_cb);
1147     } else {
1148         req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_empty_complete_cb);
1149     }
1150     if (UCS_PTR_IS_PTR(req)) {
1151         ((opal_common_ucx_request_t *)req)->winfo = winfo;
1152     }
1153 
1154     if(OPAL_COMMON_UCX_FLUSH_B) {
1155         rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb");
1156     } else {
1157         *req_ptr = req;
1158     }
1159     return rc;
1160 #endif
1161     switch (type) {
1162     case OPAL_COMMON_UCX_FLUSH_NB_PREFERRED:
1163     case OPAL_COMMON_UCX_FLUSH_B:
1164         if (scope == OPAL_COMMON_UCX_SCOPE_EP) {
1165             status = ucp_ep_flush(winfo->endpoints[target]);
1166         } else {
1167             status = ucp_worker_flush(winfo->worker);
1168         }
1169         rc = (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
1170     case OPAL_COMMON_UCX_FLUSH_NB:
1171     default:
1172         rc = OPAL_ERROR;
1173     }
1174     return rc;
1175 }
1176 
1177 
1178 OPAL_DECLSPEC int
1179 opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
1180                           opal_common_ucx_flush_scope_t scope,
1181                           int target)
1182 {
1183     _ctx_record_list_item_t *item;
1184     opal_common_ucx_ctx_t *ctx = mem->ctx;
1185     int rc = OPAL_SUCCESS;
1186 
1187     opal_mutex_lock(&ctx->mutex);
1188 
1189     OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) {
1190         if ((scope == OPAL_COMMON_UCX_SCOPE_EP) &&
1191                 (NULL == item->ptr->endpoints[target])) {
1192             continue;
1193         }
1194         opal_mutex_lock(&item->ptr->mutex);
1195         rc = opal_common_ucx_winfo_flush(item->ptr, target, OPAL_COMMON_UCX_FLUSH_B,
1196                                          scope, NULL);
1197         switch (scope) {
1198         case OPAL_COMMON_UCX_SCOPE_WORKER:
1199             item->ptr->global_inflight_ops = 0;
1200             memset(item->ptr->inflight_ops, 0, item->ptr->comm_size * sizeof(short));
1201             break;
1202         case OPAL_COMMON_UCX_SCOPE_EP:
1203             item->ptr->global_inflight_ops -= item->ptr->inflight_ops[target];
1204             item->ptr->inflight_ops[target] = 0;
1205             break;
1206         }
1207         opal_mutex_unlock(&item->ptr->mutex);
1208 
1209         if (rc != OPAL_SUCCESS) {
1210             MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d",
1211                                  rc);
1212             rc = OPAL_ERROR;
1213         }
1214     }
1215     opal_mutex_unlock(&ctx->mutex);
1216 
1217     return rc;
1218 }
1219 
1220 OPAL_DECLSPEC int
1221 opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem) {
1222     /* TODO */
1223     return OPAL_SUCCESS;
1224 }
1225 
1226 OPAL_DECLSPEC void
1227 opal_common_ucx_req_init(void *request) {
1228     opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
1229     req->ext_req = NULL;
1230     req->ext_cb = NULL;
1231     req->winfo = NULL;
1232 }
1233 
1234 OPAL_DECLSPEC void
1235 opal_common_ucx_req_completion(void *request, ucs_status_t status) {
1236     opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
1237     if (req->ext_cb != NULL) {
1238         (*req->ext_cb)(req->ext_req);
1239     }
1240     ucp_request_release(req);
1241 }

/* [<][>][^][v][top][bottom][index][help] */