This source file includes following definitions.
- opal_common_ucx_tlocal_fetch
- opal_common_ucx_wait_request_mt
- _periodical_flush_nb
- opal_common_ucx_wpmem_putget
- opal_common_ucx_wpmem_cmpswp
- opal_common_ucx_wpmem_post
- opal_common_ucx_wpmem_fetch
- opal_common_ucx_wpmem_fetch_nb
1 #ifndef COMMON_UCX_WPOOL_H
2 #define COMMON_UCX_WPOOL_H
3
4
5 #include "opal_config.h"
6
7 #include "common_ucx.h"
8 #include <stdint.h>
9 #include <string.h>
10
11 #include <ucp/api/ucp.h>
12
13 #include "opal/mca/mca.h"
14 #include "opal/util/output.h"
15 #include "opal/runtime/opal_progress.h"
16 #include "opal/include/opal/constants.h"
17 #include "opal/class/opal_list.h"
18 #include "opal/threads/tsd.h"
19
20 BEGIN_C_DECLS
21
22
23
24
25
26
27 typedef struct {
28
29 int refcnt;
30 opal_recursive_mutex_t mutex;
31
32
33 ucp_context_h ucp_ctx;
34 ucp_worker_h dflt_worker;
35 ucp_address_t *recv_waddr;
36 size_t recv_waddr_len;
37
38
39
40 opal_tsd_key_t tls_key;
41
42
43 opal_list_t idle_workers;
44 opal_list_t active_workers;
45
46 opal_list_t tls_list;
47 } opal_common_ucx_wpool_t;
48
49
50
51
52
53
54
55
56
57
58 typedef struct {
59 opal_recursive_mutex_t mutex;
60 opal_atomic_int32_t refcntr;
61
62
63 opal_common_ucx_wpool_t *wpool;
64
65
66
67 opal_list_t tls_workers;
68 volatile int released;
69
70
71 char *recv_worker_addrs;
72 int *recv_worker_displs;
73 size_t comm_size;
74 } opal_common_ucx_ctx_t;
75
76
77
78
79
80
81
82
83 typedef struct {
84
85 opal_common_ucx_ctx_t *ctx;
86
87
88 volatile int released;
89 opal_atomic_int32_t refcntr;
90
91
92 ucp_mem_h memh;
93 char *mem_addrs;
94 int *mem_displs;
95
96
97
98
99 opal_tsd_key_t mem_tls_key;
100 } opal_common_ucx_wpmem_t;
101
102
103
104
105
106
107
108 typedef struct opal_common_ucx_winfo {
109 opal_recursive_mutex_t mutex;
110 volatile int released;
111 ucp_worker_h worker;
112 ucp_ep_h *endpoints;
113 size_t comm_size;
114 short *inflight_ops;
115 short global_inflight_ops;
116 ucs_status_ptr_t inflight_req;
117 } opal_common_ucx_winfo_t;
118
119 typedef struct {
120 opal_common_ucx_winfo_t *winfo;
121 ucp_rkey_h *rkeys;
122 } opal_common_ucx_tlocal_fast_ptrs_t;
123
124 typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
125
126
127
128
129
130 typedef struct {
131 void *ext_req;
132 opal_common_ucx_user_req_handler_t ext_cb;
133 opal_common_ucx_winfo_t *winfo;
134 } opal_common_ucx_request_t;
135
136 typedef enum {
137 OPAL_COMMON_UCX_PUT,
138 OPAL_COMMON_UCX_GET
139 } opal_common_ucx_op_t;
140
141 typedef enum {
142 OPAL_COMMON_UCX_SCOPE_EP,
143 OPAL_COMMON_UCX_SCOPE_WORKER
144 } opal_common_ucx_flush_scope_t;
145
146 typedef enum {
147 OPAL_COMMON_UCX_FLUSH_NB,
148 OPAL_COMMON_UCX_FLUSH_B,
149 OPAL_COMMON_UCX_FLUSH_NB_PREFERRED
150 } opal_common_ucx_flush_type_t;
151
152 typedef enum {
153 OPAL_COMMON_UCX_MEM_ALLOCATE_MAP,
154 OPAL_COMMON_UCX_MEM_MAP
155 } opal_common_ucx_mem_type_t;
156
157 typedef int (*opal_common_ucx_exchange_func_t)(void *my_info, size_t my_info_len,
158 char **recv_info, int **disps,
159 void *metadata);
160
161
162
163 OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
164 OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
165 OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
166 int proc_world_size, bool enable_mt);
167 OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool);
168 OPAL_DECLSPEC void opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool);
169
170
171 OPAL_DECLSPEC int opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
172 opal_common_ucx_exchange_func_t exchange_func,
173 void *exchange_metadata,
174 opal_common_ucx_ctx_t **ctx_ptr);
175 OPAL_DECLSPEC void opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx);
176
177
178 OPAL_DECLSPEC void opal_common_ucx_req_init(void *request);
179 OPAL_DECLSPEC void opal_common_ucx_req_completion(void *request, ucs_status_t status);
180
181
182 OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target);
183 static inline int
184 opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target,
185 ucp_ep_h *_ep, ucp_rkey_h *_rkey,
186 opal_common_ucx_winfo_t **_winfo)
187 {
188 opal_common_ucx_tlocal_fast_ptrs_t *fp = NULL;
189 int expr;
190 int rc = OPAL_SUCCESS;
191
192
193 rc = opal_tsd_getspecific(mem->mem_tls_key, (void**)&fp);
194 if (OPAL_SUCCESS != rc) {
195 return rc;
196 }
197 expr = fp && (NULL != fp->winfo) && (fp->winfo->endpoints[target]) &&
198 (NULL != fp->rkeys[target]);
199 if (OPAL_UNLIKELY(!expr)) {
200 rc = opal_common_ucx_tlocal_fetch_spath(mem, target);
201 if (OPAL_SUCCESS != rc) {
202 return rc;
203 }
204 rc = opal_tsd_getspecific(mem->mem_tls_key, (void**)&fp);
205 if (OPAL_SUCCESS != rc) {
206 return rc;
207 }
208 }
209 MCA_COMMON_UCX_ASSERT(fp && (NULL != fp->winfo) &&
210 (fp->winfo->endpoints[target])
211 && (NULL != fp->rkeys[target]));
212
213 *_rkey = fp->rkeys[target];
214 *_winfo = fp->winfo;
215 *_ep = fp->winfo->endpoints[target];
216 return OPAL_SUCCESS;
217 }
218
219
220 OPAL_DECLSPEC int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
221 void **mem_base, size_t mem_size,
222 opal_common_ucx_mem_type_t mem_type,
223 opal_common_ucx_exchange_func_t exchange_func,
224 void *exchange_metadata,
225 char **my_mem_addr,
226 int *my_mem_addr_size,
227 opal_common_ucx_wpmem_t **mem_ptr);
228 OPAL_DECLSPEC int opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
229
230 OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
231 opal_common_ucx_flush_scope_t scope,
232 int target);
233 OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);
234
235 OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,
236 opal_common_ucx_flush_type_t type,
237 opal_common_ucx_flush_scope_t scope,
238 ucs_status_ptr_t *req_ptr);
239
240 static inline
241 int opal_common_ucx_wait_request_mt(ucs_status_ptr_t request, const char *msg)
242 {
243 ucs_status_t status;
244 int ctr = 0, ret = 0;
245 opal_common_ucx_winfo_t *winfo;
246
247
248 if (OPAL_LIKELY(UCS_OK == request)) {
249 return OPAL_SUCCESS;
250 } else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
251 MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __func__,
252 UCS_PTR_STATUS(request),
253 ucs_status_string(UCS_PTR_STATUS(request)));
254 return OPAL_ERROR;
255 }
256
257 winfo = ((opal_common_ucx_request_t *)request)->winfo;
258 assert(winfo != NULL);
259
260 do {
261 ctr = opal_common_ucx.progress_iterations;
262 opal_mutex_lock(&winfo->mutex);
263 do {
264 ret = ucp_worker_progress(winfo->worker);
265 status = opal_common_ucx_request_status(request);
266 if (status != UCS_INPROGRESS) {
267 ucp_request_free(request);
268 if (OPAL_UNLIKELY(UCS_OK != status)) {
269 MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s",
270 msg ? msg : __func__,
271 UCS_PTR_STATUS(request),
272 ucs_status_string(UCS_PTR_STATUS(request)));
273 opal_mutex_unlock(&winfo->mutex);
274 return OPAL_ERROR;
275 }
276 break;
277 }
278 ctr--;
279 } while (ctr > 0 && ret > 0 && status == UCS_INPROGRESS);
280 opal_mutex_unlock(&winfo->mutex);
281 opal_progress();
282 } while (status == UCS_INPROGRESS);
283
284 return OPAL_SUCCESS;
285 }
286
287 static inline int _periodical_flush_nb(opal_common_ucx_wpmem_t *mem,
288 opal_common_ucx_winfo_t *winfo,
289 int target) {
290 int rc = OPAL_SUCCESS;
291
292 winfo->inflight_ops[target]++;
293 winfo->global_inflight_ops++;
294
295 if (OPAL_UNLIKELY(winfo->inflight_ops[target] >= MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD) ||
296 OPAL_UNLIKELY(winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD)) {
297 opal_common_ucx_flush_scope_t scope;
298
299 if (winfo->inflight_req != UCS_OK) {
300 rc = opal_common_ucx_wait_request_mt(winfo->inflight_req,
301 "opal_common_ucx_flush_nb");
302 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
303 MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_wait_request failed: %d", rc);
304 return rc;
305 }
306 winfo->inflight_req = UCS_OK;
307 }
308
309 if (winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD) {
310 scope = OPAL_COMMON_UCX_SCOPE_WORKER;
311 winfo->global_inflight_ops = 0;
312 memset(winfo->inflight_ops, 0, winfo->comm_size * sizeof(short));
313 } else {
314 scope = OPAL_COMMON_UCX_SCOPE_EP;
315 winfo->global_inflight_ops -= winfo->inflight_ops[target];
316 winfo->inflight_ops[target] = 0;
317 }
318
319 rc = opal_common_ucx_winfo_flush(winfo, target, OPAL_COMMON_UCX_FLUSH_NB_PREFERRED,
320 scope, &winfo->inflight_req);
321 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
322 MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_flush failed: %d", rc);
323 return rc;
324 }
325 } else if (OPAL_UNLIKELY(winfo->inflight_req != UCS_OK)) {
326 int ret;
327 do {
328 ret = ucp_worker_progress(winfo->worker);
329 } while (ret);
330 }
331 return rc;
332 }
333
334 static inline int
335 opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t op,
336 int target, void *buffer, size_t len,
337 uint64_t rem_addr)
338 {
339 ucp_ep_h ep;
340 ucp_rkey_h rkey;
341 ucs_status_t status;
342 opal_common_ucx_winfo_t *winfo;
343 int rc = OPAL_SUCCESS;
344 char *called_func = "";
345
346 rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
347 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
348 MCA_COMMON_UCX_VERBOSE(1, "tlocal_fetch failed: %d", rc);
349 return rc;
350 }
351
352
353 opal_mutex_lock(&winfo->mutex);
354 switch(op){
355 case OPAL_COMMON_UCX_PUT:
356 status = ucp_put_nbi(ep, buffer,len, rem_addr, rkey);
357 called_func = "ucp_put_nbi";
358 break;
359 case OPAL_COMMON_UCX_GET:
360 status = ucp_get_nbi(ep, buffer,len, rem_addr, rkey);
361 called_func = "ucp_get_nbi";
362 break;
363 }
364
365 if (OPAL_UNLIKELY(status != UCS_OK && status != UCS_INPROGRESS)) {
366 MCA_COMMON_UCX_ERROR("%s failed: %d", called_func, status);
367 rc = OPAL_ERROR;
368 }
369
370 rc = _periodical_flush_nb(mem, winfo, target);
371 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
372 MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
373 return rc;
374 }
375
376 opal_mutex_unlock(&winfo->mutex);
377
378 return rc;
379 }
380
381
382 static inline int
383 opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
384 uint64_t value, int target, void *buffer, size_t len,
385 uint64_t rem_addr)
386 {
387 ucp_ep_h ep;
388 ucp_rkey_h rkey;
389 opal_common_ucx_winfo_t *winfo = NULL;
390 ucs_status_t status;
391 int rc = OPAL_SUCCESS;
392
393 rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
394 if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
395 MCA_COMMON_UCX_ERROR("opal_common_ucx_tlocal_fetch failed: %d", rc);
396 return rc;
397 }
398
399
400 opal_mutex_lock(&winfo->mutex);
401 status = opal_common_ucx_atomic_cswap(ep, compare, value,
402 buffer, len,
403 rem_addr, rkey,
404 winfo->worker);
405 if (OPAL_UNLIKELY(status != UCS_OK)) {
406 MCA_COMMON_UCX_ERROR("opal_common_ucx_atomic_cswap failed: %d", status);
407 rc = OPAL_ERROR;
408 }
409
410 rc = _periodical_flush_nb(mem, winfo, target);
411 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
412 MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
413 return rc;
414 }
415
416 opal_mutex_unlock(&winfo->mutex);
417
418 return rc;
419 }
420
421 static inline int
422 opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode,
423 uint64_t value, int target, size_t len, uint64_t rem_addr)
424 {
425 ucp_ep_h ep;
426 ucp_rkey_h rkey;
427 opal_common_ucx_winfo_t *winfo = NULL;
428 ucs_status_t status;
429 int rc = OPAL_SUCCESS;
430
431
432 rc =opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
433 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
434 MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
435 return rc;
436 }
437
438
439 opal_mutex_lock(&winfo->mutex);
440 status = ucp_atomic_post(ep, opcode, value,
441 len, rem_addr, rkey);
442 if (OPAL_UNLIKELY(status != UCS_OK)) {
443 MCA_COMMON_UCX_ERROR("ucp_atomic_post failed: %d", status);
444 rc = OPAL_ERROR;
445 }
446
447 rc = _periodical_flush_nb(mem, winfo, target);
448 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
449 MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
450 return rc;
451 }
452
453 opal_mutex_unlock(&winfo->mutex);
454 return rc;
455 }
456
457 static inline int
458 opal_common_ucx_wpmem_fetch(opal_common_ucx_wpmem_t *mem,
459 ucp_atomic_fetch_op_t opcode, uint64_t value,
460 int target, void *buffer, size_t len,
461 uint64_t rem_addr)
462 {
463 ucp_ep_h ep = NULL;
464 ucp_rkey_h rkey = NULL;
465 opal_common_ucx_winfo_t *winfo = NULL;
466 ucs_status_t status;
467 int rc = OPAL_SUCCESS;
468
469 rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
470 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
471 MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
472 return rc;
473 }
474
475
476 opal_mutex_lock(&winfo->mutex);
477 status = opal_common_ucx_atomic_fetch(ep, opcode, value,
478 buffer, len,
479 rem_addr, rkey,
480 winfo->worker);
481 if (OPAL_UNLIKELY(status != UCS_OK)) {
482 MCA_COMMON_UCX_ERROR("ucp_atomic_cswap64 failed: %d", status);
483 rc = OPAL_ERROR;
484 }
485
486 rc = _periodical_flush_nb(mem, winfo, target);
487 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
488 MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
489 return rc;
490 }
491
492 opal_mutex_unlock(&winfo->mutex);
493
494 return rc;
495 }
496
497 static inline int
498 opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
499 ucp_atomic_fetch_op_t opcode,
500 uint64_t value,
501 int target, void *buffer, size_t len,
502 uint64_t rem_addr,
503 opal_common_ucx_user_req_handler_t user_req_cb,
504 void *user_req_ptr)
505 {
506 ucp_ep_h ep = NULL;
507 ucp_rkey_h rkey = NULL;
508 opal_common_ucx_winfo_t *winfo = NULL;
509 int rc = OPAL_SUCCESS;
510 opal_common_ucx_request_t *req;
511
512 rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
513 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
514 MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
515 return rc;
516 }
517
518 opal_mutex_lock(&winfo->mutex);
519 req = opal_common_ucx_atomic_fetch_nb(ep, opcode, value, buffer, len,
520 rem_addr, rkey, opal_common_ucx_req_completion,
521 winfo->worker);
522 if (UCS_PTR_IS_PTR(req)) {
523 req->ext_req = user_req_ptr;
524 req->ext_cb = user_req_cb;
525 req->winfo = winfo;
526 } else {
527 if (user_req_cb != NULL) {
528 (*user_req_cb)(user_req_ptr);
529 }
530 }
531
532 rc = _periodical_flush_nb(mem, winfo, target);
533 if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
534 MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
535 return rc;
536 }
537
538 opal_mutex_unlock(&winfo->mutex);
539
540 return rc;
541 }
542
543 END_C_DECLS
544
545 #endif