This source file includes following definitions.
- osc_pt2pt_accumulate_data_constructor
- osc_pt2pt_accumulate_data_destructor
- osc_pt2pt_pending_acc_constructor
- osc_pt2pt_pending_acc_destructor
- ompi_osc_pt2pt_ddt_buffer_constructor
- ompi_osc_pt2pt_ddt_buffer_destructor
- datatype_buffer_length
- ompi_osc_pt2pt_control_send
- ompi_osc_pt2pt_control_send_unbuffered_cb
- ompi_osc_pt2pt_control_send_unbuffered
- datatype_create
- process_put
- process_put_long
- osc_pt2pt_incoming_req_complete
- osc_pt2pt_get_post_send_cb
- osc_pt2pt_get_post_send
- process_get
- osc_pt2pt_accumulate_buffer
- osc_pt2pt_accumulate_allocate
- accumulate_cb
- ompi_osc_pt2pt_acc_op_queue
- replace_cb
- ompi_osc_pt2pt_acc_start
- ompi_osc_pt2pt_acc_long_start
- ompi_osc_pt2pt_gacc_start
- ompi_osc_gacc_long_start
- ompi_osc_pt2pt_cswap_start
- ompi_osc_pt2pt_progress_pending_acc
- process_acc
- process_acc_long
- process_get_acc
- process_get_acc_long
- process_cswap
- process_complete
- process_flush
- process_unlock
- process_large_datatype_request_cb
- process_large_datatype_request
- process_frag
- ompi_osc_pt2pt_callback
- ompi_osc_pt2pt_receive_repost
- ompi_osc_pt2pt_process_receive
- ompi_osc_pt2pt_frag_start_receive
- ompi_osc_pt2pt_component_irecv
- ompi_osc_pt2pt_isend_w_cb
- ompi_osc_pt2pt_irecv_w_cb
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 #include "osc_pt2pt.h"
  26 #include "osc_pt2pt_header.h"
  27 #include "osc_pt2pt_data_move.h"
  28 #include "osc_pt2pt_frag.h"
  29 #include "osc_pt2pt_request.h"
  30 
  31 #include "opal/util/arch.h"
  32 #include "opal/sys/atomic.h"
  33 #include "opal/align.h"
  34 
  35 #include "ompi/mca/pml/pml.h"
  36 #include "ompi/mca/pml/base/pml_base_sendreq.h"
  37 #include "opal/mca/btl/btl.h"
  38 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
  39 #include "ompi/datatype/ompi_datatype.h"
  40 #include "ompi/op/op.h"
  41 #include "ompi/memchecker.h"
  42 
  43 
  44 
  45 
  46 
  47 
  48 struct osc_pt2pt_accumulate_data_t {
  49     opal_list_item_t super;
  50     ompi_osc_pt2pt_module_t* module;
  51     void *target;
  52     void *source;
  53     size_t source_len;
  54     ompi_proc_t *proc;
  55     int count;
  56     int peer;
  57     ompi_datatype_t *datatype;
  58     ompi_op_t *op;
  59     opal_atomic_int32_t request_count;
  60 };
  61 typedef struct osc_pt2pt_accumulate_data_t osc_pt2pt_accumulate_data_t;
  62 
  63 static void osc_pt2pt_accumulate_data_constructor (osc_pt2pt_accumulate_data_t *acc_data)
  64 {
  65     acc_data->source = NULL;
  66     acc_data->datatype = NULL;
  67     acc_data->op = NULL;
  68 }
  69 
  70 static void osc_pt2pt_accumulate_data_destructor (osc_pt2pt_accumulate_data_t *acc_data)
  71 {
  72     if (acc_data->source) {
  73         
  74         free (acc_data->source);
  75     }
  76 
  77     if (acc_data->datatype) {
  78         OMPI_DATATYPE_RELEASE(acc_data->datatype);
  79     }
  80 }
  81 
  82 OBJ_CLASS_DECLARATION(osc_pt2pt_accumulate_data_t);
  83 OBJ_CLASS_INSTANCE(osc_pt2pt_accumulate_data_t, opal_list_item_t, osc_pt2pt_accumulate_data_constructor,
  84                    osc_pt2pt_accumulate_data_destructor);
  85 
  86 
  87 
  88 
  89 
  90 
  91 
  92 
  93 
  94 
  95 
  96 
  97 
  98 struct osc_pt2pt_pending_acc_t {
  99     opal_list_item_t super;
 100     ompi_osc_pt2pt_header_t header;
 101     int source;
 102     void *data;
 103     size_t data_len;
 104     ompi_datatype_t *datatype;
 105     bool active_target;
 106 };
 107 typedef struct osc_pt2pt_pending_acc_t osc_pt2pt_pending_acc_t;
 108 
 109 static void osc_pt2pt_pending_acc_constructor (osc_pt2pt_pending_acc_t *pending)
 110 {
 111     pending->data = NULL;
 112     pending->datatype = NULL;
 113 }
 114 
 115 static void osc_pt2pt_pending_acc_destructor (osc_pt2pt_pending_acc_t *pending)
 116 {
 117     if (NULL != pending->data) {
 118         free (pending->data);
 119     }
 120 
 121     if (NULL != pending->datatype) {
 122         OMPI_DATATYPE_RELEASE(pending->datatype);
 123     }
 124 }
 125 
 126 OBJ_CLASS_DECLARATION(osc_pt2pt_pending_acc_t);
 127 OBJ_CLASS_INSTANCE(osc_pt2pt_pending_acc_t, opal_list_item_t,
 128                    osc_pt2pt_pending_acc_constructor, osc_pt2pt_pending_acc_destructor);
 129 
 130 
 131 
 132 
 133 
 134 
 135 
 136 
 137 
 138 struct ompi_osc_pt2pt_ddt_buffer_t {
 139     
 140 
 141     opal_list_item_t super;
 142 
 143     
 144     ompi_osc_pt2pt_module_t *module;
 145     
 146     int source;
 147     
 148     ompi_osc_pt2pt_header_t *header;
 149 };
 150 typedef struct ompi_osc_pt2pt_ddt_buffer_t ompi_osc_pt2pt_ddt_buffer_t;
 151 
 152 static void ompi_osc_pt2pt_ddt_buffer_constructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
 153 {
 154     ddt_buffer->header = NULL;
 155 }
 156 
 157 static void ompi_osc_pt2pt_ddt_buffer_destructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
 158 {
 159     if (ddt_buffer->header) {
 160         free (ddt_buffer->header);
 161         ddt_buffer->header = NULL;
 162     }
 163 }
 164 
 165 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_ddt_buffer_t);
 166 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_ddt_buffer_t, opal_list_item_t, ompi_osc_pt2pt_ddt_buffer_constructor,
 167                    ompi_osc_pt2pt_ddt_buffer_destructor);
 168 
 169 
 170 
 171 
 172 
 173 
 174 
 175 
 176 
 177 
 178 
 179 
 180 static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
 181 {
 182     ompi_datatype_t *primitive_datatype = NULL;
 183     uint32_t primitive_count;
 184     size_t buflen;
 185 
 186     ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
 187     primitive_count *= count;
 188 
 189     
 190     ompi_datatype_type_size(primitive_datatype, &buflen);
 191 
 192     return buflen * primitive_count;
 193 }
 194 
 195 
 196 
 197 
 198 
 199 
 200 
 201 
 202 
 203 
 204 
 205 
 206 
 207 
 208 
 209 
 210 
 211 int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
 212                                  void *data, size_t len)
 213 {
 214     ompi_osc_pt2pt_frag_t *frag;
 215     char *ptr;
 216     int ret;
 217 
 218     ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false, true);
 219     if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
 220         memcpy (ptr, data, len);
 221 
 222         ret = ompi_osc_pt2pt_frag_finish(module, frag);
 223     }
 224 
 225     return ret;
 226 }
 227 
 228 static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
 229 {
 230     void *ctx = request->req_complete_cb_data;
 231     ompi_osc_pt2pt_module_t *module;
 232 
 233     
 234     module = *(ompi_osc_pt2pt_module_t **)ctx;
 235 
 236     
 237     mark_outgoing_completion (module);
 238 
 239     
 240     free (ctx);
 241 
 242     ompi_request_free (&request);
 243     return 1;
 244 }
 245 
 246 
 247 
 248 
 249 
 250 
 251 
 252 
 253 
 254 
 255 
 256 
 257 
 258 
 259 
 260 
 261 
 262 
 263 int ompi_osc_pt2pt_control_send_unbuffered(ompi_osc_pt2pt_module_t *module,
 264                                           int target, void *data, size_t len)
 265 {
 266     void *ctx, *data_copy;
 267 
 268     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 269                          "osc pt2pt: sending unbuffered fragment to %d", target));
 270 
 271     
 272     ctx = malloc (sizeof(ompi_osc_pt2pt_module_t*) + len);
 273     if (OPAL_UNLIKELY(NULL == ctx)) {
 274         return OMPI_ERR_OUT_OF_RESOURCE;
 275     }
 276 
 277     
 278 
 279     ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1);
 280 
 281     
 282     *(ompi_osc_pt2pt_module_t**)ctx = module;
 283     data_copy = (ompi_osc_pt2pt_module_t**)ctx + 1;
 284     memcpy (data_copy, data, len);
 285 
 286     return ompi_osc_pt2pt_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_PT2PT_FRAG_TAG,
 287                                      module->comm, ompi_osc_pt2pt_control_send_unbuffered_cb, ctx);
 288 }
 289 
 290 
 291 
 292 
 293 
 294 
 295 
 296 
 297 
 298 
 299 
 300 
 301 
 302 
 303 
 304 static inline int datatype_create (ompi_osc_pt2pt_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data)
 305 {
 306     ompi_datatype_t *new_datatype = NULL;
 307     ompi_proc_t *peer_proc;
 308     int ret = OMPI_SUCCESS;
 309 
 310     do {
 311         peer_proc = ompi_comm_peer_lookup(module->comm, peer);
 312         if (OPAL_UNLIKELY(NULL == peer_proc)) {
 313             OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
 314                                  "%d: datatype_create: could not resolve proc pointer for peer %d",
 315                                  ompi_comm_rank(module->comm),
 316                                  peer));
 317             ret = OMPI_ERROR;
 318             break;
 319         }
 320 
 321         new_datatype = ompi_osc_base_datatype_create(peer_proc, data);
 322         if (OPAL_UNLIKELY(NULL == new_datatype)) {
 323             OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
 324                                  "%d: datatype_create: could not resolve datatype for peer %d",
 325                                  ompi_comm_rank(module->comm), peer));
 326             ret = OMPI_ERROR;
 327         }
 328     } while (0);
 329 
 330     *datatype = new_datatype;
 331     if (proc) *proc = peer_proc;
 332 
 333     return ret;
 334 }
 335 
 336 
 337 
 338 
 339 
 340 
 341 
 342 
 343 
 344 
 345 
 346 
 347 
 348 
 349 static inline int process_put(ompi_osc_pt2pt_module_t* module, int source,
 350                               ompi_osc_pt2pt_header_put_t* put_header)
 351 {
 352     char *data = (char*) (put_header + 1);
 353     ompi_proc_t *proc;
 354     struct ompi_datatype_t *datatype;
 355     size_t data_len;
 356     void *target = (unsigned char*) module->baseptr +
 357         ((unsigned long) put_header->displacement * module->disp_unit);
 358     int ret;
 359 
 360     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 361                          "%d: process_put: received message from %d",
 362                          ompi_comm_rank(module->comm),
 363                          source));
 364 
 365     ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
 366     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 367         return ret;
 368     }
 369 
 370     data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header);
 371 
 372     osc_pt2pt_copy_on_recv (target, data, data_len, proc, put_header->count, datatype);
 373 
 374     OMPI_DATATYPE_RELEASE(datatype);
 375 
 376     return put_header->len;
 377 }
 378 
 379 static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source,
 380                                    ompi_osc_pt2pt_header_put_t* put_header)
 381 {
 382     char *data = (char*) (put_header + 1);
 383     struct ompi_datatype_t *datatype;
 384     void *target = (unsigned char*) module->baseptr +
 385         ((unsigned long) put_header->displacement * module->disp_unit);
 386     int ret;
 387 
 388     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 389                          "%d: process_put_long: received message from %d",
 390                          ompi_comm_rank(module->comm),
 391                          source));
 392 
 393     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
 394     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 395         return ret;
 396     }
 397 
 398     ret = ompi_osc_pt2pt_component_irecv (module, target,
 399                                          put_header->count,
 400                                          datatype, source,
 401                                          tag_to_target(put_header->tag),
 402                                          module->comm);
 403     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 404         OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
 405                              "%d: process_put_long: irecv error: %d",
 406                              ompi_comm_rank(module->comm),
 407                              ret));
 408         return OMPI_ERROR;
 409     }
 410 
 411     OMPI_DATATYPE_RELEASE(datatype);
 412 
 413     return put_header->len;
 414 }
 415 
 416 
 417 
 418 
 419 
 420 
 421 
 422 
 423 
 424 
 425 
 426 
 427 
 428 static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)
 429 {
 430     ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
 431     int rank = MPI_PROC_NULL;
 432 
 433     if (request->req_status.MPI_TAG & 0x01) {
 434         rank = request->req_status.MPI_SOURCE;
 435     }
 436 
 437     mark_incoming_completion (module, rank);
 438 
 439     ompi_request_free (&request);
 440     return 1;
 441 }
 442 
 443 struct osc_pt2pt_get_post_send_cb_data_t {
 444     ompi_osc_pt2pt_module_t *module;
 445     int peer;
 446 };
 447 
 448 static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
 449 {
 450     struct osc_pt2pt_get_post_send_cb_data_t *data =
 451         (struct osc_pt2pt_get_post_send_cb_data_t *) request->req_complete_cb_data;
 452     ompi_osc_pt2pt_module_t *module = data->module;
 453     int rank = data->peer;
 454 
 455     free (data);
 456 
 457     
 458     mark_incoming_completion (module, rank);
 459 
 460     ompi_request_free (&request);
 461     return 1;
 462 }
 463 
 464 
 465 
 466 
 467 
 468 
 469 
 470 
 471 
 472 
 473 
 474 
 475 
 476 
 477 
 478 static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *source, int count,
 479                                    ompi_datatype_t *datatype, int peer, int tag)
 480 {
 481     struct osc_pt2pt_get_post_send_cb_data_t *data;
 482     int ret;
 483 
 484     data = malloc (sizeof (*data));
 485     if (OPAL_UNLIKELY(NULL == data)) {
 486         return OMPI_ERR_OUT_OF_RESOURCE;
 487     }
 488 
 489     data->module = module;
 490     
 491 
 492     data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL;
 493 
 494     
 495     ret = ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm,
 496                                      osc_pt2pt_get_post_send_cb, (void *) data);
 497     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 498         free (data);
 499     }
 500 
 501     return ret;
 502 }
 503 
 504 
 505 
 506 
 507 
 508 
 509 
 510 
 511 
 512 
 513 static inline int process_get (ompi_osc_pt2pt_module_t* module, int target,
 514                                ompi_osc_pt2pt_header_get_t* get_header)
 515 {
 516     char *data = (char *) (get_header + 1);
 517     struct ompi_datatype_t *datatype;
 518     void *source = (unsigned char*) module->baseptr +
 519         ((unsigned long) get_header->displacement * module->disp_unit);
 520     int ret;
 521 
 522     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 523                          "%d: process_get: received message from %d",
 524                          ompi_comm_rank(module->comm),
 525                          target));
 526 
 527     ret = datatype_create (module, target, NULL, &datatype, (void **) &data);
 528     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 529         return ret;
 530     }
 531 
 532     
 533     ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype,
 534                                   target, tag_to_origin(get_header->tag));
 535 
 536     OMPI_DATATYPE_RELEASE(datatype);
 537 
 538     return OMPI_SUCCESS == ret ? (int) get_header->len : ret;
 539 }
 540 
 541 
 542 
 543 
 544 
 545 
 546 
 547 
 548 
 549 
 550 
 551 
 552 
 553 
 554 static inline int osc_pt2pt_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc,
 555                                               int count, ompi_datatype_t *datatype, ompi_op_t *op)
 556 {
 557     int ret;
 558 
 559     assert (NULL != target && NULL != source);
 560 
 561     if (op == &ompi_mpi_op_replace.op) {
 562         osc_pt2pt_copy_on_recv (target, source, source_len, proc, count, datatype);
 563         return OMPI_SUCCESS;
 564     }
 565 
 566 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 567     if (proc->super.proc_arch != ompi_proc_local()->super.proc_arch) {
 568         ompi_datatype_t *primitive_datatype = NULL;
 569         uint32_t primitive_count;
 570         size_t buflen;
 571         void *buffer;
 572 
 573         ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
 574         primitive_count *= count;
 575 
 576         
 577         ompi_datatype_type_size(primitive_datatype, &buflen);
 578         buflen *= primitive_count;
 579 
 580         buffer = malloc (buflen);
 581         if (OPAL_UNLIKELY(NULL == buffer)) {
 582             return OMPI_ERR_OUT_OF_RESOURCE;
 583         }
 584 
 585         osc_pt2pt_copy_on_recv (buffer, source, source_len, proc, primitive_count, primitive_datatype);
 586 
 587         ret = ompi_osc_base_process_op(target, buffer, source_len, datatype,
 588                                        count, op);
 589 
 590         free(buffer);
 591     } else
 592 #endif
 593 
 594     
 595     ret = ompi_osc_base_process_op(target, source, source_len, datatype,
 596                                    count, op);
 597 
 598     return ret;
 599 }
 600 
 601 
 602 
 603 
 604 
 605 
 606 
 607 
 608 
 609 
 610 
 611 
 612 
 613 
 614 
 615 
 616 
 617 
 618 
 619 static int osc_pt2pt_accumulate_allocate (ompi_osc_pt2pt_module_t *module, int peer, void *target, void *source, size_t source_len,
 620                                          ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
 621                                          int request_count, osc_pt2pt_accumulate_data_t **acc_data_out)
 622 {
 623     osc_pt2pt_accumulate_data_t *acc_data;
 624 
 625     acc_data = OBJ_NEW(osc_pt2pt_accumulate_data_t);
 626     if (OPAL_UNLIKELY(NULL == acc_data)) {
 627         return OMPI_ERR_OUT_OF_RESOURCE;
 628     }
 629 
 630     acc_data->module = module;
 631     acc_data->peer = peer;
 632     acc_data->target = target;
 633     acc_data->source = source;
 634     acc_data->source_len = source_len;
 635     acc_data->proc = proc;
 636     acc_data->count = count;
 637     acc_data->datatype = datatype;
 638     OMPI_DATATYPE_RETAIN(datatype);
 639     acc_data->op = op;
 640     acc_data->request_count = request_count;
 641 
 642     *acc_data_out = acc_data;
 643 
 644     return OMPI_SUCCESS;
 645 }
 646 
 647 
 648 
 649 
 650 
 651 
 652 
 653 
 654 
 655 
 656 static int accumulate_cb (ompi_request_t *request)
 657 {
 658     struct osc_pt2pt_accumulate_data_t *acc_data = (struct osc_pt2pt_accumulate_data_t *) request->req_complete_cb_data;
 659     ompi_osc_pt2pt_module_t *module = acc_data->module;
 660     int rank = MPI_PROC_NULL;
 661     int ret = OMPI_SUCCESS;
 662 
 663     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 664                          "accumulate_cb, request_count = %d", acc_data->request_count));
 665 
 666     if (request->req_status.MPI_TAG & 0x01) {
 667         rank = acc_data->peer;
 668     }
 669 
 670     if (0 == OPAL_THREAD_ADD_FETCH32(&acc_data->request_count, -1)) {
 671         
 672 
 673         if (acc_data->source) {
 674             ompi_datatype_t *primitive_datatype = NULL;
 675             uint32_t primitive_count;
 676 
 677             assert (NULL != acc_data->target && NULL != acc_data->source);
 678 
 679             ompi_osc_base_get_primitive_type_info(acc_data->datatype, &primitive_datatype, &primitive_count);
 680             primitive_count *= acc_data->count;
 681 
 682             if (acc_data->op == &ompi_mpi_op_replace.op) {
 683                 ret = ompi_datatype_sndrcv(acc_data->source, primitive_count, primitive_datatype, acc_data->target, acc_data->count, acc_data->datatype);
 684             } else {
 685                 ret = ompi_osc_base_process_op(acc_data->target, acc_data->source, acc_data->source_len, acc_data->datatype, acc_data->count, acc_data->op);
 686             }
 687         }
 688 
 689         
 690         ompi_osc_pt2pt_accumulate_unlock (module);
 691 
 692         osc_pt2pt_gc_add_buffer (module, &acc_data->super);
 693     }
 694 
 695     mark_incoming_completion (module, rank);
 696 
 697     ompi_request_free (&request);
 698     return ret;
 699 }
 700 
 701 
 702 static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
 703                                         char *data, size_t data_len, ompi_datatype_t *datatype, bool active_target)
 704 {
 705     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
 706     osc_pt2pt_pending_acc_t *pending_acc;
 707 
 708     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 709                          "%d: queuing accumulate operation", ompi_comm_size (module->comm)));
 710 
 711     pending_acc = OBJ_NEW(osc_pt2pt_pending_acc_t);
 712     if (OPAL_UNLIKELY(NULL == pending_acc)) {
 713         return OMPI_ERR_OUT_OF_RESOURCE;
 714     }
 715 
 716     
 717 
 718     if (active_target) {
 719         OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -1);
 720     } else {
 721         OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
 722     }
 723 
 724     pending_acc->active_target = active_target;
 725     pending_acc->source = source;
 726 
 727     
 728     pending_acc->data_len = data_len;
 729 
 730     if (data_len) {
 731         pending_acc->data = malloc (data_len);
 732         memcpy (pending_acc->data, data, data_len);
 733     }
 734 
 735     
 736     pending_acc->datatype = datatype;
 737     OMPI_DATATYPE_RETAIN(datatype);
 738 
 739     
 740     switch (header->base.type) {
 741     case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
 742     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
 743     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
 744     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
 745         pending_acc->header.acc = header->acc;
 746         break;
 747     case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
 748         pending_acc->header.cswap = header->cswap;
 749         break;
 750     default:
 751         
 752         assert (0);
 753     }
 754 
 755     
 756     OPAL_THREAD_SCOPED_LOCK(&module->pending_acc_lock, opal_list_append (&module->pending_acc, &pending_acc->super));
 757 
 758     return OMPI_SUCCESS;
 759 }
 760 
 761 static int replace_cb (ompi_request_t *request)
 762 {
 763     ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
 764     int rank = MPI_PROC_NULL;
 765 
 766     if (request->req_status.MPI_TAG & 0x01) {
 767         rank = request->req_status.MPI_SOURCE;
 768     }
 769 
 770     mark_incoming_completion (module, rank);
 771 
 772     
 773     ompi_osc_pt2pt_accumulate_unlock (module);
 774 
 775     ompi_request_free (&request);
 776     return 1;
 777 }
 778 
 779 
 780 
 781 
 782 
 783 
 784 
 785 
 786 
 787 
 788 
 789 
 790 
 791 
 792 
 793 
 794 static int ompi_osc_pt2pt_acc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
 795                                     ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
 796 {
 797     void *target = (unsigned char*) module->baseptr +
 798         ((unsigned long) acc_header->displacement * module->disp_unit);
 799     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 800     ompi_proc_t *proc;
 801     int ret;
 802 
 803     proc = ompi_comm_peer_lookup(module->comm, source);
 804     assert (NULL != proc);
 805 
 806     ret = osc_pt2pt_accumulate_buffer (target, data, data_len, proc, acc_header->count,
 807                                       datatype, op);
 808 
 809     ompi_osc_pt2pt_accumulate_unlock (module);
 810 
 811     return ret;
 812 }
 813 
 814 
 815 
 816 
 817 
 818 
 819 
 820 
 821 
 822 
 823 
 824 
 825 
 826 
 827 static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
 828                                          ompi_osc_pt2pt_header_acc_t *acc_header) {
 829     struct osc_pt2pt_accumulate_data_t *acc_data;
 830     size_t buflen;
 831     void *buffer;
 832     ompi_proc_t *proc;
 833     void *target = (unsigned char*) module->baseptr +
 834         ((unsigned long) acc_header->displacement * module->disp_unit);
 835     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 836     ompi_datatype_t *primitive_datatype;
 837     uint32_t primitive_count;
 838     int ret;
 839 
 840     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 841                          "ompi_osc_pt2pt_acc_long_start starting..."));
 842 
 843     proc = ompi_comm_peer_lookup(module->comm, source);
 844     assert (NULL != proc);
 845 
 846     do {
 847         if (op == &ompi_mpi_op_replace.op) {
 848             ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype,
 849                                             source, tag_to_target(acc_header->tag), module->comm,
 850                                             NULL, replace_cb, module);
 851             break;
 852         }
 853 
 854         ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
 855         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 856             break;
 857         }
 858 
 859         primitive_count *= acc_header->count;
 860 
 861         buflen = datatype_buffer_length (datatype, acc_header->count);
 862 
 863         
 864         buffer = malloc (buflen);
 865         if (OPAL_UNLIKELY(NULL == buffer)) {
 866             ret = OMPI_ERR_OUT_OF_RESOURCE;
 867             break;
 868         }
 869 
 870         ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
 871                                              datatype, op, 1, &acc_data);
 872         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 873             free (buffer);
 874             break;
 875         }
 876 
 877         ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype,
 878                                         source, tag_to_target(acc_header->tag), module->comm,
 879                                         NULL, accumulate_cb, acc_data);
 880         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 881             OBJ_RELEASE(acc_data);
 882         }
 883     } while (0);
 884 
 885     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 886         ompi_osc_pt2pt_accumulate_unlock (module);
 887     }
 888 
 889     return ret;
 890 }
 891 
 892 
 893 
 894 
 895 
 896 
 897 
 898 
 899 
 900 
 901 
 902 
 903 
 904 
 905 
 906 
 907 static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
 908                                      ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
 909 {
 910     void *target = (unsigned char*) module->baseptr +
 911         ((unsigned long) acc_header->displacement * module->disp_unit);
 912     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 913     struct osc_pt2pt_accumulate_data_t *acc_data;
 914     ompi_proc_t *proc;
 915     int ret;
 916 
 917     proc = ompi_comm_peer_lookup(module->comm, source);
 918     assert (NULL != proc);
 919 
 920     do {
 921         ret = osc_pt2pt_accumulate_allocate (module, source, target, data, data_len, proc, acc_header->count,
 922                                             datatype, op, 1, &acc_data);
 923         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 924             break;
 925         }
 926 
 927         ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype,
 928                                         source, tag_to_origin(acc_header->tag), module->comm,
 929                                         accumulate_cb, acc_data);
 930         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 931             OBJ_RELEASE(acc_data);
 932         }
 933     } while (0);
 934 
 935     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 936         ompi_osc_pt2pt_accumulate_unlock (module);
 937     }
 938 
 939     return ret;
 940 }
 941 
 942 
 943 
 944 
 945 
 946 
 947 
 948 
 949 
 950 
 951 
 952 
 953 
 954 
 955 static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
 956                                      ompi_osc_pt2pt_header_acc_t *acc_header)
 957 {
 958     void *target = (unsigned char*) module->baseptr +
 959         ((unsigned long) acc_header->displacement * module->disp_unit);
 960     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 961     struct osc_pt2pt_accumulate_data_t *acc_data;
 962     ompi_datatype_t *primitive_datatype;
 963     ompi_request_t *recv_request;
 964     uint32_t primitive_count;
 965     ompi_proc_t *proc;
 966     size_t buflen;
 967     void *buffer;
 968     int ret;
 969 
 970     proc = ompi_comm_peer_lookup(module->comm, source);
 971     assert (NULL != proc);
 972 
 973     
 974     buflen = datatype_buffer_length (datatype, acc_header->count);
 975 
 976     do {
 977         ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
 978         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 979             break;
 980         }
 981 
 982         primitive_count *= acc_header->count;
 983 
 984         buffer = malloc (buflen);
 985         if (OPAL_UNLIKELY(NULL == buffer)) {
 986             ret = OMPI_ERR_OUT_OF_RESOURCE;
 987             break;
 988         }
 989 
 990         ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
 991                                              datatype, op, 2, &acc_data);
 992         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 993             free (buffer);
 994             break;
 995         }
 996 
 997         ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype,
 998                                         source, tag_to_target(acc_header->tag), module->comm,
 999                                         &recv_request, accumulate_cb, acc_data);
