root/ompi/mca/pml/base/pml_base_bsend.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_pml_bsend_alloc_segment
  2. mca_pml_base_bsend_init
  3. mca_pml_base_bsend_fini
  4. mca_pml_base_bsend_attach
  5. mca_pml_base_bsend_detach
  6. mca_pml_base_bsend_request_start
  7. mca_pml_base_bsend_request_alloc
  8. mca_pml_base_bsend_request_alloc_buf
  9. mca_pml_base_bsend_request_free
  10. mca_pml_base_bsend_request_fini

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2007 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2007      Sun Microsystems, Inc.  All rights reserved.
  14  * Copyright (c) 2015-2017 Research Organization for Information Science
  15  *                         and Technology (RIST). All rights reserved.
  16  * Copyright (c) 2015      Los Alamos National Security, LLC.  All rights
  17  *                         reserved.
  18  * Copyright (c) 2017      IBM Corporation.  All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  */
  25 
  26 #include "ompi_config.h"
  27 #include "opal/threads/mutex.h"
  28 #include "opal/threads/condition.h"
  29 #include "ompi/datatype/ompi_datatype.h"
  30 #include "opal/mca/allocator/base/base.h"
  31 #include "opal/mca/allocator/allocator.h"
  32 #include "ompi/mca/pml/pml.h"
  33 #include "ompi/mca/pml/base/pml_base_request.h"
  34 #include "ompi/mca/pml/base/pml_base_sendreq.h"
  35 #include "ompi/mca/pml/base/pml_base_bsend.h"
  36 #include "opal/mca/mpool/mpool.h"
  37 
  38 #ifdef HAVE_UNISTD_H
  39 #include <unistd.h>
  40 #endif  /* HAVE_UNISTD_H */
  41 
  42 static opal_mutex_t     mca_pml_bsend_mutex;      /* lock for thread safety */
  43 static opal_condition_t mca_pml_bsend_condition;  /* condition variable to block on detach */
  44 static mca_allocator_base_component_t* mca_pml_bsend_allocator_component;
  45 static mca_allocator_base_module_t* mca_pml_bsend_allocator;  /* sub-allocator to manage users buffer */
  46 static size_t           mca_pml_bsend_usersize;   /* user provided buffer size */
  47 unsigned char          *mca_pml_bsend_userbase=NULL;/* user provided buffer base */
  48 unsigned char          *mca_pml_bsend_base = NULL;/* adjusted base of user buffer */
  49 unsigned char          *mca_pml_bsend_addr = NULL;/* current offset into user buffer */
  50 static size_t           mca_pml_bsend_size;       /* adjusted size of user buffer */
  51 static size_t           mca_pml_bsend_count;      /* number of outstanding requests */
  52 static size_t           mca_pml_bsend_pagesz;     /* mmap page size */
  53 static int              mca_pml_bsend_pagebits;   /* number of bits in pagesz */
  54 static opal_atomic_int32_t          mca_pml_bsend_init = 0;
  55 
  56 /* defined in pml_base_open.c */
  57 extern char *ompi_pml_base_bsend_allocator_name;
  58 
  59 /*
  60  * Routine to return pages to sub-allocator as needed
  61  */
  62 static void* mca_pml_bsend_alloc_segment(void *ctx, size_t *size_inout)
  63 {
  64     void *addr;
  65     size_t size = *size_inout;
  66     if(mca_pml_bsend_addr + size > mca_pml_bsend_base + mca_pml_bsend_size) {
  67         return NULL;
  68     }
  69     /* allocate all that is left */
  70     size = mca_pml_bsend_size - (mca_pml_bsend_addr - mca_pml_bsend_base);
  71     addr = mca_pml_bsend_addr;
  72     mca_pml_bsend_addr += size;
  73     *size_inout = size;
  74     return addr;
  75 }
  76 
  77 /*
  78  * One time initialization at startup
  79  */
  80 int mca_pml_base_bsend_init(bool thread_safe)
  81 {
  82     size_t tmp;
  83 
  84     if(OPAL_THREAD_ADD_FETCH32(&mca_pml_bsend_init, 1) > 1)
  85         return OMPI_SUCCESS;
  86 
  87     /* initialize static objects */
  88     OBJ_CONSTRUCT(&mca_pml_bsend_mutex, opal_mutex_t);
  89     OBJ_CONSTRUCT(&mca_pml_bsend_condition, opal_condition_t);
  90 
  91     /* lookup name of the allocator to use for buffered sends */
  92     if(NULL == (mca_pml_bsend_allocator_component = mca_allocator_component_lookup(ompi_pml_base_bsend_allocator_name))) {
  93         return OMPI_ERR_BUFFER;
  94     }
  95 
  96     /* determine page size */
  97     tmp = mca_pml_bsend_pagesz = sysconf(_SC_PAGESIZE);
  98     mca_pml_bsend_pagebits = 0;
  99     while( tmp != 0 ) {
 100         tmp >>= 1;
 101         mca_pml_bsend_pagebits++;
 102     }
 103     return OMPI_SUCCESS;
 104 }
 105 
 106 
 107 /*
 108  * One-time cleanup at shutdown - release any resources.
 109  */
 110 int mca_pml_base_bsend_fini(void)
 111 {
 112     if(OPAL_THREAD_ADD_FETCH32(&mca_pml_bsend_init,-1) > 0)
 113         return OMPI_SUCCESS;
 114 
 115     if(NULL != mca_pml_bsend_allocator)
 116         mca_pml_bsend_allocator->alc_finalize(mca_pml_bsend_allocator);
 117     mca_pml_bsend_allocator = NULL;
 118 
 119     OBJ_DESTRUCT(&mca_pml_bsend_condition);
 120     OBJ_DESTRUCT(&mca_pml_bsend_mutex);
 121     return OMPI_SUCCESS;
 122 }
 123 
 124 
 125 /*
 126  * User-level call to attach buffer.
 127  */
 128 int mca_pml_base_bsend_attach(void* addr, int size)
 129 {
 130     int align;
 131 
 132     bool thread_safe = ompi_mpi_thread_multiple;
 133     if(NULL == addr || size <= 0) {
 134         return OMPI_ERR_BUFFER;
 135     }
 136 
 137     /* check for buffer already attached */
 138     OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
 139     if(NULL != mca_pml_bsend_allocator) {
 140         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 141         return OMPI_ERR_BUFFER;
 142     }
 143 
 144     /* try to create an instance of the allocator - to determine thread safety level */
 145     mca_pml_bsend_allocator = mca_pml_bsend_allocator_component->allocator_init(thread_safe, mca_pml_bsend_alloc_segment, NULL, NULL);
 146     if(NULL == mca_pml_bsend_allocator) {
 147         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 148         return OMPI_ERR_BUFFER;
 149     }
 150 
 151     /*
 152      * Save away what the user handed in.  This is done in case the
 153      * base and size are modified for alignment issues.
 154      */
 155     mca_pml_bsend_userbase = (unsigned char*)addr;
 156     mca_pml_bsend_usersize = size;
 157     /*
 158      * Align to pointer boundaries. The bsend overhead is large enough
 159      * to account for this.  Compute any alignment that needs to be done.
 160      */
 161     align = sizeof(void *) - ((size_t)addr & (sizeof(void *) - 1));
 162 
 163     /* setup local variables */
 164     mca_pml_bsend_base = (unsigned char *)addr + align;
 165     mca_pml_bsend_addr = (unsigned char *)addr + align;
 166     mca_pml_bsend_size = size - align;
 167     mca_pml_bsend_count = 0;
 168     OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 169     return OMPI_SUCCESS;
 170 }
 171 
 172 /*
 173  * User-level call to detach buffer
 174  */
 175 int mca_pml_base_bsend_detach(void* addr, int* size)
 176 {
 177     OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
 178 
 179     /* is buffer attached */
 180     if(NULL == mca_pml_bsend_allocator) {
 181         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 182         return OMPI_ERR_BUFFER;
 183     }
 184 
 185     /* wait on any pending requests */
 186     while(mca_pml_bsend_count != 0)
 187         opal_condition_wait(&mca_pml_bsend_condition, &mca_pml_bsend_mutex);
 188 
 189     /* free resources associated with the allocator */
 190     mca_pml_bsend_allocator->alc_finalize(mca_pml_bsend_allocator);
 191     mca_pml_bsend_allocator = NULL;
 192 
 193     /* return current settings */
 194     if(NULL != addr)
 195         *((void**)addr) = mca_pml_bsend_userbase;
 196     if(NULL != size)
 197         *size = (int)mca_pml_bsend_usersize;
 198 
 199     /* reset local variables */
 200     mca_pml_bsend_userbase = NULL;
 201     mca_pml_bsend_usersize = 0;
 202     mca_pml_bsend_base = NULL;
 203     mca_pml_bsend_addr = NULL;
 204     mca_pml_bsend_size = 0;
 205     mca_pml_bsend_count = 0;
 206     OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 207     return OMPI_SUCCESS;
 208 }
 209 
 210 
 211 /*
 212  * pack send buffer into buffer
 213  */
 214 
 215 int mca_pml_base_bsend_request_start(ompi_request_t* request)
 216 {
 217     mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
 218     struct iovec iov;
 219     unsigned int iov_count;
 220     size_t max_data;
 221     int rc;
 222 
 223     if(sendreq->req_bytes_packed > 0) {
 224 
 225         /* has a buffer been provided */
 226         OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
 227         if(NULL == mca_pml_bsend_addr) {
 228             sendreq->req_addr = NULL;
 229             OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 230             return OMPI_ERR_BUFFER;
 231         }
 232 
 233         /* allocate a buffer to hold packed message */
 234         sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc(
 235             mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0);
 236         if(NULL == sendreq->req_addr) {
 237             /* release resources when request is freed */
 238             sendreq->req_base.req_pml_complete = true;
 239             OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 240             return OMPI_ERR_BUFFER;
 241         }
 242 
 243         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 244 
 245         /* The convertor is already initialized in the begining so we just have to
 246          * pack the data in the newly allocated buffer.
 247          */
 248         iov.iov_base = (IOVBASE_TYPE*)sendreq->req_addr;
 249         iov.iov_len = sendreq->req_bytes_packed;
 250         iov_count = 1;
 251         max_data = iov.iov_len;
 252         if((rc = opal_convertor_pack( &sendreq->req_base.req_convertor,
 253                                       &iov,
 254                                       &iov_count,
 255                                       &max_data )) < 0) {
 256             return OMPI_ERROR;
 257         }
 258 
 259         /* setup convertor to point to packed buffer (at position zero) */
 260         opal_convertor_prepare_for_send( &sendreq->req_base.req_convertor, &(ompi_mpi_packed.dt.super),
 261                                          max_data, sendreq->req_addr );
 262         /* increment count of pending requests */
 263         mca_pml_bsend_count++;
 264     }
 265 
 266     return OMPI_SUCCESS;
 267 }
 268 
 269 
 270 /*
 271  * allocate buffer
 272  */
 273 
 274 int mca_pml_base_bsend_request_alloc(ompi_request_t* request)
 275 {
 276     mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
 277 
 278     assert( sendreq->req_bytes_packed > 0 );
 279 
 280     /* has a buffer been provided */
 281     OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
 282     if(NULL == mca_pml_bsend_addr) {
 283         sendreq->req_addr = NULL;
 284         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 285         return OMPI_ERR_BUFFER;
 286     }
 287 
 288     /* allocate a buffer to hold packed message */
 289     sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc(
 290         mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0);
 291     if(NULL == sendreq->req_addr) {
 292         /* release resources when request is freed */
 293         sendreq->req_base.req_pml_complete = true;
 294         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 295         /* progress communications, with the hope that more resources
 296          *   will be freed */
 297         opal_progress();
 298         return OMPI_ERR_BUFFER;
 299     }
 300 
 301     /* increment count of pending requests */
 302     mca_pml_bsend_count++;
 303     OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 304 
 305     return OMPI_SUCCESS;
 306 }
 307 
 308 /*
 309  * allocate buffer
 310  */
 311 
 312 void*  mca_pml_base_bsend_request_alloc_buf( size_t length )
 313 {
 314     void* buf = NULL;
 315     /* has a buffer been provided */
 316     OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
 317     if(NULL == mca_pml_bsend_addr) {
 318         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 319         return NULL;
 320     }
 321 
 322     /* allocate a buffer to hold packed message */
 323     buf = mca_pml_bsend_allocator->alc_alloc(
 324         mca_pml_bsend_allocator, length, 0);
 325     if(NULL == buf) {
 326         /* release resources when request is freed */
 327         OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 328         /* progress communications, with the hope that more resources
 329          *   will be freed */
 330         opal_progress();
 331         return NULL;
 332     }
 333 
 334     /* increment count of pending requests */
 335     mca_pml_bsend_count++;
 336     OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 337 
 338     return buf;
 339 }
 340 
 341 
 342 /*
 343  *  Request completed - free buffer and decrement pending count
 344  */
 345 int mca_pml_base_bsend_request_free(void* addr)
 346 {
 347     /* remove from list of pending requests */
 348     OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
 349 
 350     /* free buffer */
 351     mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, addr);
 352 
 353     /* decrement count of buffered requests */
 354     if(--mca_pml_bsend_count == 0)
 355         opal_condition_signal(&mca_pml_bsend_condition);
 356 
 357     OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 358     return OMPI_SUCCESS;
 359 }
 360 
 361 
 362 
 363 /*
 364  *  Request completed - free buffer and decrement pending count
 365  */
 366 int mca_pml_base_bsend_request_fini(ompi_request_t* request)
 367 {
 368     mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
 369     if(sendreq->req_bytes_packed == 0 ||
 370        sendreq->req_addr == NULL ||
 371        sendreq->req_addr == sendreq->req_base.req_addr)
 372         return OMPI_SUCCESS;
 373 
 374     /* remove from list of pending requests */
 375     OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
 376 
 377     /* free buffer */
 378     mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, (void *)sendreq->req_addr);
 379     sendreq->req_addr = sendreq->req_base.req_addr;
 380 
 381     /* decrement count of buffered requests */
 382     if(--mca_pml_bsend_count == 0)
 383         opal_condition_signal(&mca_pml_bsend_condition);
 384 
 385     OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
 386     return OMPI_SUCCESS;
 387 }
 388 
 389 

/* [<][>][^][v][top][bottom][index][help] */