This source file includes following definitions.
- compare_ranks
- ompi_osc_pt2pt_get_peers
- ompi_osc_pt2pt_release_peers
- ompi_osc_pt2pt_fence
- ompi_osc_pt2pt_start
- ompi_osc_pt2pt_complete
- ompi_osc_pt2pt_post
- ompi_osc_pt2pt_wait
- ompi_osc_pt2pt_test
- osc_pt2pt_incoming_complete
- osc_pt2pt_incoming_post
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 #include "ompi_config.h"
  28 
  29 #include "osc_pt2pt.h"
  30 #include "osc_pt2pt_header.h"
  31 #include "osc_pt2pt_data_move.h"
  32 #include "osc_pt2pt_frag.h"
  33 
  34 #include "mpi.h"
  35 #include "opal/runtime/opal_progress.h"
  36 #include "opal/threads/mutex.h"
  37 #include "ompi/communicator/communicator.h"
  38 #include "ompi/mca/osc/base/base.h"
  39 
  40 
  41 
  42 
  43 
  44 
  45 
  46 
  47 
  48 
  49 
  50 
  51 
  52 
  53 static int compare_ranks (const void *ptra, const void *ptrb)
  54 {
  55     int a = *((int *) ptra);
  56     int b = *((int *) ptrb);
  57 
  58     if (a < b) {
  59         return -1;
  60     } else if (a > b) {
  61         return 1;
  62     }
  63 
  64     return 0;
  65 }
  66 
  67 
  68 
  69 
  70 
  71 
  72 
  73 
  74 
  75 
  76 
  77 
  78 static ompi_osc_pt2pt_peer_t **ompi_osc_pt2pt_get_peers (ompi_osc_pt2pt_module_t *module, ompi_group_t *sub_group)
  79 {
  80     int size = ompi_group_size(sub_group);
  81     ompi_osc_pt2pt_peer_t **peers;
  82     int *ranks1, *ranks2;
  83     int ret;
  84 
  85     ranks1 = calloc (size, sizeof(int));
  86     ranks2 = calloc (size, sizeof(int));
  87     peers = calloc (size, sizeof (ompi_osc_pt2pt_peer_t *));
  88     if (NULL == ranks1 || NULL == ranks2 || NULL == peers) {
  89         free (ranks1);
  90         free (ranks2);
  91         free (peers);
  92         return NULL;
  93     }
  94 
  95     for (int i = 0 ; i < size ; ++i) {
  96         ranks1[i] = i;
  97     }
  98 
  99     ret = ompi_group_translate_ranks (sub_group, size, ranks1, module->comm->c_local_group,
 100                                       ranks2);
 101     free (ranks1);
 102     if (OMPI_SUCCESS != ret) {
 103         free (ranks2);
 104         free (peers);
 105         return NULL;
 106     }
 107 
 108     qsort (ranks2, size, sizeof (int), compare_ranks);
 109     for (int i = 0 ; i < size ; ++i) {
 110         peers[i] = ompi_osc_pt2pt_peer_lookup (module, ranks2[i]);
 111         OBJ_RETAIN(peers[i]);
 112     }
 113     free (ranks2);
 114 
 115     return peers;
 116 }
 117 
 118 static void ompi_osc_pt2pt_release_peers (ompi_osc_pt2pt_peer_t **peers, int npeers)
 119 {
 120     if (peers) {
 121         for (int i = 0 ; i < npeers ; ++i) {
 122             OBJ_RELEASE(peers[i]);
 123         }
 124 
 125         free (peers);
 126     }
 127 }
 128 
 129 int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
 130 {
 131     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 132     uint32_t incoming_reqs;
 133     int ret = OMPI_SUCCESS;
 134 
 135     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 136                          "osc pt2pt: fence start"));
 137 
 138     
 139     if (ompi_osc_pt2pt_in_passive_epoch (module)) {
 140         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 141                              "osc pt2pt: could not enter fence. already in an access epoch"));
 142         return OMPI_ERR_RMA_SYNC;
 143     }
 144 
 145     
 146     if (0 == (assert & MPI_MODE_NOSUCCEED)) {
 147         module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
 148         module->all_sync.eager_send_active = true;
 149     }
 150 
 151     
 152     if (0 != (assert & MPI_MODE_NOPRECEDE)) {
 153         module->comm->c_coll->coll_barrier (module->comm,  module->comm->c_coll->coll_barrier_module);
 154         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 155                              "osc pt2pt: fence end (short circuit)"));
 156         return ret;
 157     }
 158 
 159     
 160     ret = ompi_osc_pt2pt_frag_flush_all(module);
 161     if (OMPI_SUCCESS != ret) {
 162         return ret;
 163     }
 164 
 165     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 166                          "osc pt2pt: fence done sending"));
 167 
 168     
 169     ret = module->comm->c_coll->coll_reduce_scatter_block ((void *) module->epoch_outgoing_frag_count,
 170                                                            &incoming_reqs, 1, MPI_UINT32_T,
 171                                                            MPI_SUM, module->comm,
 172                                                            module->comm->c_coll->coll_reduce_scatter_block_module);
 173     if (OMPI_SUCCESS != ret) {
 174         return ret;
 175     }
 176 
 177     OPAL_THREAD_LOCK(&module->lock);
 178     bzero ((void *) module->epoch_outgoing_frag_count, sizeof(uint32_t) * ompi_comm_size(module->comm));
 179 
 180     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 181                          "osc pt2pt: fence expects %d requests",
 182                          incoming_reqs));
 183 
 184     
 185     OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -incoming_reqs);
 186 
 187     
 188     while (module->outgoing_frag_count < 0 || module->active_incoming_frag_count < 0) {
 189         opal_condition_wait(&module->cond, &module->lock);
 190     }
 191 
 192     if (assert & MPI_MODE_NOSUCCEED) {
 193         
 194 
 195         ompi_osc_pt2pt_sync_reset (&module->all_sync);
 196     }
 197 
 198     module->all_sync.epoch_active = false;
 199     OPAL_THREAD_UNLOCK(&module->lock);
 200 
 201     module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module);
 202 
 203     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 204                          "osc pt2pt: fence end: %d", ret));
 205 
 206     return OMPI_SUCCESS;
 207 }
 208 
 209 
 210 int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
 211 {
 212     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 213     ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
 214 
 215     OPAL_THREAD_LOCK(&sync->lock);
 216 
 217     
 218     if (ompi_osc_pt2pt_access_epoch_active (module)) {
 219         OPAL_THREAD_UNLOCK(&sync->lock);
 220         return OMPI_ERR_RMA_SYNC;
 221     }
 222 
 223     
 224     sync->num_peers = ompi_group_size (group);
 225     sync->sync.pscw.group = group;
 226 
 227     
 228     sync->sync_expected = sync->num_peers;
 229 
 230     
 231 
 232 
 233 
 234     sync->eager_send_active = false;
 235 
 236     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 237                          "ompi_osc_pt2pt_start entering with group size %d...",
 238                          sync->num_peers));
 239 
 240     sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_PSCW;
 241 
 242     
 243 
 244     sync->epoch_active = true;
 245 
 246     
 247     OBJ_RETAIN(group);
 248 
 249     if (0 == ompi_group_size (group)) {
 250         
 251         sync->eager_send_active = true;
 252         OPAL_THREAD_UNLOCK(&sync->lock);
 253         return OMPI_SUCCESS;
 254     }
 255 
 256     opal_atomic_wmb ();
 257 
 258     
 259     sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group);
 260     if (NULL == sync->peer_list.peers) {
 261         OPAL_THREAD_UNLOCK(&sync->lock);
 262         return OMPI_ERR_OUT_OF_RESOURCE;
 263     }
 264 
 265     if (!(assert & MPI_MODE_NOCHECK)) {
 266         for (int i = 0 ; i < sync->num_peers ; ++i) {
 267             ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
 268 
 269             if (ompi_osc_pt2pt_peer_unex (peer)) {
 270                 
 271                 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 272                                      "found unexpected post from %d",
 273                                      peer->rank));
 274                 OPAL_THREAD_ADD_FETCH32 (&sync->sync_expected, -1);
 275                 ompi_osc_pt2pt_peer_set_unex (peer, false);
 276             }
 277         }
 278     } else {
 279         sync->sync_expected = 0;
 280     }
 281 
 282     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 283                          "post messages still needed: %d", sync->sync_expected));
 284 
 285     
 286 
 287 
 288     if (0 == sync->sync_expected) {
 289         sync->eager_send_active = true;
 290     }
 291 
 292     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 293                          "ompi_osc_pt2pt_start complete. eager sends active: %d",
 294                          sync->eager_send_active));
 295 
 296     OPAL_THREAD_UNLOCK(&sync->lock);
 297     return OMPI_SUCCESS;
 298 }
 299 
 300 
 301 int ompi_osc_pt2pt_complete (ompi_win_t *win)
 302 {
 303     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 304     ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
 305     int my_rank = ompi_comm_rank (module->comm);
 306     ompi_osc_pt2pt_peer_t **peers;
 307     int ret = OMPI_SUCCESS;
 308     ompi_group_t *group;
 309     size_t group_size;
 310 
 311     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 312                          "ompi_osc_pt2pt_complete entering..."));
 313 
 314     OPAL_THREAD_LOCK(&sync->lock);
 315     if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) {
 316         OPAL_THREAD_UNLOCK(&sync->lock);
 317         return OMPI_ERR_RMA_SYNC;
 318     }
 319 
 320     
 321     ompi_osc_pt2pt_sync_wait_nolock (sync);
 322 
 323     
 324     group = sync->sync.pscw.group;
 325     group_size = sync->num_peers;
 326 
 327     peers = sync->peer_list.peers;
 328 
 329     
 330     ompi_osc_pt2pt_sync_reset (sync);
 331     OPAL_THREAD_UNLOCK(&sync->lock);
 332 
 333     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 334                          "ompi_osc_pt2pt_complete all posts received. sending complete messages..."));
 335 
 336     
 337 
 338 
 339 
 340 
 341 
 342 
 343     for (size_t i = 0 ; i < group_size ; ++i) {
 344         ompi_osc_pt2pt_header_complete_t complete_req;
 345         int rank = peers[i]->rank;
 346 
 347         if (my_rank == rank) {
 348             
 349             osc_pt2pt_incoming_complete (module, rank, 0);
 350             continue;
 351         }
 352 
 353         complete_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE;
 354         complete_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
 355         complete_req.frag_count = module->epoch_outgoing_frag_count[rank];
 356 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 357 #if OPAL_ENABLE_DEBUG
 358         complete_req.padding[0] = 0;
 359         complete_req.padding[1] = 0;
 360 #endif
 361         osc_pt2pt_hton(&complete_req, ompi_comm_peer_lookup (module->comm, rank));
 362 #endif
 363 
 364         ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, rank);
 365 
 366         
 367 
 368         if (peer->active_frag) {
 369             ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
 370             if (active_frag->remain_len < sizeof (complete_req)) {
 371                 ++complete_req.frag_count;
 372             }
 373         }
 374 
 375         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 376                              "ompi_osc_pt2pt_complete sending complete message to %d. frag_count: %u",
 377                              rank, complete_req.frag_count));
 378 
 379         ret = ompi_osc_pt2pt_control_send (module, rank, &complete_req,
 380                                            sizeof(ompi_osc_pt2pt_header_complete_t));
 381         if (OMPI_SUCCESS != ret) {
 382             break;
 383         }
 384 
 385         ret = ompi_osc_pt2pt_frag_flush_target (module, rank);
 386         if (OMPI_SUCCESS != ret) {
 387             break;
 388         }
 389 
 390         
 391         module->epoch_outgoing_frag_count[rank] = 0;
 392     }
 393 
 394     if (peers) {
 395         
 396         ompi_osc_pt2pt_release_peers (peers, group_size);
 397     }
 398 
 399     if (OMPI_SUCCESS != ret) {
 400         return ret;
 401     }
 402 
 403     OPAL_THREAD_LOCK(&module->lock);
 404     
 405 
 406     while (module->outgoing_frag_count < 0) {
 407         opal_condition_wait(&module->cond, &module->lock);
 408     }
 409 
 410     
 411     OPAL_THREAD_UNLOCK(&module->lock);
 412 
 413     
 414     OBJ_RELEASE(group);
 415 
 416     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 417                          "ompi_osc_pt2pt_complete complete"));
 418 
 419     return OMPI_SUCCESS;
 420 }
 421 
 422 
 423 int ompi_osc_pt2pt_post (ompi_group_t *group, int assert, ompi_win_t *win)
 424 {
 425     int ret = OMPI_SUCCESS;
 426     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 427     ompi_osc_pt2pt_header_post_t post_req;
 428     ompi_osc_pt2pt_peer_t **peers;
 429 
 430     
 431     if (module->pw_group) {
 432         return OMPI_ERR_RMA_SYNC;
 433     }
 434 
 435     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 436                          "ompi_osc_pt2pt_post entering with group size %d...",
 437                          ompi_group_size (group)));
 438 
 439     OPAL_THREAD_LOCK(&module->lock);
 440 
 441     
 442     if (NULL != module->pw_group) {
 443         OPAL_THREAD_UNLOCK(&(module->lock));
 444         return OMPI_ERR_RMA_SYNC;
 445     }
 446 
 447     
 448     OBJ_RETAIN(group);
 449 
 450     module->pw_group = group;
 451 
 452     
 453 
 454 
 455     module->num_complete_msgs = -ompi_group_size(module->pw_group);
 456 
 457     OPAL_THREAD_UNLOCK(&(module->lock));
 458 
 459     if ((assert & MPI_MODE_NOCHECK) || 0 == ompi_group_size (group)) {
 460         return OMPI_SUCCESS;
 461     }
 462 
 463     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 464                          "sending post messages"));
 465 
 466     
 467     peers = ompi_osc_pt2pt_get_peers (module, module->pw_group);
 468     if (OPAL_UNLIKELY(NULL == peers)) {
 469         return OMPI_ERR_OUT_OF_RESOURCE;
 470     }
 471 
 472     
 473     for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) {
 474         ompi_osc_pt2pt_peer_t *peer = peers[i];
 475         int rank = peer->rank;
 476 
 477         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", rank));
 478         ompi_proc_t *proc = ompi_comm_peer_lookup (module->comm, rank);
 479 
 480         
 481         if (ompi_proc_local() == proc) {
 482             OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete self post"));
 483             osc_pt2pt_incoming_post (module, ompi_comm_rank(module->comm));
 484             continue;
 485         }
 486 
 487         post_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_POST;
 488         post_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
 489         osc_pt2pt_hton(&post_req, proc);
 490 
 491         
 492 
 493         ret = ompi_osc_pt2pt_control_send_unbuffered(module, rank, &post_req,
 494                                                      sizeof(ompi_osc_pt2pt_header_post_t));
 495         if (OMPI_SUCCESS != ret) {
 496             break;
 497         }
 498     }
 499 
 500     ompi_osc_pt2pt_release_peers (peers, ompi_group_size(module->pw_group));
 501 
 502     return ret;
 503 }
 504 
 505 
 506 int ompi_osc_pt2pt_wait (ompi_win_t *win)
 507 {
 508     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 509     ompi_group_t *group;
 510 
 511     if (NULL == module->pw_group) {
 512         return OMPI_ERR_RMA_SYNC;
 513     }
 514 
 515     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 516                          "ompi_osc_pt2pt_wait entering... module %p", (void *) module));
 517 
 518     OPAL_THREAD_LOCK(&module->lock);
 519     while (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
 520         OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "module %p, num_complete_msgs = %d, "
 521                              "active_incoming_frag_count = %d", (void *) module, module->num_complete_msgs,
 522                              module->active_incoming_frag_count));
 523         opal_condition_wait(&module->cond, &module->lock);
 524     }
 525 
 526     group = module->pw_group;
 527     module->pw_group = NULL;
 528     OPAL_THREAD_UNLOCK(&module->lock);
 529 
 530     OBJ_RELEASE(group);
 531 
 532     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
 533                          "ompi_osc_pt2pt_wait complete"));
 534 
 535     return OMPI_SUCCESS;
 536 }
 537 
 538 
 539 int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag)
 540 {
 541     ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
 542     ompi_group_t *group;
 543     int ret = OMPI_SUCCESS;
 544 
 545 #if !OPAL_ENABLE_PROGRESS_THREADS
 546     opal_progress();
 547 #endif
 548 
 549     if (NULL == module->pw_group) {
 550         return OMPI_ERR_RMA_SYNC;
 551     }
 552 
 553     OPAL_THREAD_LOCK(&(module->lock));
 554 
 555     if (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
 556         *flag = 0;
 557     } else {
 558         *flag = 1;
 559 
 560         group = module->pw_group;
 561         module->pw_group = NULL;
 562 
 563         OBJ_RELEASE(group);
 564     }
 565 
 566     OPAL_THREAD_UNLOCK(&(module->lock));
 567 
 568     return ret;
 569 }
 570 
 571 void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count)
 572 {
 573     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 574                          "osc pt2pt:  process_complete got complete message from %d. expected fragment count %d. "
 575                          "current incomming count: %d. expected complete msgs: %d", source,
 576                          frag_count, module->active_incoming_frag_count, module->num_complete_msgs));
 577 
 578     
 579     OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -frag_count);
 580 
 581     
 582     opal_atomic_wmb ();
 583 
 584     if (0 == OPAL_THREAD_ADD_FETCH32(&module->num_complete_msgs, 1)) {
 585         OPAL_THREAD_LOCK(&module->lock);
 586         opal_condition_broadcast (&module->cond);
 587         OPAL_THREAD_UNLOCK(&module->lock);
 588     }
 589 }
 590 
 591 void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
 592 {
 593     ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
 594 
 595     OPAL_THREAD_LOCK(&sync->lock);
 596 
 597     
 598     if (!ompi_osc_pt2pt_sync_pscw_peer (module, source, NULL)) {
 599         ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
 600 
 601         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 602                              "received unexpected post message from %d for future PSCW synchronization",
 603                              source));
 604 
 605         ompi_osc_pt2pt_peer_set_unex (peer, true);
 606         OPAL_THREAD_UNLOCK(&sync->lock);
 607     } else {
 608         OPAL_THREAD_UNLOCK(&sync->lock);
 609 
 610         ompi_osc_pt2pt_sync_expected (sync);
 611 
 612         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 613                              "received post message for PSCW synchronization. post messages still needed: %d",
 614                              sync->sync_expected));
 615     }
 616 }