root/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. compare_ranks
  2. ompi_osc_pt2pt_get_peers
  3. ompi_osc_pt2pt_release_peers
  4. ompi_osc_pt2pt_fence
  5. ompi_osc_pt2pt_start
  6. ompi_osc_pt2pt_complete
  7. ompi_osc_pt2pt_post
  8. ompi_osc_pt2pt_wait
  9. ompi_osc_pt2pt_test
  10. osc_pt2pt_incoming_complete
  11. osc_pt2pt_incoming_post

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University.
   4  *                         All rights reserved.
   5  * Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
   6  *                         All rights reserved.
   7  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   8  *                         University of Stuttgart.  All rights reserved.
   9  * Copyright (c) 2004-2005 The Regents of the University of California.
  10  *                         All rights reserved.
  11  * Copyright (c) 2007-2018 Los Alamos National Security, LLC.  All rights
  12  *                         reserved.
  13  * Copyright (c) 2010-2016 IBM Corporation.  All rights reserved.
  14  * Copyright (c) 2012-2013 Sandia National Laboratories.  All rights reserved.
  15  * Copyright (c) 2015      Research Organization for Information Science
  16  *                         and Technology (RIST). All rights reserved.
  17  * Copyright (c) 2017      The University of Tennessee and The University
  18  *                         of Tennessee Research Foundation.  All rights
  19  *                         reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  */
  26 
  27 #include "ompi_config.h"
  28 
  29 #include "osc_pt2pt.h"
  30 #include "osc_pt2pt_header.h"
  31 #include "osc_pt2pt_data_move.h"
  32 #include "osc_pt2pt_frag.h"
  33 
  34 #include "mpi.h"
  35 #include "opal/runtime/opal_progress.h"
  36 #include "opal/threads/mutex.h"
  37 #include "ompi/communicator/communicator.h"
  38 #include "ompi/mca/osc/base/base.h"
  39 
  40 /**
  41  * compare_ranks:
  42  *
  43  * @param[in] ptra    Pointer to integer item
  44  * @param[in] ptrb    Pointer to integer item
  45  *
  46  * @returns 0 if *ptra == *ptrb
  47  * @returns -1 if *ptra < *ptrb
  48  * @returns 1 otherwise
  49  *
  50  * This function is used to sort the rank list. It can be removed if
  51  * groups are always in order.
  52  */
  53 static int compare_ranks (const void *ptra, const void *ptrb)
  54 {
  55     int a = *((int *) ptra);
  56     int b = *((int *) ptrb);
  57 
  58     if (a < b) {
  59         return -1;
  60     } else if (a > b) {
  61         return 1;
  62     }
  63 
  64     return 0;
  65 }
  66 
  67 /**
  68  * ompi_osc_pt2pt_get_comm_ranks:
  69  *
  70  * @param[in] module    - OSC PT2PT module
  71  * @param[in] sub_group - Group with ranks to translate
  72  *
  73  * @returns an array of translated ranks on success or NULL on failure
  74  *
  75  * Translate the ranks given in {sub_group} into ranks in the
  76  * communicator used to create {module}.
  77  */
  78 static ompi_osc_pt2pt_peer_t **ompi_osc_pt2pt_get_peers (ompi_osc_pt2pt_module_t *module, ompi_group_t *sub_group)
  79 {
  80     int size = ompi_group_size(sub_group);
  81     ompi_osc_pt2pt_peer_t **peers;
  82     int *ranks1, *ranks2;
  83     int ret;
  84 
  85     ranks1 = calloc (size, sizeof(int));
  86     ranks2 = calloc (size, sizeof(int));
  87     peers = calloc (size, sizeof (ompi_osc_pt2pt_peer_t *));
  88     if (NULL == ranks1 || NULL == ranks2 || NULL == peers) {
  89         free (ranks1);
  90         free (ranks2);
  91         free (peers);
  92         return NULL;
  93     }
  94 
  95     for (int i = 0 ; i < size ; ++i) {
  96         ranks1[i] = i;
  97     }
  98 
  99     ret = ompi_group_translate_ranks (sub_group, size, ranks1, module->comm->c_local_group,
 100                                       ranks2);
 101     free (ranks1);
 102     if (OMPI_SUCCESS != ret) {
 103         free (ranks2);
 104         free (peers);
 105         return NULL;
 106     }
 107 
 108     qsort (ranks2, size, sizeof (int), compare_ranks);
 109     for (int i = 0 ; i < size ; ++i) {
 110         peers[i] = ompi_osc_pt2pt_peer_lookup (module, ranks2[i]);
 111         OBJ_RETAIN(peers[i]);
 112     }
 113     free (ranks2);
 114 
 115     return peers;
 116 }
 117 
 118 static void ompi_osc_pt2pt_release_peers (ompi_osc_pt2pt_peer_t **peers, int npeers)
 119 {
 120     if (peers) {
 121         for (int i = 0 ; i < npeers ; ++i) {
 122             OBJ_RELEASE(peers[i]);
 123         }
 124 
 125         free (peers);
 126     }
 127 }
 128 
 129 int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
 130 {
 131     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 132     uint32_t incoming_reqs;
 133     int ret = OMPI_SUCCESS;
 134 
 135     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 136                          "osc pt2pt: fence start"));
 137 
 138     /* can't enter an active target epoch when in a passive target epoch */
 139     if (ompi_osc_pt2pt_in_passive_epoch (module)) {
 140         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 141                              "osc pt2pt: could not enter fence. already in an access epoch"));
 142         return OMPI_ERR_RMA_SYNC;
 143     }
 144 
 145     /* active sends are now active (we will close the epoch if NOSUCCEED is specified) */
 146     if (0 == (assert & MPI_MODE_NOSUCCEED)) {
 147         module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
 148         module->all_sync.eager_send_active = true;
 149     }
 150 
 151     /* short-circuit the noprecede case */
 152     if (0 != (assert & MPI_MODE_NOPRECEDE)) {
 153         module->comm->c_coll->coll_barrier (module->comm,  module->comm->c_coll->coll_barrier_module);
 154         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 155                              "osc pt2pt: fence end (short circuit)"));
 156         return ret;
 157     }
 158 
 159     /* try to start all requests.  */
 160     ret = ompi_osc_pt2pt_frag_flush_all(module);
 161     if (OMPI_SUCCESS != ret) {
 162         return ret;
 163     }
 164 
 165     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 166                          "osc pt2pt: fence done sending"));
 167 
 168     /* find out how much data everyone is going to send us.  */
 169     ret = module->comm->c_coll->coll_reduce_scatter_block ((void *) module->epoch_outgoing_frag_count,
 170                                                            &incoming_reqs, 1, MPI_UINT32_T,
 171                                                            MPI_SUM, module->comm,
 172                                                            module->comm->c_coll->coll_reduce_scatter_block_module);
 173     if (OMPI_SUCCESS != ret) {
 174         return ret;
 175     }
 176 
 177     OPAL_THREAD_LOCK(&module->lock);
 178     bzero ((void *) module->epoch_outgoing_frag_count, sizeof(uint32_t) * ompi_comm_size(module->comm));
 179 
 180     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 181                          "osc pt2pt: fence expects %d requests",
 182                          incoming_reqs));
 183 
 184     /* set our complete condition for incoming requests */
 185     OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -incoming_reqs);
 186 
 187     /* wait for completion */
 188     while (module->outgoing_frag_count < 0 || module->active_incoming_frag_count < 0) {
 189         opal_condition_wait(&module->cond, &module->lock);
 190     }
 191 
 192     if (assert & MPI_MODE_NOSUCCEED) {
 193         /* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly
 194          * stated that MPI_MODE_NOSUCCEED ends the epoch but it is a safe assumption. */
 195         ompi_osc_pt2pt_sync_reset (&module->all_sync);
 196     }
 197 
 198     module->all_sync.epoch_active = false;
 199     OPAL_THREAD_UNLOCK(&module->lock);
 200 
 201     module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module);
 202 
 203     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 204                          "osc pt2pt: fence end: %d", ret));
 205 
 206     return OMPI_SUCCESS;
 207 }
 208 
 209 
 210 int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
 211 {
 212     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 213     ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
 214 
 215     OPAL_THREAD_LOCK(&sync->lock);
 216 
 217     /* check if we are already in an access epoch */
 218     if (ompi_osc_pt2pt_access_epoch_active (module)) {
 219         OPAL_THREAD_UNLOCK(&sync->lock);
 220         return OMPI_ERR_RMA_SYNC;
 221     }
 222 
 223     /* mark all procs in this group as being in an access epoch */
 224     sync->num_peers = ompi_group_size (group);
 225     sync->sync.pscw.group = group;
 226 
 227     /* haven't processed any post messages yet */
 228     sync->sync_expected = sync->num_peers;
 229 
 230     /* If the previous epoch was from Fence, then eager_send_active is still
 231      * set to true at this time, but it shoulnd't be true until we get our
 232      * incoming Posts. So reset to 'false' for this new epoch.
 233      */
 234     sync->eager_send_active = false;
 235 
 236     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 237                          "ompi_osc_pt2pt_start entering with group size %d...",
 238                          sync->num_peers));
 239 
 240     sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_PSCW;
 241 
 242     /* prevent us from entering a passive-target, fence, or another pscw access epoch until
 243      * the matching complete is called */
 244     sync->epoch_active = true;
 245 
 246     /* save the group */
 247     OBJ_RETAIN(group);
 248 
 249     if (0 == ompi_group_size (group)) {
 250         /* nothing more to do. this is an empty start epoch */
 251         sync->eager_send_active = true;
 252         OPAL_THREAD_UNLOCK(&sync->lock);
 253         return OMPI_SUCCESS;
 254     }
 255 
 256     opal_atomic_wmb ();
 257 
 258     /* translate the group ranks into the communicator */
 259     sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group);
 260     if (NULL == sync->peer_list.peers) {
 261         OPAL_THREAD_UNLOCK(&sync->lock);
 262         return OMPI_ERR_OUT_OF_RESOURCE;
 263     }
 264 
 265     if (!(assert & MPI_MODE_NOCHECK)) {
 266         for (int i = 0 ; i < sync->num_peers ; ++i) {
 267             ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
 268 
 269             if (ompi_osc_pt2pt_peer_unex (peer)) {
 270                 /* the peer already sent a post message for this pscw access epoch */
 271                 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 272                                      "found unexpected post from %d",
 273                                      peer->rank));
 274                 OPAL_THREAD_ADD_FETCH32 (&sync->sync_expected, -1);
 275                 ompi_osc_pt2pt_peer_set_unex (peer, false);
 276             }
 277         }
 278     } else {
 279         sync->sync_expected = 0;
 280     }
 281 
 282     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 283                          "post messages still needed: %d", sync->sync_expected));
 284 
 285     /* if we've already received all the post messages, we can eager
 286        send.  Otherwise, eager send will be enabled when
 287        numb_post_messages reaches 0 */
 288     if (0 == sync->sync_expected) {
 289         sync->eager_send_active = true;
 290     }
 291 
 292     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 293                          "ompi_osc_pt2pt_start complete. eager sends active: %d",
 294                          sync->eager_send_active));
 295 
 296     OPAL_THREAD_UNLOCK(&sync->lock);
 297     return OMPI_SUCCESS;
 298 }
 299 
 300 
 301 int ompi_osc_pt2pt_complete (ompi_win_t *win)
 302 {
 303     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 304     ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
 305     int my_rank = ompi_comm_rank (module->comm);
 306     ompi_osc_pt2pt_peer_t **peers;
 307     int ret = OMPI_SUCCESS;
 308     ompi_group_t *group;
 309     size_t group_size;
 310 
 311     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 312                          "ompi_osc_pt2pt_complete entering..."));
 313 
 314     OPAL_THREAD_LOCK(&sync->lock);
 315     if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) {
 316         OPAL_THREAD_UNLOCK(&sync->lock);
 317         return OMPI_ERR_RMA_SYNC;
 318     }
 319 
 320     /* wait for all the post messages */
 321     ompi_osc_pt2pt_sync_wait_nolock (sync);
 322 
 323     /* phase 1 cleanup sync object */
 324     group = sync->sync.pscw.group;
 325     group_size = sync->num_peers;
 326 
 327     peers = sync->peer_list.peers;
 328 
 329     /* need to reset the sync here to avoid processing incorrect post messages */
 330     ompi_osc_pt2pt_sync_reset (sync);
 331     OPAL_THREAD_UNLOCK(&sync->lock);
 332 
 333     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 334                          "ompi_osc_pt2pt_complete all posts received. sending complete messages..."));
 335 
 336     /* for each process in group, send a control message with number
 337        of updates coming, then start all the requests.  Note that the
 338        control send is processed as another message in a fragment, so
 339        this might get queued until the flush_all (which is fine).
 340 
 341        At the same time, clean out the outgoing count for the next
 342        round. */
 343     for (size_t i = 0 ; i < group_size ; ++i) {
 344         ompi_osc_pt2pt_header_complete_t complete_req;
 345         int rank = peers[i]->rank;
 346 
 347         if (my_rank == rank) {
 348             /* shortcut for self */
 349             osc_pt2pt_incoming_complete (module, rank, 0);
 350             continue;
 351         }
 352 
 353         complete_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE;
 354         complete_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
 355         complete_req.frag_count = module->epoch_outgoing_frag_count[rank];
 356 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 357 #if OPAL_ENABLE_DEBUG
 358         complete_req.padding[0] = 0;
 359         complete_req.padding[1] = 0;
 360 #endif
 361         osc_pt2pt_hton(&complete_req, ompi_comm_peer_lookup (module->comm, rank));
 362 #endif
 363 
 364         ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, rank);
 365 
 366         /* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long
 367          * requests. once that is done this can be removed. */
 368         if (peer->active_frag) {
 369             ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
 370             if (active_frag->remain_len < sizeof (complete_req)) {
 371                 ++complete_req.frag_count;
 372             }
 373         }
 374 
 375         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 376                              "ompi_osc_pt2pt_complete sending complete message to %d. frag_count: %u",
 377                              rank, complete_req.frag_count));
 378 
 379         ret = ompi_osc_pt2pt_control_send (module, rank, &complete_req,
 380                                            sizeof(ompi_osc_pt2pt_header_complete_t));
 381         if (OMPI_SUCCESS != ret) {
 382             break;
 383         }
 384 
 385         ret = ompi_osc_pt2pt_frag_flush_target (module, rank);
 386         if (OMPI_SUCCESS != ret) {
 387             break;
 388         }
 389 
 390         /* zero the fragment counts here to ensure they are zerod */
 391         module->epoch_outgoing_frag_count[rank] = 0;
 392     }
 393 
 394     if (peers) {
 395         /* release our reference to peers in this group */
 396         ompi_osc_pt2pt_release_peers (peers, group_size);
 397     }
 398 
 399     if (OMPI_SUCCESS != ret) {
 400         return ret;
 401     }
 402 
 403     OPAL_THREAD_LOCK(&module->lock);
 404     /* wait for outgoing requests to complete.  Don't wait for incoming, as
 405        we're only completing the access epoch, not the exposure epoch */
 406     while (module->outgoing_frag_count < 0) {
 407         opal_condition_wait(&module->cond, &module->lock);
 408     }
 409 
 410     /* unlock here, as group cleanup can take a while... */
 411     OPAL_THREAD_UNLOCK(&module->lock);
 412 
 413     /* phase 2 cleanup group */
 414     OBJ_RELEASE(group);
 415 
 416     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 417                          "ompi_osc_pt2pt_complete complete"));
 418 
 419     return OMPI_SUCCESS;
 420 }
 421 
 422 
 423 int ompi_osc_pt2pt_post (ompi_group_t *group, int assert, ompi_win_t *win)
 424 {
 425     int ret = OMPI_SUCCESS;
 426     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 427     ompi_osc_pt2pt_header_post_t post_req;
 428     ompi_osc_pt2pt_peer_t **peers;
 429 
 430     /* can't check for all access epoch here due to fence */
 431     if (module->pw_group) {
 432         return OMPI_ERR_RMA_SYNC;
 433     }
 434 
 435     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 436                          "ompi_osc_pt2pt_post entering with group size %d...",
 437                          ompi_group_size (group)));
 438 
 439     OPAL_THREAD_LOCK(&module->lock);
 440 
 441     /* ensure we're not already in a post */
 442     if (NULL != module->pw_group) {
 443         OPAL_THREAD_UNLOCK(&(module->lock));
 444         return OMPI_ERR_RMA_SYNC;
 445     }
 446 
 447     /* save the group */
 448     OBJ_RETAIN(group);
 449 
 450     module->pw_group = group;
 451 
 452     /* Update completion counter.  Can't have received any completion
 453        messages yet; complete won't send a completion header until
 454        we've sent a post header. */
 455     module->num_complete_msgs = -ompi_group_size(module->pw_group);
 456 
 457     OPAL_THREAD_UNLOCK(&(module->lock));
 458 
 459     if ((assert & MPI_MODE_NOCHECK) || 0 == ompi_group_size (group)) {
 460         return OMPI_SUCCESS;
 461     }
 462 
 463     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 464                          "sending post messages"));
 465 
 466     /* translate group ranks into the communicator */
 467     peers = ompi_osc_pt2pt_get_peers (module, module->pw_group);
 468     if (OPAL_UNLIKELY(NULL == peers)) {
 469         return OMPI_ERR_OUT_OF_RESOURCE;
 470     }
 471 
 472     /* send a hello counter to everyone in group */
 473     for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) {
 474         ompi_osc_pt2pt_peer_t *peer = peers[i];
 475         int rank = peer->rank;
 476 
 477         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", rank));
 478         ompi_proc_t *proc = ompi_comm_peer_lookup (module->comm, rank);
 479 
 480         /* shortcut for self */
 481         if (ompi_proc_local() == proc) {
 482             OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete self post"));
 483             osc_pt2pt_incoming_post (module, ompi_comm_rank(module->comm));
 484             continue;
 485         }
 486 
 487         post_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_POST;
 488         post_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
 489         osc_pt2pt_hton(&post_req, proc);
 490 
 491         /* we don't want to send any data, since we're the exposure
 492            epoch only, so use an unbuffered send */
 493         ret = ompi_osc_pt2pt_control_send_unbuffered(module, rank, &post_req,
 494                                                      sizeof(ompi_osc_pt2pt_header_post_t));
 495         if (OMPI_SUCCESS != ret) {
 496             break;
 497         }
 498     }
 499 
 500     ompi_osc_pt2pt_release_peers (peers, ompi_group_size(module->pw_group));
 501 
 502     return ret;
 503 }
 504 
 505 
 506 int ompi_osc_pt2pt_wait (ompi_win_t *win)
 507 {
 508     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 509     ompi_group_t *group;
 510 
 511     if (NULL == module->pw_group) {
 512         return OMPI_ERR_RMA_SYNC;
 513     }
 514 
 515     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 516                          "ompi_osc_pt2pt_wait entering... module %p", (void *) module));
 517 
 518     OPAL_THREAD_LOCK(&module->lock);
 519     while (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
 520         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "module %p, num_complete_msgs = %d, "
 521                              "active_incoming_frag_count = %d", (void *) module, module->num_complete_msgs,
 522                              module->active_incoming_frag_count));
 523         opal_condition_wait(&module->cond, &module->lock);
 524     }
 525 
 526     group = module->pw_group;
 527     module->pw_group = NULL;
 528     OPAL_THREAD_UNLOCK(&module->lock);
 529 
 530     OBJ_RELEASE(group);
 531 
 532     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 533                          "ompi_osc_pt2pt_wait complete"));
 534 
 535     return OMPI_SUCCESS;
 536 }
 537 
 538 
 539 int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag)
 540 {
 541     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 542     ompi_group_t *group;
 543     int ret = OMPI_SUCCESS;
 544 
 545 #if !OPAL_ENABLE_PROGRESS_THREADS
 546     opal_progress();
 547 #endif
 548 
 549     if (NULL == module->pw_group) {
 550         return OMPI_ERR_RMA_SYNC;
 551     }
 552 
 553     OPAL_THREAD_LOCK(&(module->lock));
 554 
 555     if (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
 556         *flag = 0;
 557     } else {
 558         *flag = 1;
 559 
 560         group = module->pw_group;
 561         module->pw_group = NULL;
 562 
 563         OBJ_RELEASE(group);
 564     }
 565 
 566     OPAL_THREAD_UNLOCK(&(module->lock));
 567 
 568     return ret;
 569 }
 570 
 571 void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count)
 572 {
 573     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 574                          "osc pt2pt:  process_complete got complete message from %d. expected fragment count %d. "
 575                          "current incomming count: %d. expected complete msgs: %d", source,
 576                          frag_count, module->active_incoming_frag_count, module->num_complete_msgs));
 577 
 578     /* the current fragment is not part of the frag_count so we need to add it here */
 579     OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -frag_count);
 580 
 581     /* make sure the signal count is written before changing the complete message count */
 582     opal_atomic_wmb ();
 583 
 584     if (0 == OPAL_THREAD_ADD_FETCH32(&module->num_complete_msgs, 1)) {
 585         OPAL_THREAD_LOCK(&module->lock);
 586         opal_condition_broadcast (&module->cond);
 587         OPAL_THREAD_UNLOCK(&module->lock);
 588     }
 589 }
 590 
 591 void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
 592 {
 593     ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
 594 
 595     OPAL_THREAD_LOCK(&sync->lock);
 596 
 597     /* verify that this proc is part of the current start group */
 598     if (!ompi_osc_pt2pt_sync_pscw_peer (module, source, NULL)) {
 599         ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
 600 
 601         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 602                              "received unexpected post message from %d for future PSCW synchronization",
 603                              source));
 604 
 605         ompi_osc_pt2pt_peer_set_unex (peer, true);
 606         OPAL_THREAD_UNLOCK(&sync->lock);
 607     } else {
 608         OPAL_THREAD_UNLOCK(&sync->lock);
 609 
 610         ompi_osc_pt2pt_sync_expected (sync);
 611 
 612         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 613                              "received post message for PSCW synchronization. post messages still needed: %d",
 614                              sync->sync_expected));
 615     }
 616 }

/* [<][>][^][v][top][bottom][index][help] */