root/ompi/mca/osc/ucx/osc_ucx_passive_target.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. start_shared
  2. end_shared
  3. start_exclusive
  4. end_exclusive
  5. ompi_osc_ucx_lock
  6. ompi_osc_ucx_unlock
  7. ompi_osc_ucx_lock_all
  8. ompi_osc_ucx_unlock_all
  9. ompi_osc_ucx_sync
  10. ompi_osc_ucx_flush
  11. ompi_osc_ucx_flush_all
  12. ompi_osc_ucx_flush_local
  13. ompi_osc_ucx_flush_local_all

   1 /*
   2  * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
   3  * $COPYRIGHT$
   4  *
   5  * Additional copyrights may follow
   6  *
   7  * $HEADER$
   8  */
   9 
  10 #include "ompi_config.h"
  11 
  12 #include "ompi/mca/osc/osc.h"
  13 #include "ompi/mca/osc/base/base.h"
  14 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
  15 #include "opal/mca/common/ucx/common_ucx.h"
  16 
  17 #include "osc_ucx.h"
  18 
  19 OBJ_CLASS_INSTANCE(ompi_osc_ucx_lock_t, opal_object_t, NULL, NULL);
  20 
  21 static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
  22     uint64_t result_value = -1;
  23     uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
  24     int ret = OMPI_SUCCESS;
  25 
  26     while (true) {
  27         ret = opal_common_ucx_wpmem_fetch(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD, 1,
  28                                         target, &result_value, sizeof(result_value),
  29                                         remote_addr);
  30         if (OMPI_SUCCESS != ret) {
  31             return ret;
  32         }
  33 
  34         assert((int64_t)result_value >= 0);
  35         if (result_value >= TARGET_LOCK_EXCLUSIVE) {
  36             ret = opal_common_ucx_wpmem_post(module->state_mem,
  37                                            UCP_ATOMIC_POST_OP_ADD, (-1), target,
  38                                            sizeof(uint64_t), remote_addr);
  39             if (OMPI_SUCCESS != ret) {
  40                 return ret;
  41             }
  42         } else {
  43             break;
  44         }
  45         ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
  46     }
  47 
  48     return ret;
  49 }
  50 
  51 static inline int end_shared(ompi_osc_ucx_module_t *module, int target) {
  52     uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
  53     return opal_common_ucx_wpmem_post(module->state_mem, UCP_ATOMIC_POST_OP_ADD,
  54                                     (-1), target, sizeof(uint64_t), remote_addr);
  55 }
  56 
  57 static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
  58     uint64_t result_value = -1;
  59     uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
  60     int ret = OMPI_SUCCESS;
  61 
  62     for (;;) {
  63         ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
  64                                          TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
  65                                          target, &result_value, sizeof(result_value),
  66                                          remote_addr);
  67         if (OMPI_SUCCESS != ret) {
  68             return ret;
  69         }
  70         if (result_value == TARGET_LOCK_UNLOCKED) {
  71             return OMPI_SUCCESS;
  72         }
  73 
  74         ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
  75     }
  76 }
  77 
  78 static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) {
  79     uint64_t result_value = 0;
  80     uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
  81     int ret = OMPI_SUCCESS;
  82 
  83     ret = opal_common_ucx_wpmem_fetch(module->state_mem,
  84                                     UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
  85                                     target, &result_value, sizeof(result_value),
  86                                     remote_addr);
  87     if (OMPI_SUCCESS != ret) {
  88         return ret;
  89     }
  90 
  91     assert(result_value >= TARGET_LOCK_EXCLUSIVE);
  92 
  93     return ret;
  94 }
  95 
  96 int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *win) {
  97     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
  98     ompi_osc_ucx_lock_t *lock = NULL;
  99     ompi_osc_ucx_epoch_t original_epoch = module->epoch_type.access;
 100     int ret = OMPI_SUCCESS;
 101 
 102     if (module->lock_count == 0) {
 103         if (module->epoch_type.access != NONE_EPOCH &&
 104             module->epoch_type.access != FENCE_EPOCH) {
 105             return OMPI_ERR_RMA_SYNC;
 106         }
 107     } else {
 108         ompi_osc_ucx_lock_t *item = NULL;
 109         assert(module->epoch_type.access == PASSIVE_EPOCH);
 110         opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item);
 111         if (item != NULL) {
 112             return OMPI_ERR_RMA_SYNC;
 113         }
 114     }
 115 
 116     module->epoch_type.access = PASSIVE_EPOCH;
 117     module->lock_count++;
 118     assert(module->lock_count <= ompi_comm_size(module->comm));
 119 
 120     lock = OBJ_NEW(ompi_osc_ucx_lock_t);
 121     lock->target_rank = target;
 122 
 123     if ((assert & MPI_MODE_NOCHECK) == 0) {
 124         lock->is_nocheck = false;
 125         if (lock_type == MPI_LOCK_EXCLUSIVE) {
 126             ret = start_exclusive(module, target);
 127             lock->type = LOCK_EXCLUSIVE;
 128         } else {
 129             ret = start_shared(module, target);
 130             lock->type = LOCK_SHARED;
 131         }
 132     } else {
 133         lock->is_nocheck = true;
 134     }
 135 
 136     if (ret == OMPI_SUCCESS) {
 137         opal_hash_table_set_value_uint32(&module->outstanding_locks, (uint32_t)target, (void *)lock);
 138     } else {
 139         OBJ_RELEASE(lock);
 140         module->epoch_type.access = original_epoch;
 141     }
 142 
 143     return ret;
 144 }
 145 
 146 int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
 147     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
 148     ompi_osc_ucx_lock_t *lock = NULL;
 149     int ret = OMPI_SUCCESS;
 150 
 151     if (module->epoch_type.access != PASSIVE_EPOCH) {
 152         return OMPI_ERR_RMA_SYNC;
 153     }
 154 
 155     opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &lock);
 156     if (lock == NULL) {
 157         return OMPI_ERR_RMA_SYNC;
 158     }
 159 
 160     opal_hash_table_remove_value_uint32(&module->outstanding_locks,
 161                                         (uint32_t)target);
 162 
 163     ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
 164     if (ret != OMPI_SUCCESS) {
 165         return ret;
 166     }
 167 
 168     if (lock->is_nocheck == false) {
 169         if (lock->type == LOCK_EXCLUSIVE) {
 170             ret = end_exclusive(module, target);
 171         } else {
 172             ret = end_shared(module, target);
 173         }
 174     }
 175 
 176     OBJ_RELEASE(lock);
 177 
 178     module->lock_count--;
 179     assert(module->lock_count >= 0);
 180     if (module->lock_count == 0) {
 181         module->epoch_type.access = NONE_EPOCH;
 182     }
 183 
 184     return ret;
 185 }
 186 
 187 int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win) {
 188     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
 189     int ret = OMPI_SUCCESS;
 190 
 191     if (module->epoch_type.access != NONE_EPOCH &&
 192         module->epoch_type.access != FENCE_EPOCH) {
 193         return OMPI_ERR_RMA_SYNC;
 194     }
 195 
 196     module->epoch_type.access = PASSIVE_ALL_EPOCH;
 197 
 198     if (0 == (assert & MPI_MODE_NOCHECK)) {
 199         int i, comm_size;
 200         module->lock_all_is_nocheck = false;
 201         comm_size = ompi_comm_size(module->comm);
 202         for (i = 0; i < comm_size; i++) {
 203             ret = start_shared(module, i);
 204             if (ret != OMPI_SUCCESS) {
 205                 int j;
 206                 for (j = 0; j < i; j++) {
 207                     end_shared(module, j);
 208                 }
 209                 return ret;
 210             }
 211         }
 212     } else {
 213         module->lock_all_is_nocheck = true;
 214     }
 215     assert(OMPI_SUCCESS == ret);
 216     return OMPI_SUCCESS;
 217 }
 218 
 219 int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
 220     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*)win->w_osc_module;
 221     int comm_size = ompi_comm_size(module->comm);
 222     int ret = OMPI_SUCCESS;
 223 
 224     if (module->epoch_type.access != PASSIVE_ALL_EPOCH) {
 225         return OMPI_ERR_RMA_SYNC;
 226     }
 227 
 228     assert(module->lock_count == 0);
 229 
 230     ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
 231     if (ret != OMPI_SUCCESS) {
 232         return ret;
 233     }
 234 
 235     if (!module->lock_all_is_nocheck) {
 236         int i;
 237         for (i = 0; i < comm_size; i++) {
 238             ret |= end_shared(module, i);
 239         }
 240     }
 241 
 242     module->epoch_type.access = NONE_EPOCH;
 243 
 244     return ret;
 245 }
 246 
 247 int ompi_osc_ucx_sync(struct ompi_win_t *win) {
 248     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
 249     int ret = OMPI_SUCCESS;
 250 
 251     if (module->epoch_type.access != PASSIVE_EPOCH &&
 252         module->epoch_type.access != PASSIVE_ALL_EPOCH) {
 253         return OMPI_ERR_RMA_SYNC;
 254     }
 255 
 256     opal_atomic_mb();
 257 
 258     ret = opal_common_ucx_wpmem_fence(module->mem);
 259     if (ret != OMPI_SUCCESS) {
 260         OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
 261     }
 262 
 263     return ret;
 264 }
 265 
 266 int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
 267     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
 268     int ret = OMPI_SUCCESS;
 269 
 270     if (module->epoch_type.access != PASSIVE_EPOCH &&
 271         module->epoch_type.access != PASSIVE_ALL_EPOCH) {
 272         return OMPI_ERR_RMA_SYNC;
 273     }
 274 
 275     ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
 276     if (ret != OMPI_SUCCESS) {
 277         return ret;
 278     }
 279 
 280     return OMPI_SUCCESS;
 281 }
 282 
 283 int ompi_osc_ucx_flush_all(struct ompi_win_t *win) {
 284     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
 285     int ret = OMPI_SUCCESS;
 286 
 287     if (module->epoch_type.access != PASSIVE_EPOCH &&
 288         module->epoch_type.access != PASSIVE_ALL_EPOCH) {
 289         return OMPI_ERR_RMA_SYNC;
 290     }
 291 
 292     ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
 293     if (ret != OMPI_SUCCESS) {
 294         return ret;
 295     }
 296 
 297     return OMPI_SUCCESS;
 298 }
 299 
 300 int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win) {
 301     /* TODO: currently euqals to ompi_osc_ucx_flush, should find a way
 302      * to implement local completion */
 303     return ompi_osc_ucx_flush(target, win);
 304 }
 305 
 306 int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win) {
 307     /* TODO: currently euqals to ompi_osc_ucx_flush_all, should find a way
 308      * to implement local completion */
 309     return ompi_osc_ucx_flush_all(win);
 310 }

/* [<][>][^][v][top][bottom][index][help] */