root/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. osc_pt2pt_accumulate_data_constructor
  2. osc_pt2pt_accumulate_data_destructor
  3. osc_pt2pt_pending_acc_constructor
  4. osc_pt2pt_pending_acc_destructor
  5. ompi_osc_pt2pt_ddt_buffer_constructor
  6. ompi_osc_pt2pt_ddt_buffer_destructor
  7. datatype_buffer_length
  8. ompi_osc_pt2pt_control_send
  9. ompi_osc_pt2pt_control_send_unbuffered_cb
  10. ompi_osc_pt2pt_control_send_unbuffered
  11. datatype_create
  12. process_put
  13. process_put_long
  14. osc_pt2pt_incoming_req_complete
  15. osc_pt2pt_get_post_send_cb
  16. osc_pt2pt_get_post_send
  17. process_get
  18. osc_pt2pt_accumulate_buffer
  19. osc_pt2pt_accumulate_allocate
  20. accumulate_cb
  21. ompi_osc_pt2pt_acc_op_queue
  22. replace_cb
  23. ompi_osc_pt2pt_acc_start
  24. ompi_osc_pt2pt_acc_long_start
  25. ompi_osc_pt2pt_gacc_start
  26. ompi_osc_gacc_long_start
  27. ompi_osc_pt2pt_cswap_start
  28. ompi_osc_pt2pt_progress_pending_acc
  29. process_acc
  30. process_acc_long
  31. process_get_acc
  32. process_get_acc_long
  33. process_cswap
  34. process_complete
  35. process_flush
  36. process_unlock
  37. process_large_datatype_request_cb
  38. process_large_datatype_request
  39. process_frag
  40. ompi_osc_pt2pt_callback
  41. ompi_osc_pt2pt_receive_repost
  42. ompi_osc_pt2pt_process_receive
  43. ompi_osc_pt2pt_frag_start_receive
  44. ompi_osc_pt2pt_component_irecv
  45. ompi_osc_pt2pt_isend_w_cb
  46. ompi_osc_pt2pt_irecv_w_cb

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University.
   4  *                         All rights reserved.
   5  * Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
   6  *                         All rights reserved.
   7  * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
   8  *                         University of Stuttgart.  All rights reserved.
   9  * Copyright (c) 2004-2005 The Regents of the University of California.
  10  *                         All rights reserved.
  11  * Copyright (c) 2007-2018 Los Alamos National Security, LLC.  All rights
  12  *                         reserved.
  13  * Copyright (c) 2009-2011 Oracle and/or its affiliates.  All rights reserved.
  14  * Copyright (c) 2012-2013 Sandia National Laboratories.  All rights reserved.
  15  * Copyright (c) 2014-2015 Research Organization for Information Science
  16  *                         and Technology (RIST). All rights reserved.
  17  * Copyright (c) 2016      FUJITSU LIMITED.  All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 
  25 #include "osc_pt2pt.h"
  26 #include "osc_pt2pt_header.h"
  27 #include "osc_pt2pt_data_move.h"
  28 #include "osc_pt2pt_frag.h"
  29 #include "osc_pt2pt_request.h"
  30 
  31 #include "opal/util/arch.h"
  32 #include "opal/sys/atomic.h"
  33 #include "opal/align.h"
  34 
  35 #include "ompi/mca/pml/pml.h"
  36 #include "ompi/mca/pml/base/pml_base_sendreq.h"
  37 #include "opal/mca/btl/btl.h"
  38 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
  39 #include "ompi/datatype/ompi_datatype.h"
  40 #include "ompi/op/op.h"
  41 #include "ompi/memchecker.h"
  42 
  43 /**
  44  * struct osc_pt2pt_accumulate_data_t:
  45  *
  46  * @brief Data associated with an in-progress accumulation operation.
  47  */
  48 struct osc_pt2pt_accumulate_data_t {
  49     opal_list_item_t super;
  50     ompi_osc_pt2pt_module_t* module;
  51     void *target;
  52     void *source;
  53     size_t source_len;
  54     ompi_proc_t *proc;
  55     int count;
  56     int peer;
  57     ompi_datatype_t *datatype;
  58     ompi_op_t *op;
  59     opal_atomic_int32_t request_count;
  60 };
  61 typedef struct osc_pt2pt_accumulate_data_t osc_pt2pt_accumulate_data_t;
  62 
  63 static void osc_pt2pt_accumulate_data_constructor (osc_pt2pt_accumulate_data_t *acc_data)
  64 {
  65     acc_data->source = NULL;
  66     acc_data->datatype = NULL;
  67     acc_data->op = NULL;
  68 }
  69 
  70 static void osc_pt2pt_accumulate_data_destructor (osc_pt2pt_accumulate_data_t *acc_data)
  71 {
  72     if (acc_data->source) {
  73         /* the source buffer is always alloc'd */
  74         free (acc_data->source);
  75     }
  76 
  77     if (acc_data->datatype) {
  78         OMPI_DATATYPE_RELEASE(acc_data->datatype);
  79     }
  80 }
  81 
  82 OBJ_CLASS_DECLARATION(osc_pt2pt_accumulate_data_t);
  83 OBJ_CLASS_INSTANCE(osc_pt2pt_accumulate_data_t, opal_list_item_t, osc_pt2pt_accumulate_data_constructor,
  84                    osc_pt2pt_accumulate_data_destructor);
  85 
  86 /**
  87  * osc_pt2pt_pending_acc_t:
  88  *
  89  * @brief Keep track of accumulate and cswap operations that are
  90  * waiting on the accumulate lock.
  91  *
  92  *  Since accumulate operations may take several steps to
  93  * complete we need to lock the accumulate lock until the operation
  94  * is complete. While the lock is held it is possible that additional
  95  * accumulate operations will arrive. This structure keep track of
  96  * those operations.
  97  */
  98 struct osc_pt2pt_pending_acc_t {
  99     opal_list_item_t super;
 100     ompi_osc_pt2pt_header_t header;
 101     int source;
 102     void *data;
 103     size_t data_len;
 104     ompi_datatype_t *datatype;
 105     bool active_target;
 106 };
 107 typedef struct osc_pt2pt_pending_acc_t osc_pt2pt_pending_acc_t;
 108 
 109 static void osc_pt2pt_pending_acc_constructor (osc_pt2pt_pending_acc_t *pending)
 110 {
 111     pending->data = NULL;
 112     pending->datatype = NULL;
 113 }
 114 
 115 static void osc_pt2pt_pending_acc_destructor (osc_pt2pt_pending_acc_t *pending)
 116 {
 117     if (NULL != pending->data) {
 118         free (pending->data);
 119     }
 120 
 121     if (NULL != pending->datatype) {
 122         OMPI_DATATYPE_RELEASE(pending->datatype);
 123     }
 124 }
 125 
 126 OBJ_CLASS_DECLARATION(osc_pt2pt_pending_acc_t);
 127 OBJ_CLASS_INSTANCE(osc_pt2pt_pending_acc_t, opal_list_item_t,
 128                    osc_pt2pt_pending_acc_constructor, osc_pt2pt_pending_acc_destructor);
 129 /* end ompi_osc_pt2pt_pending_acc_t class */
 130 
 131 /**
 132  * @brief Class for large datatype descriptions
 133  *
 134  * This class is used to keep track of buffers for large datatype desctiotions
 135  * (datatypes that do not fit in an eager fragment). The structure is designed
 136  * to take advantage of the small datatype description code path.
 137  */
 138 struct ompi_osc_pt2pt_ddt_buffer_t {
 139     /** allows this class to be stored in the buffer garbage collection
 140      * list */
 141     opal_list_item_t super;
 142 
 143     /** OSC PT2PT module */
 144     ompi_osc_pt2pt_module_t *module;
 145     /** source of this header */
 146     int source;
 147     /** header + datatype data */
 148     ompi_osc_pt2pt_header_t *header;
 149 };
 150 typedef struct ompi_osc_pt2pt_ddt_buffer_t ompi_osc_pt2pt_ddt_buffer_t;
 151 
 152 static void ompi_osc_pt2pt_ddt_buffer_constructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
 153 {
 154     ddt_buffer->header = NULL;
 155 }
 156 
 157 static void ompi_osc_pt2pt_ddt_buffer_destructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
 158 {
 159     if (ddt_buffer->header) {
 160         free (ddt_buffer->header);
 161         ddt_buffer->header = NULL;
 162     }
 163 }
 164 
 165 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_ddt_buffer_t);
 166 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_ddt_buffer_t, opal_list_item_t, ompi_osc_pt2pt_ddt_buffer_constructor,
 167                    ompi_osc_pt2pt_ddt_buffer_destructor);
 168 /* end ompi_osc_pt2pt_ddt_buffer_t class */
 169 
 170 /**
 171  * datatype_buffer_length:
 172  *
 173  * @brief Determine the buffer size needed to hold count elements of datatype.
 174  *
 175  * @param[in] datatype  - Element type
 176  * @param[in] count     - Element count
 177  *
 178  * @returns buflen Buffer length needed to hold count elements of datatype
 179  */
 180 static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
 181 {
 182     ompi_datatype_t *primitive_datatype = NULL;
 183     uint32_t primitive_count;
 184     size_t buflen;
 185 
 186     ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
 187     primitive_count *= count;
 188 
 189     /* figure out how big a buffer we need */
 190     ompi_datatype_type_size(primitive_datatype, &buflen);
 191 
 192     return buflen * primitive_count;
 193 }
 194 
 195 /**
 196  * ompi_osc_pt2pt_control_send:
 197  *
 198  * @brief send a control message as part of a fragment
 199  *
 200  * @param[in]  module  - OSC PT2PT module
 201  * @param[in]  target  - Target peer's rank
 202  * @param[in]  data    - Data to send
 203  * @param[in]  len     - Length of data
 204  *
 205  * @returns error OMPI error code or OMPI_SUCCESS
 206  *
 207  *  "send" a control messages.  Adds it to the active fragment, so the
 208  * caller will still need to explicitly flush (either to everyone or
 209  * to a target) before this is sent.
 210  */
 211 int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
 212                                  void *data, size_t len)
 213 {
 214     ompi_osc_pt2pt_frag_t *frag;
 215     char *ptr;
 216     int ret;
 217 
 218     ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false, true);
 219     if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
 220         memcpy (ptr, data, len);
 221 
 222         ret = ompi_osc_pt2pt_frag_finish(module, frag);
 223     }
 224 
 225     return ret;
 226 }
 227 
 228 static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
 229 {
 230     void *ctx = request->req_complete_cb_data;
 231     ompi_osc_pt2pt_module_t *module;
 232 
 233     /* get module pointer and data */
 234     module = *(ompi_osc_pt2pt_module_t **)ctx;
 235 
 236     /* mark this send as complete */
 237     mark_outgoing_completion (module);
 238 
 239     /* free the temporary buffer */
 240     free (ctx);
 241 
 242     ompi_request_free (&request);
 243     return 1;
 244 }
 245 
 246 /**
 247  * ompi_osc_pt2pt_control_send_unbuffered:
 248  *
 249  * @brief Send an unbuffered control message to a peer.
 250  *
 251  * @param[in] module - OSC PT2PT module
 252  * @param[in] target - Target rank
 253  * @param[in] data   - Data to send
 254  * @param[in] len    - Length of data
 255  *
 256  *  Directly send a control message.  This does not allocate a
 257  * fragment, so should only be used when sending other messages would
 258  * be erroneous (such as complete messages, when there may be queued
 259  * transactions from an overlapping post that has already heard back
 260  * from its peer). The buffer specified by data will be available
 261  * when this call returns.
 262  */
 263 int ompi_osc_pt2pt_control_send_unbuffered(ompi_osc_pt2pt_module_t *module,
 264                                           int target, void *data, size_t len)
 265 {
 266     void *ctx, *data_copy;
 267 
 268     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 269                          "osc pt2pt: sending unbuffered fragment to %d", target));
 270 
 271     /* allocate a temporary buffer for this send */
 272     ctx = malloc (sizeof(ompi_osc_pt2pt_module_t*) + len);
 273     if (OPAL_UNLIKELY(NULL == ctx)) {
 274         return OMPI_ERR_OUT_OF_RESOURCE;
 275     }
 276 
 277     /* increment outgoing signal count. this send is not part of a passive epoch
 278      * so there it would be erroneous to increment the epoch counters. */
 279     ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1);
 280 
 281     /* store module pointer and data in the buffer */
 282     *(ompi_osc_pt2pt_module_t**)ctx = module;
 283     data_copy = (ompi_osc_pt2pt_module_t**)ctx + 1;
 284     memcpy (data_copy, data, len);
 285 
 286     return ompi_osc_pt2pt_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_PT2PT_FRAG_TAG,
 287                                      module->comm, ompi_osc_pt2pt_control_send_unbuffered_cb, ctx);
 288 }
 289 
 290 /**
 291  * datatype_create:
 292  *
 293  * @brief Utility function that creates a new datatype from a packed
 294  *        description.
 295  *
 296  * @param[in]    module   - OSC PT2PT module
 297  * @param[in]    peer     - Peer rank
 298  * @param[out]   datatype - New datatype. Must be released with OBJ_RELEASE.
 299  * @param[out]   proc     - Optional. Proc for peer.
 300  * @param[inout] data     - Pointer to a pointer where the description is stored. This
 301  *                          pointer will be updated to the location after the packed
 302  *                          description.
 303  */
 304 static inline int datatype_create (ompi_osc_pt2pt_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data)
 305 {
 306     ompi_datatype_t *new_datatype = NULL;
 307     ompi_proc_t *peer_proc;
 308     int ret = OMPI_SUCCESS;
 309 
 310     do {
 311         peer_proc = ompi_comm_peer_lookup(module->comm, peer);
 312         if (OPAL_UNLIKELY(NULL == peer_proc)) {
 313             OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
 314                                  "%d: datatype_create: could not resolve proc pointer for peer %d",
 315                                  ompi_comm_rank(module->comm),
 316                                  peer));
 317             ret = OMPI_ERROR;
 318             break;
 319         }
 320 
 321         new_datatype = ompi_osc_base_datatype_create(peer_proc, data);
 322         if (OPAL_UNLIKELY(NULL == new_datatype)) {
 323             OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
 324                                  "%d: datatype_create: could not resolve datatype for peer %d",
 325                                  ompi_comm_rank(module->comm), peer));
 326             ret = OMPI_ERROR;
 327         }
 328     } while (0);
 329 
 330     *datatype = new_datatype;
 331     if (proc) *proc = peer_proc;
 332 
 333     return ret;
 334 }
 335 
 336 /**
 337  * process_put:
 338  *
 339  * @shoer Process a put w/ data message
 340  *
 341  * @param[in] module     - OSC PT2PT module
 342  * @param[in] source     - Message source
 343  * @param[in] put_header - Message header + data
 344  *
 345  *  Process a put message and copy the message data to the specified
 346  * memory region. Note, this function does not handle any bounds
 347  * checking at the moment.
 348  */
 349 static inline int process_put(ompi_osc_pt2pt_module_t* module, int source,
 350                               ompi_osc_pt2pt_header_put_t* put_header)
 351 {
 352     char *data = (char*) (put_header + 1);
 353     ompi_proc_t *proc;
 354     struct ompi_datatype_t *datatype;
 355     size_t data_len;
 356     void *target = (unsigned char*) module->baseptr +
 357         ((unsigned long) put_header->displacement * module->disp_unit);
 358     int ret;
 359 
 360     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 361                          "%d: process_put: received message from %d",
 362                          ompi_comm_rank(module->comm),
 363                          source));
 364 
 365     ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
 366     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 367         return ret;
 368     }
 369 
 370     data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header);
 371 
 372     osc_pt2pt_copy_on_recv (target, data, data_len, proc, put_header->count, datatype);
 373 
 374     OMPI_DATATYPE_RELEASE(datatype);
 375 
 376     return put_header->len;
 377 }
 378 
 379 static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source,
 380                                    ompi_osc_pt2pt_header_put_t* put_header)
 381 {
 382     char *data = (char*) (put_header + 1);
 383     struct ompi_datatype_t *datatype;
 384     void *target = (unsigned char*) module->baseptr +
 385         ((unsigned long) put_header->displacement * module->disp_unit);
 386     int ret;
 387 
 388     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 389                          "%d: process_put_long: received message from %d",
 390                          ompi_comm_rank(module->comm),
 391                          source));
 392 
 393     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
 394     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 395         return ret;
 396     }
 397 
 398     ret = ompi_osc_pt2pt_component_irecv (module, target,
 399                                          put_header->count,
 400                                          datatype, source,
 401                                          tag_to_target(put_header->tag),
 402                                          module->comm);
 403     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 404         OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
 405                              "%d: process_put_long: irecv error: %d",
 406                              ompi_comm_rank(module->comm),
 407                              ret));
 408         return OMPI_ERROR;
 409     }
 410 
 411     OMPI_DATATYPE_RELEASE(datatype);
 412 
 413     return put_header->len;
 414 }
 415 
 416 /**
 417  * osc_pt2pt_incoming_req_complete:
 418  *
 419  * @brief Completion callback for a receive associate with an access
 420  *        epoch.
 421  *
 422  * @param[in] request - PML request with an OSC RMDA module as the callback data.
 423  *
 424  *  This function is called when a send or recieve associated with an
 425  *       access epoch completes. When fired this function will increment the
 426  *       passive or active incoming count.
 427  */
 428 static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)
 429 {
 430     ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
 431     int rank = MPI_PROC_NULL;
 432 
 433     if (request->req_status.MPI_TAG & 0x01) {
 434         rank = request->req_status.MPI_SOURCE;
 435     }
 436 
 437     mark_incoming_completion (module, rank);
 438 
 439     ompi_request_free (&request);
 440     return 1;
 441 }
 442 
 443 struct osc_pt2pt_get_post_send_cb_data_t {
 444     ompi_osc_pt2pt_module_t *module;
 445     int peer;
 446 };
 447 
 448 static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
 449 {
 450     struct osc_pt2pt_get_post_send_cb_data_t *data =
 451         (struct osc_pt2pt_get_post_send_cb_data_t *) request->req_complete_cb_data;
 452     ompi_osc_pt2pt_module_t *module = data->module;
 453     int rank = data->peer;
 454 
 455     free (data);
 456 
 457     /* mark this as a completed "incoming" request */
 458     mark_incoming_completion (module, rank);
 459 
 460     ompi_request_free (&request);
 461     return 1;
 462 }
 463 
 464 /**
 465  * @brief Post a send to match the remote receive for a get operation.
 466  *
 467  * @param[in] module   - OSC PT2PT module
 468  * @param[in] source   - Source buffer
 469  * @param[in] count    - Number of elements in the source buffer
 470  * @param[in] datatype - Type of source elements.
 471  * @param[in] peer     - Remote process that has the receive posted
 472  * @param[in] tag      - Tag for the send
 473  *
 474  *  This function posts a send to match the receive posted as part
 475  *       of a get operation. When this send is complete the get is considered
 476  *       complete at the target (this process).
 477  */
 478 static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *source, int count,
 479                                    ompi_datatype_t *datatype, int peer, int tag)
 480 {
 481     struct osc_pt2pt_get_post_send_cb_data_t *data;
 482     int ret;
 483 
 484     data = malloc (sizeof (*data));
 485     if (OPAL_UNLIKELY(NULL == data)) {
 486         return OMPI_ERR_OUT_OF_RESOURCE;
 487     }
 488 
 489     data->module = module;
 490     /* for incoming completion we need to know the peer (MPI_PROC_NULL if this is
 491      * in an active target epoch) */
 492     data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL;
 493 
 494     /* data will be freed by the callback */
 495     ret = ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm,
 496                                      osc_pt2pt_get_post_send_cb, (void *) data);
 497     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 498         free (data);
 499     }
 500 
 501     return ret;
 502 }
 503 
 504 /**
 505  * process_get:
 506  *
 507  * @brief Process a get message from a remote peer
 508  *
 509  * @param[in] module     - OSC PT2PT module
 510  * @param[in] target     - Peer process
 511  * @param[in] get_header - Incoming message header
 512  */
 513 static inline int process_get (ompi_osc_pt2pt_module_t* module, int target,
 514                                ompi_osc_pt2pt_header_get_t* get_header)
 515 {
 516     char *data = (char *) (get_header + 1);
 517     struct ompi_datatype_t *datatype;
 518     void *source = (unsigned char*) module->baseptr +
 519         ((unsigned long) get_header->displacement * module->disp_unit);
 520     int ret;
 521 
 522     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 523                          "%d: process_get: received message from %d",
 524                          ompi_comm_rank(module->comm),
 525                          target));
 526 
 527     ret = datatype_create (module, target, NULL, &datatype, (void **) &data);
 528     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 529         return ret;
 530     }
 531 
 532     /* send get data */
 533     ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype,
 534                                   target, tag_to_origin(get_header->tag));
 535 
 536     OMPI_DATATYPE_RELEASE(datatype);
 537 
 538     return OMPI_SUCCESS == ret ? (int) get_header->len : ret;
 539 }
 540 
 541 /**
 542  * osc_pt2pt_accumulate_buffer:
 543  *
 544  * @brief Accumulate data into the target buffer.
 545  *
 546  * @param[in] target     - Target buffer
 547  * @param[in] source     - Source buffer
 548  * @param[in] source_len - Length of source buffer in bytes
 549  * @param[in] proc       - Source proc
 550  * @param[in] count      - Number of elements in target buffer
 551  * @param[in] datatype   - Type of elements in target buffer
 552  * @param[in] op         - Operation to be performed
 553  */
 554 static inline int osc_pt2pt_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc,
 555                                               int count, ompi_datatype_t *datatype, ompi_op_t *op)
 556 {
 557     int ret;
 558 
 559     assert (NULL != target && NULL != source);
 560 
 561     if (op == &ompi_mpi_op_replace.op) {
 562         osc_pt2pt_copy_on_recv (target, source, source_len, proc, count, datatype);
 563         return OMPI_SUCCESS;
 564     }
 565 
 566 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
 567     if (proc->super.proc_arch != ompi_proc_local()->super.proc_arch) {
 568         ompi_datatype_t *primitive_datatype = NULL;
 569         uint32_t primitive_count;
 570         size_t buflen;
 571         void *buffer;
 572 
 573         ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
 574         primitive_count *= count;
 575 
 576         /* figure out how big a buffer we need */
 577         ompi_datatype_type_size(primitive_datatype, &buflen);
 578         buflen *= primitive_count;
 579 
 580         buffer = malloc (buflen);
 581         if (OPAL_UNLIKELY(NULL == buffer)) {
 582             return OMPI_ERR_OUT_OF_RESOURCE;
 583         }
 584 
 585         osc_pt2pt_copy_on_recv (buffer, source, source_len, proc, primitive_count, primitive_datatype);
 586 
 587         ret = ompi_osc_base_process_op(target, buffer, source_len, datatype,
 588                                        count, op);
 589 
 590         free(buffer);
 591     } else
 592 #endif
 593 
 594     /* copy the data from the temporary buffer into the user window */
 595     ret = ompi_osc_base_process_op(target, source, source_len, datatype,
 596                                    count, op);
 597 
 598     return ret;
 599 }
 600 
 601 /**
 602  * @brief Create an accumulate data object.
 603  *
 604  * @param[in]  module        - PT2PT OSC module
 605  * @param[in]  target        - Target for the accumulation
 606  * @param[in]  source        - Source of accumulate data. Must be allocated with malloc/calloc/etc
 607  * @param[in]  source_len    - Length of the source buffer in bytes
 608  * @param[in]  proc          - Source proc
 609  * @param[in]  count         - Number of elements to accumulate
 610  * @param[in]  datatype      - Datatype to accumulate
 611  * @oaram[in]  op            - Operator
 612  * @param[in]  request_count - Number of prerequisite requests
 613  * @param[out] acc_data_out  - New accumulation data
 614  *
 615  * This function is used to create a copy of the data needed to perform an accumulation.
 616  * This data should be provided to ompi_osc_pt2pt_isend_w_cb or ompi_osc_pt2pt_irecv_w_cb
 617  * as the ctx parameter with accumulate_cb as the cb parameter.
 618  */
 619 static int osc_pt2pt_accumulate_allocate (ompi_osc_pt2pt_module_t *module, int peer, void *target, void *source, size_t source_len,
 620                                          ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
 621                                          int request_count, osc_pt2pt_accumulate_data_t **acc_data_out)
 622 {
 623     osc_pt2pt_accumulate_data_t *acc_data;
 624 
 625     acc_data = OBJ_NEW(osc_pt2pt_accumulate_data_t);
 626     if (OPAL_UNLIKELY(NULL == acc_data)) {
 627         return OMPI_ERR_OUT_OF_RESOURCE;
 628     }
 629 
 630     acc_data->module = module;
 631     acc_data->peer = peer;
 632     acc_data->target = target;
 633     acc_data->source = source;
 634     acc_data->source_len = source_len;
 635     acc_data->proc = proc;
 636     acc_data->count = count;
 637     acc_data->datatype = datatype;
 638     OMPI_DATATYPE_RETAIN(datatype);
 639     acc_data->op = op;
 640     acc_data->request_count = request_count;
 641 
 642     *acc_data_out = acc_data;
 643 
 644     return OMPI_SUCCESS;
 645 }
 646 
 647 /**
 648  * @brief Execute the accumulate once the request counter reaches 0.
 649  *
 650  * @param[in] request      - request
 651  *
 652  * The request should be created with ompi_osc_pt2pt_isend_w_cb or ompi_osc_pt2pt_irecv_w_cb
 653  * with ctx allocated by osc_pt2pt_accumulate_allocate. This callback will free the accumulate
 654  * data once the accumulation operation is complete.
 655  */
 656 static int accumulate_cb (ompi_request_t *request)
 657 {
 658     struct osc_pt2pt_accumulate_data_t *acc_data = (struct osc_pt2pt_accumulate_data_t *) request->req_complete_cb_data;
 659     ompi_osc_pt2pt_module_t *module = acc_data->module;
 660     int rank = MPI_PROC_NULL;
 661     int ret = OMPI_SUCCESS;
 662 
 663     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 664                          "accumulate_cb, request_count = %d", acc_data->request_count));
 665 
 666     if (request->req_status.MPI_TAG & 0x01) {
 667         rank = acc_data->peer;
 668     }
 669 
 670     if (0 == OPAL_THREAD_ADD_FETCH32(&acc_data->request_count, -1)) {
 671         /* no more requests needed before the buffer can be accumulated */
 672 
 673         if (acc_data->source) {
 674             ompi_datatype_t *primitive_datatype = NULL;
 675             uint32_t primitive_count;
 676 
 677             assert (NULL != acc_data->target && NULL != acc_data->source);
 678 
 679             ompi_osc_base_get_primitive_type_info(acc_data->datatype, &primitive_datatype, &primitive_count);
 680             primitive_count *= acc_data->count;
 681 
 682             if (acc_data->op == &ompi_mpi_op_replace.op) {
 683                 ret = ompi_datatype_sndrcv(acc_data->source, primitive_count, primitive_datatype, acc_data->target, acc_data->count, acc_data->datatype);
 684             } else {
 685                 ret = ompi_osc_base_process_op(acc_data->target, acc_data->source, acc_data->source_len, acc_data->datatype, acc_data->count, acc_data->op);
 686             }
 687         }
 688 
 689         /* drop the accumulate lock */
 690         ompi_osc_pt2pt_accumulate_unlock (module);
 691 
 692         osc_pt2pt_gc_add_buffer (module, &acc_data->super);
 693     }
 694 
 695     mark_incoming_completion (module, rank);
 696 
 697     ompi_request_free (&request);
 698     return ret;
 699 }
 700 
 701 
 702 static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
 703                                         char *data, size_t data_len, ompi_datatype_t *datatype, bool active_target)
 704 {
 705     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
 706     osc_pt2pt_pending_acc_t *pending_acc;
 707 
 708     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 709                          "%d: queuing accumulate operation", ompi_comm_size (module->comm)));
 710 
 711     pending_acc = OBJ_NEW(osc_pt2pt_pending_acc_t);
 712     if (OPAL_UNLIKELY(NULL == pending_acc)) {
 713         return OMPI_ERR_OUT_OF_RESOURCE;
 714     }
 715 
 716     /* NTH: ensure we don't leave wait/process_flush/etc until this
 717      * accumulate operation is complete. */
 718     if (active_target) {
 719         OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -1);
 720     } else {
 721         OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
 722     }
 723 
 724     pending_acc->active_target = active_target;
 725     pending_acc->source = source;
 726 
 727     /* save any inline data (eager acc, gacc only) */
 728     pending_acc->data_len = data_len;
 729 
 730     if (data_len) {
 731         pending_acc->data = malloc (data_len);
 732         memcpy (pending_acc->data, data, data_len);
 733     }
 734 
 735     /* save the datatype */
 736     pending_acc->datatype = datatype;
 737     OMPI_DATATYPE_RETAIN(datatype);
 738 
 739     /* save the header */
 740     switch (header->base.type) {
 741     case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
 742     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
 743     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
 744     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
 745         pending_acc->header.acc = header->acc;
 746         break;
 747     case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
 748         pending_acc->header.cswap = header->cswap;
 749         break;
 750     default:
 751         /* it is a coding error if any other header types are queued this way */
 752         assert (0);
 753     }
 754 
 755     /* add to the pending acc queue */
 756     OPAL_THREAD_SCOPED_LOCK(&module->pending_acc_lock, opal_list_append (&module->pending_acc, &pending_acc->super));
 757 
 758     return OMPI_SUCCESS;
 759 }
 760 
 761 static int replace_cb (ompi_request_t *request)
 762 {
 763     ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
 764     int rank = MPI_PROC_NULL;
 765 
 766     if (request->req_status.MPI_TAG & 0x01) {
 767         rank = request->req_status.MPI_SOURCE;
 768     }
 769 
 770     mark_incoming_completion (module, rank);
 771 
 772     /* unlock the accumulate lock */
 773     ompi_osc_pt2pt_accumulate_unlock (module);
 774 
 775     ompi_request_free (&request);
 776     return 1;
 777 }
 778 
 779 /**
 780  * ompi_osc_pt2pt_acc_start:
 781  *
 782  * @brief Start an accumulate with data operation.
 783  *
 784  * @param[in] module     - OSC PT2PT module
 785  * @param[in] source     - Source rank
 786  * @param[in] data       - Accumulate data
 787  * @param[in] data_len   - Length of the accumulate data
 788  * @param[in] datatype   - Accumulation datatype
 789  * @param[in] acc_header - Accumulate header
 790  *
 791  * The module's accumulation lock must be held before calling this
 792  * function. It will release the lock when the operation is complete.
 793  */
 794 static int ompi_osc_pt2pt_acc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
 795                                     ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
 796 {
 797     void *target = (unsigned char*) module->baseptr +
 798         ((unsigned long) acc_header->displacement * module->disp_unit);
 799     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 800     ompi_proc_t *proc;
 801     int ret;
 802 
 803     proc = ompi_comm_peer_lookup(module->comm, source);
 804     assert (NULL != proc);
 805 
 806     ret = osc_pt2pt_accumulate_buffer (target, data, data_len, proc, acc_header->count,
 807                                       datatype, op);
 808 
 809     ompi_osc_pt2pt_accumulate_unlock (module);
 810 
 811     return ret;
 812 }
 813 
 814 /**
 815  * ompi_osc_pt2pt_acc_start:
 816  *
 817  * @brief Start a long accumulate operation.
 818  *
 819  * @param[in] module     - OSC PT2PT module
 820  * @param[in] source     - Source rank
 821  * @param[in] datatype   - Accumulation datatype
 822  * @param[in] acc_header - Accumulate header
 823  *
 824  * The module's accumulation lock must be held before calling this
 825  * function. It will release the lock when the operation is complete.
 826  */
 827 static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
 828                                          ompi_osc_pt2pt_header_acc_t *acc_header) {
 829     struct osc_pt2pt_accumulate_data_t *acc_data;
 830     size_t buflen;
 831     void *buffer;
 832     ompi_proc_t *proc;
 833     void *target = (unsigned char*) module->baseptr +
 834         ((unsigned long) acc_header->displacement * module->disp_unit);
 835     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 836     ompi_datatype_t *primitive_datatype;
 837     uint32_t primitive_count;
 838     int ret;
 839 
 840     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 841                          "ompi_osc_pt2pt_acc_long_start starting..."));
 842 
 843     proc = ompi_comm_peer_lookup(module->comm, source);
 844     assert (NULL != proc);
 845 
 846     do {
 847         if (op == &ompi_mpi_op_replace.op) {
 848             ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype,
 849                                             source, tag_to_target(acc_header->tag), module->comm,
 850                                             NULL, replace_cb, module);
 851             break;
 852         }
 853 
 854         ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
 855         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 856             break;
 857         }
 858 
 859         primitive_count *= acc_header->count;
 860 
 861         buflen = datatype_buffer_length (datatype, acc_header->count);
 862 
 863         /* allocate a temporary buffer to receive the accumulate data */
 864         buffer = malloc (buflen);
 865         if (OPAL_UNLIKELY(NULL == buffer)) {
 866             ret = OMPI_ERR_OUT_OF_RESOURCE;
 867             break;
 868         }
 869 
 870         ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
 871                                              datatype, op, 1, &acc_data);
 872         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 873             free (buffer);
 874             break;
 875         }
 876 
 877         ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype,
 878                                         source, tag_to_target(acc_header->tag), module->comm,
 879                                         NULL, accumulate_cb, acc_data);
 880         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 881             OBJ_RELEASE(acc_data);
 882         }
 883     } while (0);
 884 
 885     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 886         ompi_osc_pt2pt_accumulate_unlock (module);
 887     }
 888 
 889     return ret;
 890 }
 891 
 892 /**
 893  * ompi_osc_pt2pt_gacc_start:
 894  *
 895  * @brief Start a accumulate with data + get operation.
 896  *
 897  * @param[in] module         - OSC PT2PT module
 898  * @param[in] source         - Source rank
 899  * @param[in] data           - Accumulate data. Must be allocated on the heap.
 900  * @param[in] data_len       - Length of the accumulate data
 901  * @param[in] datatype       - Accumulation datatype
 902  * @param[in] get_acc_header - Accumulate header
 903  *
 904  * The module's accumulation lock must be held before calling this
 905  * function. It will release the lock when the operation is complete.
 906  */
 907 static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
 908                                      ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
 909 {
 910     void *target = (unsigned char*) module->baseptr +
 911         ((unsigned long) acc_header->displacement * module->disp_unit);
 912     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 913     struct osc_pt2pt_accumulate_data_t *acc_data;
 914     ompi_proc_t *proc;
 915     int ret;
 916 
 917     proc = ompi_comm_peer_lookup(module->comm, source);
 918     assert (NULL != proc);
 919 
 920     do {
 921         ret = osc_pt2pt_accumulate_allocate (module, source, target, data, data_len, proc, acc_header->count,
 922                                             datatype, op, 1, &acc_data);
 923         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 924             break;
 925         }
 926 
 927         ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype,
 928                                         source, tag_to_origin(acc_header->tag), module->comm,
 929                                         accumulate_cb, acc_data);
 930         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 931             OBJ_RELEASE(acc_data);
 932         }
 933     } while (0);
 934 
 935     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 936         ompi_osc_pt2pt_accumulate_unlock (module);
 937     }
 938 
 939     return ret;
 940 }
 941 
 942 /**
 943  * ompi_osc_pt2pt_gacc_long_start:
 944  *
 945  * @brief Start a long accumulate + get operation.
 946  *
 947  * @param[in] module         - OSC PT2PT module
 948  * @param[in] source         - Source rank
 949  * @param[in] datatype       - Accumulation datatype
 950  * @param[in] acc_header     - Accumulate header
 951  *
 952  * The module's accumulation lock must be held before calling this
 953  * function. It will release the lock when the operation is complete.
 954  */
 955 static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
 956                                      ompi_osc_pt2pt_header_acc_t *acc_header)
 957 {
 958     void *target = (unsigned char*) module->baseptr +
 959         ((unsigned long) acc_header->displacement * module->disp_unit);
 960     struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
 961     struct osc_pt2pt_accumulate_data_t *acc_data;
 962     ompi_datatype_t *primitive_datatype;
 963     ompi_request_t *recv_request;
 964     uint32_t primitive_count;
 965     ompi_proc_t *proc;
 966     size_t buflen;
 967     void *buffer;
 968     int ret;
 969 
 970     proc = ompi_comm_peer_lookup(module->comm, source);
 971     assert (NULL != proc);
 972 
 973     /* allocate a temporary buffer to receive the accumulate data */
 974     buflen = datatype_buffer_length (datatype, acc_header->count);
 975 
 976     do {
 977         ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
 978         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 979             break;
 980         }
 981 
 982         primitive_count *= acc_header->count;
 983 
 984         buffer = malloc (buflen);
 985         if (OPAL_UNLIKELY(NULL == buffer)) {
 986             ret = OMPI_ERR_OUT_OF_RESOURCE;
 987             break;
 988         }
 989 
 990         ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
 991                                              datatype, op, 2, &acc_data);
 992         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 993             free (buffer);
 994             break;
 995         }
 996 
 997         ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype,
 998                                         source, tag_to_target(acc_header->tag), module->comm,
 999                                         &recv_request, accumulate_cb, acc_data);
