This source file includes following definitions.
- ompi_request_default_wait
- ompi_request_default_wait_any
- ompi_request_default_wait_all
- ompi_request_default_wait_some
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 #include "ompi_config.h"
29 #include "ompi/constants.h"
30 #include "ompi/request/request.h"
31 #include "ompi/request/request_default.h"
32 #include "ompi/request/grequest.h"
33
34 #include "ompi/mca/crcp/crcp.h"
35
36 int ompi_request_default_wait(
37 ompi_request_t ** req_ptr,
38 ompi_status_public_t * status)
39 {
40 ompi_request_t *req = *req_ptr;
41
42 ompi_request_wait_completion(req);
43
44 OMPI_CRCP_REQUEST_COMPLETE(req);
45
46
47
48
49 if (OMPI_REQUEST_GEN == req->req_type) {
50 ompi_grequest_invoke_query(req, &req->req_status);
51 }
52 if( MPI_STATUS_IGNORE != status ) {
53
54
55 status->MPI_TAG = req->req_status.MPI_TAG;
56 status->MPI_SOURCE = req->req_status.MPI_SOURCE;
57 status->_ucount = req->req_status._ucount;
58 status->_cancelled = req->req_status._cancelled;
59 }
60 if( req->req_persistent ) {
61 if( req->req_state == OMPI_REQUEST_INACTIVE ) {
62 if (MPI_STATUS_IGNORE != status) {
63 *status = ompi_status_empty;
64 }
65 return OMPI_SUCCESS;
66 }
67 req->req_state = OMPI_REQUEST_INACTIVE;
68 return req->req_status.MPI_ERROR;
69 }
70
71
72
73 if (MPI_SUCCESS != req->req_status.MPI_ERROR) {
74 return req->req_status.MPI_ERROR;
75 }
76
77
78
79
80 return ompi_request_free(req_ptr);
81 }
82
83
84 int ompi_request_default_wait_any(size_t count,
85 ompi_request_t ** requests,
86 int *index,
87 ompi_status_public_t * status)
88 {
89 size_t i, completed = count, num_requests_null_inactive = 0;
90 int rc = OMPI_SUCCESS;
91 ompi_request_t *request=NULL;
92 ompi_wait_sync_t sync;
93
94 if (OPAL_UNLIKELY(0 == count)) {
95 *index = MPI_UNDEFINED;
96 return OMPI_SUCCESS;
97 }
98
99 WAIT_SYNC_INIT(&sync, 1);
100
101 num_requests_null_inactive = 0;
102 for (i = 0; i < count; i++) {
103 void *_tmp_ptr = REQUEST_PENDING;
104
105 request = requests[i];
106
107
108
109
110 if( request->req_state == OMPI_REQUEST_INACTIVE ) {
111 num_requests_null_inactive++;
112 continue;
113 }
114
115 if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync) ) {
116 assert(REQUEST_COMPLETE(request));
117 completed = i;
118 *index = i;
119 goto after_sync_wait;
120 }
121 }
122
123 if(num_requests_null_inactive == count) {
124 *index = MPI_UNDEFINED;
125 if (MPI_STATUS_IGNORE != status) {
126 *status = ompi_status_empty;
127 }
128
129 WAIT_SYNC_RELEASE_NOWAIT(&sync);
130 return rc;
131 }
132
133 SYNC_WAIT(&sync);
134
135 after_sync_wait:
136
137
138
139
140 for(i = completed-1; (i+1) > 0; i--) {
141 void *tmp_ptr = &sync;
142
143 request = requests[i];
144
145 if( request->req_state == OMPI_REQUEST_INACTIVE ) {
146 continue;
147 }
148
149
150
151
152
153 if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &tmp_ptr, REQUEST_PENDING) ) {
154 *index = i;
155 }
156 }
157
158 if( *index == (int)completed ) {
159
160
161
162
163 WAIT_SYNC_SIGNALLED(&sync);
164 }
165
166 request = requests[*index];
167 assert( REQUEST_COMPLETE(request) );
168 #if OPAL_ENABLE_FT_CR == 1
169 if( opal_cr_is_enabled ) {
170 OMPI_CRCP_REQUEST_COMPLETE(request);
171 }
172 #endif
173
174
175 if (OMPI_REQUEST_GEN == request->req_type) {
176 rc = ompi_grequest_invoke_query(request, &request->req_status);
177 }
178 if (MPI_STATUS_IGNORE != status) {
179
180
181 int old_error = status->MPI_ERROR;
182 *status = request->req_status;
183 status->MPI_ERROR = old_error;
184 }
185 rc = request->req_status.MPI_ERROR;
186 if( request->req_persistent ) {
187 request->req_state = OMPI_REQUEST_INACTIVE;
188 } else if (MPI_SUCCESS == rc) {
189
190
191
192
193 rc = ompi_request_free(&requests[*index]);
194 }
195
196 WAIT_SYNC_RELEASE(&sync);
197 return rc;
198 }
199
200
201 int ompi_request_default_wait_all( size_t count,
202 ompi_request_t ** requests,
203 ompi_status_public_t * statuses )
204 {
205 size_t i, completed = 0, failed = 0;
206 ompi_request_t **rptr;
207 ompi_request_t *request;
208 int mpi_error = OMPI_SUCCESS;
209 ompi_wait_sync_t sync;
210
211 if (OPAL_UNLIKELY(0 == count)) {
212 return OMPI_SUCCESS;
213 }
214
215 WAIT_SYNC_INIT(&sync, count);
216 rptr = requests;
217 for (i = 0; i < count; i++) {
218 void *_tmp_ptr = REQUEST_PENDING;
219
220 request = *rptr++;
221
222 if( request->req_state == OMPI_REQUEST_INACTIVE ) {
223 completed++;
224 continue;
225 }
226
227 if (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync)) {
228 if( OPAL_UNLIKELY( MPI_SUCCESS != request->req_status.MPI_ERROR ) ) {
229 failed++;
230 }
231 completed++;
232 }
233 }
234 if( failed > 0 ) {
235 goto finish;
236 }
237
238 if( 0 != completed ) {
239 wait_sync_update(&sync, completed, OPAL_SUCCESS);
240 }
241
242
243 mpi_error = SYNC_WAIT(&sync);
244 if( OPAL_SUCCESS != mpi_error ) {
245
246
247 failed++;
248 }
249
250 finish:
251 rptr = requests;
252 if (MPI_STATUSES_IGNORE != statuses) {
253
254 for( i = 0; i < count; i++, rptr++ ) {
255 void *_tmp_ptr = &sync;
256
257 request = *rptr;
258
259 if( request->req_state == OMPI_REQUEST_INACTIVE ) {
260 statuses[i] = ompi_status_empty;
261 continue;
262 }
263
264 if( OPAL_UNLIKELY(0 < failed) ) {
265
266
267
268
269
270
271 if( OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING ) ) {
272
273
274
275
276
277
278 statuses[i].MPI_ERROR = MPI_ERR_PENDING;
279 mpi_error = MPI_ERR_IN_STATUS;
280 continue;
281 }
282 }
283 assert( REQUEST_COMPLETE(request) );
284
285 if( opal_cr_is_enabled) {
286 OMPI_CRCP_REQUEST_COMPLETE(request);
287 }
288
289 if (OMPI_REQUEST_GEN == request->req_type) {
290 ompi_grequest_invoke_query(request, &request->req_status);
291 }
292
293 statuses[i] = request->req_status;
294
295 if( request->req_persistent ) {
296 request->req_state = OMPI_REQUEST_INACTIVE;
297 continue;
298 }
299
300 if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
301
302
303
304 int tmp = ompi_request_free(rptr);
305 if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
306 mpi_error = tmp;
307 }
308 }
309 if( statuses[i].MPI_ERROR != OMPI_SUCCESS) {
310 mpi_error = MPI_ERR_IN_STATUS;
311 }
312 }
313 } else {
314 int rc;
315
316 for( i = 0; i < count; i++, rptr++ ) {
317 void *_tmp_ptr = &sync;
318
319 request = *rptr;
320
321 if( request->req_state == OMPI_REQUEST_INACTIVE ) {
322 rc = ompi_status_empty.MPI_ERROR;
323 goto absorb_error_and_continue;
324 }
325
326
327
328
329 if( OPAL_UNLIKELY(0 < failed) ) {
330
331
332
333 if( OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING ) ) {
334
335
336
337
338
339
340 rc = MPI_ERR_PENDING;
341 goto absorb_error_and_continue;
342 }
343 }
344 assert( REQUEST_COMPLETE(request) );
345
346 if( opal_cr_is_enabled) {
347 OMPI_CRCP_REQUEST_COMPLETE(request);
348 }
349
350
351
352 if (OMPI_REQUEST_GEN == request->req_type) {
353 rc = ompi_grequest_invoke_query(request, &request->req_status);
354 }
355
356 rc = request->req_status.MPI_ERROR;
357
358 if( request->req_persistent ) {
359 request->req_state = OMPI_REQUEST_INACTIVE;
360 } else if (MPI_SUCCESS == rc) {
361
362 int tmp = ompi_request_free(rptr);
363 if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
364 mpi_error = tmp;
365 }
366 }
367 absorb_error_and_continue:
368
369
370
371
372
373
374
375 if( OMPI_SUCCESS == mpi_error && rc != OMPI_SUCCESS) {
376 mpi_error = MPI_ERR_IN_STATUS;
377 }
378 }
379 }
380 WAIT_SYNC_RELEASE(&sync);
381 return mpi_error;
382 }
383
384
385 int ompi_request_default_wait_some(size_t count,
386 ompi_request_t ** requests,
387 int * outcount,
388 int * indices,
389 ompi_status_public_t * statuses)
390 {
391 size_t num_requests_null_inactive, num_requests_done, num_active_reqs;
392 int rc = MPI_SUCCESS;
393 ompi_request_t **rptr = NULL;
394 ompi_request_t *request = NULL;
395 ompi_wait_sync_t sync;
396 size_t sync_sets = 0, sync_unsets = 0;
397
398 if (OPAL_UNLIKELY(0 == count)) {
399 *outcount = MPI_UNDEFINED;
400 return OMPI_SUCCESS;
401 }
402
403 WAIT_SYNC_INIT(&sync, 1);
404
405 *outcount = 0;
406
407 rptr = requests;
408 num_requests_null_inactive = 0;
409 num_requests_done = 0;
410 num_active_reqs = 0;
411 for (size_t i = 0; i < count; i++, rptr++) {
412 void *_tmp_ptr = REQUEST_PENDING;
413
414 request = *rptr;
415
416
417
418
419 if( request->req_state == OMPI_REQUEST_INACTIVE ) {
420 num_requests_null_inactive++;
421 continue;
422 }
423 indices[num_active_reqs] = OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync);
424 if( !indices[num_active_reqs] ) {
425
426 assert( REQUEST_COMPLETE(request) );
427 num_requests_done++;
428 }
429 num_active_reqs++;
430 }
431
432 if(num_requests_null_inactive == count) {
433 *outcount = MPI_UNDEFINED;
434
435 WAIT_SYNC_RELEASE_NOWAIT(&sync);
436 return rc;
437 }
438
439 sync_sets = num_active_reqs - num_requests_done;
440 if( 0 == num_requests_done ) {
441
442 SYNC_WAIT(&sync);
443 }
444
445
446
447
448 rptr = requests;
449 num_requests_done = 0;
450 num_active_reqs = 0;
451 for (size_t i = 0; i < count; i++, rptr++) {
452 void *_tmp_ptr = &sync;
453
454 request = *rptr;
455
456 if( request->req_state == OMPI_REQUEST_INACTIVE ) {
457 continue;
458 }
459
460
461
462
463
464
465
466
467
468
469
470
471
472 if( !indices[num_active_reqs] ) {
473 indices[num_requests_done++] = i;
474 } else if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING) ) {
475 indices[num_requests_done++] = i;
476 }
477 num_active_reqs++;
478 }
479 sync_unsets = num_active_reqs - num_requests_done;
480
481 if( sync_sets == sync_unsets ){
482
483
484
485 WAIT_SYNC_SIGNALLED(&sync);
486 }
487
488 WAIT_SYNC_RELEASE(&sync);
489
490 *outcount = num_requests_done;
491
492 for (size_t i = 0; i < num_requests_done; i++) {
493 request = requests[indices[i]];
494 assert( REQUEST_COMPLETE(request) );
495
496 #if OPAL_ENABLE_FT_CR == 1
497 if( opal_cr_is_enabled) {
498 OMPI_CRCP_REQUEST_COMPLETE(request);
499 }
500 #endif
501
502
503
504 if (OMPI_REQUEST_GEN == request->req_type) {
505 ompi_grequest_invoke_query(request, &request->req_status);
506 }
507 if (MPI_STATUSES_IGNORE != statuses) {
508 statuses[i] = request->req_status;
509 }
510
511 if (MPI_SUCCESS != request->req_status.MPI_ERROR) {
512 rc = MPI_ERR_IN_STATUS;
513 }
514
515 if( request->req_persistent ) {
516 request->req_state = OMPI_REQUEST_INACTIVE;
517 } else {
518
519 if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
520 int tmp;
521 tmp = ompi_request_free(&(requests[indices[i]]));
522 if (OMPI_SUCCESS != tmp) {
523 return tmp;
524 }
525 }
526 }
527 }
528
529 return rc;
530 }