root/oshmem/mca/spml/ucx/spml_ucx_component.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_spml_ucx_param_register_int
  2. mca_spml_ucx_param_register_string
  3. mca_spml_ucx_param_register_bool
  4. mca_spml_ucx_component_register
  5. spml_ucx_ctx_progress
  6. spml_ucx_default_progress
  7. spml_ucx_progress_aux_ctx
  8. mca_spml_ucx_async_cb
  9. mca_spml_ucx_component_open
  10. mca_spml_ucx_component_close
  11. spml_ucx_init
  12. mca_spml_ucx_component_init
  13. _ctx_cleanup
  14. mca_spml_ucx_component_fini

   1 /*
   2  * Copyright (c) 2015      Mellanox Technologies, Inc.
   3  *                         All rights reserved.
   4  * Copyright (c) 2019      Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * $COPYRIGHT$
   7  * 
   8  * Additional copyrights may follow
   9  * 
  10  * $HEADER$
  11  */
  12 #define _GNU_SOURCE
  13 #include <stdio.h>
  14 
  15 #include <sys/types.h>
  16 #include <unistd.h>
  17 
  18 #include "oshmem_config.h"
  19 #include "shmem.h"
  20 #include "oshmem/runtime/params.h"
  21 #include "oshmem/mca/spml/spml.h"
  22 #include "oshmem/mca/spml/base/base.h"
  23 #include "spml_ucx_component.h"
  24 #include "oshmem/mca/spml/ucx/spml_ucx.h"
  25 
  26 #include "opal/util/opal_environ.h"
  27 #include "opal/runtime/opal_progress_threads.h"
  28 
  29 static int mca_spml_ucx_component_register(void);
  30 static int mca_spml_ucx_component_open(void);
  31 static int mca_spml_ucx_component_close(void);
  32 static mca_spml_base_module_t*
  33 mca_spml_ucx_component_init(int* priority,
  34                               bool enable_progress_threads,
  35                               bool enable_mpi_threads);
  36 static int mca_spml_ucx_component_fini(void);
  37 mca_spml_base_component_2_0_0_t mca_spml_ucx_component = {
  38 
  39     /* First, the mca_base_component_t struct containing meta
  40        information about the component itself */
  41 
  42     .spmlm_version = {
  43         MCA_SPML_BASE_VERSION_2_0_0,
  44 
  45         .mca_component_name            = "ucx",
  46         .mca_component_major_version   = OSHMEM_MAJOR_VERSION,
  47         .mca_component_minor_version   = OSHMEM_MINOR_VERSION,
  48         .mca_component_release_version = OSHMEM_RELEASE_VERSION,
  49         .mca_open_component            = mca_spml_ucx_component_open,
  50         .mca_close_component           = mca_spml_ucx_component_close,
  51         .mca_query_component           = NULL,
  52         .mca_register_component_params = mca_spml_ucx_component_register
  53     },
  54     .spmlm_data = {
  55         /* The component is checkpoint ready */
  56         .param_field                   = MCA_BASE_METADATA_PARAM_CHECKPOINT
  57     },
  58 
  59     .spmlm_init                        = mca_spml_ucx_component_init,
  60     .spmlm_finalize                    = mca_spml_ucx_component_fini
  61 };
  62 
  63 
  64 static inline void mca_spml_ucx_param_register_int(const char* param_name,
  65                                                     int default_value,
  66                                                     const char *help_msg,
  67                                                     int *storage)
  68 {
  69     *storage = default_value;
  70     (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
  71                                            param_name,
  72                                            help_msg,
  73                                            MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
  74                                            OPAL_INFO_LVL_9,
  75                                            MCA_BASE_VAR_SCOPE_READONLY,
  76                                            storage);
  77 }
  78 
  79 static inline void  mca_spml_ucx_param_register_string(const char* param_name,
  80                                                     char* default_value,
  81                                                     const char *help_msg,
  82                                                     char **storage)
  83 {
  84     *storage = default_value;
  85     (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
  86                                            param_name,
  87                                            help_msg,
  88                                            MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
  89                                            OPAL_INFO_LVL_9,
  90                                            MCA_BASE_VAR_SCOPE_READONLY,
  91                                            storage);
  92 }
  93 
  94 static inline void  mca_spml_ucx_param_register_bool(const char* param_name,
  95                                                      bool default_value,
  96                                                      const char *help_msg,
  97                                                      bool *storage)
  98 {
  99     *storage = default_value;
 100     (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
 101                                            param_name,
 102                                            help_msg,
 103                                            MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
 104                                            OPAL_INFO_LVL_9,
 105                                            MCA_BASE_VAR_SCOPE_READONLY,
 106                                            storage);
 107 }
 108 
 109 static int mca_spml_ucx_component_register(void)
 110 {
 111     mca_spml_ucx_param_register_int("priority", 21,
 112                                     "[integer] ucx priority",
 113                                     &mca_spml_ucx.priority);
 114 
 115     mca_spml_ucx_param_register_int("num_disconnect", 1,
 116                                     "How may disconnects go in parallel",
 117                                     &mca_spml_ucx.num_disconnect);
 118 
 119     mca_spml_ucx_param_register_int("heap_reg_nb", 0,
 120                                     "Use non-blocking memory registration for shared heap",
 121                                     &mca_spml_ucx.heap_reg_nb);
 122 
 123     mca_spml_ucx_param_register_bool("async_progress", 0,
 124                                      "Enable asynchronous progress thread",
 125                                      &mca_spml_ucx.async_progress);
 126 
 127     mca_spml_ucx_param_register_int("async_tick_usec", 3000,
 128                                     "Asynchronous progress tick granularity (in usec)",
 129                                     &mca_spml_ucx.async_tick);
 130 
 131     opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
 132 
 133     return OSHMEM_SUCCESS;
 134 }
 135 
 136 int spml_ucx_ctx_progress(void)
 137 {
 138     int i;
 139     for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
 140         ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
 141     }
 142     return 1;
 143 }
 144 
 145 int spml_ucx_default_progress(void)
 146 {
 147     ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
 148     return 1;
 149 }
 150 
 151 int spml_ucx_progress_aux_ctx(void)
 152 {
 153     unsigned count;
 154 
 155     if (OPAL_UNLIKELY(!mca_spml_ucx.aux_ctx)) {
 156         return 0;
 157     }
 158 
 159     if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) {
 160         return 0;
 161     }
 162 
 163     count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
 164     pthread_spin_unlock(&mca_spml_ucx.async_lock);
 165 
 166     return count;
 167 }
 168 
 169 void mca_spml_ucx_async_cb(int fd, short event, void *cbdata)
 170 {
 171     int count = 0;
 172 
 173     if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) {
 174         return;
 175     }
 176 
 177     do {
 178         count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
 179     }  while (count);
 180 
 181     pthread_spin_unlock(&mca_spml_ucx.async_lock);
 182 }
 183 
 184 static int mca_spml_ucx_component_open(void)
 185 {
 186     return OSHMEM_SUCCESS;
 187 }
 188 
 189 static int mca_spml_ucx_component_close(void)
 190 {
 191     return OSHMEM_SUCCESS;
 192 }
 193 
 194 static int spml_ucx_init(void)
 195 {
 196     ucs_status_t err;
 197     ucp_config_t *ucp_config;
 198     ucp_params_t params;
 199     ucp_context_attr_t attr;
 200     ucp_worker_params_t wkr_params;
 201     ucp_worker_attr_t wkr_attr;
 202 
 203     err = ucp_config_read("OSHMEM", NULL, &ucp_config);
 204     if (UCS_OK != err) {
 205         return OSHMEM_ERROR;
 206     }
 207 
 208     opal_common_ucx_mca_register();
 209 
 210     memset(&params, 0, sizeof(params));
 211     params.field_mask = UCP_PARAM_FIELD_FEATURES|UCP_PARAM_FIELD_ESTIMATED_NUM_EPS|UCP_PARAM_FIELD_MT_WORKERS_SHARED;
 212     params.features   = UCP_FEATURE_RMA|UCP_FEATURE_AMO32|UCP_FEATURE_AMO64;
 213     params.estimated_num_eps = ompi_proc_world_size();
 214     if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE) {
 215         params.mt_workers_shared = 1;
 216     } else {
 217         params.mt_workers_shared = 0;
 218     }
 219 
 220     err = ucp_init(&params, ucp_config, &mca_spml_ucx.ucp_context);
 221     ucp_config_release(ucp_config);
 222     if (UCS_OK != err) {
 223         return OSHMEM_ERROR;
 224     }
 225 
 226     attr.field_mask = UCP_ATTR_FIELD_THREAD_MODE;
 227     err = ucp_context_query(mca_spml_ucx.ucp_context, &attr);
 228     if (err != UCS_OK) {
 229         return OSHMEM_ERROR;
 230     }
 231 
 232     if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE &&
 233         attr.thread_mode != UCS_THREAD_MODE_MULTI) {
 234         oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
 235     }
 236 
 237     mca_spml_ucx.active_array.ctxs_count = mca_spml_ucx.idle_array.ctxs_count = 0;
 238     mca_spml_ucx.active_array.ctxs_num = mca_spml_ucx.idle_array.ctxs_num = MCA_SPML_UCX_CTXS_ARRAY_SIZE;
 239     mca_spml_ucx.active_array.ctxs = calloc(mca_spml_ucx.active_array.ctxs_num,
 240                                             sizeof(mca_spml_ucx_ctx_t *));
 241     mca_spml_ucx.idle_array.ctxs = calloc(mca_spml_ucx.idle_array.ctxs_num,
 242                                           sizeof(mca_spml_ucx_ctx_t *));
 243 
 244     SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
 245     pthread_mutex_init(&mca_spml_ucx.ctx_create_mutex, NULL);
 246 
 247     wkr_params.field_mask  = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
 248     if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE) {
 249         wkr_params.thread_mode = UCS_THREAD_MODE_MULTI;
 250     } else {
 251         wkr_params.thread_mode = UCS_THREAD_MODE_SINGLE;
 252     }
 253 
 254     err = ucp_worker_create(mca_spml_ucx.ucp_context, &wkr_params,
 255                             &mca_spml_ucx_ctx_default.ucp_worker);
 256     if (UCS_OK != err) {
 257         return OSHMEM_ERROR;
 258     }
 259 
 260     wkr_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE;
 261     err = ucp_worker_query(mca_spml_ucx_ctx_default.ucp_worker, &wkr_attr);
 262 
 263     if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE &&
 264         wkr_attr.thread_mode != UCS_THREAD_MODE_MULTI) {
 265         oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
 266     }
 267 
 268     if (mca_spml_ucx.async_progress) {
 269         pthread_spin_init(&mca_spml_ucx.async_lock, 0);
 270         mca_spml_ucx.async_event_base = opal_progress_thread_init(NULL);
 271         if (NULL == mca_spml_ucx.async_event_base) {
 272             SPML_UCX_ERROR("failed to init async progress thread");
 273             return OSHMEM_ERROR;
 274         }
 275 
 276         mca_spml_ucx.tick_event = opal_event_alloc();
 277         opal_event_set(mca_spml_ucx.async_event_base, mca_spml_ucx.tick_event,
 278                        -1, EV_PERSIST, mca_spml_ucx_async_cb, NULL);
 279     }
 280 
 281     mca_spml_ucx.aux_ctx    = NULL;
 282     mca_spml_ucx.aux_refcnt = 0;
 283 
 284     oshmem_ctx_default = (shmem_ctx_t) &mca_spml_ucx_ctx_default;
 285 
 286     return OSHMEM_SUCCESS;
 287 }
 288 
 289 static mca_spml_base_module_t*
 290 mca_spml_ucx_component_init(int* priority,
 291                               bool enable_progress_threads,
 292                               bool enable_mpi_threads)
 293 {
 294     SPML_UCX_VERBOSE( 10, "in ucx, my priority is %d\n", mca_spml_ucx.priority);
 295 
 296     if ((*priority) > mca_spml_ucx.priority) {
 297         *priority = mca_spml_ucx.priority;
 298         return NULL ;
 299     }
 300     *priority = mca_spml_ucx.priority;
 301 
 302     if (OSHMEM_SUCCESS != spml_ucx_init())
 303         return NULL ;
 304 
 305     SPML_UCX_VERBOSE(50, "*** ucx initialized ****");
 306     return &mca_spml_ucx.super;
 307 }
 308 
 309 static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
 310 {
 311     int i, j, nprocs = oshmem_num_procs();
 312     opal_common_ucx_del_proc_t *del_procs;
 313 
 314     del_procs = malloc(sizeof(*del_procs) * nprocs);
 315 
 316     for (i = 0; i < nprocs; ++i) {
 317         for (j = 0; j < memheap_map->n_segments; j++) {
 318             if (ctx->ucp_peers[i].mkeys[j].key.rkey != NULL) {
 319                 ucp_rkey_destroy(ctx->ucp_peers[i].mkeys[j].key.rkey);
 320             }
 321         }
 322 
 323         del_procs[i].ep   = ctx->ucp_peers[i].ucp_conn;
 324         del_procs[i].vpid = i;
 325         ctx->ucp_peers[i].ucp_conn = NULL;
 326     }
 327 
 328     opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(),
 329                                       mca_spml_ucx.num_disconnect,
 330                                       ctx->ucp_worker);
 331     free(del_procs);
 332     free(ctx->ucp_peers);
 333 }
 334 
 335 static int mca_spml_ucx_component_fini(void)
 336 {
 337     int fenced = 0, i;
 338     int ret = OSHMEM_SUCCESS;
 339 
 340     opal_progress_unregister(spml_ucx_default_progress);
 341     if (mca_spml_ucx.active_array.ctxs_count) {
 342         opal_progress_unregister(spml_ucx_ctx_progress);
 343     }
 344 
 345     if(!mca_spml_ucx.enabled)
 346         return OSHMEM_SUCCESS; /* never selected.. return success.. */
 347 
 348     if (mca_spml_ucx.async_progress) {
 349         opal_progress_thread_finalize(NULL);
 350         opal_event_evtimer_del(mca_spml_ucx.tick_event);
 351         if (mca_spml_ucx.aux_ctx != NULL) {
 352             _ctx_cleanup(mca_spml_ucx.aux_ctx);
 353         }
 354         opal_progress_unregister(spml_ucx_progress_aux_ctx);
 355         pthread_spin_destroy(&mca_spml_ucx.async_lock);
 356     }
 357 
 358     /* delete context objects from list */
 359     for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
 360         _ctx_cleanup(mca_spml_ucx.active_array.ctxs[i]);
 361     }
 362 
 363     for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
 364         _ctx_cleanup(mca_spml_ucx.idle_array.ctxs[i]);
 365     }
 366 
 367 
 368     ret = opal_common_ucx_mca_pmix_fence_nb(&fenced);
 369     if (OPAL_SUCCESS != ret) {
 370         return ret;
 371     }
 372 
 373     while (!fenced) {
 374         for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
 375             ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
 376         }
 377 
 378         for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
 379             ucp_worker_progress(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
 380         }
 381 
 382         ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
 383 
 384         if (mca_spml_ucx.aux_ctx != NULL) {
 385             ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
 386         }
 387     }
 388 
 389     /* delete all workers */
 390     for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
 391         ucp_worker_destroy(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
 392         free(mca_spml_ucx.active_array.ctxs[i]);
 393     }
 394 
 395     for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
 396         ucp_worker_destroy(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
 397         free(mca_spml_ucx.idle_array.ctxs[i]);
 398     }
 399 
 400     if (mca_spml_ucx_ctx_default.ucp_worker) {
 401         ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker);
 402     }
 403 
 404     if (mca_spml_ucx.aux_ctx != NULL) {
 405         ucp_worker_destroy(mca_spml_ucx.aux_ctx->ucp_worker);
 406     }
 407 
 408     mca_spml_ucx.enabled = false;  /* not anymore */
 409 
 410     free(mca_spml_ucx.active_array.ctxs);
 411     free(mca_spml_ucx.idle_array.ctxs);
 412     free(mca_spml_ucx.aux_ctx);
 413 
 414     SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
 415     pthread_mutex_destroy(&mca_spml_ucx.ctx_create_mutex);
 416 
 417     if (mca_spml_ucx.ucp_context) {
 418         ucp_cleanup(mca_spml_ucx.ucp_context);
 419         mca_spml_ucx.ucp_context = NULL;
 420     }
 421 
 422     return OSHMEM_SUCCESS;
 423 }
 424 

/* [<][>][^][v][top][bottom][index][help] */