This source file includes following definitions.
- mca_atomic_ucx_startup
- mca_atomic_ucx_finalize
- mca_atomic_ucx_op
- mca_atomic_ucx_fop
- mca_atomic_ucx_add
- mca_atomic_ucx_and
- mca_atomic_ucx_or
- mca_atomic_ucx_xor
- mca_atomic_ucx_fadd
- mca_atomic_ucx_fand
- mca_atomic_ucx_for
- mca_atomic_ucx_fxor
- mca_atomic_ucx_swap
- mca_atomic_ucx_query
1
2
3
4
5
6
7
8
9
10
11 #include "oshmem_config.h"
12 #include <stdio.h>
13
14 #include "oshmem/constants.h"
15 #include "oshmem/mca/atomic/atomic.h"
16 #include "oshmem/mca/spml/spml.h"
17 #include "oshmem/mca/memheap/memheap.h"
18 #include "oshmem/proc/proc.h"
19 #include "atomic_ucx.h"
20
21
22
23
24
25 int mca_atomic_ucx_startup(bool enable_progress_threads, bool enable_threads)
26 {
27 return OSHMEM_SUCCESS;
28 }
29
30 int mca_atomic_ucx_finalize(void)
31 {
32 return OSHMEM_SUCCESS;
33 }
34
35 static inline
36 int mca_atomic_ucx_op(shmem_ctx_t ctx,
37 void *target,
38 uint64_t value,
39 size_t size,
40 int pe,
41 ucp_atomic_post_op_t op)
42 {
43 ucs_status_t status;
44 spml_ucx_mkey_t *ucx_mkey;
45 uint64_t rva;
46 mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
47
48 assert((8 == size) || (4 == size));
49
50 ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
51 status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn,
52 op, value, size, rva,
53 ucx_mkey->rkey);
54 return ucx_status_to_oshmem(status);
55 }
56
57 static inline
58 int mca_atomic_ucx_fop(shmem_ctx_t ctx,
59 void *target,
60 void *prev,
61 uint64_t value,
62 size_t size,
63 int pe,
64 ucp_atomic_fetch_op_t op)
65 {
66 ucs_status_ptr_t status_ptr;
67 spml_ucx_mkey_t *ucx_mkey;
68 uint64_t rva;
69 mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
70
71 assert((8 == size) || (4 == size));
72
73 ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
74 status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn,
75 op, value, prev, size,
76 rva, ucx_mkey->rkey,
77 opal_common_ucx_empty_complete_cb);
78 return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker,
79 "ucp_atomic_fetch_nb");
80 }
81
82 static int mca_atomic_ucx_add(shmem_ctx_t ctx,
83 void *target,
84 uint64_t value,
85 size_t size,
86 int pe)
87 {
88 return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_ADD);
89 }
90
91 static int mca_atomic_ucx_and(shmem_ctx_t ctx,
92 void *target,
93 uint64_t value,
94 size_t size,
95 int pe)
96 {
97 #if HAVE_DECL_UCP_ATOMIC_POST_OP_AND
98 return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_AND);
99 #else
100 return OSHMEM_ERR_NOT_IMPLEMENTED;
101 #endif
102 }
103
104 static int mca_atomic_ucx_or(shmem_ctx_t ctx,
105 void *target,
106 uint64_t value,
107 size_t size,
108 int pe)
109 {
110 #if HAVE_DECL_UCP_ATOMIC_POST_OP_OR
111 return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_OR);
112 #else
113 return OSHMEM_ERR_NOT_IMPLEMENTED;
114 #endif
115 }
116
117 static int mca_atomic_ucx_xor(shmem_ctx_t ctx,
118 void *target,
119 uint64_t value,
120 size_t size,
121 int pe)
122 {
123 #if HAVE_DECL_UCP_ATOMIC_POST_OP_XOR
124 return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_XOR);
125 #else
126 return OSHMEM_ERR_NOT_IMPLEMENTED;
127 #endif
128 }
129
130 static int mca_atomic_ucx_fadd(shmem_ctx_t ctx,
131 void *target,
132 void *prev,
133 uint64_t value,
134 size_t size,
135 int pe)
136 {
137 return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FADD);
138 }
139
140 static int mca_atomic_ucx_fand(shmem_ctx_t ctx,
141 void *target,
142 void *prev,
143 uint64_t value,
144 size_t size,
145 int pe)
146 {
147 #if HAVE_DECL_UCP_ATOMIC_FETCH_OP_FAND
148 return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FAND);
149 #else
150 return OSHMEM_ERR_NOT_IMPLEMENTED;
151 #endif
152 }
153
154 static int mca_atomic_ucx_for(shmem_ctx_t ctx,
155 void *target,
156 void *prev,
157 uint64_t value,
158 size_t size,
159 int pe)
160 {
161 #if HAVE_DECL_UCP_ATOMIC_FETCH_OP_FOR
162 return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FOR);
163 #else
164 return OSHMEM_ERR_NOT_IMPLEMENTED;
165 #endif
166 }
167
168 static int mca_atomic_ucx_fxor(shmem_ctx_t ctx,
169 void *target,
170 void *prev,
171 uint64_t value,
172 size_t size,
173 int pe)
174 {
175 #if HAVE_DECL_UCP_ATOMIC_FETCH_OP_FXOR
176 return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FXOR);
177 #else
178 return OSHMEM_ERR_NOT_IMPLEMENTED;
179 #endif
180 }
181
182 static int mca_atomic_ucx_swap(shmem_ctx_t ctx,
183 void *target,
184 void *prev,
185 uint64_t value,
186 size_t size,
187 int pe)
188 {
189 return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_SWAP);
190 }
191
192
193 mca_atomic_base_module_t *
194 mca_atomic_ucx_query(int *priority)
195 {
196 mca_atomic_ucx_module_t *module;
197
198 *priority = mca_atomic_ucx_component.priority;
199
200 module = OBJ_NEW(mca_atomic_ucx_module_t);
201 if (module) {
202 module->super.atomic_add = mca_atomic_ucx_add;
203 module->super.atomic_and = mca_atomic_ucx_and;
204 module->super.atomic_or = mca_atomic_ucx_or;
205 module->super.atomic_xor = mca_atomic_ucx_xor;
206 module->super.atomic_fadd = mca_atomic_ucx_fadd;
207 module->super.atomic_fand = mca_atomic_ucx_fand;
208 module->super.atomic_for = mca_atomic_ucx_for;
209 module->super.atomic_fxor = mca_atomic_ucx_fxor;
210 module->super.atomic_swap = mca_atomic_ucx_swap;
211 module->super.atomic_cswap = mca_atomic_ucx_cswap;
212 return &(module->super);
213 }
214
215 return NULL ;
216 }