This source file includes following definitions.
- ompi_osc_pt2pt_lock_self
- ompi_osc_pt2pt_unlock_self
- ompi_osc_pt2pt_lock_remote
- ompi_osc_pt2pt_unlock_remote
- ompi_osc_pt2pt_flush_remote
- ompi_osc_pt2pt_lock_internal_execute
- ompi_osc_pt2pt_lock_internal
- ompi_osc_pt2pt_unlock_internal
- ompi_osc_pt2pt_lock
- ompi_osc_pt2pt_unlock
- ompi_osc_pt2pt_lock_all
- ompi_osc_pt2pt_unlock_all
- ompi_osc_pt2pt_sync
- ompi_osc_pt2pt_flush_lock
- ompi_osc_pt2pt_flush
- ompi_osc_pt2pt_flush_all
- ompi_osc_pt2pt_flush_local
- ompi_osc_pt2pt_flush_local_all
- activate_lock
- queue_lock
- ompi_osc_pt2pt_lock_try_acquire
- ompi_osc_pt2pt_activate_next_lock
- ompi_osc_pt2pt_process_lock
- ompi_osc_pt2pt_process_lock_ack
- ompi_osc_pt2pt_process_flush_ack
- ompi_osc_pt2pt_process_unlock_ack
- ompi_osc_pt2pt_process_unlock
- ompi_osc_pt2pt_process_flush
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 #include "ompi_config.h"
26
27 #include "osc_pt2pt.h"
28 #include "osc_pt2pt_header.h"
29 #include "osc_pt2pt_data_move.h"
30 #include "osc_pt2pt_frag.h"
31
32 #include "mpi.h"
33 #include "opal/runtime/opal_progress.h"
34 #include "opal/threads/mutex.h"
35 #include "ompi/communicator/communicator.h"
36 #include "ompi/mca/osc/base/base.h"
37 #include "opal/include/opal_stdint.h"
38
39 static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type,
40 uint64_t lock_ptr);
41
42
43 struct ompi_osc_pt2pt_pending_lock_t {
44 opal_list_item_t super;
45 int peer;
46 int lock_type;
47 uint64_t lock_ptr;
48 };
49 typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t;
50 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t,
51 NULL, NULL);
52
53 static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module);
54 static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, int lock_type, uint64_t lock_ptr);
55 static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock,
56 int target);
57
58 static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
59 {
60 const int my_rank = ompi_comm_rank (module->comm);
61 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
62 int lock_type = lock->sync.lock.type;
63 bool acquired = false;
64
65 assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
66
67 (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
68
69 acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
70 if (!acquired) {
71
72 queue_lock (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
73
74
75
76 ompi_osc_pt2pt_sync_wait_expected (lock);
77 }
78
79 ompi_osc_pt2pt_peer_set_locked (peer, true);
80 ompi_osc_pt2pt_peer_set_eager_active (peer, true);
81
82 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
83 "local lock aquired"));
84
85 return OMPI_SUCCESS;
86 }
87
88 static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
89 {
90 const int my_rank = ompi_comm_rank (module->comm);
91 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
92 int lock_type = lock->sync.lock.type;
93
94 (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
95
96 assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
97
98 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
99 "ompi_osc_pt2pt_unlock_self: unlocking myself. lock state = %d", module->lock_status));
100
101 if (MPI_LOCK_EXCLUSIVE == lock_type) {
102 OPAL_THREAD_ADD_FETCH32(&module->lock_status, 1);
103 ompi_osc_pt2pt_activate_next_lock (module);
104 } else if (0 == OPAL_THREAD_ADD_FETCH32(&module->lock_status, -1)) {
105 ompi_osc_pt2pt_activate_next_lock (module);
106 }
107
108
109 opal_progress();
110
111 ompi_osc_pt2pt_peer_set_locked (peer, false);
112 ompi_osc_pt2pt_peer_set_eager_active (peer, false);
113
114 ompi_osc_pt2pt_sync_expected (lock);
115 }
116
117 int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
118 {
119 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
120 int lock_type = lock->sync.lock.type;
121 ompi_osc_pt2pt_header_lock_t lock_req;
122
123 int ret;
124
125 OPAL_THREAD_LOCK(&peer->lock);
126 if (ompi_osc_pt2pt_peer_locked (peer)) {
127 OPAL_THREAD_UNLOCK(&peer->lock);
128 return OMPI_SUCCESS;
129 }
130
131 (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
132
133 assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
134
135
136 lock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ;
137 lock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
138 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT && OPAL_ENABLE_DEBUG
139 lock_req.padding[0] = 0;
140 lock_req.padding[1] = 0;
141 #endif
142 lock_req.lock_type = lock_type;
143 lock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
144 OSC_PT2PT_HTON(&lock_req, module, target);
145
146 ret = ompi_osc_pt2pt_control_send_unbuffered (module, target, &lock_req, sizeof (lock_req));
147 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
148 OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, -1);
149 } else {
150 ompi_osc_pt2pt_peer_set_locked (peer, true);
151 }
152
153 OPAL_THREAD_UNLOCK(&peer->lock);
154
155 return ret;
156 }
157
158 static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
159 {
160 int32_t frag_count = opal_atomic_swap_32 ((opal_atomic_int32_t *) module->epoch_outgoing_frag_count + target, -1);
161 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
162 int lock_type = lock->sync.lock.type;
163 ompi_osc_pt2pt_header_unlock_t unlock_req;
164 int ret;
165
166 (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
167
168 assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
169
170 unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ;
171 unlock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
172 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT && OPAL_ENABLE_DEBUG
173 unlock_req.padding[0] = 0;
174 unlock_req.padding[1] = 0;
175 #endif
176 unlock_req.frag_count = frag_count;
177 unlock_req.lock_type = lock_type;
178 unlock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
179 OSC_PT2PT_HTON(&unlock_req, module, target);
180
181 if (peer->active_frag) {
182 ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
183 if (active_frag->remain_len < sizeof (unlock_req)) {
184
185 ++unlock_req.frag_count;
186 --module->epoch_outgoing_frag_count[target];
187 }
188 }
189
190 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
191 "osc pt2pt: unlocking target %d, frag count: %d", target,
192 unlock_req.frag_count));
193
194
195 ret = ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req));
196 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
197 return ret;
198 }
199
200 ompi_osc_pt2pt_peer_set_locked (peer, false);
201 ompi_osc_pt2pt_peer_set_eager_active (peer, false);
202
203 return ompi_osc_pt2pt_frag_flush_target(module, target);
204 }
205
206 static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
207 {
208 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
209 ompi_osc_pt2pt_header_flush_t flush_req;
210 int32_t frag_count = opal_atomic_swap_32 ((opal_atomic_int32_t *) module->epoch_outgoing_frag_count + target, -1);
211 int ret;
212
213 (void) OPAL_THREAD_ADD_FETCH32(&lock->sync_expected, 1);
214
215 assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
216
217 flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ;
218 flush_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
219 flush_req.frag_count = frag_count;
220 flush_req.lock_ptr = (uint64_t) (uintptr_t) lock;
221
222
223
224 if (peer->active_frag) {
225 ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
226 if (active_frag->remain_len < sizeof (flush_req)) {
227
228 ++flush_req.frag_count;
229 --module->epoch_outgoing_frag_count[target];
230 }
231 }
232
233 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "flushing to target %d, frag_count: %d",
234 target, flush_req.frag_count));
235
236
237 OSC_PT2PT_HTON(&flush_req, module, target);
238 ret = ompi_osc_pt2pt_control_send (module, target, &flush_req, sizeof (flush_req));
239 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
240 return ret;
241 }
242
243
244 return ompi_osc_pt2pt_frag_flush_target (module, target);
245 }
246
247 static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
248 {
249 int my_rank = ompi_comm_rank (module->comm);
250 int target = lock->sync.lock.target;
251 int assert = lock->sync.lock.assert;
252 int ret;
253
254 assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
255
256 if (0 == (assert & MPI_MODE_NOCHECK)) {
257 if (my_rank != target && target != -1) {
258 ret = ompi_osc_pt2pt_lock_remote (module, target, lock);
259 } else {
260 ret = ompi_osc_pt2pt_lock_self (module, lock);
261 }
262
263 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
264
265 return ret;
266 }
267
268
269 } else {
270 lock->eager_send_active = true;
271 }
272
273 return OMPI_SUCCESS;
274 }
275
276 static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, ompi_win_t *win)
277 {
278 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
279 ompi_osc_pt2pt_sync_t *lock;
280 int ret = OMPI_SUCCESS;
281
282
283
284 if (-1 == target) {
285 if (module->all_sync.epoch_active) {
286 OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "osc/pt2pt: attempted "
287 "to lock all when active target epoch is %s and lock all epoch is %s. type %d",
288 (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type && module->all_sync.epoch_active) ?
289 "active" : "inactive",
290 (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) ? "active" : "inactive",
291 module->all_sync.type));
292 return OMPI_ERR_RMA_SYNC;
293 }
294 } else {
295 if (module->all_sync.epoch_active && (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type || MPI_LOCK_EXCLUSIVE == lock_type)) {
296
297
298 return OMPI_ERR_RMA_SYNC;
299 }
300 }
301
302
303
304 if (module->all_sync.epoch_active || (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type &&
305 (MPI_LOCK_EXCLUSIVE == lock_type || -1 == target))) {
306 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: attempted "
307 "to acquire a lock on %d with type %d when active sync is %s and lock "
308 "all epoch is %s", target, lock_type, module->all_sync.epoch_active ? "active" : "inactive",
309 (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type &&
310 (MPI_LOCK_EXCLUSIVE == lock_type || -1 == target)) ? "active" : "inactive"));
311 return OMPI_ERR_RMA_SYNC;
312 }
313
314 if (OMPI_OSC_PT2PT_SYNC_TYPE_FENCE == module->all_sync.type) {
315
316
317 module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE;
318 }
319
320 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
321 "osc pt2pt: lock %d %d", target, lock_type));
322
323
324 if (-1 != target) {
325 lock = ompi_osc_pt2pt_sync_allocate (module);
326 if (OPAL_UNLIKELY(NULL == lock)) {
327 return OMPI_ERR_OUT_OF_RESOURCE;
328 }
329
330 lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target);
331 } else {
332 lock = &module->all_sync;
333 }
334
335 lock->type = OMPI_OSC_PT2PT_SYNC_TYPE_LOCK;
336 lock->sync.lock.target = target;
337 lock->sync.lock.type = lock_type;
338 lock->sync.lock.assert = assert;
339 lock->num_peers = (-1 == target) ? ompi_comm_size (module->comm) : 1;
340 lock->sync_expected = 0;
341
342
343 OPAL_THREAD_LOCK(&module->lock);
344
345
346 if (ompi_osc_pt2pt_module_lock_find (module, target, NULL)) {
347 if (&module->all_sync != lock) {
348 ompi_osc_pt2pt_sync_return (lock);
349 }
350 OPAL_THREAD_UNLOCK(&module->lock);
351 return OMPI_ERR_RMA_CONFLICT;
352 }
353
354 ++module->passive_target_access_epoch;
355
356 ompi_osc_pt2pt_module_lock_insert (module, lock);
357
358 OPAL_THREAD_UNLOCK(&module->lock);
359
360 ret = ompi_osc_pt2pt_lock_internal_execute (module, lock);
361 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
362 OPAL_THREAD_SCOPED_LOCK(&module->lock, ompi_osc_pt2pt_module_lock_remove (module, lock));
363 if (&module->all_sync != lock) {
364 ompi_osc_pt2pt_sync_return (lock);
365 }
366 }
367
368 return ret;
369 }
370
371 static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
372 {
373 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
374 ompi_osc_pt2pt_sync_t *lock = NULL;
375 int my_rank = ompi_comm_rank (module->comm);
376 int ret = OMPI_SUCCESS;
377
378 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
379 "ompi_osc_pt2pt_unlock_internal: unlocking target %d", target));
380
381 OPAL_THREAD_LOCK(&module->lock);
382 lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL);
383 if (OPAL_UNLIKELY(NULL == lock)) {
384 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
385 "ompi_osc_pt2pt_unlock: target %d is not locked in window %s",
386 target, win->w_name));
387 OPAL_THREAD_UNLOCK(&module->lock);
388 return OMPI_ERR_RMA_SYNC;
389 }
390
391 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
392 "ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d",
393 lock->sync_expected));
394 OPAL_THREAD_UNLOCK(&module->lock);
395
396
397 ompi_osc_pt2pt_sync_wait_expected (lock);
398
399 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
400 "ompi_osc_pt2pt_unlock_internal: all lock acks received"));
401
402 if (!(lock->sync.lock.assert & MPI_MODE_NOCHECK)) {
403 if (my_rank != target) {
404 if (-1 == target) {
405
406 for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
407 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, i);
408
409 if (my_rank == i || !ompi_osc_pt2pt_peer_locked (peer)) {
410 continue;
411 }
412
413 ret = ompi_osc_pt2pt_unlock_remote (module, i, lock);
414 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
415 return ret;
416 }
417 }
418
419 ompi_osc_pt2pt_unlock_self (module, lock);
420 } else {
421 ret = ompi_osc_pt2pt_unlock_remote (module, target, lock);
422 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
423 return ret;
424 }
425 }
426
427
428 ompi_osc_pt2pt_sync_wait_expected (lock);
429
430
431
432
433
434 OPAL_THREAD_LOCK(&module->lock);
435 while (module->outgoing_frag_count < 0) {
436 opal_condition_wait(&module->cond, &module->lock);
437 }
438 OPAL_THREAD_UNLOCK(&module->lock);
439
440 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
441 "ompi_osc_pt2pt_unlock: unlock of %d complete", target));
442 } else {
443 ompi_osc_pt2pt_unlock_self (module, lock);
444 }
445 } else {
446
447 ompi_osc_pt2pt_flush_lock (module, lock, target);
448 }
449
450 OPAL_THREAD_LOCK(&module->lock);
451 ompi_osc_pt2pt_module_lock_remove (module, lock);
452
453 if (-1 != lock->sync.lock.target) {
454 ompi_osc_pt2pt_sync_return (lock);
455 } else {
456 ompi_osc_pt2pt_sync_reset (lock);
457 }
458
459 --module->passive_target_access_epoch;
460
461 OPAL_THREAD_UNLOCK(&module->lock);
462
463 return ret;
464 }
465
466 int ompi_osc_pt2pt_lock(int lock_type, int target, int assert, ompi_win_t *win)
467 {
468 assert(target >= 0);
469
470 return ompi_osc_pt2pt_lock_internal (lock_type, target, assert, win);
471 }
472
473 int ompi_osc_pt2pt_unlock (int target, struct ompi_win_t *win)
474 {
475 return ompi_osc_pt2pt_unlock_internal (target, win);
476 }
477
478 int ompi_osc_pt2pt_lock_all(int assert, struct ompi_win_t *win)
479 {
480 return ompi_osc_pt2pt_lock_internal (MPI_LOCK_SHARED, -1, assert, win);
481 }
482
483
484 int ompi_osc_pt2pt_unlock_all (struct ompi_win_t *win)
485 {
486 return ompi_osc_pt2pt_unlock_internal (-1, win);
487 }
488
489
490 int ompi_osc_pt2pt_sync (struct ompi_win_t *win)
491 {
492 opal_progress();
493 return OMPI_SUCCESS;
494 }
495
496 static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock,
497 int target)
498 {
499 int ret;
500 int my_rank = ompi_comm_rank (module->comm);
501
502
503
504 ompi_osc_pt2pt_sync_wait_expected (lock);
505
506 if (-1 == target) {
507
508 for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
509 if (i == my_rank) {
510 continue;
511 }
512
513 ret = ompi_osc_pt2pt_flush_remote (module, i, lock);
514 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
515 return ret;
516 }
517 }
518 } else {
519
520 ret = ompi_osc_pt2pt_flush_remote (module, target, lock);
521 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
522 return ret;
523 }
524 }
525
526
527 ompi_osc_pt2pt_sync_wait_expected (lock);
528 opal_condition_broadcast (&module->cond);
529
530 return OMPI_SUCCESS;
531 }
532
533 int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win)
534 {
535 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
536 ompi_osc_pt2pt_sync_t *lock;
537 int ret;
538
539 assert (0 <= target);
540
541
542 if (!module->passive_target_access_epoch) {
543 return OMPI_ERR_RMA_SYNC;
544 }
545
546 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
547 "ompi_osc_pt2pt_flush starting..."));
548
549 if (ompi_comm_rank (module->comm) == target) {
550
551 opal_progress ();
552 return OMPI_SUCCESS;
553 }
554
555 OPAL_THREAD_LOCK(&module->lock);
556 lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL);
557 if (NULL == lock) {
558 if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) {
559 lock = &module->all_sync;
560 }
561 }
562 OPAL_THREAD_UNLOCK(&module->lock);
563 if (OPAL_UNLIKELY(NULL == lock)) {
564 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
565 "ompi_osc_pt2pt_flush: target %d is not locked in window %s",
566 target, win->w_name));
567 ret = OMPI_ERR_RMA_SYNC;
568 } else {
569 ret = ompi_osc_pt2pt_flush_lock (module, lock, target);
570 }
571
572 return ret;
573 }
574
575
576 int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win)
577 {
578 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
579 ompi_osc_pt2pt_sync_t *lock;
580 int target, ret;
581 void *node;
582
583
584 if (OPAL_UNLIKELY(!module->passive_target_access_epoch)) {
585 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
586 "ompi_osc_pt2pt_flush_all: no targets are locked in window %s",
587 win->w_name));
588 return OMPI_ERR_RMA_SYNC;
589 }
590
591 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
592 "ompi_osc_pt2pt_flush_all entering..."));
593
594
595 ret = opal_hash_table_get_first_key_uint32 (&module->outstanding_locks, (uint32_t *) &target,
596 (void **) &lock, &node);
597 if (OPAL_SUCCESS == ret) {
598 do {
599 ret = ompi_osc_pt2pt_flush_lock (module, lock, lock->sync.lock.target);
600 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
601 break;
602 }
603
604 ret = opal_hash_table_get_next_key_uint32 (&module->outstanding_locks, (uint32_t *) &target,
605 (void **) lock, node, &node);
606 if (OPAL_SUCCESS != ret) {
607 ret = OPAL_SUCCESS;
608 break;
609 }
610 } while (1);
611 }
612
613 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
614 "ompi_osc_pt2pt_flush_all complete"));
615
616 return ret;
617 }
618
619
620 int ompi_osc_pt2pt_flush_local (int target, struct ompi_win_t *win)
621 {
622 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
623 int ret;
624
625
626 if (!module->passive_target_access_epoch) {
627 return OMPI_ERR_RMA_SYNC;
628 }
629
630 ret = ompi_osc_pt2pt_frag_flush_target(module, target);
631 if (OMPI_SUCCESS != ret) {
632 return ret;
633 }
634
635
636 OPAL_THREAD_LOCK(&module->lock);
637 while (module->outgoing_frag_count < 0) {
638 opal_condition_wait(&module->cond, &module->lock);
639 }
640 OPAL_THREAD_UNLOCK(&module->lock);
641
642
643 opal_progress ();
644
645 return OMPI_SUCCESS;
646 }
647
648
649 int ompi_osc_pt2pt_flush_local_all (struct ompi_win_t *win)
650 {
651 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
652 int ret = OMPI_SUCCESS;
653
654
655 if (!module->passive_target_access_epoch) {
656 return OMPI_ERR_RMA_SYNC;
657 }
658
659 ret = ompi_osc_pt2pt_frag_flush_all(module);
660 if (OMPI_SUCCESS != ret) {
661 return ret;
662 }
663
664
665 OPAL_THREAD_LOCK(&module->lock);
666 while (module->outgoing_frag_count < 0) {
667 opal_condition_wait(&module->cond, &module->lock);
668 }
669 OPAL_THREAD_UNLOCK(&module->lock);
670
671
672 opal_progress ();
673
674 return OMPI_SUCCESS;
675 }
676
677
678
679 static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor,
680 uint64_t lock_ptr)
681 {
682 ompi_osc_pt2pt_sync_t *lock;
683
684 if (ompi_comm_rank (module->comm) != requestor) {
685 ompi_osc_pt2pt_header_lock_ack_t lock_ack;
686
687 lock_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK;
688 lock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
689 lock_ack.source = ompi_comm_rank(module->comm);
690 lock_ack.lock_ptr = lock_ptr;
691 OSC_PT2PT_HTON(&lock_ack, module, requestor);
692
693 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
694 "osc pt2pt: sending lock to %d", requestor));
695
696
697
698 return ompi_osc_pt2pt_control_send_unbuffered (module, requestor, &lock_ack, sizeof (lock_ack));
699 }
700
701
702 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
703 "osc pt2pt: releasing local lock"));
704
705 lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ptr;
706 if (OPAL_UNLIKELY(NULL == lock)) {
707 OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output,
708 "lock could not be located"));
709 }
710
711 ompi_osc_pt2pt_sync_expected (lock);
712
713 return OMPI_SUCCESS;
714 }
715
716
717
718
719 static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor,
720 int lock_type, uint64_t lock_ptr)
721 {
722 ompi_osc_pt2pt_pending_lock_t *pending =
723 OBJ_NEW(ompi_osc_pt2pt_pending_lock_t);
724 if (NULL == pending) {
725 return OMPI_ERR_OUT_OF_RESOURCE;
726 }
727
728 pending->peer = requestor;
729 pending->lock_type = lock_type;
730 pending->lock_ptr = lock_ptr;
731
732 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
733 "osc pt2pt: queueing lock request from %d", requestor));
734
735 OPAL_THREAD_SCOPED_LOCK(&module->locks_pending_lock, opal_list_append(&module->locks_pending, &pending->super));
736
737 return OMPI_SUCCESS;
738 }
739
740 static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type, uint64_t lock_ptr)
741 {
742 bool queue = false;
743
744 if (MPI_LOCK_SHARED == lock_type) {
745 int32_t lock_status = module->lock_status;
746
747 do {
748 if (lock_status < 0) {
749 queue = true;
750 break;
751 }
752
753 if (opal_atomic_compare_exchange_strong_32 (&module->lock_status, &lock_status, lock_status + 1)) {
754 break;
755 }
756 } while (1);
757 } else {
758 int32_t _tmp_value = 0;
759 queue = !opal_atomic_compare_exchange_strong_32 (&module->lock_status, &_tmp_value, -1);
760 }
761
762 if (queue) {
763 return false;
764 }
765
766 activate_lock(module, source, lock_ptr);
767
768
769 return true;
770 }
771
772 static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module) {
773
774 ompi_osc_pt2pt_pending_lock_t *pending_lock, *next;
775 int ret = OMPI_SUCCESS;
776
777 OPAL_THREAD_LOCK(&module->locks_pending_lock);
778 OPAL_LIST_FOREACH_SAFE(pending_lock, next, &module->locks_pending,
779 ompi_osc_pt2pt_pending_lock_t) {
780 bool acquired = ompi_osc_pt2pt_lock_try_acquire (module, pending_lock->peer, pending_lock->lock_type,
781 pending_lock->lock_ptr);
782 if (!acquired) {
783 break;
784 }
785
786 opal_list_remove_item (&module->locks_pending, &pending_lock->super);
787 OBJ_RELEASE(pending_lock);
788 }
789 OPAL_THREAD_UNLOCK(&module->locks_pending_lock);
790
791 return ret;
792 }
793
794
795
796
797
798 int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source,
799 ompi_osc_pt2pt_header_lock_t* lock_header)
800 {
801 bool acquired;
802
803 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
804 "ompi_osc_pt2pt_process_lock: processing lock request from %d. current lock state = %d",
805 source, module->lock_status));
806
807 acquired = ompi_osc_pt2pt_lock_try_acquire (module, source, lock_header->lock_type, lock_header->lock_ptr);
808
809 if (!acquired) {
810 queue_lock(module, source, lock_header->lock_type, lock_header->lock_ptr);
811 }
812
813 return OMPI_SUCCESS;
814 }
815
816
817
818
819 void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
820 ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header)
821 {
822 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, lock_ack_header->source);
823 ompi_osc_pt2pt_sync_t *lock;
824
825 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
826 "ompi_osc_pt2pt_process_lock_ack: processing lock ack from %d for lock %" PRIu64,
827 lock_ack_header->source, lock_ack_header->lock_ptr));
828
829 lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ack_header->lock_ptr;
830 assert (NULL != lock);
831
832 ompi_osc_pt2pt_peer_set_eager_active (peer, true);
833 ompi_osc_pt2pt_frag_flush_pending (module, peer->rank);
834
835 ompi_osc_pt2pt_sync_expected (lock);
836 }
837
838 void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int source,
839 ompi_osc_pt2pt_header_flush_ack_t *flush_ack_header) {
840 ompi_osc_pt2pt_sync_t *lock;
841
842 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
843 "ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock 0x%" PRIx64,
844 source, flush_ack_header->lock_ptr));
845
846 lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) flush_ack_header->lock_ptr;
847 assert (NULL != lock);
848
849 ompi_osc_pt2pt_sync_expected (lock);
850 }
851
852 void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int source,
853 ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header)
854 {
855 ompi_osc_pt2pt_sync_t *lock;
856
857 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
858 "ompi_osc_pt2pt_process_unlock_ack: processing unlock ack from %d",
859 source));
860
861
862 lock = (ompi_osc_pt2pt_sync_t *) (intptr_t) unlock_ack_header->lock_ptr;
863 assert (NULL != lock);
864
865 ompi_osc_pt2pt_sync_expected (lock);
866 }
867
868
869
870
871
872
873
874
875
876
877
878
879
880 int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source,
881 ompi_osc_pt2pt_header_unlock_t *unlock_header)
882 {
883 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
884 ompi_osc_pt2pt_header_unlock_ack_t unlock_ack;
885 int ret;
886
887 assert (NULL != peer);
888
889 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
890 "ompi_osc_pt2pt_process_unlock entering (passive_incoming_frag_count: %d)...",
891 peer->passive_incoming_frag_count));
892
893
894 if (0 != peer->passive_incoming_frag_count) {
895 return OMPI_ERR_WOULD_BLOCK;
896 }
897
898 unlock_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK;
899 unlock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
900 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT && OPAL_ENABLE_DEBUG
901 unlock_ack.padding[0] = 0;
902 unlock_ack.padding[1] = 0;
903 unlock_ack.padding[2] = 0;
904 unlock_ack.padding[3] = 0;
905 unlock_ack.padding[4] = 0;
906 unlock_ack.padding[5] = 0;
907 #endif
908 unlock_ack.lock_ptr = unlock_header->lock_ptr;
909 OSC_PT2PT_HTON(&unlock_ack, module, source);
910
911 ret = ompi_osc_pt2pt_control_send_unbuffered (module, source, &unlock_ack, sizeof (unlock_ack));
912 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
913 return ret;
914 }
915
916 if (-1 == module->lock_status) {
917 OPAL_THREAD_ADD_FETCH32(&module->lock_status, 1);
918 ompi_osc_pt2pt_activate_next_lock (module);
919 } else if (0 == OPAL_THREAD_ADD_FETCH32(&module->lock_status, -1)) {
920 ompi_osc_pt2pt_activate_next_lock (module);
921 }
922
923 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
924 "osc pt2pt: finished processing unlock fragment"));
925
926 return ret;
927 }
928
929 int ompi_osc_pt2pt_process_flush (ompi_osc_pt2pt_module_t *module, int source,
930 ompi_osc_pt2pt_header_flush_t *flush_header)
931 {
932 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
933 ompi_osc_pt2pt_header_flush_ack_t flush_ack;
934
935 assert (NULL != peer);
936
937 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
938 "ompi_osc_pt2pt_process_flush entering (passive_incoming_frag_count: %d)...",
939 peer->passive_incoming_frag_count));
940
941
942 if (0 != peer->passive_incoming_frag_count) {
943 return OMPI_ERR_WOULD_BLOCK;
944 }
945
946 flush_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK;
947 flush_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
948 flush_ack.lock_ptr = flush_header->lock_ptr;
949 OSC_PT2PT_HTON(&flush_ack, module, source);
950
951 return ompi_osc_pt2pt_control_send_unbuffered (module, source, &flush_ack, sizeof (flush_ack));
952 }