root/ompi/mca/osc/pt2pt/osc_pt2pt.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. ompi_osc_pt2pt_peer_locked
  2. ompi_osc_pt2pt_peer_unex
  3. ompi_osc_pt2pt_peer_eager_active
  4. ompi_osc_pt2pt_peer_set_flag
  5. ompi_osc_pt2pt_peer_set_locked
  6. ompi_osc_pt2pt_peer_set_unex
  7. ompi_osc_pt2pt_peer_set_eager_active
  8. ompi_osc_pt2pt_peer_lookup
  9. mark_incoming_completion
  10. mark_outgoing_completion
  11. ompi_osc_signal_outgoing
  12. osc_pt2pt_copy_on_recv
  13. osc_pt2pt_copy_for_send
  14. osc_pt2pt_gc_clean
  15. osc_pt2pt_gc_add_buffer
  16. osc_pt2pt_add_pending
  17. get_tag
  18. tag_to_target
  19. tag_to_origin
  20. ompi_osc_pt2pt_accumulate_lock
  21. ompi_osc_pt2pt_accumulate_trylock
  22. ompi_osc_pt2pt_in_passive_epoch
  23. ompi_osc_pt2pt_accumulate_unlock
  24. ompi_osc_pt2pt_module_lock_find
  25. ompi_osc_pt2pt_module_lock_insert
  26. ompi_osc_pt2pt_module_lock_remove
  27. ompi_osc_pt2pt_module_sync_lookup
  28. ompi_osc_pt2pt_access_epoch_active
  29. ompi_osc_pt2pt_peer_sends_active

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University.
   4  *                         All rights reserved.
   5  * Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
   6  *                         All rights reserved.
   7  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   8  *                         University of Stuttgart.  All rights reserved.
   9  * Copyright (c) 2004-2005 The Regents of the University of California.
  10  *                         All rights reserved.
  11  * Copyright (c) 2007-2018 Los Alamos National Security, LLC.  All rights
  12  *                         reserved.
  13  * Copyright (c) 2010      Cisco Systems, Inc.  All rights reserved.
  14  * Copyright (c) 2012-2013 Sandia National Laboratories.  All rights reserved.
  15  * Copyright (c) 2015-2017 Research Organization for Information Science
  16  *                         and Technology (RIST). All rights reserved.
  17  * Copyright (c) 2016      FUJITSU LIMITED.  All rights reserved.
  18  * Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  */
  25 
  26 #ifndef OMPI_OSC_PT2PT_H
  27 #define OMPI_OSC_PT2PT_H
  28 
  29 #include "ompi_config.h"
  30 #include "opal/class/opal_list.h"
  31 #include "opal/class/opal_free_list.h"
  32 #include "opal/class/opal_hash_table.h"
  33 #include "opal/threads/threads.h"
  34 #include "opal/util/output.h"
  35 
  36 #include "ompi/win/win.h"
  37 #include "ompi/info/info.h"
  38 #include "ompi/communicator/communicator.h"
  39 #include "ompi/datatype/ompi_datatype.h"
  40 #include "ompi/request/request.h"
  41 #include "ompi/mca/osc/osc.h"
  42 #include "ompi/mca/osc/base/base.h"
  43 #include "ompi/memchecker.h"
  44 
  45 #include "osc_pt2pt_header.h"
  46 #include "osc_pt2pt_sync.h"
  47 
  48 BEGIN_C_DECLS
  49 
  50 struct ompi_osc_pt2pt_frag_t;
  51 struct ompi_osc_pt2pt_receive_t;
  52 
  53 struct ompi_osc_pt2pt_component_t {
  54     /** Extend the basic osc component interface */
  55     ompi_osc_base_component_t super;
  56 
  57     /** lock access to modules */
  58     opal_mutex_t lock;
  59 
  60     /** cid -> module mapping */
  61     opal_hash_table_t modules;
  62 
  63     /** module count */
  64     int module_count;
  65 
  66     /** number of buffers per window */
  67     int receive_count;
  68 
  69     /** free list of ompi_osc_pt2pt_frag_t structures */
  70     opal_free_list_t frags;
  71 
  72     /** Free list of requests */
  73     opal_free_list_t requests;
  74 
  75     /** PT2PT component buffer size */
  76     unsigned int buffer_size;
  77 
  78     /** Lock for pending_operations */
  79     opal_mutex_t pending_operations_lock;
  80 
  81     /** List of operations that need to be processed */
  82     opal_list_t pending_operations;
  83 
  84     /** List of receives to be processed */
  85     opal_list_t pending_receives;
  86 
  87     /** Lock for pending_receives */
  88     opal_mutex_t pending_receives_lock;
  89 
  90     /** Is the progress function enabled? */
  91     bool progress_enable;
  92 };
  93 typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
  94 
  95 enum {
  96     /** peer has sent an unexpected post message (no matching start) */
  97     OMPI_OSC_PT2PT_PEER_FLAG_UNEX = 1,
  98     /** eager sends are active on this peer */
  99     OMPI_OSC_PT2PT_PEER_FLAG_EAGER = 2,
 100     /** peer has been locked (on-demand locking for lock_all) */
 101     OMPI_OSC_PT2PT_PEER_FLAG_LOCK = 4,
 102 };
 103 
 104 
 105 struct ompi_osc_pt2pt_peer_t {
 106     /** make this an opal object */
 107     opal_object_t super;
 108 
 109     /** rank of this peer */
 110     int rank;
 111 
 112     /** pointer to the current send fragment for each outgoing target */
 113     opal_atomic_intptr_t active_frag;
 114 
 115     /** lock for this peer */
 116     opal_mutex_t lock;
 117 
 118     /** fragments queued to this target */
 119     opal_list_t queued_frags;
 120 
 121     /** number of fragments incomming (negative - expected, positive - unsynchronized) */
 122     opal_atomic_int32_t passive_incoming_frag_count;
 123 
 124     /** peer flags */
 125     opal_atomic_int32_t flags;
 126 };
 127 typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
 128 
 129 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
 130 
 131 static inline bool ompi_osc_pt2pt_peer_locked (ompi_osc_pt2pt_peer_t *peer)
 132 {
 133     return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_LOCK);
 134 }
 135 
 136 static inline bool ompi_osc_pt2pt_peer_unex (ompi_osc_pt2pt_peer_t *peer)
 137 {
 138     return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_UNEX);
 139 }
 140 
 141 static inline bool ompi_osc_pt2pt_peer_eager_active (ompi_osc_pt2pt_peer_t *peer)
 142 {
 143     return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER);
 144 }
 145 
 146 static inline void ompi_osc_pt2pt_peer_set_flag (ompi_osc_pt2pt_peer_t *peer, int32_t flag, bool value)
 147 {
 148     if (value) {
 149         OPAL_ATOMIC_OR_FETCH32 (&peer->flags, flag);
 150     } else {
 151         OPAL_ATOMIC_AND_FETCH32 (&peer->flags, ~flag);
 152     }
 153 }
 154 
 155 static inline void ompi_osc_pt2pt_peer_set_locked (ompi_osc_pt2pt_peer_t *peer, bool value)
 156 {
 157     ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_LOCK, value);
 158 }
 159 
 160 static inline void ompi_osc_pt2pt_peer_set_unex (ompi_osc_pt2pt_peer_t *peer, bool value)
 161 {
 162     ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_UNEX, value);
 163 }
 164 
 165 static inline void ompi_osc_pt2pt_peer_set_eager_active (ompi_osc_pt2pt_peer_t *peer, bool value)
 166 {
 167     ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_EAGER, value);
 168 }
 169 
 170 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
 171 
 172 /** Module structure.  Exactly one of these is associated with each
 173     PT2PT window */
 174 struct ompi_osc_pt2pt_module_t {
 175     /** Extend the basic osc module interface */
 176     ompi_osc_base_module_t super;
 177 
 178     /** window should have accumulate ordering... */
 179     bool accumulate_ordering;
 180 
 181     /** no locks info key value */
 182     bool no_locks;
 183 
 184     /** pointer to free on cleanup (may be NULL) */
 185     void *free_after;
 186 
 187     /** Base pointer for local window */
 188     void *baseptr;
 189 
 190     /** communicator created with this window.  This is the cid used
 191         in the component's modules mapping. */
 192     ompi_communicator_t *comm;
 193 
 194     /** Local displacement unit. */
 195     int disp_unit;
 196 
 197     /** Mutex lock protecting module data */
 198     opal_recursive_mutex_t lock;
 199 
 200     /** condition variable associated with lock */
 201     opal_condition_t cond;
 202 
 203     /** hash table of peer objects */
 204     opal_hash_table_t peer_hash;
 205 
 206     /** lock protecting peer_hash */
 207     opal_mutex_t peer_lock;
 208 
 209     /** Nmber of communication fragments started for this epoch, by
 210         peer.  Not in peer data to make fence more manageable. */
 211     opal_atomic_uint32_t *epoch_outgoing_frag_count;
 212 
 213     /** cyclic counter for a unique tage for long messages. */
 214     opal_atomic_uint32_t tag_counter;
 215 
 216     /** number of outgoing fragments still to be completed */
 217     opal_atomic_int32_t outgoing_frag_count;
 218 
 219     /** number of incoming fragments */
 220     opal_atomic_int32_t active_incoming_frag_count;
 221 
 222     /** Number of targets locked/being locked */
 223     unsigned int passive_target_access_epoch;
 224 
 225     /** Indicates the window is in a pcsw or all access (fence, lock_all) epoch */
 226     ompi_osc_pt2pt_sync_t all_sync;
 227 
 228     /* ********************* PWSC data ************************ */
 229     struct ompi_group_t *pw_group;
 230 
 231     /** Number of "count" messages from the remote complete group
 232         we've received */
 233     opal_atomic_int32_t num_complete_msgs;
 234 
 235     /* ********************* LOCK data ************************ */
 236 
 237     /** Status of the local window lock.  One of 0 (unlocked),
 238         MPI_LOCK_EXCLUSIVE, or MPI_LOCK_SHARED. */
 239     opal_atomic_int32_t lock_status;
 240 
 241     /** lock for locks_pending list */
 242     opal_mutex_t locks_pending_lock;
 243 
 244     /** target side list of lock requests we couldn't satisfy yet */
 245     opal_list_t locks_pending;
 246 
 247     /** origin side list of locks currently outstanding */
 248     opal_hash_table_t outstanding_locks;
 249 
 250     /** receive fragments */
 251     struct ompi_osc_pt2pt_receive_t *recv_frags;
 252 
 253     /** number of receive fragments */
 254     unsigned int recv_frag_count;
 255 
 256     /* enforce accumulate semantics */
 257     opal_atomic_lock_t accumulate_lock;
 258 
 259     /** accumulate operations pending the accumulation lock */
 260     opal_list_t pending_acc;
 261 
 262     /** lock for pending_acc */
 263     opal_mutex_t pending_acc_lock;
 264 
 265     /** Lock for garbage collection lists */
 266     opal_mutex_t gc_lock;
 267 
 268     /** List of buffers that need to be freed */
 269     opal_list_t buffer_gc;
 270 };
 271 typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t;
 272 OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component;
 273 
 274 static inline ompi_osc_pt2pt_peer_t *ompi_osc_pt2pt_peer_lookup (ompi_osc_pt2pt_module_t *module,
 275                                                                  int rank)
 276 {
 277     ompi_osc_pt2pt_peer_t *peer = NULL;
 278     (void) opal_hash_table_get_value_uint32 (&module->peer_hash, rank, (void **) &peer);
 279 
 280     if (OPAL_UNLIKELY(NULL == peer)) {
 281         OPAL_THREAD_LOCK(&module->peer_lock);
 282         (void) opal_hash_table_get_value_uint32 (&module->peer_hash, rank, (void **) &peer);
 283 
 284         if (NULL == peer) {
 285             peer = OBJ_NEW(ompi_osc_pt2pt_peer_t);
 286             peer->rank = rank;
 287 
 288             (void) opal_hash_table_set_value_uint32 (&module->peer_hash, rank, (void *) peer);
 289         }
 290         OPAL_THREAD_UNLOCK(&module->peer_lock);
 291     }
 292 
 293     return peer;
 294 }
 295 
 296 
 297 struct ompi_osc_pt2pt_pending_t {
 298     opal_list_item_t super;
 299     ompi_osc_pt2pt_module_t *module;
 300     int source;
 301     ompi_osc_pt2pt_header_t header;
 302 };
 303 typedef struct ompi_osc_pt2pt_pending_t ompi_osc_pt2pt_pending_t;
 304 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_pending_t);
 305 
 306 struct ompi_osc_pt2pt_receive_t {
 307     opal_list_item_t super;
 308     ompi_osc_pt2pt_module_t *module;
 309     ompi_request_t *pml_request;
 310     void *buffer;
 311 };
 312 typedef struct ompi_osc_pt2pt_receive_t ompi_osc_pt2pt_receive_t;
 313 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_receive_t);
 314 
 315 #define GET_MODULE(win) ((ompi_osc_pt2pt_module_t*) win->w_osc_module)
 316 
 317 extern bool ompi_osc_pt2pt_no_locks;
 318 
 319 int ompi_osc_pt2pt_attach(struct ompi_win_t *win, void *base, size_t len);
 320 int ompi_osc_pt2pt_detach(struct ompi_win_t *win, const void *base);
 321 
 322 int ompi_osc_pt2pt_free(struct ompi_win_t *win);
 323 
 324 int ompi_osc_pt2pt_put(const void *origin_addr,
 325                              int origin_count,
 326                              struct ompi_datatype_t *origin_dt,
 327                              int target,
 328                              ptrdiff_t target_disp,
 329                              int target_count,
 330                              struct ompi_datatype_t *target_dt,
 331                              struct ompi_win_t *win);
 332 
 333 int ompi_osc_pt2pt_accumulate(const void *origin_addr,
 334                               int origin_count,
 335                               struct ompi_datatype_t *origin_dt,
 336                               int target,
 337                               ptrdiff_t target_disp,
 338                               int target_count,
 339                               struct ompi_datatype_t *target_dt,
 340                               struct ompi_op_t *op,
 341                               struct ompi_win_t *win);
 342 
 343 int ompi_osc_pt2pt_get(void *origin_addr,
 344                        int origin_count,
 345                        struct ompi_datatype_t *origin_dt,
 346                        int target,
 347                        ptrdiff_t target_disp,
 348                        int target_count,
 349                        struct ompi_datatype_t *target_dt,
 350                        struct ompi_win_t *win);
 351 
 352 int ompi_osc_pt2pt_compare_and_swap(const void *origin_addr,
 353                                    const void *compare_addr,
 354                                    void *result_addr,
 355                                    struct ompi_datatype_t *dt,
 356                                    int target,
 357                                    ptrdiff_t target_disp,
 358                                    struct ompi_win_t *win);
 359 
 360 int ompi_osc_pt2pt_fetch_and_op(const void *origin_addr,
 361                                void *result_addr,
 362                                struct ompi_datatype_t *dt,
 363                                int target,
 364                                ptrdiff_t target_disp,
 365                                struct ompi_op_t *op,
 366                                struct ompi_win_t *win);
 367 
 368 int ompi_osc_pt2pt_get_accumulate(const void *origin_addr,
 369                                  int origin_count,
 370                                  struct ompi_datatype_t *origin_datatype,
 371                                  void *result_addr,
 372                                  int result_count,
 373                                  struct ompi_datatype_t *result_datatype,
 374                                  int target_rank,
 375                                  MPI_Aint target_disp,
 376                                  int target_count,
 377                                  struct ompi_datatype_t *target_datatype,
 378                                  struct ompi_op_t *op,
 379                                  struct ompi_win_t *win);
 380 
 381 int ompi_osc_pt2pt_rput(const void *origin_addr,
 382                        int origin_count,
 383                        struct ompi_datatype_t *origin_dt,
 384                        int target,
 385                        ptrdiff_t target_disp,
 386                        int target_count,
 387                        struct ompi_datatype_t *target_dt,
 388                        struct ompi_win_t *win,
 389                        struct ompi_request_t **request);
 390 
 391 int ompi_osc_pt2pt_rget(void *origin_addr,
 392                        int origin_count,
 393                        struct ompi_datatype_t *origin_dt,
 394                        int target,
 395                        ptrdiff_t target_disp,
 396                        int target_count,
 397                        struct ompi_datatype_t *target_dt,
 398                        struct ompi_win_t *win,
 399                        struct ompi_request_t **request);
 400 
 401 int ompi_osc_pt2pt_raccumulate(const void *origin_addr,
 402                               int origin_count,
 403                               struct ompi_datatype_t *origin_dt,
 404                               int target,
 405                               ptrdiff_t target_disp,
 406                               int target_count,
 407                               struct ompi_datatype_t *target_dt,
 408                               struct ompi_op_t *op,
 409                               struct ompi_win_t *win,
 410                               struct ompi_request_t **request);
 411 
 412 int ompi_osc_pt2pt_rget_accumulate(const void *origin_addr,
 413                                   int origin_count,
 414                                   struct ompi_datatype_t *origin_datatype,
 415                                   void *result_addr,
 416                                   int result_count,
 417                                   struct ompi_datatype_t *result_datatype,
 418                                   int target_rank,
 419                                   MPI_Aint target_disp,
 420                                   int target_count,
 421                                   struct ompi_datatype_t *target_datatype,
 422                                   struct ompi_op_t *op,
 423                                   struct ompi_win_t *win,
 424                                   struct ompi_request_t **request);
 425 
 426 int ompi_osc_pt2pt_fence(int assert, struct ompi_win_t *win);
 427 
 428 /* received a post message */
 429 void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source);
 430 
 431 /* received a complete message */
 432 void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count);
 433 
 434 int ompi_osc_pt2pt_start(struct ompi_group_t *group,
 435                         int assert,
 436                         struct ompi_win_t *win);
 437 int ompi_osc_pt2pt_complete(struct ompi_win_t *win);
 438 
 439 int ompi_osc_pt2pt_post(struct ompi_group_t *group,
 440                               int assert,
 441                               struct ompi_win_t *win);
 442 
 443 int ompi_osc_pt2pt_wait(struct ompi_win_t *win);
 444 
 445 int ompi_osc_pt2pt_test(struct ompi_win_t *win,
 446                               int *flag);
 447 
 448 int ompi_osc_pt2pt_lock(int lock_type,
 449                               int target,
 450                               int assert,
 451                               struct ompi_win_t *win);
 452 
 453 int ompi_osc_pt2pt_unlock(int target,
 454                                 struct ompi_win_t *win);
 455 
 456 int ompi_osc_pt2pt_lock_all(int assert,
 457                            struct ompi_win_t *win);
 458 
 459 int ompi_osc_pt2pt_unlock_all(struct ompi_win_t *win);
 460 
 461 int ompi_osc_pt2pt_sync(struct ompi_win_t *win);
 462 
 463 int ompi_osc_pt2pt_flush(int target,
 464                         struct ompi_win_t *win);
 465 int ompi_osc_pt2pt_flush_all(struct ompi_win_t *win);
 466 int ompi_osc_pt2pt_flush_local(int target,
 467                               struct ompi_win_t *win);
 468 int ompi_osc_pt2pt_flush_local_all(struct ompi_win_t *win);
 469 
 470 int ompi_osc_pt2pt_set_info(struct ompi_win_t *win, struct opal_info_t *info);
 471 int ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct opal_info_t **info_used);
 472 
 473 int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module,
 474                                   void *buf,
 475                                   size_t count,
 476                                   struct ompi_datatype_t *datatype,
 477                                   int src,
 478                                   int tag,
 479                                   struct ompi_communicator_t *comm);
 480 
 481 int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock);
 482 
 483 /**
 484  * ompi_osc_pt2pt_progress_pending_acc:
 485  *
 486  * @short Progress one pending accumulation or compare and swap operation.
 487  *
 488  * @param[in] module   - OSC PT2PT module
 489  *
 490  * @long If the accumulation lock can be aquired progress one pending
 491  *       accumulate or compare and swap operation.
 492  */
 493 int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module);
 494 
 495 
 496 /**
 497  * mark_incoming_completion:
 498  *
 499  * @short Increment incoming completeion count.
 500  *
 501  * @param[in] module - OSC PT2PT module
 502  * @param[in] source - Passive target source or MPI_PROC_NULL (active target)
 503  *
 504  * @long This function incremements either the passive or active incoming counts.
 505  *       If the count reaches the signal count we signal the module's condition.
 506  *       This function uses atomics if necessary so it is not necessary to hold
 507  *       the module lock before calling this function.
 508  */
 509 static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, int source)
 510 {
 511     int32_t new_value;
 512 
 513     if (MPI_PROC_NULL == source) {
 514         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 515                              "mark_incoming_completion marking active incoming complete. module %p, count = %d",
 516                              (void *) module, (int) module->active_incoming_frag_count + 1));
 517         new_value = OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, 1);
 518         if (new_value >= 0) {
 519             OPAL_THREAD_LOCK(&module->lock);
 520             opal_condition_broadcast(&module->cond);
 521             OPAL_THREAD_UNLOCK(&module->lock);
 522         }
 523     } else {
 524         ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
 525 
 526         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 527                              "mark_incoming_completion marking passive incoming complete. module %p, source = %d, count = %d",
 528                              (void *) module, source, (int) peer->passive_incoming_frag_count + 1));
 529         new_value = OPAL_THREAD_ADD_FETCH32((opal_atomic_int32_t *) &peer->passive_incoming_frag_count, 1);
 530         if (0 == new_value) {
 531             OPAL_THREAD_LOCK(&module->lock);
 532             opal_condition_broadcast(&module->cond);
 533             OPAL_THREAD_UNLOCK(&module->lock);
 534         }
 535     }
 536 }
 537 
 538 /**
 539  * mark_outgoing_completion:
 540  *
 541  * @short Increment outgoing count.
 542  *
 543  * @param[in] module - OSC PT2PT module
 544  *
 545  * @long This function is used to signal that an outgoing send is complete. It
 546  *       incrememnts only the outgoing fragment count and signals the module
 547  *       condition the fragment count is >= the signal count. This function
 548  *       uses atomics if necessary so it is not necessary to hold the module
 549  *       lock before calling this function.
 550  */
 551 static inline void mark_outgoing_completion (ompi_osc_pt2pt_module_t *module)
 552 {
 553     int32_t new_value = OPAL_THREAD_ADD_FETCH32((opal_atomic_int32_t *) &module->outgoing_frag_count, 1);
 554     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 555                          "mark_outgoing_completion: outgoing_frag_count = %d", new_value));
 556     if (new_value >= 0) {
 557         OPAL_THREAD_LOCK(&module->lock);
 558         opal_condition_broadcast(&module->cond);
 559         OPAL_THREAD_UNLOCK(&module->lock);
 560     }
 561 }
 562 
 563 /**
 564  * ompi_osc_signal_outgoing:
 565  *
 566  * @short Increment outgoing signal counters.
 567  *
 568  * @param[in] module - OSC PT2PT module
 569  * @param[in] target - Passive target rank or MPI_PROC_NULL (active target)
 570  * @param[in] count  - Number of outgoing messages to signal.
 571  *
 572  * @long This function uses atomics if necessary so it is not necessary to hold
 573  *       the module lock before calling this function.
 574  */
 575 static inline void ompi_osc_signal_outgoing (ompi_osc_pt2pt_module_t *module, int target, int count)
 576 {
 577     OPAL_THREAD_ADD_FETCH32((opal_atomic_int32_t *) &module->outgoing_frag_count, -count);
 578     if (MPI_PROC_NULL != target) {
 579         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 580                              "ompi_osc_signal_outgoing_passive: target = %d, count = %d, total = %d", target,
 581                              count, module->epoch_outgoing_frag_count[target] + count));
 582         OPAL_THREAD_ADD_FETCH32((opal_atomic_int32_t *) (module->epoch_outgoing_frag_count + target), count);
 583     }
 584 }
 585 
 586 /**
 587  * osc_pt2pt_copy_on_recv:
 588  *
 589  * @short Helper function. Copies data from source to target through the
 590  * convertor.
 591  *
 592  * @param[in] target     - destination for the data
 593  * @param[in] source     - packed data
 594  * @param[in] source_len - length of source buffer
 595  * @param[in] proc       - proc that packed the source data
 596  * @param[in] count      - count of datatype items
 597  * @param[in] datatype   - datatype used for unpacking
 598  *
 599  * @long This functions unpacks data from the source buffer into the target
 600  *       buffer. The copy is done with a convertor generated from proc,
 601  *       datatype, and count.
 602  */
 603 static inline void osc_pt2pt_copy_on_recv (void *target, void *source, size_t source_len, ompi_proc_t *proc,
 604                                           int count, ompi_datatype_t *datatype)
 605 {
 606     opal_convertor_t convertor;
 607     uint32_t iov_count = 1;
 608     struct iovec iov;
 609     size_t max_data;
 610 
 611     /* create convertor */
 612     OBJ_CONSTRUCT(&convertor, opal_convertor_t);
 613 
 614     /* initialize convertor */
 615     opal_convertor_copy_and_prepare_for_recv(proc->super.proc_convertor, &datatype->super, count, target,
 616                                              0, &convertor);
 617 
 618     iov.iov_len  = source_len;
 619     iov.iov_base = (IOVBASE_TYPE *) source;
 620     max_data     = iov.iov_len;
 621     MEMCHECKER(memchecker_convertor_call(&opal_memchecker_base_mem_defined, &convertor));
 622 
 623     opal_convertor_unpack (&convertor, &iov, &iov_count, &max_data);
 624 
 625     MEMCHECKER(memchecker_convertor_call(&opal_memchecker_base_mem_noaccess, &convertor));
 626 
 627     OBJ_DESTRUCT(&convertor);
 628 }
 629 
 630 /**
 631  * osc_pt2pt_copy_for_send:
 632  *
 633  * @short: Helper function. Copies data from source to target through the
 634  * convertor.
 635  *
 636  * @param[in] target     - destination for the packed data
 637  * @param[in] target_len - length of the target buffer
 638  * @param[in] source     - original data
 639  * @param[in] proc       - proc this data will be sent to
 640  * @param[in] count      - count of datatype items
 641  * @param[in] datatype   - datatype used for packing
 642  *
 643  * @long This functions packs data from the source buffer into the target
 644  *       buffer. The copy is done with a convertor generated from proc,
 645  *       datatype, and count.
 646  */
 647 static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, const void *source, ompi_proc_t *proc,
 648                                            int count, ompi_datatype_t *datatype)
 649 {
 650     opal_convertor_t convertor;
 651     uint32_t iov_count = 1;
 652     struct iovec iov;
 653     size_t max_data;
 654 
 655     OBJ_CONSTRUCT(&convertor, opal_convertor_t);
 656 
 657     opal_convertor_copy_and_prepare_for_send(proc->super.proc_convertor, &datatype->super,
 658                                              count, source, 0, &convertor);
 659 
 660     iov.iov_len = target_len;
 661     iov.iov_base = (IOVBASE_TYPE *) target;
 662     opal_convertor_pack(&convertor, &iov, &iov_count, &max_data);
 663 
 664     OBJ_DESTRUCT(&convertor);
 665 }
 666 
 667 /**
 668  * osc_pt2pt_gc_clean:
 669  *
 670  * @short Release finished PML requests and accumulate buffers.
 671  *
 672  * @long This function exists because it is not possible to free a buffer from
 673  *     a request completion callback. We instead put requests and buffers on the
 674  *     module's garbage collection lists and release then at a later time.
 675  */
 676 static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module)
 677 {
 678     opal_list_item_t *item;
 679 
 680     OPAL_THREAD_LOCK(&module->gc_lock);
 681     while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) {
 682         OBJ_RELEASE(item);
 683     }
 684     OPAL_THREAD_UNLOCK(&module->gc_lock);
 685 }
 686 
 687 static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer)
 688 {
 689     OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
 690                             opal_list_append (&module->buffer_gc, buffer));
 691 }
 692 
 693 static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
 694 {
 695     OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.pending_operations_lock,
 696                             opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super));
 697 }
 698 
 699 #define OSC_PT2PT_FRAG_TAG   0x10000
 700 #define OSC_PT2PT_FRAG_MASK  0x0ffff
 701 
 702 /**
 703  * get_tag:
 704  *
 705  * @short Get a send/recv base tag for large memory operations.
 706  *
 707  * @param[in] module - OSC PT2PT module
 708  *
 709  * @long This function acquires a 16-bit tag for use with large memory operations. The
 710  *       tag will be odd or even depending on if this is in a passive target access
 711  *       or not. An actual tag that will be passed to PML send/recv function is given
 712  *       by tag_to_target or tag_to_origin function depending on the communication
 713  *       direction.
 714  */
 715 static inline int get_tag(ompi_osc_pt2pt_module_t *module)
 716 {
 717     /* the LSB of the tag is used be the receiver to determine if the
 718        message is a passive or active target (ie, where to mark
 719        completion). */
 720     int32_t tmp = OPAL_THREAD_ADD_FETCH32((opal_atomic_int32_t *) &module->tag_counter, 4);
 721     return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch);
 722 }
 723 
 724 /**
 725  * tag_to_target:
 726  *
 727  * @short Get a tag used for PML send/recv communication from an origin to a target.
 728  *
 729  * @param[in] tag - base tag given by get_tag function.
 730  */
 731 static inline int tag_to_target(int tag)
 732 {
 733     /* (returned_tag >> 1) & 0x1 == 0 */
 734     return tag + 0;
 735 }
 736 
 737 /**
 738  * tag_to_origin:
 739  *
 740  * @short Get a tag used for PML send/recv communication from a target to an origin.
 741  *
 742  * @param[in] tag - base tag given by get_tag function.
 743  */
 744 static inline int tag_to_origin(int tag)
 745 {
 746     /* (returned_tag >> 1) & 0x1 == 1 */
 747     return tag + 2;
 748 }
 749 
 750 /**
 751  * ompi_osc_pt2pt_accumulate_lock:
 752  *
 753  * @short Internal function that spins until the accumulation lock has
 754  *        been aquired.
 755  *
 756  * @param[in] module - OSC PT2PT module
 757  *
 758  * @returns 0
 759  *
 760  * @long This functions blocks until the accumulation lock has been aquired. This
 761  *       behavior is only acceptable from a user-level call as blocking in a
 762  *       callback may cause deadlock. If a callback needs the accumulate lock and
 763  *       it is not available it should be placed on the pending_acc list of the
 764  *       module. It will be released by ompi_osc_pt2pt_accumulate_unlock().
 765  */
 766 static inline int ompi_osc_pt2pt_accumulate_lock (ompi_osc_pt2pt_module_t *module)
 767 {
 768     while (opal_atomic_trylock (&module->accumulate_lock)) {
 769         opal_progress ();
 770     }
 771 
 772     return 0;
 773 }
 774 
 775 /**
 776  * ompi_osc_pt2pt_accumulate_trylock:
 777  *
 778  * @short Try to aquire the accumulation lock.
 779  *
 780  * @param[in] module - OSC PT2PT module
 781  *
 782  * @returns 0 if the accumulation lock was aquired
 783  * @returns 1 if the lock was not available
 784  *
 785  * @long This function will try to aquire the accumulation lock. This function
 786  *       is safe to call from a callback.
 787  */
 788 static inline int ompi_osc_pt2pt_accumulate_trylock (ompi_osc_pt2pt_module_t *module)
 789 {
 790     return opal_atomic_trylock (&module->accumulate_lock);
 791 }
 792 
 793 /**
 794  * @brief check if this process has this process is in a passive target access epoch
 795  *
 796  * @param[in] module          osc pt2pt module
 797  */
 798 static inline bool ompi_osc_pt2pt_in_passive_epoch (ompi_osc_pt2pt_module_t *module)
 799 {
 800     return 0 != module->passive_target_access_epoch;
 801 }
 802 
 803 /**
 804  * ompi_osc_pt2pt_accumulate_unlock:
 805  *
 806  * @short Unlock the accumulation lock and release a pending accumulation operation.
 807  *
 808  * @param[in] module - OSC PT2PT module
 809  *
 810  * @long This function unlocks the accumulation lock and release a single pending
 811  *       accumulation operation if one exists. This function may be called recursively.
 812  */
 813 static inline void ompi_osc_pt2pt_accumulate_unlock (ompi_osc_pt2pt_module_t *module)
 814 {
 815     opal_atomic_unlock (&module->accumulate_lock);
 816     if (0 != opal_list_get_size (&module->pending_acc)) {
 817         ompi_osc_pt2pt_progress_pending_acc (module);
 818     }
 819 }
 820 
 821 /**
 822  * Find the first outstanding lock of the target.
 823  *
 824  * @param[in]  module   osc pt2pt module
 825  * @param[in]  target   target rank
 826  * @param[out] peer     peer object associated with the target
 827  *
 828  * @returns an outstanding lock on success
 829  *
 830  * This function looks for an outstanding lock to the target. If a lock exists it is returned.
 831  */
 832 static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_lock_find (ompi_osc_pt2pt_module_t *module, int target,
 833                                                                       ompi_osc_pt2pt_peer_t **peer)
 834 {
 835     ompi_osc_pt2pt_sync_t *outstanding_lock = NULL;
 836 
 837     (void) opal_hash_table_get_value_uint32 (&module->outstanding_locks, (uint32_t) target, (void **) &outstanding_lock);
 838     if (NULL != outstanding_lock && peer) {
 839         *peer = outstanding_lock->peer_list.peer;
 840     }
 841 
 842     return outstanding_lock;
 843 }
 844 
 845 /**
 846  * Add an outstanding lock
 847  *
 848  * @param[in] module   osc pt2pt module
 849  * @param[in] lock     lock object
 850  *
 851  * This function inserts a lock object to the list of outstanding locks. The caller must be holding the module
 852  * lock.
 853  */
 854 static inline void ompi_osc_pt2pt_module_lock_insert (struct ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
 855 {
 856     (void) opal_hash_table_set_value_uint32 (&module->outstanding_locks, (uint32_t) lock->sync.lock.target, (void *) lock);
 857 }
 858 
 859 
 860 /**
 861  * Remove an outstanding lock
 862  *
 863  * @param[in] module   osc pt2pt module
 864  * @param[in] lock     lock object
 865  *
 866  * This function removes a lock object to the list of outstanding locks. The caller must be holding the module
 867  * lock.
 868  */
 869 static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
 870 {
 871 
 872     (void) opal_hash_table_remove_value_uint32 (&module->outstanding_locks, (uint32_t) lock->sync.lock.target);
 873 }
 874 
 875 /**
 876  * Lookup a synchronization object associated with the target
 877  *
 878  * @param[in] module   osc pt2pt module
 879  * @param[in] target   target rank
 880  * @param[out] peer    peer object
 881  *
 882  * @returns NULL if the target is not locked, fenced, or part of a pscw sync
 883  * @returns synchronization object on success
 884  *
 885  * This function returns the synchronization object associated with an access epoch for
 886  * the target. If the target is not part of any current access epoch then NULL is returned.
 887  */
 888 static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t *module, int target,
 889                                                                         struct ompi_osc_pt2pt_peer_t **peer)
 890 {
 891     ompi_osc_pt2pt_peer_t *tmp;
 892 
 893     if (NULL == peer) {
 894         peer = &tmp;
 895     }
 896 
 897     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 898                          "osc/pt2pt: looking for synchronization object for target %d", target));
 899 
 900     switch (module->all_sync.type) {
 901     case OMPI_OSC_PT2PT_SYNC_TYPE_NONE:
 902         if (!module->no_locks) {
 903             return ompi_osc_pt2pt_module_lock_find (module, target, peer);
 904         }
 905 
 906         return NULL;
 907     case OMPI_OSC_PT2PT_SYNC_TYPE_FENCE:
 908     case OMPI_OSC_PT2PT_SYNC_TYPE_LOCK:
 909         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 910                              "osc/pt2pt: found fence/lock_all access epoch for target %d", target));
 911 
 912         /* fence epoch is now active */
 913         module->all_sync.epoch_active = true;
 914         *peer = ompi_osc_pt2pt_peer_lookup (module, target);
 915         if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type && !ompi_osc_pt2pt_peer_locked (*peer)) {
 916             (void) ompi_osc_pt2pt_lock_remote (module, target, &module->all_sync);
 917         }
 918 
 919         return &module->all_sync;
 920     case OMPI_OSC_PT2PT_SYNC_TYPE_PSCW:
 921         if (ompi_osc_pt2pt_sync_pscw_peer (module, target, peer)) {
 922             OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 923                                  "osc/pt2pt: found PSCW access epoch target for %d", target));
 924             return &module->all_sync;
 925         }
 926     }
 927 
 928     return NULL;
 929 }
 930 
 931 /**
 932  * @brief check if an access epoch is active
 933  *
 934  * @param[in] module        osc pt2pt module
 935  *
 936  * @returns true if any type of access epoch is active
 937  * @returns false otherwise
 938  *
 939  * This function is used to check for conflicting access epochs.
 940  */
 941 static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t *module)
 942 {
 943     return (module->all_sync.epoch_active || ompi_osc_pt2pt_in_passive_epoch (module));
 944 }
 945 
 946 static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank)
 947 {
 948     ompi_osc_pt2pt_sync_t *sync;
 949     ompi_osc_pt2pt_peer_t *peer;
 950 
 951     sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, &peer);
 952     if (!sync) {
 953         return false;
 954     }
 955 
 956     return sync->eager_send_active || ompi_osc_pt2pt_peer_eager_active (peer);
 957 }
 958 
 959 END_C_DECLS
 960 
 961 #endif /* OMPI_OSC_PT2PT_H */

/* [<][>][^][v][top][bottom][index][help] */