root/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_osc_pt2pt_lock_self
  2. ompi_osc_pt2pt_unlock_self
  3. ompi_osc_pt2pt_lock_remote
  4. ompi_osc_pt2pt_unlock_remote
  5. ompi_osc_pt2pt_flush_remote
  6. ompi_osc_pt2pt_lock_internal_execute
  7. ompi_osc_pt2pt_lock_internal
  8. ompi_osc_pt2pt_unlock_internal
  9. ompi_osc_pt2pt_lock
  10. ompi_osc_pt2pt_unlock
  11. ompi_osc_pt2pt_lock_all
  12. ompi_osc_pt2pt_unlock_all
  13. ompi_osc_pt2pt_sync
  14. ompi_osc_pt2pt_flush_lock
  15. ompi_osc_pt2pt_flush
  16. ompi_osc_pt2pt_flush_all
  17. ompi_osc_pt2pt_flush_local
  18. ompi_osc_pt2pt_flush_local_all
  19. activate_lock
  20. queue_lock
  21. ompi_osc_pt2pt_lock_try_acquire
  22. ompi_osc_pt2pt_activate_next_lock
  23. ompi_osc_pt2pt_process_lock
  24. ompi_osc_pt2pt_process_lock_ack
  25. ompi_osc_pt2pt_process_flush_ack
  26. ompi_osc_pt2pt_process_unlock_ack
  27. ompi_osc_pt2pt_process_unlock
  28. ompi_osc_pt2pt_process_flush

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University.
   4  *                         All rights reserved.
   5  * Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
   6  *                         All rights reserved.
   7  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   8  *                         University of Stuttgart.  All rights reserved.
   9  * Copyright (c) 2004-2005 The Regents of the University of California.
  10  *                         All rights reserved.
  11  * Copyright (c) 2007-2018 Los Alamos National Security, LLC.  All rights
  12  *                         reserved.
  13  * Copyright (c) 2010-2016 IBM Corporation.  All rights reserved.
  14  * Copyright (c) 2012-2013 Sandia National Laboratories.  All rights reserved.
  15  * Copyright (c) 2015      Intel, Inc. All rights reserved.
  16  * Copyright (c) 2015-2016 Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 
  25 #include "ompi_config.h"
  26 
  27 #include "osc_pt2pt.h"
  28 #include "osc_pt2pt_header.h"
  29 #include "osc_pt2pt_data_move.h"
  30 #include "osc_pt2pt_frag.h"
  31 
  32 #include "mpi.h"
  33 #include "opal/runtime/opal_progress.h"
  34 #include "opal/threads/mutex.h"
  35 #include "ompi/communicator/communicator.h"
  36 #include "ompi/mca/osc/base/base.h"
  37 #include "opal/include/opal_stdint.h"
  38 
  39 static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type,
  40                                              uint64_t lock_ptr);
  41 
  42 /* target-side tracking of a lock request */
  43 struct ompi_osc_pt2pt_pending_lock_t {
  44     opal_list_item_t super;
  45     int peer;
  46     int lock_type;
  47     uint64_t lock_ptr;
  48 };
  49 typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t;
  50 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t,
  51                    NULL, NULL);
  52 
  53 static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module);
  54 static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, int lock_type, uint64_t lock_ptr);
  55 static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock,
  56                                       int target);
  57 
  58 static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
  59 {
  60     const int my_rank = ompi_comm_rank (module->comm);
  61     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
  62     int lock_type = lock->sync.lock.type;
  63     bool acquired = false;
  64 
  65     assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
  66 
  67     (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
  68 
  69     acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
  70     if (!acquired) {
  71         /* queue the lock */
  72         queue_lock (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
  73 
  74         /* If locking local, can't be non-blocking according to the
  75            standard.  We need to wait for the ack here. */
  76         ompi_osc_pt2pt_sync_wait_expected (lock);
  77     }
  78 
  79     ompi_osc_pt2pt_peer_set_locked (peer, true);
  80     ompi_osc_pt2pt_peer_set_eager_active (peer, true);
  81 
  82     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
  83                          "local lock aquired"));
  84 
  85     return OMPI_SUCCESS;
  86 }
  87 
  88 static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
  89 {
  90     const int my_rank = ompi_comm_rank (module->comm);
  91     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
  92     int lock_type = lock->sync.lock.type;
  93 
  94     (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
  95 
  96     assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
  97 
  98     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
  99                          "ompi_osc_pt2pt_unlock_self: unlocking myself. lock state = %d", module->lock_status));
 100 
 101     if (MPI_LOCK_EXCLUSIVE == lock_type) {
 102         OPAL_THREAD_ADD_FETCH32(&module->lock_status, 1);
 103         ompi_osc_pt2pt_activate_next_lock (module);
 104     } else if (0 == OPAL_THREAD_ADD_FETCH32(&module->lock_status, -1)) {
 105         ompi_osc_pt2pt_activate_next_lock (module);
 106     }
 107 
 108     /* need to ensure we make progress */
 109     opal_progress();
 110 
 111     ompi_osc_pt2pt_peer_set_locked (peer, false);
 112     ompi_osc_pt2pt_peer_set_eager_active (peer, false);
 113 
 114     ompi_osc_pt2pt_sync_expected (lock);
 115 }
 116 
 117 int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
 118 {
 119     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
 120     int lock_type = lock->sync.lock.type;
 121     ompi_osc_pt2pt_header_lock_t lock_req;
 122 
 123     int ret;
 124 
 125     OPAL_THREAD_LOCK(&peer->lock);
 126     if (ompi_osc_pt2pt_peer_locked (peer)) {
 127         OPAL_THREAD_UNLOCK(&peer->lock);
 128         return OMPI_SUCCESS;
 129     }
 130 
 131     (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
 132 
 133     assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
 134 
 135     /* generate a lock request */
 136     lock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ;
 137     lock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
 138 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT && OPAL_ENABLE_DEBUG
 139     lock_req.padding[0] = 0;
 140     lock_req.padding[1] = 0;
 141 #endif
 142     lock_req.lock_type = lock_type;
 143     lock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
 144     OSC_PT2PT_HTON(&lock_req, module, target);
 145 
 146     ret = ompi_osc_pt2pt_control_send_unbuffered (module, target, &lock_req, sizeof (lock_req));
 147     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 148         OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, -1);
 149     } else {
 150         ompi_osc_pt2pt_peer_set_locked (peer, true);
 151     }
 152 
 153     OPAL_THREAD_UNLOCK(&peer->lock);
 154 
 155     return ret;
 156 }
 157 
 158 static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
 159 {
 160     int32_t frag_count = opal_atomic_swap_32 ((opal_atomic_int32_t *) module->epoch_outgoing_frag_count + target, -1);
 161     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
 162     int lock_type = lock->sync.lock.type;
 163     ompi_osc_pt2pt_header_unlock_t unlock_req;
 164     int ret;
 165 
 166     (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
 167 
 168     assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
 169 
 170     unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ;
 171     unlock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
 172 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT && OPAL_ENABLE_DEBUG
 173     unlock_req.padding[0] = 0;
 174     unlock_req.padding[1] = 0;
 175 #endif
 176     unlock_req.frag_count = frag_count;
 177     unlock_req.lock_type = lock_type;
 178     unlock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
 179     OSC_PT2PT_HTON(&unlock_req, module, target);
 180 
 181     if (peer->active_frag) {
 182         ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
 183         if  (active_frag->remain_len < sizeof (unlock_req)) {
 184             /* the peer should expect one more packet */
 185             ++unlock_req.frag_count;
 186             --module->epoch_outgoing_frag_count[target];
 187         }
 188     }
 189 
 190     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 191                          "osc pt2pt: unlocking target %d, frag count: %d", target,
 192                          unlock_req.frag_count));
 193 
 194     /* send control message with unlock request and count */
 195     ret = ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req));
 196     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 197         return ret;
 198     }
 199 
 200     ompi_osc_pt2pt_peer_set_locked (peer, false);
 201     ompi_osc_pt2pt_peer_set_eager_active (peer, false);
 202 
 203     return ompi_osc_pt2pt_frag_flush_target(module, target);
 204 }
 205 
 206 static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
 207 {
 208     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
 209     ompi_osc_pt2pt_header_flush_t flush_req;
 210     int32_t frag_count = opal_atomic_swap_32 ((opal_atomic_int32_t *) module->epoch_outgoing_frag_count + target, -1);
 211     int ret;
 212 
 213     (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
 214 
 215     assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
 216 
 217     flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ;
 218     flush_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
 219     flush_req.frag_count = frag_count;
 220     flush_req.lock_ptr = (uint64_t) (uintptr_t) lock;
 221 
 222     /* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long
 223      * requests. once that is done this can be removed. */
 224     if (peer->active_frag) {
 225         ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
 226         if (active_frag->remain_len < sizeof (flush_req)) {
 227             /* the peer should expect one more packet */
 228             ++flush_req.frag_count;
 229             --module->epoch_outgoing_frag_count[target];
 230         }
 231     }
 232 
 233     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "flushing to target %d, frag_count: %d",
 234                          target, flush_req.frag_count));
 235 
 236     /* send control message with unlock request and count */
 237     OSC_PT2PT_HTON(&flush_req, module, target);
 238     ret = ompi_osc_pt2pt_control_send (module, target, &flush_req, sizeof (flush_req));
 239     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 240         return ret;
 241     }
 242 
 243     /* start all sendreqs to target */
 244     return ompi_osc_pt2pt_frag_flush_target (module, target);
 245 }
 246 
 247 static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
 248 {
 249     int my_rank = ompi_comm_rank (module->comm);
 250     int target = lock->sync.lock.target;
 251     int assert = lock->sync.lock.assert;
 252     int ret;
 253 
 254     assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
 255 
 256     if (0 == (assert & MPI_MODE_NOCHECK)) {
 257         if (my_rank != target && target != -1) {
 258             ret = ompi_osc_pt2pt_lock_remote (module, target, lock);
 259         } else {
 260             ret = ompi_osc_pt2pt_lock_self (module, lock);
 261         }
 262 
 263         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 264             /* return */
 265             return ret;
 266         }
 267 
 268         /* for lock_all there is nothing more to do. we will lock peer's on demand */
 269     } else {
 270         lock->eager_send_active = true;
 271     }
 272 
 273     return OMPI_SUCCESS;
 274 }
 275 
 276 static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, ompi_win_t *win)
 277 {
 278     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 279     ompi_osc_pt2pt_sync_t *lock;
 280     int ret = OMPI_SUCCESS;
 281 
 282     /* Check if no_locks is set. TODO: we also need to track whether we are in an
 283      * active target epoch. Fence can make this tricky to track. */
 284     if (-1 == target) {
 285         if (module->all_sync.epoch_active) {
 286             OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "osc/pt2pt: attempted "
 287                                  "to lock all when active target epoch is %s and lock all epoch is %s. type %d",
 288                                  (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type && module->all_sync.epoch_active) ?
 289                                  "active" : "inactive",
 290                                  (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) ? "active" : "inactive",
 291                                  module->all_sync.type));
 292             return OMPI_ERR_RMA_SYNC;
 293         }
 294     } else {
 295         if (module->all_sync.epoch_active && (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type || MPI_LOCK_EXCLUSIVE == lock_type)) {
 296             /* impossible to get an exclusive lock while holding a global shared lock or in a active
 297              * target access epoch */
 298             return OMPI_ERR_RMA_SYNC;
 299         }
 300     }
 301 
 302     /* Check if no_locks is set. TODO: we also need to track whether we are in an
 303      * active target epoch. Fence can make this tricky to track. */
 304     if (module->all_sync.epoch_active || (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type &&
 305                                           (MPI_LOCK_EXCLUSIVE == lock_type || -1 == target))) {
 306         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: attempted "
 307                              "to acquire a lock on %d with type %d when active sync is %s and lock "
 308                              "all epoch is %s", target, lock_type, module->all_sync.epoch_active ? "active" : "inactive",
 309                              (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type &&
 310                               (MPI_LOCK_EXCLUSIVE == lock_type || -1 == target)) ? "active" : "inactive"));
 311         return OMPI_ERR_RMA_SYNC;
 312     }
 313 
 314     if (OMPI_OSC_PT2PT_SYNC_TYPE_FENCE == module->all_sync.type) {
 315         /* if not communication has occurred during a fence epoch then we can enter a lock epoch
 316          * just need to clear the all access epoch */
 317         module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE;
 318     }
 319 
 320     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 321                          "osc pt2pt: lock %d %d", target, lock_type));
 322 
 323     /* create lock item */
 324     if (-1 != target) {
 325         lock = ompi_osc_pt2pt_sync_allocate (module);
 326         if (OPAL_UNLIKELY(NULL == lock)) {
 327             return OMPI_ERR_OUT_OF_RESOURCE;
 328         }
 329 
 330         lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target);
 331     } else {
 332         lock = &module->all_sync;
 333     }
 334 
 335     lock->type = OMPI_OSC_PT2PT_SYNC_TYPE_LOCK;
 336     lock->sync.lock.target = target;
 337     lock->sync.lock.type = lock_type;
 338     lock->sync.lock.assert = assert;
 339     lock->num_peers = (-1 == target) ? ompi_comm_size (module->comm) : 1;
 340     lock->sync_expected = 0;
 341 
 342     /* delay all eager sends until we've heard back.. */
 343     OPAL_THREAD_LOCK(&module->lock);
 344 
 345     /* check for conflicting lock */
 346     if (ompi_osc_pt2pt_module_lock_find (module, target, NULL)) {
 347         if (&module->all_sync != lock) {
 348             ompi_osc_pt2pt_sync_return (lock);
 349         }
 350         OPAL_THREAD_UNLOCK(&module->lock);
 351         return OMPI_ERR_RMA_CONFLICT;
 352     }
 353 
 354     ++module->passive_target_access_epoch;
 355 
 356     ompi_osc_pt2pt_module_lock_insert (module, lock);
 357 
 358     OPAL_THREAD_UNLOCK(&module->lock);
 359 
 360     ret = ompi_osc_pt2pt_lock_internal_execute (module, lock);
 361     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 362         OPAL_THREAD_SCOPED_LOCK(&module->lock, ompi_osc_pt2pt_module_lock_remove (module, lock));
 363         if (&module->all_sync != lock) {
 364             ompi_osc_pt2pt_sync_return (lock);
 365         }
 366     }
 367 
 368     return ret;
 369 }
 370 
 371 static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
 372 {
 373     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 374     ompi_osc_pt2pt_sync_t *lock = NULL;
 375     int my_rank = ompi_comm_rank (module->comm);
 376     int ret = OMPI_SUCCESS;
 377 
 378     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 379                          "ompi_osc_pt2pt_unlock_internal: unlocking target %d", target));
 380 
 381     OPAL_THREAD_LOCK(&module->lock);
 382     lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL);
 383     if (OPAL_UNLIKELY(NULL == lock)) {
 384         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 385                              "ompi_osc_pt2pt_unlock: target %d is not locked in window %s",
 386                              target, win->w_name));
 387         OPAL_THREAD_UNLOCK(&module->lock);
 388         return OMPI_ERR_RMA_SYNC;
 389     }
 390 
 391     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 392                          "ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d",
 393                          lock->sync_expected));
 394     OPAL_THREAD_UNLOCK(&module->lock);
 395 
 396     /* wait until ack has arrived from target */
 397     ompi_osc_pt2pt_sync_wait_expected (lock);
 398 
 399     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 400                          "ompi_osc_pt2pt_unlock_internal: all lock acks received"));
 401 
 402     if (!(lock->sync.lock.assert & MPI_MODE_NOCHECK)) {
 403         if (my_rank != target) {
 404             if (-1 == target) {
 405                 /* send unlock messages to all of my peers */
 406                 for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
 407                     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, i);
 408 
 409                     if (my_rank == i || !ompi_osc_pt2pt_peer_locked (peer)) {
 410                         continue;
 411                     }
 412 
 413                     ret = ompi_osc_pt2pt_unlock_remote (module, i, lock);
 414                     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 415                         return ret;
 416                     }
 417                 }
 418 
 419                 ompi_osc_pt2pt_unlock_self (module, lock);
 420             } else {
 421                 ret = ompi_osc_pt2pt_unlock_remote (module, target, lock);
 422                 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 423                     return ret;
 424                 }
 425             }
 426 
 427             /* wait for unlock acks. this signals remote completion of fragments */
 428             ompi_osc_pt2pt_sync_wait_expected (lock);
 429 
 430             /* It is possible for the unlock to finish too early before the data
 431              * is actually present in the recv buffer (for non-contiguous datatypes)
 432              * So make sure to wait for all of the fragments to arrive.
 433              */
 434             OPAL_THREAD_LOCK(&module->lock);
 435             while (module->outgoing_frag_count < 0) {
 436                 opal_condition_wait(&module->cond, &module->lock);
 437             }
 438             OPAL_THREAD_UNLOCK(&module->lock);
 439 
 440             OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 441                                  "ompi_osc_pt2pt_unlock: unlock of %d complete", target));
 442         } else {
 443             ompi_osc_pt2pt_unlock_self (module, lock);
 444         }
 445     } else {
 446         /* flush instead */
 447         ompi_osc_pt2pt_flush_lock (module, lock, target);
 448     }
 449 
 450     OPAL_THREAD_LOCK(&module->lock);
 451     ompi_osc_pt2pt_module_lock_remove (module, lock);
 452 
 453     if (-1 != lock->sync.lock.target) {
 454         ompi_osc_pt2pt_sync_return (lock);
 455     } else {
 456         ompi_osc_pt2pt_sync_reset (lock);
 457     }
 458 
 459     --module->passive_target_access_epoch;
 460 
 461     OPAL_THREAD_UNLOCK(&module->lock);
 462 
 463     return ret;
 464 }
 465 
 466 int ompi_osc_pt2pt_lock(int lock_type, int target, int assert, ompi_win_t *win)
 467 {
 468     assert(target >= 0);
 469 
 470     return ompi_osc_pt2pt_lock_internal (lock_type, target, assert, win);
 471 }
 472 
 473 int ompi_osc_pt2pt_unlock (int target, struct ompi_win_t *win)
 474 {
 475     return ompi_osc_pt2pt_unlock_internal (target, win);
 476 }
 477 
 478 int ompi_osc_pt2pt_lock_all(int assert, struct ompi_win_t *win)
 479 {
 480     return ompi_osc_pt2pt_lock_internal (MPI_LOCK_SHARED, -1, assert, win);
 481 }
 482 
 483 
 484 int ompi_osc_pt2pt_unlock_all (struct ompi_win_t *win)
 485 {
 486     return ompi_osc_pt2pt_unlock_internal (-1, win);
 487 }
 488 
 489 
 490 int ompi_osc_pt2pt_sync (struct ompi_win_t *win)
 491 {
 492     opal_progress();
 493     return OMPI_SUCCESS;
 494 }
 495 
 496 static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock,
 497                                       int target)
 498 {
 499     int ret;
 500     int my_rank = ompi_comm_rank (module->comm);
 501 
 502     /* wait until ack has arrived from target, since we need to be
 503        able to eager send before we can transfer all the data... */
 504     ompi_osc_pt2pt_sync_wait_expected (lock);
 505 
 506     if (-1 == target) {
 507         /* NTH: no local flush */
 508         for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
 509             if (i == my_rank) {
 510                 continue;
 511             }
 512 
 513             ret = ompi_osc_pt2pt_flush_remote (module, i, lock);
 514             if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 515                 return ret;
 516             }
 517         }
 518     } else {
 519         /* send control message with flush request and count */
 520         ret = ompi_osc_pt2pt_flush_remote (module, target, lock);
 521         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 522             return ret;
 523         }
 524     }
 525 
 526     /* wait for all flush acks (meaning remote completion) */
 527     ompi_osc_pt2pt_sync_wait_expected (lock);
 528     opal_condition_broadcast (&module->cond);
 529 
 530     return OMPI_SUCCESS;
 531 }
 532 
 533 int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win)
 534 {
 535     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 536     ompi_osc_pt2pt_sync_t *lock;
 537     int ret;
 538 
 539     assert (0 <= target);
 540 
 541     /* flush is only allowed from within a passive target epoch */
 542     if (!module->passive_target_access_epoch) {
 543         return OMPI_ERR_RMA_SYNC;
 544     }
 545 
 546     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 547                          "ompi_osc_pt2pt_flush starting..."));
 548 
 549     if (ompi_comm_rank (module->comm) == target) {
 550         /* nothing to flush */
 551         opal_progress ();
 552         return OMPI_SUCCESS;
 553     }
 554 
 555     OPAL_THREAD_LOCK(&module->lock);
 556     lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL);
 557     if (NULL == lock) {
 558         if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) {
 559             lock = &module->all_sync;
 560         }
 561     }
 562     OPAL_THREAD_UNLOCK(&module->lock);
 563     if (OPAL_UNLIKELY(NULL == lock)) {
 564         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 565                              "ompi_osc_pt2pt_flush: target %d is not locked in window %s",
 566                              target, win->w_name));
 567         ret = OMPI_ERR_RMA_SYNC;
 568     } else {
 569         ret = ompi_osc_pt2pt_flush_lock (module, lock, target);
 570     }
 571 
 572     return ret;
 573 }
 574 
 575 
 576 int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win)
 577 {
 578     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 579     ompi_osc_pt2pt_sync_t *lock;
 580     int target, ret;
 581     void *node;
 582 
 583     /* flush is only allowed from within a passive target epoch */
 584     if (OPAL_UNLIKELY(!module->passive_target_access_epoch)) {
 585         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 586                              "ompi_osc_pt2pt_flush_all: no targets are locked in window %s",
 587                              win->w_name));
 588         return OMPI_ERR_RMA_SYNC;
 589     }
 590 
 591     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 592                          "ompi_osc_pt2pt_flush_all entering..."));
 593 
 594     /* flush all locks */
 595     ret = opal_hash_table_get_first_key_uint32 (&module->outstanding_locks, (uint32_t *) &target,
 596                                                 (void **) &lock, &node);
 597     if (OPAL_SUCCESS == ret) {
 598         do {
 599             ret = ompi_osc_pt2pt_flush_lock (module, lock, lock->sync.lock.target);
 600             if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 601                 break;
 602             }
 603 
 604             ret = opal_hash_table_get_next_key_uint32 (&module->outstanding_locks, (uint32_t *) &target,
 605                                                        (void **) lock, node, &node);
 606             if (OPAL_SUCCESS != ret) {
 607                 ret = OPAL_SUCCESS;
 608                 break;
 609             }
 610         } while (1);
 611     }
 612 
 613     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 614                          "ompi_osc_pt2pt_flush_all complete"));
 615 
 616     return ret;
 617 }
 618 
 619 
 620 int ompi_osc_pt2pt_flush_local (int target, struct ompi_win_t *win)
 621 {
 622     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 623     int ret;
 624 
 625     /* flush is only allowed from within a passive target epoch */
 626     if (!module->passive_target_access_epoch) {
 627         return OMPI_ERR_RMA_SYNC;
 628     }
 629 
 630     ret = ompi_osc_pt2pt_frag_flush_target(module, target);
 631     if (OMPI_SUCCESS != ret) {
 632         return ret;
 633     }
 634 
 635     /* wait for all the requests */
 636     OPAL_THREAD_LOCK(&module->lock);
 637     while (module->outgoing_frag_count < 0) {
 638         opal_condition_wait(&module->cond, &module->lock);
 639     }
 640     OPAL_THREAD_UNLOCK(&module->lock);
 641 
 642     /* make some progress */
 643     opal_progress ();
 644 
 645     return OMPI_SUCCESS;
 646 }
 647 
 648 
 649 int ompi_osc_pt2pt_flush_local_all (struct ompi_win_t *win)
 650 {
 651     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 652     int ret = OMPI_SUCCESS;
 653 
 654     /* flush is only allowed from within a passive target epoch */
 655     if (!module->passive_target_access_epoch) {
 656         return OMPI_ERR_RMA_SYNC;
 657     }
 658 
 659     ret = ompi_osc_pt2pt_frag_flush_all(module);
 660     if (OMPI_SUCCESS != ret) {
 661         return ret;
 662     }
 663 
 664     /* wait for all the requests */
 665     OPAL_THREAD_LOCK(&module->lock);
 666     while (module->outgoing_frag_count < 0) {
 667         opal_condition_wait(&module->cond, &module->lock);
 668     }
 669     OPAL_THREAD_UNLOCK(&module->lock);
 670 
 671     /* make some progress */
 672     opal_progress ();
 673 
 674     return OMPI_SUCCESS;
 675 }
 676 
 677 /* target side operation to acknowledge to initiator side that the
 678    lock is now held by the initiator */
 679 static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor,
 680                                  uint64_t lock_ptr)
 681 {
 682     ompi_osc_pt2pt_sync_t *lock;
 683 
 684     if (ompi_comm_rank (module->comm) != requestor) {
 685         ompi_osc_pt2pt_header_lock_ack_t lock_ack;
 686 
 687         lock_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK;
 688         lock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
 689         lock_ack.source = ompi_comm_rank(module->comm);
 690         lock_ack.lock_ptr = lock_ptr;
 691         OSC_PT2PT_HTON(&lock_ack, module, requestor);
 692 
 693         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 694                              "osc pt2pt: sending lock to %d", requestor));
 695 
 696         /* we don't want to send any data, since we're the exposure
 697            epoch only, so use an unbuffered send */
 698         return ompi_osc_pt2pt_control_send_unbuffered (module, requestor, &lock_ack, sizeof (lock_ack));
 699     }
 700 
 701 
 702     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 703                          "osc pt2pt: releasing local lock"));
 704 
 705     lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ptr;
 706     if (OPAL_UNLIKELY(NULL == lock)) {
 707         OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output,
 708                              "lock could not be located"));
 709     }
 710 
 711     ompi_osc_pt2pt_sync_expected (lock);
 712 
 713     return OMPI_SUCCESS;
 714 }
 715 
 716 
 717 /* target side operation to create a pending lock request for a lock
 718    request that could not be satisfied */
 719 static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor,
 720                               int lock_type, uint64_t lock_ptr)
 721 {
 722     ompi_osc_pt2pt_pending_lock_t *pending =
 723         OBJ_NEW(ompi_osc_pt2pt_pending_lock_t);
 724     if (NULL == pending) {
 725         return OMPI_ERR_OUT_OF_RESOURCE;
 726     }
 727 
 728     pending->peer = requestor;
 729     pending->lock_type = lock_type;
 730     pending->lock_ptr = lock_ptr;
 731 
 732     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 733                          "osc pt2pt: queueing lock request from %d", requestor));
 734 
 735     OPAL_THREAD_SCOPED_LOCK(&module->locks_pending_lock, opal_list_append(&module->locks_pending, &pending->super));
 736 
 737     return OMPI_SUCCESS;
 738 }
 739 
 740 static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type, uint64_t lock_ptr)
 741 {
 742     bool queue = false;
 743 
 744     if (MPI_LOCK_SHARED == lock_type) {
 745         int32_t lock_status = module->lock_status;
 746 
 747         do {
 748             if (lock_status < 0) {
 749                 queue = true;
 750                 break;
 751             }
 752 
 753             if (opal_atomic_compare_exchange_strong_32 (&module->lock_status, &lock_status, lock_status + 1)) {
 754                 break;
 755             }
 756         } while (1);
 757     } else {
 758         int32_t _tmp_value = 0;
 759         queue = !opal_atomic_compare_exchange_strong_32 (&module->lock_status, &_tmp_value, -1);
 760     }
 761 
 762     if (queue) {
 763         return false;
 764     }
 765 
 766     activate_lock(module, source, lock_ptr);
 767 
 768     /* activated the lock */
 769     return true;
 770 }
 771 
 772 static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module) {
 773     /* release any other pending locks we can */
 774     ompi_osc_pt2pt_pending_lock_t *pending_lock, *next;
 775     int ret = OMPI_SUCCESS;
 776 
 777     OPAL_THREAD_LOCK(&module->locks_pending_lock);
 778     OPAL_LIST_FOREACH_SAFE(pending_lock, next, &module->locks_pending,
 779                            ompi_osc_pt2pt_pending_lock_t) {
 780         bool acquired = ompi_osc_pt2pt_lock_try_acquire (module, pending_lock->peer, pending_lock->lock_type,
 781                                                          pending_lock->lock_ptr);
 782         if (!acquired) {
 783             break;
 784         }
 785 
 786         opal_list_remove_item (&module->locks_pending, &pending_lock->super);
 787         OBJ_RELEASE(pending_lock);
 788     }
 789     OPAL_THREAD_UNLOCK(&module->locks_pending_lock);
 790 
 791     return ret;
 792 }
 793 
 794 
 795 /* target side function called when the initiator sends a lock
 796    request.  Lock will either be activated and acknowledged or
 797    queued. */
 798 int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source,
 799                                 ompi_osc_pt2pt_header_lock_t* lock_header)
 800 {
 801     bool acquired;
 802 
 803     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 804                          "ompi_osc_pt2pt_process_lock: processing lock request from %d. current lock state = %d",
 805                          source, module->lock_status));
 806 
 807     acquired = ompi_osc_pt2pt_lock_try_acquire (module, source, lock_header->lock_type, lock_header->lock_ptr);
 808 
 809     if (!acquired) {
 810         queue_lock(module, source, lock_header->lock_type, lock_header->lock_ptr);
 811     }
 812 
 813     return OMPI_SUCCESS;
 814 }
 815 
 816 
 817 /* initiator-side function called when the target acks the lock
 818    request. */
 819 void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
 820                                       ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header)
 821 {
 822     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, lock_ack_header->source);
 823     ompi_osc_pt2pt_sync_t *lock;
 824 
 825     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 826                          "ompi_osc_pt2pt_process_lock_ack: processing lock ack from %d for lock %" PRIu64,
 827                          lock_ack_header->source, lock_ack_header->lock_ptr));
 828 
 829     lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ack_header->lock_ptr;
 830     assert (NULL != lock);
 831 
 832     ompi_osc_pt2pt_peer_set_eager_active (peer, true);
 833     ompi_osc_pt2pt_frag_flush_pending (module, peer->rank);
 834 
 835     ompi_osc_pt2pt_sync_expected (lock);
 836 }
 837 
 838 void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int source,
 839                                       ompi_osc_pt2pt_header_flush_ack_t *flush_ack_header) {
 840     ompi_osc_pt2pt_sync_t *lock;
 841 
 842     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 843                          "ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock 0x%" PRIx64,
 844                          source, flush_ack_header->lock_ptr));
 845 
 846     lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) flush_ack_header->lock_ptr;
 847     assert (NULL != lock);
 848 
 849     ompi_osc_pt2pt_sync_expected (lock);
 850 }
 851 
 852 void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int source,
 853                                         ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header)
 854 {
 855     ompi_osc_pt2pt_sync_t *lock;
 856 
 857     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 858                          "ompi_osc_pt2pt_process_unlock_ack: processing unlock ack from %d",
 859                          source));
 860 
 861     /* NTH: need to verify that this will work as expected */
 862     lock = (ompi_osc_pt2pt_sync_t *) (intptr_t) unlock_ack_header->lock_ptr;
 863     assert (NULL != lock);
 864 
 865     ompi_osc_pt2pt_sync_expected (lock);
 866 }
 867 
 868 /**
 869  * Process an unlock request.
 870  *
 871  * @param[in] module        - OSC PT2PT module
 872  * @param[in] source        - Source rank
 873  * @param[in] unlock_header - Incoming unlock header
 874  *
 875  * This functions is the target-side function for handling an unlock
 876  * request. Once all pending operations from the target are complete
 877  * this functions sends an unlock acknowledgement then attempts to
 878  * active a pending lock if the lock becomes free.
 879  */
 880 int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source,
 881                                    ompi_osc_pt2pt_header_unlock_t *unlock_header)
 882 {
 883     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
 884     ompi_osc_pt2pt_header_unlock_ack_t unlock_ack;
 885     int ret;
 886 
 887     assert (NULL != peer);
 888 
 889     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 890                          "ompi_osc_pt2pt_process_unlock entering (passive_incoming_frag_count: %d)...",
 891                          peer->passive_incoming_frag_count));
 892 
 893     /* we cannot block when processing an incoming request */
 894     if (0 != peer->passive_incoming_frag_count) {
 895         return OMPI_ERR_WOULD_BLOCK;
 896     }
 897 
 898     unlock_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK;
 899     unlock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
 900 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT && OPAL_ENABLE_DEBUG
 901     unlock_ack.padding[0] = 0;
 902     unlock_ack.padding[1] = 0;
 903     unlock_ack.padding[2] = 0;
 904     unlock_ack.padding[3] = 0;
 905     unlock_ack.padding[4] = 0;
 906     unlock_ack.padding[5] = 0;
 907 #endif
 908     unlock_ack.lock_ptr = unlock_header->lock_ptr;
 909     OSC_PT2PT_HTON(&unlock_ack, module, source);
 910 
 911     ret = ompi_osc_pt2pt_control_send_unbuffered (module, source, &unlock_ack, sizeof (unlock_ack));
 912     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 913         return ret;
 914     }
 915 
 916     if (-1 == module->lock_status) {
 917         OPAL_THREAD_ADD_FETCH32(&module->lock_status, 1);
 918         ompi_osc_pt2pt_activate_next_lock (module);
 919     } else if (0 == OPAL_THREAD_ADD_FETCH32(&module->lock_status, -1)) {
 920         ompi_osc_pt2pt_activate_next_lock (module);
 921     }
 922 
 923     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 924                          "osc pt2pt: finished processing unlock fragment"));
 925 
 926     return ret;
 927 }
 928 
 929 int ompi_osc_pt2pt_process_flush (ompi_osc_pt2pt_module_t *module, int source,
 930                                   ompi_osc_pt2pt_header_flush_t *flush_header)
 931 {
 932     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
 933     ompi_osc_pt2pt_header_flush_ack_t flush_ack;
 934 
 935     assert (NULL != peer);
 936 
 937     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 938                          "ompi_osc_pt2pt_process_flush entering (passive_incoming_frag_count: %d)...",
 939                          peer->passive_incoming_frag_count));
 940 
 941     /* we cannot block when processing an incoming request */
 942     if (0 != peer->passive_incoming_frag_count) {
 943         return OMPI_ERR_WOULD_BLOCK;
 944     }
 945 
 946     flush_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK;
 947     flush_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
 948     flush_ack.lock_ptr = flush_header->lock_ptr;
 949     OSC_PT2PT_HTON(&flush_ack, module, source);
 950 
 951     return ompi_osc_pt2pt_control_send_unbuffered (module, source, &flush_ack, sizeof (flush_ack));
 952 }

/* [<][>][^][v][top][bottom][index][help] */