This source file includes following definitions.
- opal_common_ucx_request_status
- opal_common_ucx_wait_request
- opal_common_ucx_ep_flush
- opal_common_ucx_worker_flush
- opal_common_ucx_atomic_fetch
- opal_common_ucx_atomic_fetch_nb
- opal_common_ucx_atomic_cswap
1
2
3
4
5
6
7
8
9
10
11
12
13 #ifndef _COMMON_UCX_H_
14 #define _COMMON_UCX_H_
15
16 #include "opal_config.h"
17
18 #include <stdint.h>
19
20 #include <ucp/api/ucp.h>
21
22 #include "opal/mca/mca.h"
23 #include "opal/util/output.h"
24 #include "opal/runtime/opal_progress.h"
25 #include "opal/include/opal/constants.h"
26 #include "opal/class/opal_list.h"
27
28 BEGIN_C_DECLS
29
30 #define MCA_COMMON_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
31 #if MCA_COMMON_UCX_ENABLE_DEBUG
32 # define MCA_COMMON_UCX_MAX_VERBOSE 100
33 # define MCA_COMMON_UCX_ASSERT(_x) assert(_x)
34 #else
35 # define MCA_COMMON_UCX_MAX_VERBOSE 2
36 # define MCA_COMMON_UCX_ASSERT(_x)
37 #endif
38
39 #define MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD 1000
40 #define MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD 1000
41
42 #define UCX_VERSION(_major, _minor, _build) (((_major) * 100) + (_minor))
43
44
45 #define _MCA_COMMON_UCX_QUOTE(_x) \
46 # _x
47 #define MCA_COMMON_UCX_QUOTE(_x) \
48 _MCA_COMMON_UCX_QUOTE(_x)
49
50 #define MCA_COMMON_UCX_ERROR(...) \
51 MCA_COMMON_UCX_VERBOSE(0, " Error: " __VA_ARGS__)
52
53 #define MCA_COMMON_UCX_WARN(...) \
54 MCA_COMMON_UCX_VERBOSE(0, " Warning: " __VA_ARGS__)
55
56 #define MCA_COMMON_UCX_VERBOSE(_level, ... ) \
57 if (((_level) <= MCA_COMMON_UCX_MAX_VERBOSE) && \
58 ((_level) <= opal_common_ucx.verbose)) { \
59 opal_output_verbose(_level, opal_common_ucx.output, \
60 __FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) " " \
61 __VA_ARGS__); \
62 }
63
64
65
66 #define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \
67 for (unsigned iter = 0;; (++iter % opal_common_ucx.progress_iterations) ? \
68 (void)ucp_worker_progress(_worker) : opal_progress())
69
70 #define MCA_COMMON_UCX_WAIT_LOOP(_request, _worker, _msg, _completed) \
71 do { \
72 ucs_status_t status; \
73 \
74 MCA_COMMON_UCX_PROGRESS_LOOP(_worker) { \
75 status = opal_common_ucx_request_status(_request); \
76 if (UCS_INPROGRESS != status) { \
77 _completed; \
78 if (OPAL_LIKELY(UCS_OK == status)) { \
79 return OPAL_SUCCESS; \
80 } else { \
81 MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", \
82 (_msg) ? (_msg) : __func__, \
83 UCS_PTR_STATUS(_request), \
84 ucs_status_string(UCS_PTR_STATUS(_request))); \
85 return OPAL_ERROR; \
86 } \
87 } \
88 } \
89 } while (0)
90
91 typedef struct opal_common_ucx_module {
92 int output;
93 int verbose;
94 int progress_iterations;
95 int registered;
96 bool opal_mem_hooks;
97 } opal_common_ucx_module_t;
98
99 typedef struct opal_common_ucx_del_proc {
100 ucp_ep_h ep;
101 size_t vpid;
102 } opal_common_ucx_del_proc_t;
103
104 extern opal_common_ucx_module_t opal_common_ucx;
105
106 OPAL_DECLSPEC void opal_common_ucx_mca_register(void);
107 OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
108 OPAL_DECLSPEC void opal_common_ucx_mca_proc_added(void);
109 OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
110 OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
111 OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced);
112 OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
113 size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
114 OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
115 size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
116 OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
117
118 static inline
119 ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)
120 {
121 #if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS
122 ucp_tag_recv_info_t info;
123
124 return ucp_request_test(request, &info);
125 #else
126 return ucp_request_check_status(request);
127 #endif
128 }
129
130 static inline
131 int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
132 const char *msg)
133 {
134
135 if (OPAL_LIKELY(UCS_OK == request)) {
136 return OPAL_SUCCESS;
137 } else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
138 MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __func__,
139 UCS_PTR_STATUS(request),
140 ucs_status_string(UCS_PTR_STATUS(request)));
141 return OPAL_ERROR;
142 }
143
144 MCA_COMMON_UCX_WAIT_LOOP(request, worker, msg, ucp_request_free(request));
145 }
146
147 static inline
148 int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
149 {
150 #if HAVE_DECL_UCP_EP_FLUSH_NB
151 ucs_status_ptr_t request;
152
153 request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
154 return opal_common_ucx_wait_request(request, worker, "ucp_ep_flush_nb");
155 #else
156 ucs_status_t status;
157
158 status = ucp_ep_flush(ep);
159 return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
160 #endif
161 }
162
163 static inline
164 int opal_common_ucx_worker_flush(ucp_worker_h worker)
165 {
166 #if HAVE_DECL_UCP_WORKER_FLUSH_NB
167 ucs_status_ptr_t request;
168
169 request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
170 return opal_common_ucx_wait_request(request, worker, "ucp_worker_flush_nb");
171 #else
172 ucs_status_t status;
173
174 status = ucp_worker_flush(worker);
175 return (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR;
176 #endif
177 }
178
179 static inline
180 int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
181 uint64_t value, void *result, size_t op_size,
182 uint64_t remote_addr, ucp_rkey_h rkey,
183 ucp_worker_h worker)
184 {
185 ucs_status_ptr_t request;
186
187 request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
188 remote_addr, rkey, opal_common_ucx_empty_complete_cb);
189 return opal_common_ucx_wait_request(request, worker, "ucp_atomic_fetch_nb");
190 }
191
192 static inline
193 ucs_status_ptr_t opal_common_ucx_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
194 uint64_t value, void *result, size_t op_size,
195 uint64_t remote_addr, ucp_rkey_h rkey,
196 ucp_send_callback_t req_handler,
197 ucp_worker_h worker)
198 {
199 return ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
200 remote_addr, rkey, req_handler);
201 }
202
203 static inline
204 int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
205 uint64_t value, void *result, size_t op_size,
206 uint64_t remote_addr, ucp_rkey_h rkey,
207 ucp_worker_h worker)
208 {
209 uint64_t tmp = value;
210 int ret;
211
212 ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
213 op_size, remote_addr, rkey, worker);
214 if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
215
216
217 if (op_size == sizeof(uint64_t)) {
218 *(uint64_t*)result = tmp;
219 } else {
220 assert(op_size == sizeof(uint32_t));
221 *(uint32_t*)result = tmp;
222 }
223 }
224 return ret;
225 }
226
227 END_C_DECLS
228
229 #endif