root/ompi/mca/osc/ucx/osc_ucx_active_target.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_osc_ucx_handle_incoming_post
  2. ompi_osc_ucx_fence
  3. ompi_osc_ucx_start
  4. ompi_osc_ucx_complete
  5. ompi_osc_ucx_post
  6. ompi_osc_ucx_wait
  7. ompi_osc_ucx_test

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
   5  *                         All rights reserved.
   6  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   7  *                         University of Stuttgart.  All rights reserved.
   8  * Copyright (c) 2004-2005 The Regents of the University of California.
   9  *                         All rights reserved.
  10  * Copyright (c) 2007-2015 Los Alamos National Security, LLC.  All rights
  11  *                         reserved.
  12  * Copyright (c) 2010      IBM Corporation.  All rights reserved.
  13  * Copyright (c) 2012-2013 Sandia National Laboratories.  All rights reserved.
  14  * Copyright (c) 2015 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2017      The University of Tennessee and The University
  16  *                         of Tennessee Research Foundation.  All rights
  17  *                         reserved.
  18  * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  */
  25 
  26 #include "ompi_config.h"
  27 
  28 #include "ompi/mca/osc/osc.h"
  29 #include "ompi/mca/osc/base/base.h"
  30 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
  31 #include "opal/mca/common/ucx/common_ucx.h"
  32 
  33 #include "osc_ucx.h"
  34 
  35 typedef struct ompi_osc_ucx_pending_post {
  36     opal_list_item_t super;
  37     int rank;
  38 } ompi_osc_ucx_pending_post_t;
  39 
  40 OBJ_CLASS_INSTANCE(ompi_osc_ucx_pending_post_t, opal_list_item_t, NULL, NULL);
  41 
  42 static inline void ompi_osc_ucx_handle_incoming_post(ompi_osc_ucx_module_t *module, volatile uint64_t *post_ptr, int ranks_in_win_grp[], int grp_size) {
  43     int i, post_rank = (*post_ptr) - 1;
  44     ompi_osc_ucx_pending_post_t *pending_post = NULL;
  45 
  46     (*post_ptr) = 0;
  47 
  48     for (i = 0; i < grp_size; i++) {
  49         if (post_rank == ranks_in_win_grp[i]) {
  50             module->post_count++;
  51             return;
  52         }
  53     }
  54 
  55     /* post does not belong to this start epoch. save it for later */
  56     pending_post = OBJ_NEW(ompi_osc_ucx_pending_post_t);
  57     pending_post->rank = post_rank;
  58     opal_list_append(&module->pending_posts, &pending_post->super);
  59 }
  60 
  61 int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
  62     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
  63     int ret = OMPI_SUCCESS;
  64 
  65     if (module->epoch_type.access != NONE_EPOCH &&
  66         module->epoch_type.access != FENCE_EPOCH) {
  67         return OMPI_ERR_RMA_SYNC;
  68     }
  69 
  70     if (assert & MPI_MODE_NOSUCCEED) {
  71         module->epoch_type.access = NONE_EPOCH;
  72     } else {
  73         module->epoch_type.access = FENCE_EPOCH;
  74     }
  75 
  76     if (!(assert & MPI_MODE_NOPRECEDE)) {
  77         ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0/*ignore*/);
  78         if (ret != OMPI_SUCCESS) {
  79             return ret;
  80         }
  81     }
  82 
  83     return module->comm->c_coll->coll_barrier(module->comm,
  84                                               module->comm->c_coll->coll_barrier_module);
  85 }
  86 
  87 int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t *win) {
  88     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
  89     int i, size, *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
  90     ompi_group_t *win_group = NULL;
  91     int ret = OMPI_SUCCESS;
  92 
  93     if (module->epoch_type.access != NONE_EPOCH &&
  94         module->epoch_type.access != FENCE_EPOCH) {
  95         return OMPI_ERR_RMA_SYNC;
  96     }
  97 
  98     module->epoch_type.access = START_COMPLETE_EPOCH;
  99 
 100     OBJ_RETAIN(group);
 101     module->start_group = group;
 102     size = ompi_group_size(module->start_group);
 103 
 104     ranks_in_grp = malloc(sizeof(int) * size);
 105     ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm));
 106 
 107     for (i = 0; i < size; i++) {
 108         ranks_in_grp[i] = i;
 109     }
 110 
 111     ret = ompi_comm_group(module->comm, &win_group);
 112     if (ret != OMPI_SUCCESS) {
 113         return OMPI_ERROR;
 114     }
 115 
 116     ret = ompi_group_translate_ranks(module->start_group, size, ranks_in_grp,
 117                                      win_group, ranks_in_win_grp);
 118     if (ret != OMPI_SUCCESS) {
 119         return OMPI_ERROR;
 120     }
 121 
 122     if ((assert & MPI_MODE_NOCHECK) == 0) {
 123         ompi_osc_ucx_pending_post_t *pending_post, *next;
 124 
 125         /* first look through the pending list */
 126         OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_ucx_pending_post_t) {
 127             for (i = 0; i < size; i++) {
 128                 if (pending_post->rank == ranks_in_win_grp[i]) {
 129                     opal_list_remove_item(&module->pending_posts, &pending_post->super);
 130                     OBJ_RELEASE(pending_post);
 131                     module->post_count++;
 132                     break;
 133                 }
 134             }
 135         }
 136 
 137         /* waiting for the rest post requests to come */
 138         while (module->post_count != size) {
 139             for (i = 0; i < OMPI_OSC_UCX_POST_PEER_MAX; i++) {
 140                 if (0 == module->state.post_state[i]) {
 141                     continue;
 142                 }
 143 
 144                 ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[i]), ranks_in_win_grp, size);
 145             }
 146             opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
 147         }
 148 
 149         module->post_count = 0;
 150     }
 151 
 152     free(ranks_in_grp);
 153     ompi_group_free(&win_group);
 154 
 155     module->start_grp_ranks = ranks_in_win_grp;
 156 
 157     return ret;
 158 }
 159 
 160 int ompi_osc_ucx_complete(struct ompi_win_t *win) {
 161     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
 162     int i, size;
 163     int ret = OMPI_SUCCESS;
 164 
 165     if (module->epoch_type.access != START_COMPLETE_EPOCH) {
 166         return OMPI_ERR_RMA_SYNC;
 167     }
 168 
 169     module->epoch_type.access = NONE_EPOCH;
 170 
 171     ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0/*ignore*/);
 172     if (ret != OMPI_SUCCESS) {
 173         return ret;
 174     }
 175 
 176     size = ompi_group_size(module->start_group);
 177     for (i = 0; i < size; i++) {
 178         uint64_t remote_addr = module->state_addrs[module->start_grp_ranks[i]] + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET; // write to state.complete_count on remote side
 179 
 180         ret = opal_common_ucx_wpmem_post(module->mem, UCP_ATOMIC_POST_OP_ADD,
 181                                        1, module->start_grp_ranks[i], sizeof(uint64_t),
 182                                        remote_addr);
 183         if (ret != OMPI_SUCCESS) {
 184             OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_post failed: %d", ret);
 185         }
 186 
 187         ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP,
 188                                         module->start_grp_ranks[i]);
 189         if (ret != OMPI_SUCCESS) {
 190             return ret;
 191         }
 192     }
 193 
 194     OBJ_RELEASE(module->start_group);
 195     module->start_group = NULL;
 196     free(module->start_grp_ranks);
 197 
 198     return ret;
 199 }
 200 
 201 int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t *win) {
 202     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
 203     int ret = OMPI_SUCCESS;
 204 
 205     if (module->epoch_type.exposure != NONE_EPOCH) {
 206         return OMPI_ERR_RMA_SYNC;
 207     }
 208 
 209     OBJ_RETAIN(group);
 210     module->post_group = group;
 211 
 212     if ((assert & MPI_MODE_NOCHECK) == 0) {
 213         int i, j, size;
 214         ompi_group_t *win_group = NULL;
 215         int *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
 216         int myrank = ompi_comm_rank(module->comm);
 217 
 218         size = ompi_group_size(module->post_group);
 219         ranks_in_grp = malloc(sizeof(int) * size);
 220         ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm));
 221 
 222         for (i = 0; i < size; i++) {
 223             ranks_in_grp[i] = i;
 224         }
 225 
 226         ret = ompi_comm_group(module->comm, &win_group);
 227         if (ret != OMPI_SUCCESS) {
 228             return OMPI_ERROR;
 229         }
 230 
 231         ret = ompi_group_translate_ranks(module->post_group, size, ranks_in_grp,
 232                                          win_group, ranks_in_win_grp);
 233         if (ret != OMPI_SUCCESS) {
 234             return OMPI_ERROR;
 235         }
 236 
 237         for (i = 0; i < size; i++) {
 238             uint64_t remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_INDEX_OFFSET; // write to state.post_index on remote side
 239             uint64_t curr_idx = 0, result = 0;
 240 
 241             /* do fop first to get an post index */
 242             ret = opal_common_ucx_wpmem_fetch(module->mem, UCP_ATOMIC_FETCH_OP_FADD,
 243                                             1, ranks_in_win_grp[i], &result,
 244                                             sizeof(result), remote_addr);
 245             if (ret != OMPI_SUCCESS) {
 246                 return OMPI_ERROR;
 247             }
 248 
 249             curr_idx = result & (OMPI_OSC_UCX_POST_PEER_MAX - 1);
 250 
 251             remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_STATE_OFFSET + sizeof(uint64_t) * curr_idx;
 252 
 253             /* do cas to send post message */
 254             do {
 255                 ret = opal_common_ucx_wpmem_cmpswp(module->mem, 0, result,
 256                                                  myrank + 1, &result, sizeof(result),
 257                                                  remote_addr);
 258                 if (ret != OMPI_SUCCESS) {
 259                     return OMPI_ERROR;
 260                 }
 261 
 262                 if (result == 0)
 263                     break;
 264 
 265                 /* prevent circular wait by checking for post messages received */
 266                 for (j = 0; j < OMPI_OSC_UCX_POST_PEER_MAX; j++) {
 267                     /* no post at this index (yet) */
 268                     if (0 == module->state.post_state[j]) {
 269                         continue;
 270                     }
 271 
 272                     ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0);
 273                 }
 274 
 275                 ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
 276                 usleep(100);
 277             } while (1);
 278         }
 279 
 280         free(ranks_in_grp);
 281         free(ranks_in_win_grp);
 282         ompi_group_free(&win_group);
 283     }
 284 
 285     module->epoch_type.exposure = POST_WAIT_EPOCH;
 286 
 287     return ret;
 288 }
 289 
 290 int ompi_osc_ucx_wait(struct ompi_win_t *win) {
 291     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
 292     int size;
 293 
 294     if (module->epoch_type.exposure != POST_WAIT_EPOCH) {
 295         return OMPI_ERR_RMA_SYNC;
 296     }
 297 
 298     size = ompi_group_size(module->post_group);
 299 
 300     while (module->state.complete_count != (uint64_t)size) {
 301         /* not sure if this is required */
 302         opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
 303     }
 304 
 305     module->state.complete_count = 0;
 306 
 307     OBJ_RELEASE(module->post_group);
 308     module->post_group = NULL;
 309 
 310     module->epoch_type.exposure = NONE_EPOCH;
 311 
 312     return OMPI_SUCCESS;
 313 }
 314 
 315 int ompi_osc_ucx_test(struct ompi_win_t *win, int *flag) {
 316     ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
 317     int size;
 318 
 319     if (module->epoch_type.exposure != POST_WAIT_EPOCH) {
 320         return OMPI_ERR_RMA_SYNC;
 321     }
 322 
 323     size = ompi_group_size(module->post_group);
 324 
 325     opal_progress();
 326 
 327     if (module->state.complete_count == (uint64_t)size) {
 328         OBJ_RELEASE(module->post_group);
 329         module->post_group = NULL;
 330 
 331         module->state.complete_count = 0;
 332 
 333         module->epoch_type.exposure = NONE_EPOCH;
 334         *flag = 1;
 335     } else {
 336         *flag = 0;
 337     }
 338 
 339     return OMPI_SUCCESS;
 340 }

/* [<][>][^][v][top][bottom][index][help] */