root/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_osc_pt2pt_frag_constructor
  2. frag_send_cb
  3. frag_send
  4. ompi_osc_pt2pt_frag_start
  5. ompi_osc_pt2pt_flush_active_frag
  6. ompi_osc_pt2pt_frag_flush_pending
  7. ompi_osc_pt2pt_frag_flush_pending_all
  8. ompi_osc_pt2pt_frag_flush_target
  9. ompi_osc_pt2pt_frag_flush_all

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2012-2013 Sandia National Laboratories.  All rights reserved.
   4  * Copyright (c) 2014-2018 Los Alamos National Security, LLC. All rights
   5  *                         reserved.
   6  * Copyright (c) 2015      Research Organization for Information Science
   7  *                         and Technology (RIST). All rights reserved.
   8  * Copyright (c) 2017-2018 Cisco Systems, Inc.  All rights reserved
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 
  16 #include "osc_pt2pt.h"
  17 #include "osc_pt2pt_frag.h"
  18 #include "osc_pt2pt_data_move.h"
  19 
  20 static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag)
  21 {
  22     frag->buffer = frag->super.ptr;
  23 }
  24 
  25 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, opal_free_list_item_t,
  26                    ompi_osc_pt2pt_frag_constructor, NULL);
  27 
  28 static int frag_send_cb (ompi_request_t *request)
  29 {
  30     ompi_osc_pt2pt_frag_t *frag =
  31         (ompi_osc_pt2pt_frag_t*) request->req_complete_cb_data;
  32     ompi_osc_pt2pt_module_t *module = frag->module;
  33 
  34     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
  35                          "osc pt2pt: frag_send complete to %d, frag = %p, request = %p",
  36                          frag->target, (void *) frag, (void *) request));
  37 
  38     mark_outgoing_completion(module);
  39     opal_free_list_return (&mca_osc_pt2pt_component.frags, &frag->super);
  40 
  41     ompi_request_free (&request);
  42 
  43     return 1;
  44 }
  45 
  46 static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag)
  47 {
  48     int count;
  49 
  50     count = (int)((uintptr_t) frag->top - (uintptr_t) frag->buffer);
  51 
  52     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
  53                          "osc pt2pt: frag_send called to %d, frag = %p, count = %d",
  54                          frag->target, (void *) frag, count));
  55 
  56     OSC_PT2PT_HTON(frag->header, module, frag->target);
  57     return ompi_osc_pt2pt_isend_w_cb (frag->buffer, count, MPI_BYTE, frag->target, OSC_PT2PT_FRAG_TAG,
  58                                      module->comm, frag_send_cb, frag);
  59 }
  60 
  61 
  62 int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module,
  63                                ompi_osc_pt2pt_frag_t *frag)
  64 {
  65     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, frag->target);
  66     int ret;
  67 
  68     assert(0 == frag->pending && peer->active_frag != (intptr_t) frag);
  69 
  70     /* we need to signal now that a frag is outgoing to ensure the count sent
  71      * with the unlock message is correct */
  72     ompi_osc_signal_outgoing (module, frag->target, 1);
  73 
  74     /* if eager sends are not active, can't send yet, so buffer and
  75        get out... */
  76     if (!ompi_osc_pt2pt_peer_sends_active (module, frag->target) || opal_list_get_size (&peer->queued_frags)) {
  77         OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "queuing fragment to peer %d",
  78                              frag->target));
  79         OPAL_THREAD_SCOPED_LOCK(&peer->lock,
  80                                 opal_list_append(&peer->queued_frags, (opal_list_item_t *) frag));
  81         return OMPI_SUCCESS;
  82     }
  83 
  84     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "sending fragment to peer %d",
  85                          frag->target));
  86 
  87     ret = frag_send(module, frag);
  88 
  89     opal_condition_broadcast(&module->cond);
  90 
  91     return ret;
  92 }
  93 
  94 static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_peer_t *peer)
  95 {
  96     ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
  97     int ret = OMPI_SUCCESS;
  98 
  99     if (NULL == active_frag) {
 100         /* nothing to do */
 101         return OMPI_SUCCESS;
 102     }
 103 
 104     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 105                          "osc pt2pt: flushing active fragment to target %d. pending: %d",
 106                          active_frag->target, active_frag->pending));
 107 
 108     if (opal_atomic_compare_exchange_strong_ptr (&peer->active_frag, (intptr_t *) &active_frag, 0)) {
 109         if (0 != OPAL_THREAD_ADD_FETCH32(&active_frag->pending, -1)) {
 110             /* communication going on while synchronizing; this is an rma usage bug */
 111             return OMPI_ERR_RMA_SYNC;
 112         }
 113 
 114         ompi_osc_signal_outgoing (module, active_frag->target, 1);
 115         ret = frag_send (module, active_frag);
 116     }
 117 
 118     return ret;
 119 }
 120 
 121 int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target)
 122 {
 123     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
 124     ompi_osc_pt2pt_frag_t *frag;
 125     int ret = OMPI_SUCCESS;
 126 
 127     /* walk through the pending list and send */
 128     OPAL_THREAD_LOCK(&peer->lock);
 129     while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
 130         ret = frag_send(module, frag);
 131         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 132             break;
 133         }
 134     }
 135     OPAL_THREAD_UNLOCK(&peer->lock);
 136 
 137     return ret;
 138 }
 139 
 140 int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module)
 141 {
 142     int ret = OPAL_SUCCESS;
 143 
 144     for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
 145         ret = ompi_osc_pt2pt_frag_flush_pending (module, i);
 146         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
 147             return ret;
 148         }
 149     }
 150 
 151     return ret;
 152 }
 153 
 154 int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
 155 {
 156     ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
 157     int ret = OMPI_SUCCESS;
 158 
 159     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 160                          "osc pt2pt: frag flush to target target %d. queue fragments: %lu",
 161                          target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
 162 
 163     ret = ompi_osc_pt2pt_frag_flush_pending (module, target);
 164     if (OMPI_SUCCESS != ret) {
 165         /* XXX -- TODO -- better error handling */
 166         return ret;
 167     }
 168 
 169 
 170     /* flush the active frag */
 171     ret = ompi_osc_pt2pt_flush_active_frag (module, peer);
 172 
 173     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 174                          "osc pt2pt: frag flush target %d finished", target));
 175 
 176     return ret;
 177 }
 178 
 179 int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
 180 {
 181     int ret = OMPI_SUCCESS;
 182 
 183     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 184                          "osc pt2pt: frag flush all begin"));
 185 
 186     /* try to start frags queued to all peers */
 187     for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
 188         ret = ompi_osc_pt2pt_frag_flush_target (module, i);
 189         if (OMPI_SUCCESS != ret) {
 190             break;
 191         }
 192     }
 193 
 194     OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
 195                          "osc pt2pt: frag flush all done. ret: %d", ret));
 196 
 197     return ret;
 198 }

/* [<][>][^][v][top][bottom][index][help] */