This source file includes following definitions.
- ncon
- ldes
- pmix_gds_ds21_lock_init
- pmix_ds21_lock_finalize
- pmix_ds21_lock_wr_get
- pmix_ds21_lock_wr_rel
- pmix_ds21_lock_rd_get
- pmix_ds21_lock_rd_rel
1
2
3
4
5
6
7
8
9
10
11
12
13 #include <src/include/pmix_config.h>
14 #include <pmix_common.h>
15
16 #include <stdio.h>
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif
20 #ifdef HAVE_SYS_TYPES_H
21 #include <sys/types.h>
22 #endif
23 #ifdef HAVE_SYS_STAT_H
24 #include <sys/stat.h>
25 #endif
26
27 #include "src/mca/common/dstore/dstore_common.h"
28 #include "src/mca/gds/base/base.h"
29 #include "src/mca/pshmem/pshmem.h"
30 #include "src/class/pmix_list.h"
31
32 #include "src/util/error.h"
33 #include "src/util/output.h"
34
35 #include "gds_ds21_lock.h"
36 #include "src/mca/common/dstore/dstore_segment.h"
37
38 typedef struct {
39 pmix_list_item_t super;
40
41 char *lockfile;
42 pmix_dstore_seg_desc_t *seg_desc;
43 pthread_mutex_t *mutex;
44 uint32_t num_locks;
45 uint32_t lock_idx;
46 } lock_item_t;
47
48 typedef struct {
49 pmix_list_t lock_traker;
50 } lock_ctx_t;
51
52 typedef pmix_list_t ds21_lock_pthread_ctx_t;
53
54
55
56
57
58
59
60
61
62
63 typedef struct {
64 size_t seg_size;
65 uint32_t num_locks;
66 size_t align_size;
67 size_t mutex_offs;
68 } segment_hdr_t;
69
70 #define _GET_IDX_ARR_PTR(seg_ptr) \
71 ((pmix_atomic_int32_t*)((char*)seg_ptr + sizeof(segment_hdr_t)))
72
73 #define _GET_MUTEX_ARR_PTR(seg_hdr) \
74 ((pthread_mutex_t*)((char*)seg_hdr + seg_hdr->mutex_offs))
75
76 #define _GET_MUTEX_PTR(seg_hdr, idx) \
77 ((pthread_mutex_t*)((char*)seg_hdr + seg_hdr->mutex_offs + seg_hdr->align_size * (idx)))
78
79
80 static void ncon(lock_item_t *p) {
81 p->lockfile = NULL;
82 p->lock_idx = 0;
83 p->mutex = NULL;
84 p->num_locks = 0;
85 p->seg_desc = NULL;
86 }
87
88 static void ldes(lock_item_t *p) {
89 uint32_t i;
90
91 if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
92 segment_hdr_t *seg_hdr = (segment_hdr_t *)p->seg_desc->seg_info.seg_base_addr;
93 if (p->lockfile) {
94 unlink(p->lockfile);
95 }
96 for(i = 0; i < p->num_locks * 2; i++) {
97 pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, i);
98 if (0 != pthread_mutex_destroy(mutex)) {
99 PMIX_ERROR_LOG(PMIX_ERROR);
100 }
101 }
102 }
103 if (p->lockfile) {
104 free(p->lockfile);
105 }
106 if (p->seg_desc) {
107 pmix_common_dstor_delete_sm_desc(p->seg_desc);
108 }
109 }
110
111 PMIX_CLASS_INSTANCE(lock_item_t,
112 pmix_list_item_t,
113 ncon, ldes);
114
115 pmix_status_t pmix_gds_ds21_lock_init(pmix_common_dstor_lock_ctx_t *ctx, const char *base_path, const char * name,
116 uint32_t local_size, uid_t uid, bool setuid)
117 {
118 pthread_mutexattr_t attr;
119 size_t size;
120 uint32_t i;
121 int page_size = pmix_common_dstor_getpagesize();
122 segment_hdr_t *seg_hdr;
123 lock_item_t *lock_item = NULL;
124 lock_ctx_t *lock_ctx = (lock_ctx_t*)*ctx;
125 pmix_list_t *lock_tracker;
126 pmix_status_t rc = PMIX_SUCCESS;
127
128 if (NULL == *ctx) {
129 lock_ctx = (lock_ctx_t*)malloc(sizeof(lock_ctx_t));
130 if (NULL == lock_ctx) {
131 rc = PMIX_ERR_INIT;
132 PMIX_ERROR_LOG(rc);
133 goto error;
134 }
135 memset(lock_ctx, 0, sizeof(lock_ctx_t));
136 PMIX_CONSTRUCT(&lock_ctx->lock_traker, pmix_list_t);
137 *ctx = lock_ctx;
138 }
139
140 lock_tracker = &lock_ctx->lock_traker;
141 lock_item = PMIX_NEW(lock_item_t);
142
143 if (NULL == lock_item) {
144 rc = PMIX_ERR_INIT;
145 PMIX_ERROR_LOG(rc);
146 goto error;
147 }
148 pmix_list_append(lock_tracker, &lock_item->super);
149
150 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
151 "%s:%d:%s local_size %d", __FILE__, __LINE__, __func__, local_size));
152
153 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
154 size_t seg_align_size;
155 size_t seg_hdr_size;
156
157 if (0 != (seg_align_size = pmix_common_dstor_getcacheblocksize())) {
158 seg_align_size = (sizeof(pthread_mutex_t) / seg_align_size + 1)
159 * seg_align_size;
160 } else {
161 seg_align_size = sizeof(pthread_mutex_t);
162 }
163
164 seg_hdr_size = ((sizeof(segment_hdr_t)
165 + sizeof(int32_t) * local_size)
166 / seg_align_size + 1) * seg_align_size;
167
168 size = ((seg_hdr_size
169 + 2 * local_size * seg_align_size)
170 / page_size + 1) * page_size;
171
172 lock_item->seg_desc = pmix_common_dstor_create_new_lock_seg(base_path,
173 size, name, 0, uid, setuid);
174 if (NULL == lock_item->seg_desc) {
175 rc = PMIX_ERR_OUT_OF_RESOURCE;
176 PMIX_ERROR_LOG(rc);
177 goto error;
178 }
179
180 if (0 != pthread_mutexattr_init(&attr)) {
181 rc = PMIX_ERR_INIT;
182 PMIX_ERROR_LOG(rc);
183 goto error;
184 }
185 if (0 != pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) {
186 pthread_mutexattr_destroy(&attr);
187 rc = PMIX_ERR_INIT;
188 PMIX_ERROR_LOG(rc);
189 goto error;
190 }
191
192 segment_hdr_t *seg_hdr = (segment_hdr_t*)lock_item->seg_desc->seg_info.seg_base_addr;
193 seg_hdr->num_locks = local_size;
194 seg_hdr->seg_size = size;
195 seg_hdr->align_size = seg_align_size;
196 seg_hdr->mutex_offs = seg_hdr_size;
197
198 lock_item->lockfile = strdup(lock_item->seg_desc->seg_info.seg_name);
199 lock_item->num_locks = local_size;
200 lock_item->mutex = _GET_MUTEX_ARR_PTR(seg_hdr);
201
202 for(i = 0; i < local_size * 2; i++) {
203 pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, i);
204 if (0 != pthread_mutex_init(mutex, &attr)) {
205 pthread_mutexattr_destroy(&attr);
206 rc = PMIX_ERR_INIT;
207 PMIX_ERROR_LOG(rc);
208 goto error;
209 }
210 }
211 if (0 != pthread_mutexattr_destroy(&attr)) {
212 rc = PMIX_ERR_INIT;
213 PMIX_ERROR_LOG(PMIX_ERR_INIT);
214 goto error;
215 }
216 }
217 else {
218 pmix_atomic_int32_t *lock_idx_ptr;
219 bool idx_found = false;
220
221 size = pmix_common_dstor_getpagesize();
222 lock_item->seg_desc = pmix_common_dstor_attach_new_lock_seg(base_path, size, name, 0);
223 if (NULL == lock_item->seg_desc) {
224 rc = PMIX_ERR_NOT_FOUND;
225 goto error;
226 }
227 seg_hdr = (segment_hdr_t*)lock_item->seg_desc->seg_info.seg_base_addr;
228
229 if (seg_hdr->seg_size > size) {
230 size = seg_hdr->seg_size;
231 pmix_common_dstor_delete_sm_desc(lock_item->seg_desc);
232 lock_item->seg_desc = pmix_common_dstor_attach_new_lock_seg(base_path, size, name, 0);
233 if (NULL == lock_item->seg_desc) {
234 rc = PMIX_ERR_NOT_FOUND;
235 goto error;
236 }
237 seg_hdr = (segment_hdr_t*)lock_item->seg_desc->seg_info.seg_base_addr;
238 }
239
240 lock_item->num_locks = seg_hdr->num_locks;
241 lock_idx_ptr = _GET_IDX_ARR_PTR(seg_hdr);
242 lock_item->mutex = _GET_MUTEX_ARR_PTR(seg_hdr);
243
244 for (i = 0; i < lock_item->num_locks; i++) {
245 int32_t expected = 0;
246 if (pmix_atomic_compare_exchange_strong_32(&lock_idx_ptr[i], &expected, 1)) {
247 lock_item->lock_idx = i;
248 lock_item->lockfile = strdup(lock_item->seg_desc->seg_info.seg_name);
249 idx_found = true;
250 break;
251 }
252 }
253
254 if (false == idx_found) {
255 rc = PMIX_ERR_NOT_FOUND;
256 goto error;
257 }
258 }
259
260 return rc;
261
262 error:
263 if (NULL != lock_item) {
264 pmix_list_remove_item(lock_tracker, &lock_item->super);
265 PMIX_RELEASE(lock_item);
266 lock_item = NULL;
267 }
268 *ctx = NULL;
269
270 return rc;
271 }
272
273 void pmix_ds21_lock_finalize(pmix_common_dstor_lock_ctx_t *lock_ctx)
274 {
275 lock_item_t *lock_item, *item_next;
276 pmix_list_t *lock_tracker = &((lock_ctx_t*)*lock_ctx)->lock_traker;
277
278 if (NULL == lock_tracker) {
279 return;
280 }
281
282 PMIX_LIST_FOREACH_SAFE(lock_item, item_next, lock_tracker, lock_item_t) {
283 pmix_list_remove_item(lock_tracker, &lock_item->super);
284 PMIX_RELEASE(lock_item);
285 }
286 if (pmix_list_is_empty(lock_tracker)) {
287 PMIX_LIST_DESTRUCT(lock_tracker);
288 free(lock_tracker);
289 lock_tracker = NULL;
290 }
291 *lock_ctx = NULL;
292 }
293
294 pmix_status_t pmix_ds21_lock_wr_get(pmix_common_dstor_lock_ctx_t lock_ctx)
295 {
296 lock_item_t *lock_item;
297 pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
298 uint32_t num_locks;
299 uint32_t i;
300 pmix_status_t rc;
301 segment_hdr_t *seg_hdr;
302
303 if (NULL == lock_tracker) {
304 rc = PMIX_ERR_NOT_FOUND;
305 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
306 return rc;
307 }
308
309 PMIX_LIST_FOREACH(lock_item, lock_tracker, lock_item_t) {
310 num_locks = lock_item->num_locks;
311 seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
312
313
314
315
316
317
318 for (i = 0; i < num_locks; i++) {
319 pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, 2*i);
320 if (0 != pthread_mutex_lock(mutex)) {
321 return PMIX_ERROR;
322 }
323 }
324
325
326
327
328
329
330
331 for(i = 0; i < num_locks; i++) {
332 pthread_mutex_t *mutex = _GET_MUTEX_PTR(seg_hdr, 2*i + 1);
333 if (0 != pthread_mutex_lock(mutex)) {
334 return PMIX_ERROR;
335 }
336 }
337 }
338 return PMIX_SUCCESS;
339 }
340
341 pmix_status_t pmix_ds21_lock_wr_rel(pmix_common_dstor_lock_ctx_t lock_ctx)
342 {
343 lock_item_t *lock_item;
344 pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
345 uint32_t num_locks;
346 uint32_t i;
347 pmix_status_t rc;
348 segment_hdr_t *seg_hdr;
349
350 if (NULL == lock_tracker) {
351 rc = PMIX_ERR_NOT_FOUND;
352 PMIX_ERROR_LOG(rc);
353 return rc;
354 }
355
356 PMIX_LIST_FOREACH(lock_item, lock_tracker, lock_item_t) {
357 seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
358 num_locks = lock_item->num_locks;
359
360
361
362 for(i=0; i<num_locks; i++) {
363 if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*i))) {
364 return PMIX_ERROR;
365 }
366 if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*i + 1))) {
367 return PMIX_ERROR;
368 }
369 }
370 }
371
372 return PMIX_SUCCESS;
373 }
374
375 pmix_status_t pmix_ds21_lock_rd_get(pmix_common_dstor_lock_ctx_t lock_ctx)
376 {
377 lock_item_t *lock_item;
378 pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
379 uint32_t idx;
380 pmix_status_t rc;
381 segment_hdr_t *seg_hdr;
382
383 if (NULL == lock_tracker) {
384 rc = PMIX_ERR_NOT_FOUND;
385 PMIX_ERROR_LOG(rc);
386 return rc;
387 }
388
389 lock_item = (lock_item_t*)pmix_list_get_first(lock_tracker);
390 idx = lock_item->lock_idx;
391 seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
392
393
394
395
396
397
398 if (0 != pthread_mutex_lock(_GET_MUTEX_PTR(seg_hdr, 2*idx))) {
399 return PMIX_ERROR;
400 }
401
402
403 if (0 != pthread_mutex_lock(_GET_MUTEX_PTR(seg_hdr, 2*idx + 1))) {
404 return PMIX_ERROR;
405 }
406
407
408 if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*idx))) {
409 return PMIX_ERROR;
410 }
411
412 return PMIX_SUCCESS;
413 }
414
415 pmix_status_t pmix_ds21_lock_rd_rel(pmix_common_dstor_lock_ctx_t lock_ctx)
416 {
417 lock_item_t *lock_item;
418 pmix_list_t *lock_tracker = &((lock_ctx_t*)lock_ctx)->lock_traker;
419 pmix_status_t rc;
420 uint32_t idx;
421 segment_hdr_t *seg_hdr;
422
423 if (NULL == lock_tracker) {
424 rc = PMIX_ERR_NOT_FOUND;
425 PMIX_ERROR_LOG(rc);
426 return rc;
427 }
428
429 lock_item = (lock_item_t*)pmix_list_get_first(lock_tracker);
430 seg_hdr = (segment_hdr_t *)lock_item->seg_desc->seg_info.seg_base_addr;
431 idx = lock_item->lock_idx;
432
433
434 if (0 != pthread_mutex_unlock(_GET_MUTEX_PTR(seg_hdr, 2*idx + 1))) {
435 return PMIX_SUCCESS;
436 }
437
438 return PMIX_SUCCESS;
439 }