This source file includes following definitions.
- mca_spml_ucx_param_register_int
- mca_spml_ucx_param_register_string
- mca_spml_ucx_param_register_bool
- mca_spml_ucx_component_register
- spml_ucx_ctx_progress
- spml_ucx_default_progress
- spml_ucx_progress_aux_ctx
- mca_spml_ucx_async_cb
- mca_spml_ucx_component_open
- mca_spml_ucx_component_close
- spml_ucx_init
- mca_spml_ucx_component_init
- _ctx_cleanup
- mca_spml_ucx_component_fini
1
2
3
4
5
6
7
8
9
10
11
12 #define _GNU_SOURCE
13 #include <stdio.h>
14
15 #include <sys/types.h>
16 #include <unistd.h>
17
18 #include "oshmem_config.h"
19 #include "shmem.h"
20 #include "oshmem/runtime/params.h"
21 #include "oshmem/mca/spml/spml.h"
22 #include "oshmem/mca/spml/base/base.h"
23 #include "spml_ucx_component.h"
24 #include "oshmem/mca/spml/ucx/spml_ucx.h"
25
26 #include "opal/util/opal_environ.h"
27 #include "opal/runtime/opal_progress_threads.h"
28
29 static int mca_spml_ucx_component_register(void);
30 static int mca_spml_ucx_component_open(void);
31 static int mca_spml_ucx_component_close(void);
32 static mca_spml_base_module_t*
33 mca_spml_ucx_component_init(int* priority,
34 bool enable_progress_threads,
35 bool enable_mpi_threads);
36 static int mca_spml_ucx_component_fini(void);
37 mca_spml_base_component_2_0_0_t mca_spml_ucx_component = {
38
39
40
41
42 .spmlm_version = {
43 MCA_SPML_BASE_VERSION_2_0_0,
44
45 .mca_component_name = "ucx",
46 .mca_component_major_version = OSHMEM_MAJOR_VERSION,
47 .mca_component_minor_version = OSHMEM_MINOR_VERSION,
48 .mca_component_release_version = OSHMEM_RELEASE_VERSION,
49 .mca_open_component = mca_spml_ucx_component_open,
50 .mca_close_component = mca_spml_ucx_component_close,
51 .mca_query_component = NULL,
52 .mca_register_component_params = mca_spml_ucx_component_register
53 },
54 .spmlm_data = {
55
56 .param_field = MCA_BASE_METADATA_PARAM_CHECKPOINT
57 },
58
59 .spmlm_init = mca_spml_ucx_component_init,
60 .spmlm_finalize = mca_spml_ucx_component_fini
61 };
62
63
64 static inline void mca_spml_ucx_param_register_int(const char* param_name,
65 int default_value,
66 const char *help_msg,
67 int *storage)
68 {
69 *storage = default_value;
70 (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
71 param_name,
72 help_msg,
73 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
74 OPAL_INFO_LVL_9,
75 MCA_BASE_VAR_SCOPE_READONLY,
76 storage);
77 }
78
79 static inline void mca_spml_ucx_param_register_string(const char* param_name,
80 char* default_value,
81 const char *help_msg,
82 char **storage)
83 {
84 *storage = default_value;
85 (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
86 param_name,
87 help_msg,
88 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
89 OPAL_INFO_LVL_9,
90 MCA_BASE_VAR_SCOPE_READONLY,
91 storage);
92 }
93
94 static inline void mca_spml_ucx_param_register_bool(const char* param_name,
95 bool default_value,
96 const char *help_msg,
97 bool *storage)
98 {
99 *storage = default_value;
100 (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
101 param_name,
102 help_msg,
103 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
104 OPAL_INFO_LVL_9,
105 MCA_BASE_VAR_SCOPE_READONLY,
106 storage);
107 }
108
109 static int mca_spml_ucx_component_register(void)
110 {
111 mca_spml_ucx_param_register_int("priority", 21,
112 "[integer] ucx priority",
113 &mca_spml_ucx.priority);
114
115 mca_spml_ucx_param_register_int("num_disconnect", 1,
116 "How may disconnects go in parallel",
117 &mca_spml_ucx.num_disconnect);
118
119 mca_spml_ucx_param_register_int("heap_reg_nb", 0,
120 "Use non-blocking memory registration for shared heap",
121 &mca_spml_ucx.heap_reg_nb);
122
123 mca_spml_ucx_param_register_bool("async_progress", 0,
124 "Enable asynchronous progress thread",
125 &mca_spml_ucx.async_progress);
126
127 mca_spml_ucx_param_register_int("async_tick_usec", 3000,
128 "Asynchronous progress tick granularity (in usec)",
129 &mca_spml_ucx.async_tick);
130
131 opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
132
133 return OSHMEM_SUCCESS;
134 }
135
136 int spml_ucx_ctx_progress(void)
137 {
138 int i;
139 for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
140 ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
141 }
142 return 1;
143 }
144
145 int spml_ucx_default_progress(void)
146 {
147 ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
148 return 1;
149 }
150
151 int spml_ucx_progress_aux_ctx(void)
152 {
153 unsigned count;
154
155 if (OPAL_UNLIKELY(!mca_spml_ucx.aux_ctx)) {
156 return 0;
157 }
158
159 if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) {
160 return 0;
161 }
162
163 count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
164 pthread_spin_unlock(&mca_spml_ucx.async_lock);
165
166 return count;
167 }
168
169 void mca_spml_ucx_async_cb(int fd, short event, void *cbdata)
170 {
171 int count = 0;
172
173 if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) {
174 return;
175 }
176
177 do {
178 count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
179 } while (count);
180
181 pthread_spin_unlock(&mca_spml_ucx.async_lock);
182 }
183
184 static int mca_spml_ucx_component_open(void)
185 {
186 return OSHMEM_SUCCESS;
187 }
188
189 static int mca_spml_ucx_component_close(void)
190 {
191 return OSHMEM_SUCCESS;
192 }
193
194 static int spml_ucx_init(void)
195 {
196 ucs_status_t err;
197 ucp_config_t *ucp_config;
198 ucp_params_t params;
199 ucp_context_attr_t attr;
200 ucp_worker_params_t wkr_params;
201 ucp_worker_attr_t wkr_attr;
202
203 err = ucp_config_read("OSHMEM", NULL, &ucp_config);
204 if (UCS_OK != err) {
205 return OSHMEM_ERROR;
206 }
207
208 opal_common_ucx_mca_register();
209
210 memset(¶ms, 0, sizeof(params));
211 params.field_mask = UCP_PARAM_FIELD_FEATURES|UCP_PARAM_FIELD_ESTIMATED_NUM_EPS|UCP_PARAM_FIELD_MT_WORKERS_SHARED;
212 params.features = UCP_FEATURE_RMA|UCP_FEATURE_AMO32|UCP_FEATURE_AMO64;
213 params.estimated_num_eps = ompi_proc_world_size();
214 if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE) {
215 params.mt_workers_shared = 1;
216 } else {
217 params.mt_workers_shared = 0;
218 }
219
220 err = ucp_init(¶ms, ucp_config, &mca_spml_ucx.ucp_context);
221 ucp_config_release(ucp_config);
222 if (UCS_OK != err) {
223 return OSHMEM_ERROR;
224 }
225
226 attr.field_mask = UCP_ATTR_FIELD_THREAD_MODE;
227 err = ucp_context_query(mca_spml_ucx.ucp_context, &attr);
228 if (err != UCS_OK) {
229 return OSHMEM_ERROR;
230 }
231
232 if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE &&
233 attr.thread_mode != UCS_THREAD_MODE_MULTI) {
234 oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
235 }
236
237 mca_spml_ucx.active_array.ctxs_count = mca_spml_ucx.idle_array.ctxs_count = 0;
238 mca_spml_ucx.active_array.ctxs_num = mca_spml_ucx.idle_array.ctxs_num = MCA_SPML_UCX_CTXS_ARRAY_SIZE;
239 mca_spml_ucx.active_array.ctxs = calloc(mca_spml_ucx.active_array.ctxs_num,
240 sizeof(mca_spml_ucx_ctx_t *));
241 mca_spml_ucx.idle_array.ctxs = calloc(mca_spml_ucx.idle_array.ctxs_num,
242 sizeof(mca_spml_ucx_ctx_t *));
243
244 SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
245 pthread_mutex_init(&mca_spml_ucx.ctx_create_mutex, NULL);
246
247 wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
248 if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE) {
249 wkr_params.thread_mode = UCS_THREAD_MODE_MULTI;
250 } else {
251 wkr_params.thread_mode = UCS_THREAD_MODE_SINGLE;
252 }
253
254 err = ucp_worker_create(mca_spml_ucx.ucp_context, &wkr_params,
255 &mca_spml_ucx_ctx_default.ucp_worker);
256 if (UCS_OK != err) {
257 return OSHMEM_ERROR;
258 }
259
260 wkr_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE;
261 err = ucp_worker_query(mca_spml_ucx_ctx_default.ucp_worker, &wkr_attr);
262
263 if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE &&
264 wkr_attr.thread_mode != UCS_THREAD_MODE_MULTI) {
265 oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
266 }
267
268 if (mca_spml_ucx.async_progress) {
269 pthread_spin_init(&mca_spml_ucx.async_lock, 0);
270 mca_spml_ucx.async_event_base = opal_progress_thread_init(NULL);
271 if (NULL == mca_spml_ucx.async_event_base) {
272 SPML_UCX_ERROR("failed to init async progress thread");
273 return OSHMEM_ERROR;
274 }
275
276 mca_spml_ucx.tick_event = opal_event_alloc();
277 opal_event_set(mca_spml_ucx.async_event_base, mca_spml_ucx.tick_event,
278 -1, EV_PERSIST, mca_spml_ucx_async_cb, NULL);
279 }
280
281 mca_spml_ucx.aux_ctx = NULL;
282 mca_spml_ucx.aux_refcnt = 0;
283
284 oshmem_ctx_default = (shmem_ctx_t) &mca_spml_ucx_ctx_default;
285
286 return OSHMEM_SUCCESS;
287 }
288
289 static mca_spml_base_module_t*
290 mca_spml_ucx_component_init(int* priority,
291 bool enable_progress_threads,
292 bool enable_mpi_threads)
293 {
294 SPML_UCX_VERBOSE( 10, "in ucx, my priority is %d\n", mca_spml_ucx.priority);
295
296 if ((*priority) > mca_spml_ucx.priority) {
297 *priority = mca_spml_ucx.priority;
298 return NULL ;
299 }
300 *priority = mca_spml_ucx.priority;
301
302 if (OSHMEM_SUCCESS != spml_ucx_init())
303 return NULL ;
304
305 SPML_UCX_VERBOSE(50, "*** ucx initialized ****");
306 return &mca_spml_ucx.super;
307 }
308
309 static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
310 {
311 int i, j, nprocs = oshmem_num_procs();
312 opal_common_ucx_del_proc_t *del_procs;
313
314 del_procs = malloc(sizeof(*del_procs) * nprocs);
315
316 for (i = 0; i < nprocs; ++i) {
317 for (j = 0; j < memheap_map->n_segments; j++) {
318 if (ctx->ucp_peers[i].mkeys[j].key.rkey != NULL) {
319 ucp_rkey_destroy(ctx->ucp_peers[i].mkeys[j].key.rkey);
320 }
321 }
322
323 del_procs[i].ep = ctx->ucp_peers[i].ucp_conn;
324 del_procs[i].vpid = i;
325 ctx->ucp_peers[i].ucp_conn = NULL;
326 }
327
328 opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(),
329 mca_spml_ucx.num_disconnect,
330 ctx->ucp_worker);
331 free(del_procs);
332 free(ctx->ucp_peers);
333 }
334
335 static int mca_spml_ucx_component_fini(void)
336 {
337 int fenced = 0, i;
338 int ret = OSHMEM_SUCCESS;
339
340 opal_progress_unregister(spml_ucx_default_progress);
341 if (mca_spml_ucx.active_array.ctxs_count) {
342 opal_progress_unregister(spml_ucx_ctx_progress);
343 }
344
345 if(!mca_spml_ucx.enabled)
346 return OSHMEM_SUCCESS;
347
348 if (mca_spml_ucx.async_progress) {
349 opal_progress_thread_finalize(NULL);
350 opal_event_evtimer_del(mca_spml_ucx.tick_event);
351 if (mca_spml_ucx.aux_ctx != NULL) {
352 _ctx_cleanup(mca_spml_ucx.aux_ctx);
353 }
354 opal_progress_unregister(spml_ucx_progress_aux_ctx);
355 pthread_spin_destroy(&mca_spml_ucx.async_lock);
356 }
357
358
359 for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
360 _ctx_cleanup(mca_spml_ucx.active_array.ctxs[i]);
361 }
362
363 for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
364 _ctx_cleanup(mca_spml_ucx.idle_array.ctxs[i]);
365 }
366
367
368 ret = opal_common_ucx_mca_pmix_fence_nb(&fenced);
369 if (OPAL_SUCCESS != ret) {
370 return ret;
371 }
372
373 while (!fenced) {
374 for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
375 ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
376 }
377
378 for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
379 ucp_worker_progress(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
380 }
381
382 ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
383
384 if (mca_spml_ucx.aux_ctx != NULL) {
385 ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
386 }
387 }
388
389
390 for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
391 ucp_worker_destroy(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
392 free(mca_spml_ucx.active_array.ctxs[i]);
393 }
394
395 for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
396 ucp_worker_destroy(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
397 free(mca_spml_ucx.idle_array.ctxs[i]);
398 }
399
400 if (mca_spml_ucx_ctx_default.ucp_worker) {
401 ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker);
402 }
403
404 if (mca_spml_ucx.aux_ctx != NULL) {
405 ucp_worker_destroy(mca_spml_ucx.aux_ctx->ucp_worker);
406 }
407
408 mca_spml_ucx.enabled = false;
409
410 free(mca_spml_ucx.active_array.ctxs);
411 free(mca_spml_ucx.idle_array.ctxs);
412 free(mca_spml_ucx.aux_ctx);
413
414 SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
415 pthread_mutex_destroy(&mca_spml_ucx.ctx_create_mutex);
416
417 if (mca_spml_ucx.ucp_context) {
418 ucp_cleanup(mca_spml_ucx.ucp_context);
419 mca_spml_ucx.ucp_context = NULL;
420 }
421
422 return OSHMEM_SUCCESS;
423 }
424