root/opal/mca/pmix/pmix4x/pmix/src/mca/gds/ds21/gds_ds21_lock_pthread.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ncon
  2. ldes
  3. pmix_gds_ds21_lock_init
  4. pmix_ds21_lock_finalize
  5. pmix_ds21_lock_wr_get
  6. pmix_ds21_lock_wr_rel
  7. pmix_ds21_lock_rd_get
  8. pmix_ds21_lock_rd_rel

   1 /*
   2  * Copyright (c) 2018      Mellanox Technologies, Inc.
   3  *                         All rights reserved.
   4  *
   5  * Copyright (c) 2018      Intel, Inc.  All rights reserved.
   6  * $COPYRIGHT$
   7  *
   8  * Additional copyrights may follow
   9  *
  10  * $HEADER$
  11  */
  12 
  13 #include <src/include/pmix_config.h>
  14 #include <pmix_common.h>
  15 
  16 #include <stdio.h>
  17 #ifdef HAVE_UNISTD_H
  18 #include <unistd.h>
  19 #endif
  20 #ifdef HAVE_SYS_TYPES_H
  21 #include <sys/types.h>
  22 #endif
  23 #ifdef HAVE_SYS_STAT_H
  24 #include <sys/stat.h>
  25 #endif
  26 
  27 #include "src/mca/common/dstore/dstore_common.h"
  28 #include "src/mca/gds/base/base.h"
  29 #include "src/mca/pshmem/pshmem.h"
  30 #include "src/class/pmix_list.h"
  31 
  32 #include "src/util/error.h"
  33 #include "src/util/output.h"
  34 
  35 #include "gds_ds21_lock.h"
  36 #include "src/mca/common/dstore/dstore_segment.h"
  37 
  38 typedef struct {
  39     pmix_list_item_t super;
  40 
  41     char *lockfile;
  42     pmix_dstore_seg_desc_t *seg_desc;
  43     pthread_mutex_t *mutex;
  44     uint32_t num_locks;
  45     uint32_t lock_idx;
  46 } lock_item_t;
  47 
  48 typedef struct {
  49     pmix_list_t lock_traker;
  50 } lock_ctx_t;
  51 
  52 typedef pmix_list_t ds21_lock_pthread_ctx_t;
  53 
  54 /*
  55  * Lock segment format:
  56  * 1. Segment size             sizeof(size_t)
  57  * 2. local_size:              sizeof(uint32_t)
  58  * 3. Align size               sizeof(size_t)
  59  * 4. Offset of mutexes        sizeof(size_t)
  60  * 5. Array of in use indexes: sizeof(int32_t)*local_size
  61  * 6. Double array of locks:   sizeof(pthread_mutex_t)*local_size*2
  62  */
  63 typedef struct {
  64    size_t   seg_size;
  65    uint32_t num_locks;
  66    size_t   align_size;
  67    size_t   mutex_offs;
  68 } segment_hdr_t;
  69 
  70 #define _GET_IDX_ARR_PTR(seg_ptr) \
  71     ((pmix_atomic_int32_t*)((char*)seg_ptr + sizeof(segment_hdr_t)))
  72 
  73 #define _GET_MUTEX_ARR_PTR(seg_hdr) \
  74     ((pthread_mutex_t*)((char*)seg_hdr + seg_hdr->mutex_offs))
  75 
  76 #define _GET_MUTEX_PTR(seg_hdr, idx) \
  77     ((pthread_mutex_t*)((char*)seg_hdr + seg_hdr->mutex_offs + seg_hdr->align_size * (idx)))
  78 
  79 
  80 static void ncon(lock_item_t *p) {
  81     p->lockfile = NULL;
  82     p->lock_idx = 0;
  83     p->mutex = NULL;
  84     p->num_locks = 0;
  85     p->seg_desc = NULL;
  86 }
  87 
  88 static void ldes(lock_item_t *p) {
  89     uint32_t i;
  90 
  91     if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
  92         segment_hdr_t *seg_hdr = (segment_hdr_t *)p->seg_desc->seg_info.seg_base_addr;
  93         if (p->lockfile) {
  94             unlink(p->lockfile);
  95         }
  96         for(i = 0; i < p->num_locks * 2; i++) {
  97             pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, i);
  98             if (0 != pthread_mutex_destroy(mutex)) {
  99                 PMIX_ERROR_LOG(PMIX_ERROR);
 100             }
 101         }
 102     }
 103     if (p->lockfile) {
 104         free(p->lockfile);
 105     }
 106     if (p->seg_desc) {
 107         pmix_common_dstor_delete_sm_desc(p->seg_desc);
 108     }
 109 }
 110 
 111 PMIX_CLASS_INSTANCE(lock_item_t,
 112                     pmix_list_item_t,
 113                     ncon, ldes);
 114 
 115 pmix_status_t pmix_gds_ds21_lock_init(pmix_common_dstor_lock_ctx_t *ctx, const char *base_path, const char * name,
 116                                       uint32_t local_size, uid_t uid, bool setuid)
 117 {
 118     pthread_mutexattr_t attr;
 119     size_t size;
 120     uint32_t i;
 121     int page_size = pmix_common_dstor_getpagesize();
 122     segment_hdr_t *seg_hdr;
 123     lock_item_t *lock_item = NULL;
 124     lock_ctx_t *lock_ctx = (lock_ctx_t*)*ctx;
 125     pmix_list_t *lock_tracker;
 126     pmix_status_t rc = PMIX_SUCCESS;
 127 
 128     if (NULL == *ctx) {
 129         lock_ctx = (lock_ctx_t*)malloc(sizeof(lock_ctx_t));
 130         if (NULL == lock_ctx) {
 131             rc = PMIX_ERR_INIT;
 132             PMIX_ERROR_LOG(rc);
 133             goto error;
 134         }
 135         memset(lock_ctx, 0, sizeof(lock_ctx_t));
 136         PMIX_CONSTRUCT(&lock_ctx->lock_traker, pmix_list_t);
 137         *ctx = lock_ctx;
 138     }
 139 
 140     lock_tracker = &lock_ctx->lock_traker;
 141     lock_item = PMIX_NEW(lock_item_t);
 142 
 143     if (NULL == lock_item) {
 144         rc = PMIX_ERR_INIT;
 145         PMIX_ERROR_LOG(rc);
 146         goto error;
 147     }
 148     pmix_list_append(lock_tracker, &lock_item->super);
 149 
 150     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
 151         "%s:%d:%s local_size %d", __FILE__, __LINE__, __func__, local_size));
 152 
 153     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 154         size_t seg_align_size;
 155         size_t seg_hdr_size;
 156 
 157         if (0 != (seg_align_size = pmix_common_dstor_getcacheblocksize())) {
 158             seg_align_size = (sizeof(pthread_mutex_t) / seg_align_size + 1)
 159                     * seg_align_size;
 160         } else {
 161             seg_align_size = sizeof(pthread_mutex_t);
 162         }
 163 
 164         seg_hdr_size = ((sizeof(segment_hdr_t)
 165                         + sizeof(int32_t) * local_size)
 166                         / seg_align_size + 1) * seg_align_size;
 167 
 168         size = ((seg_hdr_size
 169                 + 2 * local_size * seg_align_size) /* array of mutexes */
 170                 / page_size + 1) * page_size;
 171 
 172         lock_item->seg_desc = pmix_common_dstor_create_new_lock_seg(base_path,
 173                                     size, name, 0, uid, setuid);
 174         if (NULL == lock_item->seg_desc) {
 175             rc = PMIX_ERR_OUT_OF_RESOURCE;
 176             PMIX_ERROR_LOG(rc);
 177             goto error;
 178         }
 179 
 180         if (0 != pthread_mutexattr_init(&attr)) {
 181             rc = PMIX_ERR_INIT;
 182             PMIX_ERROR_LOG(rc);
 183             goto error;
 184         }
 185         if (0 != pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) {
 186             pthread_mutexattr_destroy(&attr);
 187             rc = PMIX_ERR_INIT;
 188             PMIX_ERROR_LOG(rc);
 189             goto error;
 190         }
 191 
 192         segment_hdr_t *seg_hdr = (segment_hdr_t*)lock_item->seg_desc->seg_info.seg_base_addr;
 193         seg_hdr->num_locks = local_size;
 194         seg_hdr->seg_size = size;
 195         seg_hdr->align_size = seg_align_size;
 196         seg_hdr->mutex_offs = seg_hdr_size;
 197 
 198         lock_item->lockfile = strdup(lock_item->seg_desc->seg_info.seg_name);
 199         lock_item->num_locks = local_size;
 200         lock_item->mutex = _GET_MUTEX_ARR_PTR(seg_hdr);
 201 
 202         for(i = 0; i < local_size * 2; i++) {
 203             pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, i);
 204             if (0 != pthread_mutex_init(mutex, &attr)) {
 205                 pthread_mutexattr_destroy(&attr);
 206                 rc = PMIX_ERR_INIT;
 207                 PMIX_ERROR_LOG(rc);
 208                 goto error;
 209             }
 210         }
 211         if (0 != pthread_mutexattr_destroy(&attr)) {
 212             rc = PMIX_ERR_INIT;
 213             PMIX_ERROR_LOG(PMIX_ERR_INIT);
 214             goto error;
 215         }
 216     }
 217     else {
 218         pmix_atomic_int32_t *lock_idx_ptr;
 219         bool idx_found = false;
 220 
 221         size = pmix_common_dstor_getpagesize();
 222         lock_item->seg_desc = pmix_common_dstor_attach_new_lock_seg(base_path, size, name, 0);
 223         if (NULL == lock_item->seg_desc) {
 224             rc = PMIX_ERR_NOT_FOUND;
 225             goto error;
 226         }
 227         seg_hdr = (segment_hdr_t*)lock_item->seg_desc->seg_info.seg_base_addr;
 228 
 229         if (seg_hdr->seg_size > size) {
 230             size = seg_hdr->seg_size;
 231             pmix_common_dstor_delete_sm_desc(lock_item->seg_desc);
 232             lock_item->seg_desc = pmix_common_dstor_attach_new_lock_seg(base_path, size, name, 0);
 233             if (NULL == lock_item->seg_desc) {
 234                 rc = PMIX_ERR_NOT_FOUND;
 235                 goto error;
 236             }
 237             seg_hdr = (segment_hdr_t*)lock_item->seg_desc->seg_info.seg_base_addr;
 238         }
 239 
 240         lock_item->num_locks = seg_hdr->num_locks;
 241         lock_idx_ptr = _GET_IDX_ARR_PTR(seg_hdr);
 242         lock_item->mutex = _GET_MUTEX_ARR_PTR(seg_hdr);
 243 
 244         for (i = 0; i < lock_item->num_locks; i++) {
 245             int32_t expected = 0;
 246             if (pmix_atomic_compare_exchange_strong_32(&lock_idx_ptr[i], &expected, 1)) {
 247                 lock_item->lock_idx = i;
 248                 lock_item->lockfile = strdup(lock_item->seg_desc->seg_info.seg_name);
 249                 idx_found = true;
 250                 break;
 251             }
 252         }
 253 
 254         if (false == idx_found) {
 255             rc = PMIX_ERR_NOT_FOUND;
 256             goto error;
 257         }
 258     }
 259 
 260     return rc;
 261 
 262 error:
 263     if (NULL != lock_item) {
 264         pmix_list_remove_item(lock_tracker, &lock_item->super);
 265         PMIX_RELEASE(lock_item);
 266         lock_item = NULL;
 267     }
 268     *ctx = NULL;
 269 
 270     return rc;
 271 }
 272 
 273 void pmix_ds21_lock_finalize(pmix_common_dstor_lock_ctx_t *lock_ctx)
 274 {
 275     lock_item_t *lock_item, *item_next;
 276     pmix_list_t *lock_tracker = &((lock_ctx_t*)*lock_ctx)->lock_traker;
 277 
 278     if (NULL == lock_tracker) {
 279         return;
 280     }
 281 
 282     PMIX_LIST_FOREACH_SAFE(lock_item, item_next, lock_tracker, lock_item_t) {
 283         pmix_list_remove_item(lock_tracker, &lock_item->super);
 284         PMIX_RELEASE(lock_item);
 285     }
 286     if (pmix_list_is_empty(lock_tracker)) {
 287         PMIX_LIST_DESTRUCT(lock_tracker);
 288         free(lock_tracker);
 289         lock_tracker = NULL;
 290     }
 291     *lock_ctx = NULL;
 292 }
 293 
 294 pmix_status_t pmix_ds21_lock_wr_get(pmix_common_dstor_lock_ctx_t lock_ctx)
 295 {
 296     lock_item_t *lock_item;
 297     pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
 298     uint32_t num_locks;
 299     uint32_t i;
 300     pmix_status_t rc;
 301     segment_hdr_t *seg_hdr;
 302 
 303     if (NULL == lock_tracker) {
 304         rc = PMIX_ERR_NOT_FOUND;
 305         PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
 306         return rc;
 307     }
 308 
 309     PMIX_LIST_FOREACH(lock_item, lock_tracker, lock_item_t) {
 310         num_locks = lock_item->num_locks;
 311         seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
 312 
 313          /* Lock the "signalling" lock first to let clients know that
 314          * server is going to get a write lock.
 315          * Clients do not hold this lock for a long time,
 316          * so this loop should be relatively dast.
 317          */
 318         for (i = 0; i < num_locks; i++) {
 319             pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, 2*i);
 320             if (0 != pthread_mutex_lock(mutex)) {
 321                 return PMIX_ERROR;
 322             }
 323         }
 324 
 325         /* Now we can go and grab the main locks
 326          * New clients will be stopped at the previous
 327          * "barrier" locks.
 328          * We will wait here while all clients currently holding
 329          * locks will be done
 330          */
 331         for(i = 0; i < num_locks; i++) {
 332             pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, 2*i + 1);
 333             if (0 != pthread_mutex_lock(mutex)) {
 334                 return PMIX_ERROR;
 335             }
 336         }
 337     }
 338     return PMIX_SUCCESS;
 339 }
 340 
 341 pmix_status_t pmix_ds21_lock_wr_rel(pmix_common_dstor_lock_ctx_t lock_ctx)
 342 {
 343     lock_item_t *lock_item;
 344     pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
 345     uint32_t num_locks;
 346     uint32_t i;
 347     pmix_status_t rc;
 348     segment_hdr_t *seg_hdr;
 349 
 350     if (NULL == lock_tracker) {
 351         rc = PMIX_ERR_NOT_FOUND;
 352         PMIX_ERROR_LOG(rc);
 353         return rc;
 354     }
 355 
 356     PMIX_LIST_FOREACH(lock_item, lock_tracker, lock_item_t) {
 357         seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
 358         num_locks = lock_item->num_locks;
 359 
 360         /* Lock the second lock first to ensure that all procs will see
 361          * that we are trying to grab the main one */
 362         for(i=0; i<num_locks; i++) {
 363             if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*i))) {
 364                 return PMIX_ERROR;
 365             }
 366             if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*i + 1))) {
 367                 return PMIX_ERROR;
 368             }
 369         }
 370     }
 371 
 372     return PMIX_SUCCESS;
 373 }
 374 
 375 pmix_status_t pmix_ds21_lock_rd_get(pmix_common_dstor_lock_ctx_t lock_ctx)
 376 {
 377     lock_item_t *lock_item;
 378     pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
 379     uint32_t idx;
 380     pmix_status_t rc;
 381     segment_hdr_t *seg_hdr;
 382 
 383     if (NULL == lock_tracker) {
 384         rc = PMIX_ERR_NOT_FOUND;
 385         PMIX_ERROR_LOG(rc);
 386         return rc;
 387     }
 388 
 389     lock_item = (lock_item_t*)pmix_list_get_first(lock_tracker);
 390     idx = lock_item->lock_idx;
 391     seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
 392 
 393     /* This mutex is only used to acquire the next one,
 394      * this is a barrier that server is using to let clients
 395      * know that it is going to grab the write lock
 396      */
 397 
 398     if (0 != pthread_mutex_lock(_GET_MUTEX_PTR(seg_hdr, 2*idx))) {
 399         return PMIX_ERROR;
 400     }
 401 
 402     /* Now grab the main lock */
 403     if (0 != pthread_mutex_lock(_GET_MUTEX_PTR(seg_hdr, 2*idx + 1))) {
 404         return PMIX_ERROR;
 405     }
 406 
 407     /* Once done - release signalling lock */
 408     if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*idx))) {
 409         return PMIX_ERROR;
 410     }
 411 
 412     return PMIX_SUCCESS;
 413 }
 414 
 415 pmix_status_t pmix_ds21_lock_rd_rel(pmix_common_dstor_lock_ctx_t lock_ctx)
 416 {
 417     lock_item_t *lock_item;
 418     pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
 419     pmix_status_t rc;
 420     uint32_t idx;
 421     segment_hdr_t *seg_hdr;
 422 
 423     if (NULL == lock_tracker) {
 424         rc = PMIX_ERR_NOT_FOUND;
 425         PMIX_ERROR_LOG(rc);
 426         return rc;
 427     }
 428 
 429     lock_item = (lock_item_t*)pmix_list_get_first(lock_tracker);
 430     seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
 431     idx = lock_item->lock_idx;
 432 
 433     /* Release the main lock */
 434     if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*idx + 1))) {
 435         return PMIX_SUCCESS;
 436     }
 437 
 438     return PMIX_SUCCESS;
 439 }

/* [<][>][^][v][top][bottom][index][help] */