root/ompi/mca/vprotocol/pessimist/vprotocol_pessimist_sender_based.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. __SENDER_BASED_METHOD_COPY
  2. __SENDER_BASED_METHOD_COPY
  3. vprotocol_pessimist_sb_progress_req
  4. vprotocol_pessimist_sb_progress_all_reqs
  5. __SENDER_BASED_METHOD_FLUSH
  6. vprotocol_pessimist_sender_based_copy_start

   1 /*
   2  * Copyright (c) 2004-2007 The Trustees of the University of Tennessee.
   3  *                         All rights reserved.
   4  * $COPYRIGHT$
   5  *
   6  * Additional copyrights may follow
   7  *
   8  * $HEADER$
   9  */
  10 
  11 #ifndef __VPROTOCOL_PESSIMIST_SENDERBASED_H__
  12 #define __VPROTOCOL_PESSIMIST_SENDERBASED_H__
  13 
  14 #include "ompi_config.h"
  15 #include "ompi/mca/pml/base/pml_base_sendreq.h"
  16 #include "ompi/mca/pml/v/pml_v_output.h"
  17 #include "vprotocol_pessimist_sender_based_types.h"
  18 #include "vprotocol_pessimist_request.h"
  19 #include "vprotocol_pessimist.h"
  20 
  21 BEGIN_C_DECLS
  22 
  23 /** Prepare for using the sender based storage
  24   */
  25 int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size);
  26 
  27 /** Cleanup mmap etc
  28   */
  29 void vprotocol_pessimist_sender_based_finalize(void);
  30 
  31 /** Manage mmap floating window, allocating enough memory for the message to be
  32   * asynchronously copied to disk.
  33   */
  34 void vprotocol_pessimist_sender_based_alloc(size_t len);
  35 
  36 
  37 /*******************************************************************************
  38  * Convertor pack (blocking) method (good latency, bad bandwidth)
  39  */
  40 #if defined(SB_USE_PACK_METHOD)
  41 static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *pmlreq)
  42 {
  43     if(0 != pmlreq->req_bytes_packed)
  44     {
  45         opal_convertor_t conv;
  46         size_t max_data;
  47         size_t zero = 0;
  48         unsigned int iov_count = 1;
  49         struct iovec iov;
  50 
  51         max_data = iov.iov_len = pmlreq->req_bytes_packed;
  52         iov.iov_base = (IOVBASE_TYPE *) VPESSIMIST_SEND_FTREQ(pmlreq)->sb.cursor;
  53         opal_convertor_clone_with_position( &pmlreq->req_base.req_convertor,
  54                                             &conv, 0, &zero );
  55         opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
  56     }
  57 }
  58 
  59 #define __SENDER_BASED_METHOD_FLUSH(REQ)
  60 
  61 
  62 /*******************************************************************************
  63  * Convertor replacement (non blocking) method (under testing)
  64  */
  65 #elif defined(SB_USE_CONVERTOR_METHOD)
  66 int32_t vprotocol_pessimist_sender_based_convertor_advance(opal_convertor_t*,
  67                                                             struct iovec*,
  68                                                             uint32_t*,
  69                                                             size_t*);
  70 
  71 #define __SENDER_BASED_METHOD_COPY(REQ) do {                                  \
  72     opal_convertor_t *pConv;                                                  \
  73     mca_vprotocol_pessimist_send_request_t *ftreq;                            \
  74                                                                               \
  75     pConv = & (REQ)->req_base.req_convertor;                                  \
  76     ftreq = VPESSIMIST_SEND_FTREQ(REQ);                                       \
  77     ftreq->sb.conv_flags = pConv->flags;                                      \
  78     ftreq->sb.conv_advance = pConv->fAdvance;                                 \
  79                                                                               \
  80     pConv->flags &= ~CONVERTOR_NO_OP;                                         \
  81     pConv->fAdvance = vprotocol_pessimist_sender_based_convertor_advance;     \
  82 } while(0)
  83 
  84 #define __SENDER_BASED_METHOD_FLUSH(REQ)
  85 
  86 #define VPESSIMIST_CONV_REQ(CONV) ((mca_vprotocol_pessimist_send_request_t *) \
  87     (mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset +       \
  88      (uintptr_t) ((CONV)->clone_of)))
  89 
  90 
  91 /*******************************************************************************
  92  * progress method
  93  */
  94 #elif defined(SB_USE_PROGRESS_METHOD)
  95 static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *req)
  96 {
  97     if(req->req_bytes_packed)
  98     {
  99         mca_vprotocol_pessimist_send_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
 100         ftreq->sb.bytes_progressed = 0;
 101         opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
 102                          &ftreq->list_item);
 103     }
 104 }
 105 
 106 static inline int vprotocol_pessimist_sb_progress_req(mca_pml_base_send_request_t *req)
 107 {
 108     mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
 109     size_t max_data = 0;
 110 
 111     if(ftreq->sb.bytes_progressed < req->req_bytes_packed)
 112     {
 113         opal_convertor_t conv;
 114         unsigned int iov_count = 1;
 115         struct iovec iov;
 116         uintptr_t position = ftreq->sb.bytes_progressed;
 117         max_data = req->req_bytes_packed - ftreq->sb.bytes_progressed;
 118         iov.iov_len = max_data;
 119         iov.iov_base = (IOVBASE_TYPE *) (ftreq->sb.cursor + position);
 120 
 121         V_OUTPUT_VERBOSE(80, "pessimist:\tsb\tprgress\t%"PRIpclock"\tsize %lu from position %lu", ftreq->reqid, max_data, position);
 122         opal_convertor_clone_with_position(&req->req_base.req_convertor,
 123                                            &conv, 0, &position );
 124         opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
 125         ftreq->sb.bytes_progressed += max_data;
 126     }
 127     return max_data;
 128 }
 129 
 130 static inline int vprotocol_pessimist_sb_progress_all_reqs(void)
 131 {
 132     int ret = 0;
 133 
 134     /* progress any waiting Sender Based copy */
 135     if(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq))
 136     {
 137         mca_vprotocol_pessimist_request_t *ftreq = (mca_vprotocol_pessimist_request_t *)
 138             opal_list_remove_first(&mca_vprotocol_pessimist.sender_based.sb_sendreq);
 139         if(vprotocol_pessimist_sb_progress_req(VPROTOCOL_SEND_REQ(ftreq)))
 140             ret = 1;
 141         opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
 142                          &ftreq->list_item);
 143     }
 144     return ret;
 145 }
 146 
 147 static inline void __SENDER_BASED_METHOD_FLUSH(ompi_request_t *req)
 148 {
 149     mca_pml_base_send_request_t *pmlreq = (mca_pml_base_send_request_t *) req;
 150 
 151     if((pmlreq->req_base.req_type == MCA_PML_REQUEST_SEND) &&
 152        pmlreq->req_bytes_packed)
 153     {
 154         mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
 155         assert(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq));
 156         opal_list_remove_item(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
 157                               (opal_list_item_t *) ftreq);
 158         vprotocol_pessimist_sb_progress_req(pmlreq);
 159         assert(pmlreq->req_bytes_packed == ftreq->sb.bytes_progressed);
 160     }
 161 }
 162 
 163 #endif /* SB_USE_*_METHOD */
 164 
 165 
 166 /** Copy data associated to a pml_base_send_request_t to the sender based
 167  * message payload buffer
 168  */
 169 static inline void vprotocol_pessimist_sender_based_copy_start(ompi_request_t *req)
 170 {
 171     vprotocol_pessimist_sender_based_header_t *sbhdr;
 172     mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
 173     mca_pml_base_send_request_t *pmlreq = (mca_pml_base_send_request_t *) req;
 174 
 175     /* Allocate enough sender-based space to hold the message */
 176     if(mca_vprotocol_pessimist.sender_based.sb_available <
 177             pmlreq->req_bytes_packed +
 178             sizeof(vprotocol_pessimist_sender_based_header_t))
 179     {
 180         vprotocol_pessimist_sender_based_alloc(pmlreq->req_bytes_packed);
 181     }
 182 
 183     /* Copy message header to the sender-based space */
 184     /* /!\ This is NOT thread safe */
 185     ftreq->sb.cursor = mca_vprotocol_pessimist.sender_based.sb_cursor;
 186 #if 1
 187     mca_vprotocol_pessimist.sender_based.sb_cursor +=
 188         sizeof(vprotocol_pessimist_sender_based_header_t) +
 189         pmlreq->req_bytes_packed;
 190     mca_vprotocol_pessimist.sender_based.sb_available -=
 191         sizeof(vprotocol_pessimist_sender_based_header_t) +
 192         pmlreq->req_bytes_packed;
 193 #endif
 194     sbhdr = (vprotocol_pessimist_sender_based_header_t *) ftreq->sb.cursor;
 195     sbhdr->size = pmlreq->req_bytes_packed;
 196     sbhdr->dst = pmlreq->req_base.req_peer;
 197     sbhdr->tag = pmlreq->req_base.req_tag;
 198     sbhdr->contextid = pmlreq->req_base.req_comm->c_contextid;
 199     sbhdr->sequence = pmlreq->req_base.req_sequence;
 200     ftreq->sb.cursor += sizeof(vprotocol_pessimist_sender_based_header_t);
 201     V_OUTPUT_VERBOSE(70, "pessimist:\tsb\tsend\t%"PRIpclock"\tsize %lu (+%lu header)", VPESSIMIST_FTREQ(req)->reqid, (long unsigned)pmlreq->req_bytes_packed, (long unsigned)sizeof(vprotocol_pessimist_sender_based_header_t));
 202 
 203     /* Use one of the previous data copy method */
 204     __SENDER_BASED_METHOD_COPY(pmlreq);
 205 }
 206 
 207 /** Ensure sender based is finished before allowing user to touch send buffer
 208  */
 209 #define vprotocol_pessimist_sender_based_flush(REQ) __SENDER_BASED_METHOD_FLUSH(REQ)
 210 
 211 END_C_DECLS
 212 
 213 #endif
 214 

/* [<][>][^][v][top][bottom][index][help] */