root/opal/mca/common/ucx/common_ucx_wpool.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. opal_common_ucx_tlocal_fetch
  2. opal_common_ucx_wait_request_mt
  3. _periodical_flush_nb
  4. opal_common_ucx_wpmem_putget
  5. opal_common_ucx_wpmem_cmpswp
  6. opal_common_ucx_wpmem_post
  7. opal_common_ucx_wpmem_fetch
  8. opal_common_ucx_wpmem_fetch_nb

   1 #ifndef COMMON_UCX_WPOOL_H
   2 #define COMMON_UCX_WPOOL_H
   3 
   4 
   5 #include "opal_config.h"
   6 
   7 #include "common_ucx.h"
   8 #include <stdint.h>
   9 #include <string.h>
  10 
  11 #include <ucp/api/ucp.h>
  12 
  13 #include "opal/mca/mca.h"
  14 #include "opal/util/output.h"
  15 #include "opal/runtime/opal_progress.h"
  16 #include "opal/include/opal/constants.h"
  17 #include "opal/class/opal_list.h"
  18 #include "opal/threads/tsd.h"
  19 
  20 BEGIN_C_DECLS
  21 
  22 /* Worker pool is a global object that that is allocated per component or can be
  23  * shared between multiple compatible components.
  24  * The lifetime of this object is normally equal to the lifetime of a component[s].
  25  * It is expected to be initialized in MPI_Init and finalized in MPI_Finalize.
  26  */
  27 typedef struct {
  28     /* Ref counting & locking*/
  29     int refcnt;
  30     opal_recursive_mutex_t mutex;
  31 
  32     /* UCX data */
  33     ucp_context_h ucp_ctx;
  34     ucp_worker_h dflt_worker;
  35     ucp_address_t *recv_waddr;
  36     size_t recv_waddr_len;
  37 
  38     /* Thread-local key to allow each thread to have
  39      * local information assisiated with this wpool */
  40     opal_tsd_key_t tls_key;
  41 
  42     /* Bookkeeping information */
  43     opal_list_t idle_workers;
  44     opal_list_t active_workers;
  45 
  46     opal_list_t tls_list;
  47 } opal_common_ucx_wpool_t;
  48 
  49 /* Worker Pool Context (wpctx) is an object that is comprised of a set of UCP
  50  * workers that are considered as one logical communication entity.
  51  * One UCP worker per "active" thread is used.
  52  * Thread is considered "active" if it performs communication operations on this
  53  * Wpool context.
  54  * A lifetime of this object is dynamic and determined by the application
  55  * (the object is created and destroyed with corresponding functions).
  56  * Context is bound to a particular Worker Pool object.
  57  */
  58 typedef struct {
  59     opal_recursive_mutex_t mutex;
  60     opal_atomic_int32_t refcntr;
  61 
  62     /* the reference to a Worker pool this context belongs to*/
  63     opal_common_ucx_wpool_t *wpool;
  64     /* A list of references to TLS context records
  65      * we need to keep track of them to have an ability to
  66      * let thread know that this context is no longer valid */
  67     opal_list_t tls_workers;
  68     volatile int released;
  69 
  70     /* UCX addressing information */
  71     char *recv_worker_addrs;
  72     int *recv_worker_displs;
  73     size_t comm_size;
  74 } opal_common_ucx_ctx_t;
  75 
  76 /* Worker Pool memory (wpmem) is an object that represents a remotely accessible
  77  * distributed memory.
  78  * It has dynamic lifetime (created and destroyed by corresponding functions).
  79  * It depends on particular Wpool context.
  80  * Currently OSC is using one context per MPI Window, though in future it will
  81  * be possible to have one context for multiple windows.
  82  */
  83 typedef struct {
  84     /* reference context to which memory region belongs */
  85     opal_common_ucx_ctx_t *ctx;
  86 
  87     /* object lifetime control */
  88     volatile int released;
  89     opal_atomic_int32_t refcntr;
  90 
  91     /* UCX memory handler */
  92     ucp_mem_h memh;
  93     char *mem_addrs;
  94     int *mem_displs;
  95 
  96     /* TLS item that allows each thread to
  97      * store endpoints and rkey arrays
  98      * for faster access */
  99     opal_tsd_key_t mem_tls_key;
 100 } opal_common_ucx_wpmem_t;
 101 
 102 /* The structure that wraps UCP worker and holds the state that is required
 103  * for its use.
 104  * The structure is allocated along with UCP worker on demand and is being held
 105  * in the Worker Pool lists (either active or idle).
 106  * One wpmem is intended per shared memory segment (i.e. MPI Window).
 107  */
 108 typedef struct opal_common_ucx_winfo {
 109     opal_recursive_mutex_t mutex;
 110     volatile int released;
 111     ucp_worker_h worker;
 112     ucp_ep_h *endpoints;
 113     size_t comm_size;
 114     short *inflight_ops;
 115     short global_inflight_ops;
 116     ucs_status_ptr_t inflight_req;
 117 } opal_common_ucx_winfo_t;
 118 
 119 typedef struct {
 120     opal_common_ucx_winfo_t *winfo;
 121     ucp_rkey_h *rkeys;
 122 } opal_common_ucx_tlocal_fast_ptrs_t;
 123 
 124 typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
 125 
 126 /* A fast-path structure that gathers all pointers that are required to
 127  * perform RMA operation
 128  * wpmem's mem_tls_key holds the pointer to this structure
 129  */
 130 typedef struct {
 131     void *ext_req;
 132     opal_common_ucx_user_req_handler_t ext_cb;
 133     opal_common_ucx_winfo_t *winfo;
 134 } opal_common_ucx_request_t;
 135 
 136 typedef enum {
 137     OPAL_COMMON_UCX_PUT,
 138     OPAL_COMMON_UCX_GET
 139 } opal_common_ucx_op_t;
 140 
 141 typedef enum {
 142     OPAL_COMMON_UCX_SCOPE_EP,
 143     OPAL_COMMON_UCX_SCOPE_WORKER
 144 } opal_common_ucx_flush_scope_t;
 145 
 146 typedef enum {
 147     OPAL_COMMON_UCX_FLUSH_NB,
 148     OPAL_COMMON_UCX_FLUSH_B,
 149     OPAL_COMMON_UCX_FLUSH_NB_PREFERRED
 150 } opal_common_ucx_flush_type_t;
 151 
 152 typedef enum {
 153     OPAL_COMMON_UCX_MEM_ALLOCATE_MAP,
 154     OPAL_COMMON_UCX_MEM_MAP
 155 } opal_common_ucx_mem_type_t;
 156 
 157 typedef int (*opal_common_ucx_exchange_func_t)(void *my_info, size_t my_info_len,
 158                                                char **recv_info, int **disps,
 159                                                void *metadata);
 160 
 161 
 162 /* Manage Worker Pool (wpool) */
 163 OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
 164 OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
 165 OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
 166                                              int proc_world_size, bool enable_mt);
 167 OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool);
 168 OPAL_DECLSPEC void opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool);
 169 
 170 /* Manage Communication context */
 171 OPAL_DECLSPEC int opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
 172                                              opal_common_ucx_exchange_func_t exchange_func,
 173                                              void *exchange_metadata,
 174                                              opal_common_ucx_ctx_t **ctx_ptr);
 175 OPAL_DECLSPEC void opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx);
 176 
 177 /* request init / completion */
 178 OPAL_DECLSPEC void opal_common_ucx_req_init(void *request);
 179 OPAL_DECLSPEC void opal_common_ucx_req_completion(void *request, ucs_status_t status);
 180 
 181 /* Managing thread local storage */
 182 OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target);
 183 static inline int
 184 opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target,
 185                                 ucp_ep_h *_ep, ucp_rkey_h *_rkey,
 186                                 opal_common_ucx_winfo_t **_winfo)
 187 {
 188     opal_common_ucx_tlocal_fast_ptrs_t *fp = NULL;
 189     int expr;
 190     int rc = OPAL_SUCCESS;
 191 
 192     /* First check the fast-path */
 193     rc = opal_tsd_getspecific(mem->mem_tls_key, (void**)&fp);
 194     if (OPAL_SUCCESS != rc) {
 195         return rc;
 196     }
 197     expr = fp && (NULL != fp->winfo) && (fp->winfo->endpoints[target]) &&
 198             (NULL != fp->rkeys[target]);
 199     if (OPAL_UNLIKELY(!expr)) {
 200         rc = opal_common_ucx_tlocal_fetch_spath(mem, target);
 201         if (OPAL_SUCCESS != rc) {
 202             return rc;
 203         }
 204         rc = opal_tsd_getspecific(mem->mem_tls_key, (void**)&fp);
 205         if (OPAL_SUCCESS != rc) {
 206             return rc;
 207         }
 208     }
 209     MCA_COMMON_UCX_ASSERT(fp && (NULL != fp->winfo) &&
 210                           (fp->winfo->endpoints[target])
 211                           && (NULL != fp->rkeys[target]));
 212 
 213     *_rkey = fp->rkeys[target];
 214     *_winfo = fp->winfo;
 215     *_ep = fp->winfo->endpoints[target];
 216     return OPAL_SUCCESS;
 217 }
 218 
 219 /* Manage & operations on the Memory registrations */
 220 OPAL_DECLSPEC int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
 221                                void **mem_base, size_t mem_size,
 222                                opal_common_ucx_mem_type_t mem_type,
 223                                opal_common_ucx_exchange_func_t exchange_func,
 224                                void *exchange_metadata,
 225                                                char **my_mem_addr,
 226                                                int *my_mem_addr_size,
 227                                opal_common_ucx_wpmem_t **mem_ptr);
 228 OPAL_DECLSPEC int opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
 229 
 230 OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
 231                                             opal_common_ucx_flush_scope_t scope,
 232                                             int target);
 233 OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);
 234 
 235 OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,
 236                                               opal_common_ucx_flush_type_t type,
 237                                               opal_common_ucx_flush_scope_t scope,
 238                                               ucs_status_ptr_t *req_ptr);
 239 
 240 static inline
 241 int opal_common_ucx_wait_request_mt(ucs_status_ptr_t request, const char *msg)
 242 {
 243     ucs_status_t status;
 244     int ctr = 0, ret = 0;
 245     opal_common_ucx_winfo_t *winfo;
 246 
 247     /* check for request completed or failed */
 248     if (OPAL_LIKELY(UCS_OK == request)) {
 249         return OPAL_SUCCESS;
 250     } else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
 251         MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __func__,
 252                                UCS_PTR_STATUS(request),
 253                                ucs_status_string(UCS_PTR_STATUS(request)));
 254         return OPAL_ERROR;
 255     }
 256 
 257     winfo = ((opal_common_ucx_request_t *)request)->winfo;
 258     assert(winfo != NULL);
 259 
 260     do {
 261         ctr = opal_common_ucx.progress_iterations;
 262         opal_mutex_lock(&winfo->mutex);
 263         do {
 264             ret = ucp_worker_progress(winfo->worker);
 265             status = opal_common_ucx_request_status(request);
 266             if (status != UCS_INPROGRESS) {
 267                 ucp_request_free(request);
 268                 if (OPAL_UNLIKELY(UCS_OK != status)) {
 269                     MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s",
 270                                            msg ? msg : __func__,
 271                                            UCS_PTR_STATUS(request),
 272                                            ucs_status_string(UCS_PTR_STATUS(request)));
 273                    opal_mutex_unlock(&winfo->mutex);
 274                    return OPAL_ERROR;
 275                 }
 276                 break;
 277             }
 278             ctr--;
 279         } while (ctr > 0 && ret > 0 && status == UCS_INPROGRESS);
 280         opal_mutex_unlock(&winfo->mutex);
 281         opal_progress();
 282     } while (status == UCS_INPROGRESS);
 283 
 284     return OPAL_SUCCESS;
 285 }
 286 
 287 static inline int _periodical_flush_nb(opal_common_ucx_wpmem_t *mem,
 288                                        opal_common_ucx_winfo_t *winfo,
 289                                        int target) {
 290     int rc = OPAL_SUCCESS;
 291 
 292     winfo->inflight_ops[target]++;
 293     winfo->global_inflight_ops++;
 294 
 295     if (OPAL_UNLIKELY(winfo->inflight_ops[target] >= MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD) ||
 296         OPAL_UNLIKELY(winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD)) {
 297         opal_common_ucx_flush_scope_t scope;
 298 
 299         if (winfo->inflight_req != UCS_OK) {
 300             rc = opal_common_ucx_wait_request_mt(winfo->inflight_req,
 301                                                  "opal_common_ucx_flush_nb");
 302             if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 303                 MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_wait_request failed: %d", rc);
 304                 return rc;
 305             }
 306             winfo->inflight_req = UCS_OK;
 307         }
 308 
 309         if (winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD) {
 310             scope = OPAL_COMMON_UCX_SCOPE_WORKER;
 311             winfo->global_inflight_ops = 0;
 312             memset(winfo->inflight_ops, 0, winfo->comm_size * sizeof(short));
 313         } else {
 314             scope = OPAL_COMMON_UCX_SCOPE_EP;
 315             winfo->global_inflight_ops -= winfo->inflight_ops[target];
 316             winfo->inflight_ops[target] = 0;
 317         }
 318 
 319         rc = opal_common_ucx_winfo_flush(winfo, target, OPAL_COMMON_UCX_FLUSH_NB_PREFERRED,
 320                                          scope, &winfo->inflight_req);
 321         if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 322             MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_flush failed: %d", rc);
 323             return rc;
 324         }
 325     } else if (OPAL_UNLIKELY(winfo->inflight_req != UCS_OK)) {
 326         int ret;
 327         do {
 328             ret = ucp_worker_progress(winfo->worker);
 329         } while (ret);
 330     }
 331     return rc;
 332 }
 333 
 334 static inline int
 335 opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t op,
 336                            int target, void *buffer, size_t len,
 337                            uint64_t rem_addr)
 338 {
 339     ucp_ep_h ep;
 340     ucp_rkey_h rkey;
 341     ucs_status_t status;
 342     opal_common_ucx_winfo_t *winfo;
 343     int rc = OPAL_SUCCESS;
 344     char *called_func = "";
 345 
 346     rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
 347     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 348         MCA_COMMON_UCX_VERBOSE(1, "tlocal_fetch failed: %d", rc);
 349         return rc;
 350     }
 351 
 352     /* Perform the operation */
 353     opal_mutex_lock(&winfo->mutex);
 354     switch(op){
 355     case OPAL_COMMON_UCX_PUT:
 356         status = ucp_put_nbi(ep, buffer,len, rem_addr, rkey);
 357         called_func = "ucp_put_nbi";
 358         break;
 359     case OPAL_COMMON_UCX_GET:
 360         status = ucp_get_nbi(ep, buffer,len, rem_addr, rkey);
 361         called_func = "ucp_get_nbi";
 362         break;
 363     }
 364 
 365     if (OPAL_UNLIKELY(status != UCS_OK && status != UCS_INPROGRESS)) {
 366         MCA_COMMON_UCX_ERROR("%s failed: %d", called_func, status);
 367         rc = OPAL_ERROR;
 368     }
 369 
 370     rc = _periodical_flush_nb(mem, winfo, target);
 371     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 372         MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
 373         return rc;
 374     }
 375 
 376     opal_mutex_unlock(&winfo->mutex);
 377 
 378     return rc;
 379 }
 380 
 381 
 382 static inline int
 383 opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
 384                            uint64_t value, int target, void *buffer, size_t len,
 385                            uint64_t rem_addr)
 386 {
 387     ucp_ep_h ep;
 388     ucp_rkey_h rkey;
 389     opal_common_ucx_winfo_t *winfo = NULL;
 390     ucs_status_t status;
 391     int rc = OPAL_SUCCESS;
 392 
 393     rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
 394     if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
 395         MCA_COMMON_UCX_ERROR("opal_common_ucx_tlocal_fetch failed: %d", rc);
 396         return rc;
 397     }
 398 
 399     /* Perform the operation */
 400     opal_mutex_lock(&winfo->mutex);
 401     status = opal_common_ucx_atomic_cswap(ep, compare, value,
 402                                           buffer, len,
 403                                           rem_addr, rkey,
 404                                           winfo->worker);
 405     if (OPAL_UNLIKELY(status != UCS_OK)) {
 406         MCA_COMMON_UCX_ERROR("opal_common_ucx_atomic_cswap failed: %d", status);
 407         rc = OPAL_ERROR;
 408     }
 409 
 410     rc = _periodical_flush_nb(mem, winfo, target);
 411     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 412         MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
 413         return rc;
 414     }
 415 
 416     opal_mutex_unlock(&winfo->mutex);
 417 
 418     return rc;
 419 }
 420 
 421 static inline int
 422 opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode,
 423                          uint64_t value, int target, size_t len, uint64_t rem_addr)
 424 {
 425     ucp_ep_h ep;
 426     ucp_rkey_h rkey;
 427     opal_common_ucx_winfo_t *winfo = NULL;
 428     ucs_status_t status;
 429     int rc = OPAL_SUCCESS;
 430 
 431 
 432     rc =opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
 433     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 434         MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
 435         return rc;
 436     }
 437 
 438     /* Perform the operation */
 439     opal_mutex_lock(&winfo->mutex);
 440     status = ucp_atomic_post(ep, opcode, value,
 441                              len, rem_addr, rkey);
 442     if (OPAL_UNLIKELY(status != UCS_OK)) {
 443         MCA_COMMON_UCX_ERROR("ucp_atomic_post failed: %d", status);
 444         rc = OPAL_ERROR;
 445     }
 446 
 447     rc = _periodical_flush_nb(mem, winfo, target);
 448     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 449         MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
 450         return rc;
 451     }
 452 
 453     opal_mutex_unlock(&winfo->mutex);
 454     return rc;
 455 }
 456 
 457 static inline int
 458 opal_common_ucx_wpmem_fetch(opal_common_ucx_wpmem_t *mem,
 459                             ucp_atomic_fetch_op_t opcode, uint64_t value,
 460                             int target, void *buffer, size_t len,
 461                             uint64_t rem_addr)
 462 {
 463     ucp_ep_h ep = NULL;
 464     ucp_rkey_h rkey = NULL;
 465     opal_common_ucx_winfo_t *winfo = NULL;
 466     ucs_status_t status;
 467     int rc = OPAL_SUCCESS;
 468 
 469     rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
 470     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 471         MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
 472         return rc;
 473     }
 474 
 475     /* Perform the operation */
 476     opal_mutex_lock(&winfo->mutex);
 477     status = opal_common_ucx_atomic_fetch(ep, opcode, value,
 478                                           buffer, len,
 479                                           rem_addr, rkey,
 480                                           winfo->worker);
 481     if (OPAL_UNLIKELY(status != UCS_OK)) {
 482         MCA_COMMON_UCX_ERROR("ucp_atomic_cswap64 failed: %d", status);
 483         rc = OPAL_ERROR;
 484     }
 485 
 486     rc = _periodical_flush_nb(mem, winfo, target);
 487     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 488         MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
 489         return rc;
 490     }
 491 
 492     opal_mutex_unlock(&winfo->mutex);
 493 
 494     return rc;
 495 }
 496 
 497 static inline int
 498 opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
 499                                ucp_atomic_fetch_op_t opcode,
 500                                uint64_t value,
 501                                int target, void *buffer, size_t len,
 502                                uint64_t rem_addr,
 503                                opal_common_ucx_user_req_handler_t user_req_cb,
 504                                void *user_req_ptr)
 505 {
 506     ucp_ep_h ep = NULL;
 507     ucp_rkey_h rkey = NULL;
 508     opal_common_ucx_winfo_t *winfo = NULL;
 509     int rc = OPAL_SUCCESS;
 510     opal_common_ucx_request_t *req;
 511 
 512     rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
 513     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 514         MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
 515         return rc;
 516     }
 517     /* Perform the operation */
 518     opal_mutex_lock(&winfo->mutex);
 519     req = opal_common_ucx_atomic_fetch_nb(ep, opcode, value, buffer, len,
 520                                           rem_addr, rkey, opal_common_ucx_req_completion,
 521                                           winfo->worker);
 522     if (UCS_PTR_IS_PTR(req)) {
 523         req->ext_req = user_req_ptr;
 524         req->ext_cb = user_req_cb;
 525         req->winfo = winfo;
 526     } else {
 527         if (user_req_cb != NULL) {
 528             (*user_req_cb)(user_req_ptr);
 529         }
 530     }
 531 
 532     rc = _periodical_flush_nb(mem, winfo, target);
 533     if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
 534         MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
 535         return rc;
 536     }
 537 
 538     opal_mutex_unlock(&winfo->mutex);
 539 
 540     return rc;
 541 }
 542 
 543 END_C_DECLS
 544 
 545 #endif // COMMON_UCX_WPOOL_H

/* [<][>][^][v][top][bottom][index][help] */