root/ompi/mca/vprotocol/pessimist/vprotocol_pessimist_sender_based.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. sb_mmap_file_open
  2. sb_mmap_file_close
  3. sb_mmap_alloc
  4. sb_mmap_free
  5. vprotocol_pessimist_sender_based_init
  6. vprotocol_pessimist_sender_based_finalize
  7. vprotocol_pessimist_sender_based_alloc
  8. vprotocol_pessimist_sender_based_convertor_advance

   1 /*
   2  * Copyright (c) 2004-2014 The Trustees of the University of Tennessee.
   3  *                         All rights reserved.
   4  * Copyright (c) 2018      Amazon.com, Inc. or its affiliates.  All Rights reserved.
   5  * $COPYRIGHT$
   6  *
   7  * Additional copyrights may follow
   8  *
   9  * $HEADER$
  10  */
  11 
  12 #include "ompi_config.h"
  13 #include "vprotocol_pessimist_sender_based.h"
  14 #include <sys/types.h>
  15 #if defined(HAVE_SYS_MMAN_H)
  16 #include <sys/mman.h>
  17 #endif  /* defined(HAVE_SYS_MMAN_H) */
  18 #if defined(HAVE_UNISTD_H)
  19 #include <unistd.h>
  20 #endif
  21 #include <string.h>
  22 #include <errno.h>
  23 #include "opal/datatype/opal_datatype_memcpy.h"
  24 #include "opal/util/printf.h"
  25 #include <fcntl.h>
  26 
  27 #define sb mca_vprotocol_pessimist.sender_based
  28 
  29 static int sb_mmap_file_open(const char *path)
  30 {
  31     sb.sb_fd = open(path, O_CREAT | O_TRUNC | O_RDWR, 0600);
  32     if(-1 == sb.sb_fd)
  33     {
  34         V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_init: open (%s): %s",
  35                      path, strerror(errno));
  36         return OPAL_ERR_FILE_OPEN_FAILURE;
  37     }
  38     return OPAL_SUCCESS;
  39 }
  40 
  41 static void sb_mmap_file_close(void)
  42 {
  43     int ret = close(sb.sb_fd);
  44     if(-1 == ret)
  45         V_OUTPUT_ERR("pml_v: protocol_pessimist: sender_based_finalize: close (%d): %s",
  46                      sb.sb_fd, strerror(errno));
  47 }
  48 
  49 static void sb_mmap_alloc(void)
  50 {
  51 #ifndef MAP_NOCACHE
  52 #   define MAP_NOCACHE 0
  53 #endif
  54     if(-1 == ftruncate(sb.sb_fd, sb.sb_offset + sb.sb_length))
  55     {
  56         V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_alloc: ftruncate: %s",
  57                      strerror(errno));
  58         close(sb.sb_fd);
  59         ompi_mpi_abort(MPI_COMM_NULL, MPI_ERR_NO_SPACE);
  60     }
  61     sb.sb_addr = (uintptr_t) mmap((void *) sb.sb_addr, sb.sb_length,
  62                                   PROT_WRITE | PROT_READ,
  63                                   MAP_PRIVATE | MAP_NOCACHE, sb.sb_fd,
  64                                   sb.sb_offset);
  65     if(((uintptr_t) -1) == sb.sb_addr)
  66     {
  67         V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_alloc: mmap: %s",
  68                      strerror(errno));
  69         close(sb.sb_fd);
  70         ompi_mpi_abort(MPI_COMM_NULL, MPI_ERR_NO_SPACE);
  71     }
  72 }
  73 
  74 static void sb_mmap_free(void)
  75 {
  76     int ret = munmap((void *) sb.sb_addr, sb.sb_length);
  77     if(-1 == ret)
  78         V_OUTPUT_ERR("pml_v: protocol_pessimsit: sender_based_finalize: munmap (%p): %s",
  79                      (void *) sb.sb_addr, strerror(errno));
  80 }
  81 
  82 int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size)
  83 {
  84     char *path;
  85 #ifdef SB_USE_CONVERTOR_METHOD
  86     mca_pml_base_send_request_t pml_req;
  87     sb.sb_conv_to_pessimist_offset = (uintptr_t) VPROTOCOL_SEND_REQ(NULL) -
  88             ((uintptr_t) &pml_req.req_base.req_convertor -
  89              (uintptr_t) &pml_req);
  90     V_OUTPUT_VERBOSE(500, "pessimist: conv_to_pessimist_offset: %p", (void *) sb.sb_conv_to_pessimist_offset);
  91 #endif
  92     sb.sb_offset = 0;
  93     sb.sb_length = size;
  94     sb.sb_pagesize = getpagesize();
  95     sb.sb_cursor = sb.sb_addr = (uintptr_t) NULL;
  96     sb.sb_available = 0;
  97 #ifdef SB_USE_PROGRESS_METHOD
  98     OBJ_CONSTRUCT(&sb.sb_sendreq, opal_list_t);
  99 #endif
 100 
 101     opal_asprintf(&path, "%s"OPAL_PATH_SEP"%s", ompi_process_info.proc_session_dir,
 102                 mmapfile);
 103     if(OPAL_SUCCESS != sb_mmap_file_open(path))
 104         return OPAL_ERR_FILE_OPEN_FAILURE;
 105     free(path);
 106     return OMPI_SUCCESS;
 107 }
 108 
 109 void vprotocol_pessimist_sender_based_finalize(void)
 110 {
 111     if(((uintptr_t) NULL) != sb.sb_addr)
 112         sb_mmap_free();
 113     sb_mmap_file_close();
 114 }
 115 
 116 
 117 /** Manage mmap floating window, allocating enough memory for the message to be
 118   * asynchronously copied to disk.
 119   */
 120 void vprotocol_pessimist_sender_based_alloc(size_t len)
 121 {
 122     if(((uintptr_t) NULL) != sb.sb_addr)
 123         sb_mmap_free();
 124 #ifdef SB_USE_SELFCOMM_METHOD
 125     else
 126         ompi_comm_dup(MPI_COMM_SELF, &sb.sb_comm, 1);
 127 #endif
 128 
 129     /* Take care of alignement of sb_offset                             */
 130     sb.sb_offset += sb.sb_cursor - sb.sb_addr;
 131     sb.sb_cursor = sb.sb_offset % sb.sb_pagesize;
 132     sb.sb_offset -= sb.sb_cursor;
 133 
 134     /* Adjusting sb_length for the largest application message to fit   */
 135     len += sb.sb_cursor + sizeof(vprotocol_pessimist_sender_based_header_t);
 136     if(sb.sb_length < len)
 137         sb.sb_length = len;
 138     /* How much space left for application data */
 139     sb.sb_available = sb.sb_length - sb.sb_cursor;
 140 
 141     sb_mmap_alloc();
 142 
 143     sb.sb_cursor += sb.sb_addr; /* set absolute addr of sender_based buffer */
 144     V_OUTPUT_VERBOSE(30, "pessimist:\tsb\tgrow\toffset %llu\tlength %llu\tbase %p\tcursor %p", (unsigned long long) sb.sb_offset, (unsigned long long) sb.sb_length, (void *) sb.sb_addr, (void *) sb.sb_cursor);
 145 }
 146 
 147 #undef sb
 148 
 149 #ifdef SB_USE_CONVERTOR_METHOD
 150 int32_t vprotocol_pessimist_sender_based_convertor_advance(opal_convertor_t* pConvertor,
 151                                                             struct iovec* iov,
 152                                                             uint32_t* out_size,
 153                                                             size_t* max_data) {
 154     int ret;
 155     unsigned int i;
 156     size_t pending_length;
 157     mca_vprotocol_pessimist_send_request_t *ftreq;
 158 
 159     ftreq = VPESSIMIST_CONV_REQ(pConvertor);
 160     pConvertor->flags = ftreq->sb.conv_flags;
 161     pConvertor->fAdvance = ftreq->sb.conv_advance;
 162     ret = opal_convertor_pack(pConvertor, iov, out_size, max_data);
 163     V_OUTPUT_VERBOSE(39, "pessimist:\tsb\tpack\t%"PRIsize_t, *max_data);
 164 
 165     for(i = 0, pending_length = *max_data; pending_length > 0; i++) {
 166         assert(i < *out_size);
 167         MEMCPY((void *) ftreq->sb.cursor, iov[i].iov_base, iov[i].iov_len);
 168         pending_length -= iov[i].iov_len;
 169         ftreq->sb.cursor += iov[i].iov_len;
 170     }
 171     assert(pending_length == 0);
 172 
 173     pConvertor->flags &= ~CONVERTOR_NO_OP;
 174     pConvertor->fAdvance = &vprotocol_pessimist_sender_based_convertor_advance;
 175     return ret;
 176 }
 177 #endif
 178 

/* [<][>][^][v][top][bottom][index][help] */