1000         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1001             OBJ_RELEASE(acc_data);
1002             break;
1003         }
1004 
1005         ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype,
1006                                         source, tag_to_origin(acc_header->tag), module->comm,
1007                                         accumulate_cb, acc_data);
1008         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1009             
1010             ompi_request_cancel (recv_request);
1011             OBJ_RELEASE(acc_data);
1012             break;
1013         }
1014     } while (0);
1015 
1016     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1017         ompi_osc_pt2pt_accumulate_unlock (module);
1018     }
1019 
1020     return ret;
1021 }
1022 
1023 
1024 
1025 
1026 
1027 
1028 
1029 
1030 
1031 
1032 
1033 
1034 
1035 
1036 
1037 
1038 
1039 static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int source, void *data, ompi_datatype_t *datatype,
1040                                       ompi_osc_pt2pt_header_cswap_t *cswap_header)
1041 {
1042     void *target = (unsigned char*) module->baseptr +
1043         ((unsigned long) cswap_header->displacement * module->disp_unit);
1044     void *compare_addr, *origin_addr;
1045     size_t datatype_size;
1046     ompi_proc_t *proc;
1047     int ret;
1048 
1049     proc = ompi_comm_peer_lookup(module->comm, source);
1050     assert (NULL != proc);
1051 
1052     datatype_size = datatype->super.size;
1053 
1054     origin_addr  = data;
1055     compare_addr = (void *)((uintptr_t) data + datatype_size);
1056 
1057     do {
1058         
1059         ret = MCA_PML_CALL(send(target, 1, datatype, source, tag_to_origin(cswap_header->tag),
1060                                 MCA_PML_BASE_SEND_STANDARD, module->comm));
1061         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1062             break;
1063         }
1064 
1065         
1066         mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL);
1067 
1068         if (0 == memcmp (target, compare_addr, datatype_size)) {
1069             osc_pt2pt_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype);
1070         }
1071     } while (0);
1072 
1073     ompi_osc_pt2pt_accumulate_unlock (module);
1074 
1075     return ret;
1076 }
1077 
1078 
1079 
1080 
1081 
1082 
1083 
1084 
1085 
1086 
1087 
1088 int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
1089 {
1090     osc_pt2pt_pending_acc_t *pending_acc;
1091     int ret;
1092 
1093     
1094 
1095     if (ompi_osc_pt2pt_accumulate_trylock (module)) {
1096         return OMPI_SUCCESS;
1097     }
1098 
1099     OPAL_THREAD_LOCK(&module->pending_acc_lock);
1100     pending_acc = (osc_pt2pt_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
1101     OPAL_THREAD_UNLOCK(&module->pending_acc_lock);
1102     if (OPAL_UNLIKELY(NULL == pending_acc)) {
1103         
1104         ompi_osc_pt2pt_accumulate_unlock (module);
1105         return OMPI_SUCCESS;
1106     }
1107 
1108     switch (pending_acc->header.base.type) {
1109     case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
1110         ret = ompi_osc_pt2pt_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len,
1111                                        pending_acc->datatype, &pending_acc->header.acc);
1112         free (pending_acc->data);
1113         break;
1114     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1115         ret = ompi_osc_pt2pt_acc_long_start (module, pending_acc->source, pending_acc->datatype,
1116                                             &pending_acc->header.acc);
1117         break;
1118     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
1119         ret = ompi_osc_pt2pt_gacc_start (module, pending_acc->source, pending_acc->data,
1120                                         pending_acc->data_len, pending_acc->datatype,
1121                                         &pending_acc->header.acc);
1122         break;
1123     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1124         ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype,
1125                                         &pending_acc->header.acc);
1126         break;
1127     case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
1128         ret = ompi_osc_pt2pt_cswap_start (module, pending_acc->source, pending_acc->data,
1129                                          pending_acc->datatype, &pending_acc->header.cswap);
1130         break;
1131     default:
1132         ret = OMPI_ERROR;
1133         
1134         assert (0);
1135     }
1136 
1137     
1138     mark_incoming_completion (module, pending_acc->active_target ? MPI_PROC_NULL : pending_acc->source);
1139 
1140     pending_acc->data = NULL;
1141     OBJ_RELEASE(pending_acc);
1142 
1143     return ret;
1144 }
1145 
1146 static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source,
1147                                ompi_osc_pt2pt_header_acc_t *acc_header)
1148 {
1149     bool active_target = !(acc_header->tag & 0x1);
1150     char *data = (char *) (acc_header + 1);
1151     struct ompi_datatype_t *datatype;
1152     uint64_t data_len;
1153     int ret;
1154 
1155     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1156                          "%d: process_acc: received message from %d",
1157                          ompi_comm_rank(module->comm),
1158                          source));
1159 
1160     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1161     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1162         return ret;
1163     }
1164 
1165     data_len = acc_header->len - ((char*) data - (char*) acc_header);
1166 
1167     
1168     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1169         ret = ompi_osc_pt2pt_acc_start (module, source, data, data_len, datatype,
1170                                        acc_header);
1171     } else {
1172         
1173         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1174                                            source, data, data_len, datatype, active_target);
1175     }
1176 
1177     
1178     OMPI_DATATYPE_RELEASE(datatype);
1179 
1180     return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1181 }
1182 
1183 static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source,
1184                                     ompi_osc_pt2pt_header_acc_t* acc_header)
1185 {
1186     bool active_target = !(acc_header->tag & 0x1);
1187     char *data = (char *) (acc_header + 1);
1188     struct ompi_datatype_t *datatype;
1189     int ret;
1190 
1191     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1192                          "%d: process_acc_long: received message from %d",
1193                          ompi_comm_rank(module->comm),
1194                          source));
1195 
1196     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1197     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1198         return ret;
1199     }
1200 
1201     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1202         ret = ompi_osc_pt2pt_acc_long_start (module, source, datatype, acc_header);
1203     } else {
1204         
1205         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source,
1206                                            NULL, 0, datatype, active_target);
1207     }
1208 
1209     
1210     OMPI_DATATYPE_RELEASE(datatype);
1211 
1212     return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1213 }
1214 
1215 static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source,
1216                                   ompi_osc_pt2pt_header_acc_t *acc_header)
1217 {
1218     bool active_target = !(acc_header->tag & 0x1);
1219     char *data = (char *) (acc_header + 1);
1220     struct ompi_datatype_t *datatype;
1221     void *buffer = NULL;
1222     uint64_t data_len;
1223     ompi_proc_t * proc;
1224     int ret;
1225 
1226     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1227                          "%d: process_get_acc: received message from %d",
1228                          ompi_comm_rank(module->comm),
1229                          source));
1230 
1231     ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
1232     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1233         return ret;
1234     }
1235 
1236     data_len = acc_header->len - ((char*) data - (char*) acc_header);
1237 
1238     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1239         
1240         if (data_len) {
1241             ompi_datatype_t *primitive_datatype = NULL;
1242             uint32_t primitive_count;
1243             buffer = malloc (data_len);
1244             if (OPAL_UNLIKELY(NULL == buffer)) {
1245                 OMPI_DATATYPE_RELEASE(datatype);
1246                 return OMPI_ERR_OUT_OF_RESOURCE;
1247             }
1248 
1249             ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
1250             primitive_count *= acc_header->count;
1251 
1252             osc_pt2pt_copy_on_recv (buffer, data, data_len, proc, primitive_count, primitive_datatype);
1253         }
1254 
1255         ret = ompi_osc_pt2pt_gacc_start (module, source, buffer, data_len, datatype,
1256                                         acc_header);
1257     } else {
1258         
1259         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1260                                            source, data, data_len, datatype, active_target);
1261     }
1262 
1263     
1264     OMPI_DATATYPE_RELEASE(datatype);
1265 
1266     return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1267 }
1268 
1269 static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int source,
1270                                        ompi_osc_pt2pt_header_acc_t *acc_header)
1271 {
1272     bool active_target = !(acc_header->tag & 0x1);
1273     char *data = (char *) (acc_header + 1);
1274     struct ompi_datatype_t *datatype;
1275     int ret;
1276 
1277     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1278                          "%d: process_acc: received message from %d",
1279                          ompi_comm_rank(module->comm),
1280                          source));
1281 
1282     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1283     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1284         return ret;
1285     }
1286 
1287     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1288         ret = ompi_osc_gacc_long_start (module, source, datatype, acc_header);
1289     } else {
1290         
1291         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1292                                            source, NULL, 0, datatype, active_target);
1293     }
1294 
1295     
1296     OMPI_DATATYPE_RELEASE(datatype);
1297 
1298     return OMPI_SUCCESS == ret ? (int) acc_header->len : ret;
1299 }
1300 
1301 
1302 static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source,
1303                                  ompi_osc_pt2pt_header_cswap_t *cswap_header)
1304 {
1305     bool active_target = !(cswap_header->tag & 0x1);
1306     char *data = (char*) (cswap_header + 1);
1307     struct ompi_datatype_t *datatype;
1308     int ret;
1309 
1310     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1311                          "%d: process_cswap: received message from %d",
1312                          ompi_comm_rank(module->comm),
1313                          source));
1314 
1315     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1316     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1317         return ret;
1318     }
1319 
1320     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1321         ret = ompi_osc_pt2pt_cswap_start (module, source, data, datatype, cswap_header);
1322     } else {
1323         
1324         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) cswap_header, source,
1325                                            data, 2 * datatype->super.size, datatype, active_target);
1326     }
1327 
1328     
1329     OMPI_DATATYPE_RELEASE(datatype);
1330 
1331     return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret;
1332 }
1333 
1334 static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
1335                                     ompi_osc_pt2pt_header_complete_t *complete_header)
1336 {
1337     
1338     osc_pt2pt_incoming_complete (module, source, complete_header->frag_count + 1);
1339 
1340     return sizeof (*complete_header);
1341 }
1342 
1343 
1344 
1345 
1346 static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
1347                                  ompi_osc_pt2pt_header_flush_t *flush_header)
1348 {
1349     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
1350     int ret;
1351 
1352     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1353                          "process_flush header = {.frag_count = %d}", flush_header->frag_count));
1354 
1355     
1356     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count);
1357 
1358     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1359                          "%d: process_flush: received message from %d. passive_incoming_frag_count = %d",
1360                          ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count));
1361 
1362     ret = ompi_osc_pt2pt_process_flush (module, source, flush_header);
1363     if (OMPI_SUCCESS != ret) {
1364         ompi_osc_pt2pt_pending_t *pending;
1365 
1366         pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
1367         pending->module = module;
1368         pending->source = source;
1369         pending->header.flush = *flush_header;
1370 
1371         osc_pt2pt_add_pending (pending);
1372     }
1373 
1374     
1375     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
1376 
1377     return sizeof (*flush_header);
1378 }
1379 
1380 static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
1381                                   ompi_osc_pt2pt_header_unlock_t *unlock_header)
1382 {
1383     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
1384     int ret;
1385 
1386     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1387                          "process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
1388 
1389     
1390     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count);
1391 
1392     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
1393                          "osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d",
1394                          source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count));
1395 
1396     ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header);
1397     if (OMPI_SUCCESS != ret) {
1398         ompi_osc_pt2pt_pending_t *pending;
1399 
1400         pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
1401         pending->module = module;
1402         pending->source = source;
1403         pending->header.unlock = *unlock_header;
1404 
1405         osc_pt2pt_add_pending (pending);
1406     }
1407 
1408     
1409     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
1410 
1411     return sizeof (*unlock_header);
1412 }
1413 
1414 static int process_large_datatype_request_cb (ompi_request_t *request)
1415 {
1416     ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer = (ompi_osc_pt2pt_ddt_buffer_t *) request->req_complete_cb_data;
1417     ompi_osc_pt2pt_module_t *module = ddt_buffer->module;
1418     ompi_osc_pt2pt_header_t *header = ddt_buffer->header;
1419     int source = ddt_buffer->source;
1420 
1421     
1422     switch (header->base.type) {
1423     case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1424         (void) process_put_long (module, source, &header->put);
1425         break;
1426     case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1427         (void) process_get (module, source, &header->get);
1428         break;
1429     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1430         (void) process_acc_long (module, source, &header->acc);
1431         break;
1432     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1433         (void) process_get_acc_long (module, source, &header->acc);
1434         break;
1435     default:
1436         
1437         assert (0);
1438         return OMPI_ERROR;
1439     }
1440 
1441     
1442     osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);
1443 
1444     ompi_request_free (&request);
1445     return 1;
1446 }
1447 
1448 
1449 
1450 
1451 
1452 
1453 
1454 
1455 
1456 
1457 
1458 
1459 
1460 static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_t *header)
1461 {
1462     ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer;
1463     int header_len, tag, ret;
1464     uint64_t ddt_len;
1465 
1466     
1467     switch (header->base.type) {
1468     case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1469         header_len = sizeof (header->put);
1470         tag = header->put.tag;
1471         break;
1472     case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1473         header_len = sizeof (header->get);
1474         tag = header->get.tag;
1475         break;
1476     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1477         header_len = sizeof (header->acc);
1478         tag = header->acc.tag;
1479         break;
1480     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1481         header_len = sizeof (header->acc);
1482         tag = header->acc.tag;
1483         break;
1484     default:
1485         
1486         opal_output (0, "Unsupported header/flag combination");
1487         return OMPI_ERROR;
1488     }
1489 
1490     ddt_len = *((uint64_t *)((uintptr_t) header + header_len));
1491 
1492     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
1493                          "process_large_datatype_request: processing fragment with type %d. ddt_len %lu",
1494                          header->base.type, (unsigned long) ddt_len));
1495 
1496     ddt_buffer = OBJ_NEW(ompi_osc_pt2pt_ddt_buffer_t);
1497     if (OPAL_UNLIKELY(NULL == ddt_buffer)) {
1498         return OMPI_ERR_OUT_OF_RESOURCE;
1499     }
1500 
1501     ddt_buffer->module = module;
1502     ddt_buffer->source = source;
1503 
1504     ddt_buffer->header = malloc (ddt_len + header_len);
1505     if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) {
1506         OBJ_RELEASE(ddt_buffer);
1507         return OMPI_ERR_OUT_OF_RESOURCE;
1508     }
1509 
1510     memcpy (ddt_buffer->header, header, header_len);
1511 
1512     ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
1513                                     ddt_len, MPI_BYTE,
1514                                     source, tag_to_target(tag), module->comm,
1515                                     NULL, process_large_datatype_request_cb, ddt_buffer);
1516     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1517         OBJ_RELEASE(ddt_buffer);
1518         return ret;
1519     }
1520 
1521     return header_len + 8;
1522 }
1523 
1524 
1525 
1526 
1527 static inline int process_frag (ompi_osc_pt2pt_module_t *module,
1528                                 ompi_osc_pt2pt_frag_header_t *frag)
1529 {
1530     ompi_osc_pt2pt_header_t *header;
1531     int ret;
1532 
1533     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1534                          "osc pt2pt: process_frag: from %d, ops %d",
1535                          (int) frag->source, (int) frag->num_ops));
1536 
1537     header = (ompi_osc_pt2pt_header_t *) (frag + 1);
1538 
1539     for (int i = 0 ; i < frag->num_ops ; ++i) {
1540         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1541                              "osc pt2pt: process_frag: type 0x%x. flag 0x%x. offset %u",
1542                              header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag),
1543                              header->base.flags));
1544 
1545         if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE))) {
1546             osc_pt2pt_ntoh(header);
1547             switch (header->base.type) {
1548             case OMPI_OSC_PT2PT_HDR_TYPE_PUT:
1549                 ret = process_put(module, frag->source, &header->put);
1550                 break;
1551             case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1552                 ret = process_put_long(module, frag->source, &header->put);
1553                 break;
1554 
1555             case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
1556                 ret = process_acc(module, frag->source, &header->acc);
1557                 break;
1558             case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1559                 ret = process_acc_long (module, frag->source, &header->acc);
1560                 break;
1561 
1562             case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
1563                 ret = process_unlock(module, frag->source, &header->unlock);
1564                 break;
1565 
1566             case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1567                 ret = process_get (module, frag->source, &header->get);
1568                 break;
1569 
1570             case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
1571                 ret = process_cswap (module, frag->source, &header->cswap);
1572                 break;
1573 
1574             case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
1575                 ret = process_get_acc (module, frag->source, &header->acc);
1576                 break;
1577 
1578             case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1579                 ret = process_get_acc_long (module, frag->source, &header->acc);
1580                 break;
1581 
1582             case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
1583                 ret = process_flush (module, frag->source, &header->flush);
1584                 break;
1585 
1586             case OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE:
1587                 ret = process_complete (module, frag->source, &header->complete);
1588                 break;
1589 
1590             default:
1591                 opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
1592                 abort(); 
1593             }
1594         } else {
1595             ret = process_large_datatype_request (module, frag->source, header);
1596         }
1597 
1598         if (ret <= 0) {
1599             opal_output(0, "Error processing fragment: %d", ret);
1600             abort(); 
1601         }
1602 
1603         
1604 
1605         header = (ompi_osc_pt2pt_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t);
1606     }
1607 
1608     return OMPI_SUCCESS;
1609 }
1610 
1611 
1612 static int ompi_osc_pt2pt_callback (ompi_request_t *request)
1613 {
1614     ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) request->req_complete_cb_data;
1615 
1616     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "received pt2pt fragment"));
1617 
1618     
1619 
1620     OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1621     opal_list_append (&mca_osc_pt2pt_component.pending_receives, &recv->super);
1622     OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1623 
1624     return OMPI_SUCCESS;
1625 }
1626 
1627 static int ompi_osc_pt2pt_receive_repost (ompi_osc_pt2pt_receive_t *recv)
1628 {
1629     
1630     ompi_request_wait_completion (recv->pml_request);
1631 
1632     
1633     recv->pml_request->req_complete_cb = ompi_osc_pt2pt_callback;
1634     recv->pml_request->req_complete_cb_data = (void *) recv;
1635 
1636     return MCA_PML_CALL(start(1, &recv->pml_request));
1637 }
1638 
1639 int ompi_osc_pt2pt_process_receive (ompi_osc_pt2pt_receive_t *recv)
1640 {
1641     ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) recv->module;
1642     ompi_osc_pt2pt_header_t *base_header = (ompi_osc_pt2pt_header_t *) recv->buffer;
1643     size_t incoming_length = recv->pml_request->req_status._ucount;
1644     int source = recv->pml_request->req_status.MPI_SOURCE;
1645     int rc __opal_attribute_unused__;
1646 
1647     assert(incoming_length >= sizeof(ompi_osc_pt2pt_header_base_t));
1648     (void)incoming_length;  
1649 
1650     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1651                          "received pt2pt callback for fragment. source = %d, count = %u, type = 0x%x",
1652                          source, (unsigned) incoming_length, base_header->base.type));
1653 
1654     osc_pt2pt_ntoh(base_header);
1655     switch (base_header->base.type) {
1656     case OMPI_OSC_PT2PT_HDR_TYPE_FRAG:
1657         process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header);
1658 
1659         
1660         mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ?
1661                                   source : MPI_PROC_NULL);
1662         break;
1663     case OMPI_OSC_PT2PT_HDR_TYPE_POST:
1664         osc_pt2pt_incoming_post (module, source);
1665         break;
1666     case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ:
1667         ompi_osc_pt2pt_process_lock(module, source, (ompi_osc_pt2pt_header_lock_t *) base_header);
1668         break;
1669     case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK:
1670         ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header);
1671         break;
1672     case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK:
1673         ompi_osc_pt2pt_process_flush_ack (module, source, (ompi_osc_pt2pt_header_flush_ack_t *) base_header);
1674         break;
1675     case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK:
1676         ompi_osc_pt2pt_process_unlock_ack (module, source, (ompi_osc_pt2pt_header_unlock_ack_t *) base_header);
1677         break;
1678     default:
1679         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1680                              "received unexpected message of type %x",
1681                              (int) base_header->base.type));
1682     }
1683 
1684     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1685                          "finished processing incoming messages"));
1686 
1687     osc_pt2pt_gc_clean (module);
1688 
1689     rc = ompi_osc_pt2pt_receive_repost (recv);
1690 
1691     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1692                          "finished posting receive request. rc: %d", rc));
1693 
1694     return OMPI_SUCCESS;
1695 }
1696 
1697 int ompi_osc_pt2pt_frag_start_receive (ompi_osc_pt2pt_module_t *module)
1698 {
1699     int rc;
1700 
1701     module->recv_frag_count = mca_osc_pt2pt_component.receive_count;
1702     if (0 == module->recv_frag_count) {
1703         module->recv_frag_count = 1;
1704     }
1705 
1706     module->recv_frags = malloc (sizeof (module->recv_frags[0]) * module->recv_frag_count);
1707     if (NULL == module->recv_frags) {
1708         return OMPI_ERR_OUT_OF_RESOURCE;
1709     }
1710 
1711     for (unsigned int i = 0 ; i < module->recv_frag_count ; ++i) {
1712         OBJ_CONSTRUCT(module->recv_frags + i, ompi_osc_pt2pt_receive_t);
1713         module->recv_frags[i].module = module;
1714         module->recv_frags[i].buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
1715         if (NULL == module->recv_frags[i].buffer) {
1716             return OMPI_ERR_OUT_OF_RESOURCE;
1717         }
1718 
1719         rc = ompi_osc_pt2pt_irecv_w_cb (module->recv_frags[i].buffer, mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t),
1720                                         MPI_BYTE, OMPI_ANY_SOURCE, OSC_PT2PT_FRAG_TAG, module->comm, &module->recv_frags[i].pml_request,
1721                                         ompi_osc_pt2pt_callback, module->recv_frags + i);
1722         if (OMPI_SUCCESS != rc) {
1723             return rc;
1724         }
1725     }
1726 
1727     return OMPI_SUCCESS;
1728 }
1729 
1730 int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf,
1731                                    size_t count, struct ompi_datatype_t *datatype,
1732                                    int src, int tag, struct ompi_communicator_t *comm)
1733 {
1734     return ompi_osc_pt2pt_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL,
1735                                      osc_pt2pt_incoming_req_complete, module);
1736 }
1737 
1738 int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
1739                               ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
1740 {
1741     ompi_request_t *request;
1742     int ret;
1743 
1744     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1745                          "osc pt2pt: ompi_osc_pt2pt_isend_w_cb sending %d bytes to %d with tag %d",
1746                          count, target, tag));
1747 
1748     ret = MCA_PML_CALL(isend_init((void *)ptr, count, datatype, target, tag,
1749                                   MCA_PML_BASE_SEND_STANDARD, comm, &request));
1750     if (OMPI_SUCCESS != ret) {
1751         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1752                              "error sending fragment. ret = %d", ret));
1753         return ret;
1754     }
1755 
1756     request->req_complete_cb = cb;
1757     request->req_complete_cb_data = ctx;
1758 
1759     ret = MCA_PML_CALL(start(1, &request));
1760 
1761     return ret;
1762 }
1763 
1764 int ompi_osc_pt2pt_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
1765                               ompi_communicator_t *comm, ompi_request_t **request_out,
1766                               ompi_request_complete_fn_t cb, void *ctx)
1767 {
1768     ompi_request_t *dummy;
1769     int ret;
1770 
1771     if (NULL == request_out) {
1772         request_out = &dummy;
1773     }
1774 
1775     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1776                          "osc pt2pt: ompi_osc_pt2pt_irecv_w_cb receiving %d bytes from %d with tag %d",
1777                          count, target, tag));
1778 
1779     ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, request_out));
1780     if (OMPI_SUCCESS != ret) {
1781         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1782                              "error posting receive. ret = %d", ret));
1783         return ret;
1784     }
1785 
1786     (*request_out)->req_complete_cb = cb;
1787     (*request_out)->req_complete_cb_data = ctx;
1788 
1789     ret = MCA_PML_CALL(start(1, request_out));
1790 
1791     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1792                          "osc pt2pt: pml start returned %d", ret));
1793 
1794     return ret;
1795 }