This source file includes following definitions.
- compare_ranks
- ompi_osc_sm_group_ranks
- ompi_osc_sm_fence
- ompi_osc_sm_start
- ompi_osc_sm_complete
- ompi_osc_sm_post
- ompi_osc_sm_wait
- ompi_osc_sm_test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include "ompi_config.h"
17
18 #include "opal/sys/atomic.h"
19 #include "ompi/mca/osc/osc.h"
20 #include "ompi/mca/osc/base/base.h"
21 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
22
23 #include "osc_sm.h"
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 static int compare_ranks (const void *ptra, const void *ptrb)
39 {
40 int a = *((int *) ptra);
41 int b = *((int *) ptrb);
42
43 if (a < b) {
44 return -1;
45 } else if (a > b) {
46 return 1;
47 }
48
49 return 0;
50 }
51
52
53
54
55
56
57
58
59
60
61
62
63 static int *ompi_osc_sm_group_ranks (ompi_group_t *group, ompi_group_t *sub_group)
64 {
65 int size = ompi_group_size(sub_group);
66 int *ranks1, *ranks2;
67 int ret;
68
69 ranks1 = calloc (size, sizeof(int));
70 ranks2 = calloc (size, sizeof(int));
71 if (NULL == ranks1 || NULL == ranks2) {
72 free (ranks1);
73 free (ranks2);
74 return NULL;
75 }
76
77 for (int i = 0 ; i < size ; ++i) {
78 ranks1[i] = i;
79 }
80
81 ret = ompi_group_translate_ranks (sub_group, size, ranks1, group, ranks2);
82 free (ranks1);
83 if (OMPI_SUCCESS != ret) {
84 free (ranks2);
85 return NULL;
86 }
87
88 qsort (ranks2, size, sizeof (int), compare_ranks);
89
90 return ranks2;
91 }
92
93
94 int
95 ompi_osc_sm_fence(int assert, struct ompi_win_t *win)
96 {
97 ompi_osc_sm_module_t *module =
98 (ompi_osc_sm_module_t*) win->w_osc_module;
99
100
101 opal_atomic_mb();
102
103 if (module->global_state->use_barrier_for_fence) {
104 return module->comm->c_coll->coll_barrier(module->comm,
105 module->comm->c_coll->coll_barrier_module);
106 } else {
107 module->my_sense = !module->my_sense;
108 pthread_mutex_lock(&module->global_state->mtx);
109 module->global_state->count--;
110 if (module->global_state->count == 0) {
111 module->global_state->count = ompi_comm_size(module->comm);
112 module->global_state->sense = module->my_sense;
113 pthread_cond_broadcast(&module->global_state->cond);
114 } else {
115 while (module->global_state->sense != module->my_sense) {
116 pthread_cond_wait(&module->global_state->cond, &module->global_state->mtx);
117 }
118 }
119 pthread_mutex_unlock(&module->global_state->mtx);
120
121 return OMPI_SUCCESS;
122 }
123 }
124
125 int
126 ompi_osc_sm_start(struct ompi_group_t *group,
127 int assert,
128 struct ompi_win_t *win)
129 {
130 ompi_osc_sm_module_t *module =
131 (ompi_osc_sm_module_t*) win->w_osc_module;
132 int my_rank = ompi_comm_rank (module->comm);
133 void *_tmp_ptr = NULL;
134
135 OBJ_RETAIN(group);
136
137 if (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&module->start_group, (void *) &_tmp_ptr, group)) {
138 OBJ_RELEASE(group);
139 return OMPI_ERR_RMA_SYNC;
140 }
141
142 if (0 == (assert & MPI_MODE_NOCHECK)) {
143 int size;
144
145 int *ranks = ompi_osc_sm_group_ranks (module->comm->c_local_group, group);
146 if (NULL == ranks) {
147 return OMPI_ERR_OUT_OF_RESOURCE;
148 }
149
150 size = ompi_group_size(module->start_group);
151
152 for (int i = 0 ; i < size ; ++i) {
153 int rank_byte = ranks[i] >> OSC_SM_POST_BITS;
154 osc_sm_post_type_t rank_bit = ((osc_sm_post_type_t) 1) << (ranks[i] & 0x3f);
155
156
157 while (!(module->posts[my_rank][rank_byte] & rank_bit)) {
158 opal_progress();
159 opal_atomic_mb();
160 }
161
162 opal_atomic_rmb ();
163
164 #if OPAL_HAVE_ATOMIC_MATH_64
165 (void) opal_atomic_fetch_xor_64 ((opal_atomic_int64_t *) module->posts[my_rank] + rank_byte, rank_bit);
166 #else
167 (void) opal_atomic_fetch_xor_32 ((opal_atomic_int32_t *) module->posts[my_rank] + rank_byte, rank_bit);
168 #endif
169 }
170
171 free (ranks);
172 }
173
174 opal_atomic_mb();
175 return OMPI_SUCCESS;
176 }
177
178
179 int
180 ompi_osc_sm_complete(struct ompi_win_t *win)
181 {
182 ompi_osc_sm_module_t *module =
183 (ompi_osc_sm_module_t*) win->w_osc_module;
184 ompi_group_t *group;
185 int gsize;
186
187
188 opal_atomic_mb();
189
190 group = module->start_group;
191 if (NULL == group || !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR((opal_atomic_intptr_t *) &module->start_group, (opal_atomic_intptr_t *) &group, 0)) {
192 return OMPI_ERR_RMA_SYNC;
193 }
194
195 opal_atomic_mb();
196
197 int *ranks = ompi_osc_sm_group_ranks (module->comm->c_local_group, group);
198 if (NULL == ranks) {
199 return OMPI_ERR_OUT_OF_RESOURCE;
200 }
201
202 gsize = ompi_group_size(group);
203 for (int i = 0 ; i < gsize ; ++i) {
204 (void) opal_atomic_add_fetch_32(&module->node_states[ranks[i]].complete_count, 1);
205 }
206
207 free (ranks);
208
209 OBJ_RELEASE(group);
210
211 opal_atomic_mb();
212 return OMPI_SUCCESS;
213 }
214
215
216 int
217 ompi_osc_sm_post(struct ompi_group_t *group,
218 int assert,
219 struct ompi_win_t *win)
220 {
221 ompi_osc_sm_module_t *module =
222 (ompi_osc_sm_module_t*) win->w_osc_module;
223 int my_rank = ompi_comm_rank (module->comm);
224 int my_byte = my_rank >> 6;
225 uint64_t my_bit = ((uint64_t) 1) << (my_rank & 0x3f);
226 int gsize;
227
228 OPAL_THREAD_LOCK(&module->lock);
229
230 if (NULL != module->post_group) {
231 OPAL_THREAD_UNLOCK(&module->lock);
232 return OMPI_ERR_RMA_SYNC;
233 }
234
235 module->post_group = group;
236
237 OBJ_RETAIN(group);
238
239 if (0 == (assert & MPI_MODE_NOCHECK)) {
240 int *ranks = ompi_osc_sm_group_ranks (module->comm->c_local_group, group);
241 if (NULL == ranks) {
242 return OMPI_ERR_OUT_OF_RESOURCE;
243 }
244
245 module->my_node_state->complete_count = 0;
246 opal_atomic_mb();
247
248 gsize = ompi_group_size(module->post_group);
249 for (int i = 0 ; i < gsize ; ++i) {
250 #if OPAL_HAVE_ATOMIC_MATH_64
251 (void) opal_atomic_fetch_add_64 ((opal_atomic_int64_t *) module->posts[ranks[i]] + my_byte, my_bit);
252 #else
253 (void) opal_atomic_fetch_add_32 ((opal_atomic_int32_t *) module->posts[ranks[i]] + my_byte, my_bit);
254 #endif
255 }
256
257 opal_atomic_wmb ();
258
259 free (ranks);
260
261 opal_progress ();
262 }
263
264 OPAL_THREAD_UNLOCK(&module->lock);
265
266 return OMPI_SUCCESS;
267 }
268
269
270 int
271 ompi_osc_sm_wait(struct ompi_win_t *win)
272 {
273 ompi_osc_sm_module_t *module =
274 (ompi_osc_sm_module_t*) win->w_osc_module;
275 ompi_group_t *group;
276
277 OPAL_THREAD_LOCK(&module->lock);
278
279 if (NULL == module->post_group) {
280 OPAL_THREAD_UNLOCK(&module->lock);
281 return OMPI_ERR_RMA_SYNC;
282 }
283
284 group = module->post_group;
285
286 int size = ompi_group_size (group);
287
288 while (module->my_node_state->complete_count != size) {
289 opal_progress();
290 opal_atomic_mb();
291 }
292
293 OBJ_RELEASE(group);
294 module->post_group = NULL;
295
296 OPAL_THREAD_UNLOCK(&module->lock);
297
298
299 opal_atomic_mb();
300
301 return OMPI_SUCCESS;
302 }
303
304
305 int
306 ompi_osc_sm_test(struct ompi_win_t *win,
307 int *flag)
308 {
309 ompi_osc_sm_module_t *module =
310 (ompi_osc_sm_module_t*) win->w_osc_module;
311
312 OPAL_THREAD_LOCK(&module->lock);
313
314 if (NULL == module->post_group) {
315 OPAL_THREAD_UNLOCK(&module->lock);
316 return OMPI_ERR_RMA_SYNC;
317 }
318
319 int size = ompi_group_size(module->post_group);
320
321 if (module->my_node_state->complete_count == size) {
322 OBJ_RELEASE(module->post_group);
323 module->post_group = NULL;
324 *flag = 1;
325 } else {
326 *flag = 0;
327 }
328
329 OPAL_THREAD_UNLOCK(&module->lock);
330
331
332 opal_atomic_mb();
333
334 return OMPI_SUCCESS;
335 }