1000         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1001             OBJ_RELEASE(acc_data);
1002             break;
1003         }
1004 
1005         ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype,
1006                                         source, tag_to_origin(acc_header->tag), module->comm,
1007                                         accumulate_cb, acc_data);
1008         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1009             /* cancel the receive and free the accumulate data */
1010             ompi_request_cancel (recv_request);
1011             OBJ_RELEASE(acc_data);
1012             break;
1013         }
1014     } while (0);
1015 
1016     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1017         ompi_osc_pt2pt_accumulate_unlock (module);
1018     }
1019 
1020     return ret;
1021 }
1022 
1023 /**
1024  * ompi_osc_pt2pt_cswap_start:
1025  *
1026  * @brief Start a compare and swap operation
1027  *
1028  * @param[in] module       - OSC PT2PT module
1029  * @param[in] source       - Source rank
1030  * @param[in] data         - Compare and swap data
1031  * @param[in] data_len     - Length of the compare and swap data. Must be exactly
1032  *                           twice the size of the datatype.
1033  * @param[in] datatype     - Compare and swap datatype
1034  * @param[in] cswap_header - Compare and swap header
1035  *
1036  * The module's accumulation lock must be held before calling this
1037  * function. It will release the lock when the operation is complete.
1038  */
1039 static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int source, void *data, ompi_datatype_t *datatype,
1040                                       ompi_osc_pt2pt_header_cswap_t *cswap_header)
1041 {
1042     void *target = (unsigned char*) module->baseptr +
1043         ((unsigned long) cswap_header->displacement * module->disp_unit);
1044     void *compare_addr, *origin_addr;
1045     size_t datatype_size;
1046     ompi_proc_t *proc;
1047     int ret;
1048 
1049     proc = ompi_comm_peer_lookup(module->comm, source);
1050     assert (NULL != proc);
1051 
1052     datatype_size = datatype->super.size;
1053 
1054     origin_addr  = data;
1055     compare_addr = (void *)((uintptr_t) data + datatype_size);
1056 
1057     do {
1058         /* no reason to do a non-blocking send here */
1059         ret = MCA_PML_CALL(send(target, 1, datatype, source, tag_to_origin(cswap_header->tag),
1060                                 MCA_PML_BASE_SEND_STANDARD, module->comm));
1061         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1062             break;
1063         }
1064 
1065         /* increment the incoming fragment count so it matches what is expected */
1066         mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL);
1067 
1068         if (0 == memcmp (target, compare_addr, datatype_size)) {
1069             osc_pt2pt_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype);
1070         }
1071     } while (0);
1072 
1073     ompi_osc_pt2pt_accumulate_unlock (module);
1074 
1075     return ret;
1076 }
1077 
1078 /**
1079  * ompi_osc_pt2pt_progress_pending_acc:
1080  *
1081  * @brief Progress one pending accumulation or compare and swap operation.
1082  *
1083  * @param[in] module   - OSC PT2PT module
1084  *
1085  * If the accumulation lock can be aquired progress one pending
1086  * accumulate or compare and swap operation.
1087  */
1088 int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
1089 {
1090     osc_pt2pt_pending_acc_t *pending_acc;
1091     int ret;
1092 
1093     /* try to aquire the lock. it will be unlocked when the accumulate or cswap
1094      * operation completes */
1095     if (ompi_osc_pt2pt_accumulate_trylock (module)) {
1096         return OMPI_SUCCESS;
1097     }
1098 
1099     OPAL_THREAD_LOCK(&module->pending_acc_lock);
1100     pending_acc = (osc_pt2pt_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
1101     OPAL_THREAD_UNLOCK(&module->pending_acc_lock);
1102     if (OPAL_UNLIKELY(NULL == pending_acc)) {
1103         /* called without any pending accumulation operations */
1104         ompi_osc_pt2pt_accumulate_unlock (module);
1105         return OMPI_SUCCESS;
1106     }
1107 
1108     switch (pending_acc->header.base.type) {
1109     case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
1110         ret = ompi_osc_pt2pt_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len,
1111                                        pending_acc->datatype, &pending_acc->header.acc);
1112         free (pending_acc->data);
1113         break;
1114     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1115         ret = ompi_osc_pt2pt_acc_long_start (module, pending_acc->source, pending_acc->datatype,
1116                                             &pending_acc->header.acc);
1117         break;
1118     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
1119         ret = ompi_osc_pt2pt_gacc_start (module, pending_acc->source, pending_acc->data,
1120                                         pending_acc->data_len, pending_acc->datatype,
1121                                         &pending_acc->header.acc);
1122         break;
1123     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1124         ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype,
1125                                         &pending_acc->header.acc);
1126         break;
1127     case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
1128         ret = ompi_osc_pt2pt_cswap_start (module, pending_acc->source, pending_acc->data,
1129                                          pending_acc->datatype, &pending_acc->header.cswap);
1130         break;
1131     default:
1132         ret = OMPI_ERROR;
1133         /* it is a coding error if this point is reached */
1134         assert (0);
1135     }
1136 
1137     /* signal that an operation is complete */
1138     mark_incoming_completion (module, pending_acc->active_target ? MPI_PROC_NULL : pending_acc->source);
1139 
1140     pending_acc->data = NULL;
1141     OBJ_RELEASE(pending_acc);
1142 
1143     return ret;
1144 }
1145 
1146 static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source,
1147                                ompi_osc_pt2pt_header_acc_t *acc_header)
1148 {
1149     bool active_target = !(acc_header->tag & 0x1);
1150     char *data = (char *) (acc_header + 1);
1151     struct ompi_datatype_t *datatype;
1152     uint64_t data_len;
1153     int ret;
1154 
1155     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1156                          "%d: process_acc: received message from %d",
1157                          ompi_comm_rank(module->comm),
1158                          source));
1159 
1160     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1161     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1162         return ret;
1163     }
1164 
1165     data_len = acc_header->len - ((char*) data - (char*) acc_header);
1166 
1167     /* try to aquire the accumulate lock */
1168     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1169         ret = ompi_osc_pt2pt_acc_start (module, source, data, data_len, datatype,
1170                                        acc_header);
1171     } else {
1172         /* couldn't aquire the accumulate lock so queue up the accumulate operation */
1173         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1174                                            source, data, data_len, datatype, active_target);
1175     }
1176 
1177     /* Release datatype & op */
1178     OMPI_DATATYPE_RELEASE(datatype);
1179 
1180     return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1181 }
1182 
1183 static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source,
1184                                     ompi_osc_pt2pt_header_acc_t* acc_header)
1185 {
1186     bool active_target = !(acc_header->tag & 0x1);
1187     char *data = (char *) (acc_header + 1);
1188     struct ompi_datatype_t *datatype;
1189     int ret;
1190 
1191     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1192                          "%d: process_acc_long: received message from %d",
1193                          ompi_comm_rank(module->comm),
1194                          source));
1195 
1196     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1197     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1198         return ret;
1199     }
1200 
1201     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1202         ret = ompi_osc_pt2pt_acc_long_start (module, source, datatype, acc_header);
1203     } else {
1204         /* queue the operation */
1205         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source,
1206                                            NULL, 0, datatype, active_target);
1207     }
1208 
1209     /* Release datatype & op */
1210     OMPI_DATATYPE_RELEASE(datatype);
1211 
1212     return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1213 }
1214 
1215 static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source,
1216                                   ompi_osc_pt2pt_header_acc_t *acc_header)
1217 {
1218     bool active_target = !(acc_header->tag & 0x1);
1219     char *data = (char *) (acc_header + 1);
1220     struct ompi_datatype_t *datatype;
1221     void *buffer = NULL;
1222     uint64_t data_len;
1223     ompi_proc_t * proc;
1224     int ret;
1225 
1226     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1227                          "%d: process_get_acc: received message from %d",
1228                          ompi_comm_rank(module->comm),
1229                          source));
1230 
1231     ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
1232     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1233         return ret;
1234     }
1235 
1236     data_len = acc_header->len - ((char*) data - (char*) acc_header);
1237 
1238     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1239         /* make a copy of the data since the buffer needs to be returned */
1240         if (data_len) {
1241             ompi_datatype_t *primitive_datatype = NULL;
1242             uint32_t primitive_count;
1243             buffer = malloc (data_len);
1244             if (OPAL_UNLIKELY(NULL == buffer)) {
1245                 OMPI_DATATYPE_RELEASE(datatype);
1246                 return OMPI_ERR_OUT_OF_RESOURCE;
1247             }
1248 
1249             ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
1250             primitive_count *= acc_header->count;
1251 
1252             osc_pt2pt_copy_on_recv (buffer, data, data_len, proc, primitive_count, primitive_datatype);
1253         }
1254 
1255         ret = ompi_osc_pt2pt_gacc_start (module, source, buffer, data_len, datatype,
1256                                         acc_header);
1257     } else {
1258         /* queue the operation */
1259         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1260                                            source, data, data_len, datatype, active_target);
1261     }
1262 
1263     /* Release datatype & op */
1264     OMPI_DATATYPE_RELEASE(datatype);
1265 
1266     return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1267 }
1268 
1269 static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int source,
1270                                        ompi_osc_pt2pt_header_acc_t *acc_header)
1271 {
1272     bool active_target = !(acc_header->tag & 0x1);
1273     char *data = (char *) (acc_header + 1);
1274     struct ompi_datatype_t *datatype;
1275     int ret;
1276 
1277     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1278                          "%d: process_acc: received message from %d",
1279                          ompi_comm_rank(module->comm),
1280                          source));
1281 
1282     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1283     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1284         return ret;
1285     }
1286 
1287     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1288         ret = ompi_osc_gacc_long_start (module, source, datatype, acc_header);
1289     } else {
1290         /* queue the operation */
1291         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1292                                            source, NULL, 0, datatype, active_target);
1293     }
1294 
1295     /* Release datatype & op */
1296     OMPI_DATATYPE_RELEASE(datatype);
1297 
1298     return OMPI_SUCCESS == ret ? (int) acc_header->len : ret;
1299 }
1300 
1301 
1302 static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source,
1303                                  ompi_osc_pt2pt_header_cswap_t *cswap_header)
1304 {
1305     bool active_target = !(cswap_header->tag & 0x1);
1306     char *data = (char*) (cswap_header + 1);
1307     struct ompi_datatype_t *datatype;
1308     int ret;
1309 
1310     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1311                          "%d: process_cswap: received message from %d",
1312                          ompi_comm_rank(module->comm),
1313                          source));
1314 
1315     ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1316     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1317         return ret;
1318     }
1319 
1320     if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1321         ret = ompi_osc_pt2pt_cswap_start (module, source, data, datatype, cswap_header);
1322     } else {
1323         /* queue the operation */
1324         ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) cswap_header, source,
1325                                            data, 2 * datatype->super.size, datatype, active_target);
1326     }
1327 
1328     /* Release datatype */
1329     OMPI_DATATYPE_RELEASE(datatype);
1330 
1331     return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret;
1332 }
1333 
1334 static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
1335                                     ompi_osc_pt2pt_header_complete_t *complete_header)
1336 {
1337     /* the current fragment is not part of the frag_count so we need to add it here */
1338     osc_pt2pt_incoming_complete (module, source, complete_header->frag_count + 1);
1339 
1340     return sizeof (*complete_header);
1341 }
1342 
1343 /* flush and unlock headers cannot be processed from the request callback
1344  * because some btls do not provide re-entrant progress functions. these
1345  * fragment will be progressed by the pt2pt component's progress function */
1346 static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
1347                                  ompi_osc_pt2pt_header_flush_t *flush_header)
1348 {
1349     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
1350     int ret;
1351 
1352     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1353                          "process_flush header = {.frag_count = %d}", flush_header->frag_count));
1354 
1355     /* increase signal count by incoming frags */
1356     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count);
1357 
1358     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1359                          "%d: process_flush: received message from %d. passive_incoming_frag_count = %d",
1360                          ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count));
1361 
1362     ret = ompi_osc_pt2pt_process_flush (module, source, flush_header);
1363     if (OMPI_SUCCESS != ret) {
1364         ompi_osc_pt2pt_pending_t *pending;
1365 
1366         pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
1367         pending->module = module;
1368         pending->source = source;
1369         pending->header.flush = *flush_header;
1370 
1371         osc_pt2pt_add_pending (pending);
1372     }
1373 
1374     /* signal incomming will increment this counter */
1375     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
1376 
1377     return sizeof (*flush_header);
1378 }
1379 
1380 static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
1381                                   ompi_osc_pt2pt_header_unlock_t *unlock_header)
1382 {
1383     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
1384     int ret;
1385 
1386     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1387                          "process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
1388 
1389     /* increase signal count by incoming frags */
1390     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count);
1391 
1392     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
1393                          "osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d",
1394                          source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count));
1395 
1396     ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header);
1397     if (OMPI_SUCCESS != ret) {
1398         ompi_osc_pt2pt_pending_t *pending;
1399 
1400         pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
1401         pending->module = module;
1402         pending->source = source;
1403         pending->header.unlock = *unlock_header;
1404 
1405         osc_pt2pt_add_pending (pending);
1406     }
1407 
1408     /* signal incoming will increment this counter */
1409     OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
1410 
1411     return sizeof (*unlock_header);
1412 }
1413 
1414 static int process_large_datatype_request_cb (ompi_request_t *request)
1415 {
1416     ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer = (ompi_osc_pt2pt_ddt_buffer_t *) request->req_complete_cb_data;
1417     ompi_osc_pt2pt_module_t *module = ddt_buffer->module;
1418     ompi_osc_pt2pt_header_t *header = ddt_buffer->header;
1419     int source = ddt_buffer->source;
1420 
1421     /* process the request */
1422     switch (header->base.type) {
1423     case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1424         (void) process_put_long (module, source, &header->put);
1425         break;
1426     case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1427         (void) process_get (module, source, &header->get);
1428         break;
1429     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1430         (void) process_acc_long (module, source, &header->acc);
1431         break;
1432     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1433         (void) process_get_acc_long (module, source, &header->acc);
1434         break;
1435     default:
1436         /* developer error */
1437         assert (0);
1438         return OMPI_ERROR;
1439     }
1440 
1441     /* free the datatype buffer */
1442     osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);
1443 
1444     ompi_request_free (&request);
1445     return 1;
1446 }
1447 
1448 /**
1449  * @short process a request with a large datatype
1450  *
1451  * @param[in] module - OSC PT2PT module
1452  * @param[in] source - header source
1453  * @param[in] header - header to process
1454  *
1455  * It is possible to construct datatypes whos description is too large
1456  * to fit in an OSC PT2PT fragment. In this case the remote side posts
1457  * a send of the datatype description. This function posts the matching
1458  * receive and processes the header on completion.
1459  */
1460 static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_t *header)
1461 {
1462     ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer;
1463     int header_len, tag, ret;
1464     uint64_t ddt_len;
1465 
1466     /* determine the header size and receive tag */
1467     switch (header->base.type) {
1468     case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1469         header_len = sizeof (header->put);
1470         tag = header->put.tag;
1471         break;
1472     case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1473         header_len = sizeof (header->get);
1474         tag = header->get.tag;
1475         break;
1476     case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1477         header_len = sizeof (header->acc);
1478         tag = header->acc.tag;
1479         break;
1480     case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1481         header_len = sizeof (header->acc);
1482         tag = header->acc.tag;
1483         break;
1484     default:
1485         /* developer error */
1486         opal_output (0, "Unsupported header/flag combination");
1487         return OMPI_ERROR;
1488     }
1489 
1490     ddt_len = *((uint64_t *)((uintptr_t) header + header_len));
1491 
1492     OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
1493                          "process_large_datatype_request: processing fragment with type %d. ddt_len %lu",
1494                          header->base.type, (unsigned long) ddt_len));
1495 
1496     ddt_buffer = OBJ_NEW(ompi_osc_pt2pt_ddt_buffer_t);
1497     if (OPAL_UNLIKELY(NULL == ddt_buffer)) {
1498         return OMPI_ERR_OUT_OF_RESOURCE;
1499     }
1500 
1501     ddt_buffer->module = module;
1502     ddt_buffer->source = source;
1503 
1504     ddt_buffer->header = malloc (ddt_len + header_len);
1505     if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) {
1506         OBJ_RELEASE(ddt_buffer);
1507         return OMPI_ERR_OUT_OF_RESOURCE;
1508     }
1509 
1510     memcpy (ddt_buffer->header, header, header_len);
1511 
1512     ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
1513                                     ddt_len, MPI_BYTE,
1514                                     source, tag_to_target(tag), module->comm,
1515                                     NULL, process_large_datatype_request_cb, ddt_buffer);
1516     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1517         OBJ_RELEASE(ddt_buffer);
1518         return ret;
1519     }
1520 
1521     return header_len + 8;
1522 }
1523 
1524 /*
1525  * Do all the data movement associated with a fragment
1526  */
1527 static inline int process_frag (ompi_osc_pt2pt_module_t *module,
1528                                 ompi_osc_pt2pt_frag_header_t *frag)
1529 {
1530     ompi_osc_pt2pt_header_t *header;
1531     int ret;
1532 
1533     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1534                          "osc pt2pt: process_frag: from %d, ops %d",
1535                          (int) frag->source, (int) frag->num_ops));
1536 
1537     header = (ompi_osc_pt2pt_header_t *) (frag + 1);
1538 
1539     for (int i = 0 ; i < frag->num_ops ; ++i) {
1540         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1541                              "osc pt2pt: process_frag: type 0x%x. flag 0x%x. offset %u",
1542                              header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag),
1543                              header->base.flags));
1544 
1545         if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE))) {
1546             osc_pt2pt_ntoh(header);
1547             switch (header->base.type) {
1548             case OMPI_OSC_PT2PT_HDR_TYPE_PUT:
1549                 ret = process_put(module, frag->source, &header->put);
1550                 break;
1551             case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1552                 ret = process_put_long(module, frag->source, &header->put);
1553                 break;
1554 
1555             case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
1556                 ret = process_acc(module, frag->source, &header->acc);
1557                 break;
1558             case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1559                 ret = process_acc_long (module, frag->source, &header->acc);
1560                 break;
1561 
1562             case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
1563                 ret = process_unlock(module, frag->source, &header->unlock);
1564                 break;
1565 
1566             case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1567                 ret = process_get (module, frag->source, &header->get);
1568                 break;
1569 
1570             case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
1571                 ret = process_cswap (module, frag->source, &header->cswap);
1572                 break;
1573 
1574             case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
1575                 ret = process_get_acc (module, frag->source, &header->acc);
1576                 break;
1577 
1578             case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1579                 ret = process_get_acc_long (module, frag->source, &header->acc);
1580                 break;
1581 
1582             case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
1583                 ret = process_flush (module, frag->source, &header->flush);
1584                 break;
1585 
1586             case OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE:
1587                 ret = process_complete (module, frag->source, &header->complete);
1588                 break;
1589 
1590             default:
1591                 opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
1592                 abort(); /* FIX ME */
1593             }
1594         } else {
1595             ret = process_large_datatype_request (module, frag->source, header);
1596         }
1597 
1598         if (ret <= 0) {
1599             opal_output(0, "Error processing fragment: %d", ret);
1600             abort(); /* FIX ME */
1601         }
1602 
1603         /* the next header will start on an 8-byte boundary. this is done to ensure
1604          * that the next header and the packed datatype is properly aligned */
1605         header = (ompi_osc_pt2pt_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t);
1606     }
1607 
1608     return OMPI_SUCCESS;
1609 }
1610 
1611 /* dispatch for callback on message completion */
1612 static int ompi_osc_pt2pt_callback (ompi_request_t *request)
1613 {
1614     ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) request->req_complete_cb_data;
1615 
1616     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "received pt2pt fragment"));
1617 
1618     /* to avoid deep recursion from complet -> start -> complete -> ... we simply put this
1619      * request on a list and let it be processed by opal_progress(). */
1620     OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1621     opal_list_append (&mca_osc_pt2pt_component.pending_receives, &recv->super);
1622     OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1623 
1624     return OMPI_SUCCESS;
1625 }
1626 
1627 static int ompi_osc_pt2pt_receive_repost (ompi_osc_pt2pt_receive_t *recv)
1628 {
1629     /* wait until the request has been marked as complete */
1630     ompi_request_wait_completion (recv->pml_request);
1631 
1632     /* ompi_request_complete clears the callback */
1633     recv->pml_request->req_complete_cb = ompi_osc_pt2pt_callback;
1634     recv->pml_request->req_complete_cb_data = (void *) recv;
1635 
1636     return MCA_PML_CALL(start(1, &recv->pml_request));
1637 }
1638 
1639 int ompi_osc_pt2pt_process_receive (ompi_osc_pt2pt_receive_t *recv)
1640 {
1641     ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) recv->module;
1642     ompi_osc_pt2pt_header_t *base_header = (ompi_osc_pt2pt_header_t *) recv->buffer;
1643     size_t incoming_length = recv->pml_request->req_status._ucount;
1644     int source = recv->pml_request->req_status.MPI_SOURCE;
1645     int rc __opal_attribute_unused__;
1646 
1647     assert(incoming_length >= sizeof(ompi_osc_pt2pt_header_base_t));
1648     (void)incoming_length;  // silence compiler warning
1649 
1650     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1651                          "received pt2pt callback for fragment. source = %d, count = %u, type = 0x%x",
1652                          source, (unsigned) incoming_length, base_header->base.type));
1653 
1654     osc_pt2pt_ntoh(base_header);
1655     switch (base_header->base.type) {
1656     case OMPI_OSC_PT2PT_HDR_TYPE_FRAG:
1657         process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header);
1658 
1659         /* only data fragments should be included in the completion counters */
1660         mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ?
1661                                   source : MPI_PROC_NULL);
1662         break;
1663     case OMPI_OSC_PT2PT_HDR_TYPE_POST:
1664         osc_pt2pt_incoming_post (module, source);
1665         break;
1666     case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ:
1667         ompi_osc_pt2pt_process_lock(module, source, (ompi_osc_pt2pt_header_lock_t *) base_header);
1668         break;
1669     case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK:
1670         ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header);
1671         break;
1672     case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK:
1673         ompi_osc_pt2pt_process_flush_ack (module, source, (ompi_osc_pt2pt_header_flush_ack_t *) base_header);
1674         break;
1675     case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK:
1676         ompi_osc_pt2pt_process_unlock_ack (module, source, (ompi_osc_pt2pt_header_unlock_ack_t *) base_header);
1677         break;
1678     default:
1679         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1680                              "received unexpected message of type %x",
1681                              (int) base_header->base.type));
1682     }
1683 
1684     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1685                          "finished processing incoming messages"));
1686 
1687     osc_pt2pt_gc_clean (module);
1688 
1689     rc = ompi_osc_pt2pt_receive_repost (recv);
1690 
1691     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1692                          "finished posting receive request. rc: %d", rc));
1693 
1694     return OMPI_SUCCESS;
1695 }
1696 
1697 int ompi_osc_pt2pt_frag_start_receive (ompi_osc_pt2pt_module_t *module)
1698 {
1699     int rc;
1700 
1701     module->recv_frag_count = mca_osc_pt2pt_component.receive_count;
1702     if (0 == module->recv_frag_count) {
1703         module->recv_frag_count = 1;
1704     }
1705 
1706     module->recv_frags = malloc (sizeof (module->recv_frags[0]) * module->recv_frag_count);
1707     if (NULL == module->recv_frags) {
1708         return OMPI_ERR_OUT_OF_RESOURCE;
1709     }
1710 
1711     for (unsigned int i = 0 ; i < module->recv_frag_count ; ++i) {
1712         OBJ_CONSTRUCT(module->recv_frags + i, ompi_osc_pt2pt_receive_t);
1713         module->recv_frags[i].module = module;
1714         module->recv_frags[i].buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
1715         if (NULL == module->recv_frags[i].buffer) {
1716             return OMPI_ERR_OUT_OF_RESOURCE;
1717         }
1718 
1719         rc = ompi_osc_pt2pt_irecv_w_cb (module->recv_frags[i].buffer, mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t),
1720                                         MPI_BYTE, OMPI_ANY_SOURCE, OSC_PT2PT_FRAG_TAG, module->comm, &module->recv_frags[i].pml_request,
1721                                         ompi_osc_pt2pt_callback, module->recv_frags + i);
1722         if (OMPI_SUCCESS != rc) {
1723             return rc;
1724         }
1725     }
1726 
1727     return OMPI_SUCCESS;
1728 }
1729 
1730 int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf,
1731                                    size_t count, struct ompi_datatype_t *datatype,
1732                                    int src, int tag, struct ompi_communicator_t *comm)
1733 {
1734     return ompi_osc_pt2pt_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL,
1735                                      osc_pt2pt_incoming_req_complete, module);
1736 }
1737 
1738 int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
1739                               ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
1740 {
1741     ompi_request_t *request;
1742     int ret;
1743 
1744     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1745                          "osc pt2pt: ompi_osc_pt2pt_isend_w_cb sending %d bytes to %d with tag %d",
1746                          count, target, tag));
1747 
1748     ret = MCA_PML_CALL(isend_init((void *)ptr, count, datatype, target, tag,
1749                                   MCA_PML_BASE_SEND_STANDARD, comm, &request));
1750     if (OMPI_SUCCESS != ret) {
1751         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1752                              "error sending fragment. ret = %d", ret));
1753         return ret;
1754     }
1755 
1756     request->req_complete_cb = cb;
1757     request->req_complete_cb_data = ctx;
1758 
1759     ret = MCA_PML_CALL(start(1, &request));
1760 
1761     return ret;
1762 }
1763 
1764 int ompi_osc_pt2pt_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
1765                               ompi_communicator_t *comm, ompi_request_t **request_out,
1766                               ompi_request_complete_fn_t cb, void *ctx)
1767 {
1768     ompi_request_t *dummy;
1769     int ret;
1770 
1771     if (NULL == request_out) {
1772         request_out = &dummy;
1773     }
1774 
1775     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1776                          "osc pt2pt: ompi_osc_pt2pt_irecv_w_cb receiving %d bytes from %d with tag %d",
1777                          count, target, tag));
1778 
1779     ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, request_out));
1780     if (OMPI_SUCCESS != ret) {
1781         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1782                              "error posting receive. ret = %d", ret));
1783         return ret;
1784     }
1785 
1786     (*request_out)->req_complete_cb = cb;
1787     (*request_out)->req_complete_cb_data = ctx;
1788 
1789     ret = MCA_PML_CALL(start(1, request_out));
1790 
1791     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1792                          "osc pt2pt: pml start returned %d", ret));
1793 
1794     return ret;
1795 }

/* [<][>][^][v][top][bottom][index][help] */