This source file includes following definitions.
- ompi_request_default_wait
- ompi_request_default_wait_any
- ompi_request_default_wait_all
- ompi_request_default_wait_some
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 #include "ompi_config.h"
  29 #include "ompi/constants.h"
  30 #include "ompi/request/request.h"
  31 #include "ompi/request/request_default.h"
  32 #include "ompi/request/grequest.h"
  33 
  34 #include "ompi/mca/crcp/crcp.h"
  35 
  36 int ompi_request_default_wait(
  37     ompi_request_t ** req_ptr,
  38     ompi_status_public_t * status)
  39 {
  40     ompi_request_t *req = *req_ptr;
  41 
  42     ompi_request_wait_completion(req);
  43 
  44     OMPI_CRCP_REQUEST_COMPLETE(req);
  45 
  46     
  47 
  48 
  49     if (OMPI_REQUEST_GEN == req->req_type) {
  50         ompi_grequest_invoke_query(req, &req->req_status);
  51     }
  52     if( MPI_STATUS_IGNORE != status ) {
  53         
  54 
  55         status->MPI_TAG    = req->req_status.MPI_TAG;
  56         status->MPI_SOURCE = req->req_status.MPI_SOURCE;
  57         status->_ucount    = req->req_status._ucount;
  58         status->_cancelled = req->req_status._cancelled;
  59     }
  60     if( req->req_persistent ) {
  61         if( req->req_state == OMPI_REQUEST_INACTIVE ) {
  62             if (MPI_STATUS_IGNORE != status) {
  63                 *status = ompi_status_empty;
  64             }
  65             return OMPI_SUCCESS;
  66         }
  67         req->req_state = OMPI_REQUEST_INACTIVE;
  68         return req->req_status.MPI_ERROR;
  69     }
  70 
  71     
  72 
  73     if (MPI_SUCCESS != req->req_status.MPI_ERROR) {
  74         return req->req_status.MPI_ERROR;
  75     }
  76 
  77     
  78 
  79 
  80     return ompi_request_free(req_ptr);
  81 }
  82 
  83 
  84 int ompi_request_default_wait_any(size_t count,
  85                                   ompi_request_t ** requests,
  86                                   int *index,
  87                                   ompi_status_public_t * status)
  88 {
  89     size_t i, completed = count, num_requests_null_inactive = 0;
  90     int rc = OMPI_SUCCESS;
  91     ompi_request_t *request=NULL;
  92     ompi_wait_sync_t sync;
  93 
  94     if (OPAL_UNLIKELY(0 == count)) {
  95         *index = MPI_UNDEFINED;
  96         return OMPI_SUCCESS;
  97     }
  98 
  99     WAIT_SYNC_INIT(&sync, 1);
 100 
 101     num_requests_null_inactive = 0;
 102     for (i = 0; i < count; i++) {
 103         void *_tmp_ptr = REQUEST_PENDING;
 104 
 105         request = requests[i];
 106 
 107         
 108 
 109 
 110         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 111             num_requests_null_inactive++;
 112             continue;
 113         }
 114 
 115         if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync) ) {
 116             assert(REQUEST_COMPLETE(request));
 117             completed = i;
 118             *index = i;
 119             goto after_sync_wait;
 120         }
 121     }
 122 
 123     if(num_requests_null_inactive == count) {
 124         *index = MPI_UNDEFINED;
 125         if (MPI_STATUS_IGNORE != status) {
 126             *status = ompi_status_empty;
 127         }
 128         
 129         WAIT_SYNC_RELEASE_NOWAIT(&sync);
 130         return rc;
 131     }
 132 
 133     SYNC_WAIT(&sync);
 134 
 135   after_sync_wait:
 136     
 137 
 138 
 139 
 140     for(i = completed-1; (i+1) > 0; i--) {
 141         void *tmp_ptr = &sync;
 142 
 143         request = requests[i];
 144 
 145         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 146             continue;
 147         }
 148         
 149 
 150 
 151 
 152 
 153         if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &tmp_ptr, REQUEST_PENDING) ) {
 154             *index = i;
 155         }
 156     }
 157 
 158     if( *index == (int)completed ) {
 159         
 160 
 161 
 162 
 163         WAIT_SYNC_SIGNALLED(&sync);
 164     }
 165 
 166     request = requests[*index];
 167     assert( REQUEST_COMPLETE(request) );
 168 #if OPAL_ENABLE_FT_CR == 1
 169     if( opal_cr_is_enabled ) {
 170         OMPI_CRCP_REQUEST_COMPLETE(request);
 171     }
 172 #endif
 173     
 174 
 175     if (OMPI_REQUEST_GEN == request->req_type) {
 176         rc = ompi_grequest_invoke_query(request, &request->req_status);
 177     }
 178     if (MPI_STATUS_IGNORE != status) {
 179         
 180 
 181         int old_error = status->MPI_ERROR;
 182         *status = request->req_status;
 183         status->MPI_ERROR = old_error;
 184     }
 185     rc = request->req_status.MPI_ERROR;
 186     if( request->req_persistent ) {
 187         request->req_state = OMPI_REQUEST_INACTIVE;
 188     } else if (MPI_SUCCESS == rc) {
 189         
 190         
 191 
 192 
 193         rc = ompi_request_free(&requests[*index]);
 194     }
 195 
 196     WAIT_SYNC_RELEASE(&sync);
 197     return rc;
 198 }
 199 
 200 
 201 int ompi_request_default_wait_all( size_t count,
 202                                    ompi_request_t ** requests,
 203                                    ompi_status_public_t * statuses )
 204 {
 205     size_t i, completed = 0, failed = 0;
 206     ompi_request_t **rptr;
 207     ompi_request_t *request;
 208     int mpi_error = OMPI_SUCCESS;
 209     ompi_wait_sync_t sync;
 210 
 211     if (OPAL_UNLIKELY(0 == count)) {
 212         return OMPI_SUCCESS;
 213     }
 214 
 215     WAIT_SYNC_INIT(&sync, count);
 216     rptr = requests;
 217     for (i = 0; i < count; i++) {
 218         void *_tmp_ptr = REQUEST_PENDING;
 219 
 220         request = *rptr++;
 221 
 222         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 223             completed++;
 224             continue;
 225         }
 226 
 227         if (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync)) {
 228             if( OPAL_UNLIKELY( MPI_SUCCESS != request->req_status.MPI_ERROR ) ) {
 229                 failed++;
 230             }
 231             completed++;
 232         }
 233     }
 234     if( failed > 0 ) {
 235         goto finish;
 236     }
 237 
 238     if( 0 != completed ) {
 239         wait_sync_update(&sync, completed, OPAL_SUCCESS);
 240     }
 241 
 242     
 243     mpi_error = SYNC_WAIT(&sync);
 244     if( OPAL_SUCCESS != mpi_error ) {
 245         
 246 
 247         failed++;
 248     }
 249 
 250  finish:
 251     rptr = requests;
 252     if (MPI_STATUSES_IGNORE != statuses) {
 253         
 254         for( i = 0; i < count; i++, rptr++ ) {
 255             void *_tmp_ptr = &sync;
 256 
 257             request = *rptr;
 258 
 259             if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 260                 statuses[i] = ompi_status_empty;
 261                 continue;
 262             }
 263 
 264             if( OPAL_UNLIKELY(0 < failed) ) {
 265                 
 266 
 267 
 268 
 269 
 270 
 271                 if( OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING ) ) {
 272                     
 273 
 274 
 275 
 276 
 277 
 278                     statuses[i].MPI_ERROR = MPI_ERR_PENDING;
 279                     mpi_error = MPI_ERR_IN_STATUS;
 280                     continue;
 281                 }
 282             }
 283             assert( REQUEST_COMPLETE(request) );
 284 
 285             if( opal_cr_is_enabled) {
 286                 OMPI_CRCP_REQUEST_COMPLETE(request);
 287             }
 288 
 289             if (OMPI_REQUEST_GEN == request->req_type) {
 290                 ompi_grequest_invoke_query(request, &request->req_status);
 291             }
 292 
 293             statuses[i] = request->req_status;
 294 
 295             if( request->req_persistent ) {
 296                 request->req_state = OMPI_REQUEST_INACTIVE;
 297                 continue;
 298             }
 299             
 300             if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
 301                 
 302 
 303 
 304                 int tmp = ompi_request_free(rptr);
 305                 if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
 306                     mpi_error = tmp;
 307                 }
 308             }
 309             if( statuses[i].MPI_ERROR != OMPI_SUCCESS) {
 310                 mpi_error = MPI_ERR_IN_STATUS;
 311             }
 312         }
 313     } else {
 314         int rc;
 315         
 316         for( i = 0; i < count; i++, rptr++ ) {
 317             void *_tmp_ptr = &sync;
 318 
 319             request = *rptr;
 320 
 321             if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 322                 rc = ompi_status_empty.MPI_ERROR;
 323                 goto absorb_error_and_continue;
 324             }
 325             
 326 
 327 
 328 
 329             if( OPAL_UNLIKELY(0 < failed) ) {
 330                 
 331 
 332 
 333                  if( OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING ) ) {
 334                     
 335 
 336 
 337 
 338 
 339 
 340                     rc = MPI_ERR_PENDING;
 341                     goto absorb_error_and_continue;
 342                  }
 343             }
 344             assert( REQUEST_COMPLETE(request) );
 345 
 346             if( opal_cr_is_enabled) {
 347                 OMPI_CRCP_REQUEST_COMPLETE(request);
 348             }
 349 
 350             
 351 
 352             if (OMPI_REQUEST_GEN == request->req_type) {
 353                 rc = ompi_grequest_invoke_query(request, &request->req_status);
 354             }
 355 
 356             rc = request->req_status.MPI_ERROR;
 357 
 358             if( request->req_persistent ) {
 359                 request->req_state = OMPI_REQUEST_INACTIVE;
 360             } else if (MPI_SUCCESS == rc) {
 361                 
 362                 int tmp = ompi_request_free(rptr);
 363                 if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
 364                     mpi_error = tmp;
 365                 }
 366             }
 367     absorb_error_and_continue:
 368             
 369 
 370 
 371 
 372 
 373 
 374 
 375             if( OMPI_SUCCESS == mpi_error && rc != OMPI_SUCCESS) {
 376                 mpi_error = MPI_ERR_IN_STATUS;
 377             }
 378         }
 379     }
 380     WAIT_SYNC_RELEASE(&sync);
 381     return mpi_error;
 382 }
 383 
 384 
 385 int ompi_request_default_wait_some(size_t count,
 386                                    ompi_request_t ** requests,
 387                                    int * outcount,
 388                                    int * indices,
 389                                    ompi_status_public_t * statuses)
 390 {
 391     size_t num_requests_null_inactive, num_requests_done, num_active_reqs;
 392     int rc = MPI_SUCCESS;
 393     ompi_request_t **rptr = NULL;
 394     ompi_request_t *request = NULL;
 395     ompi_wait_sync_t sync;
 396     size_t sync_sets = 0, sync_unsets = 0;
 397 
 398     if (OPAL_UNLIKELY(0 == count)) {
 399         *outcount = MPI_UNDEFINED;
 400         return OMPI_SUCCESS;
 401     }
 402 
 403     WAIT_SYNC_INIT(&sync, 1);
 404 
 405     *outcount = 0;
 406 
 407     rptr = requests;
 408     num_requests_null_inactive = 0;
 409     num_requests_done = 0;
 410     num_active_reqs = 0;
 411     for (size_t i = 0; i < count; i++, rptr++) {
 412         void *_tmp_ptr = REQUEST_PENDING;
 413 
 414         request = *rptr;
 415         
 416 
 417 
 418 
 419         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 420             num_requests_null_inactive++;
 421             continue;
 422         }
 423         indices[num_active_reqs] = OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync);
 424         if( !indices[num_active_reqs] ) {
 425             
 426             assert( REQUEST_COMPLETE(request) );
 427             num_requests_done++;
 428         }
 429         num_active_reqs++;
 430     }
 431 
 432     if(num_requests_null_inactive == count) {
 433         *outcount = MPI_UNDEFINED;
 434         
 435         WAIT_SYNC_RELEASE_NOWAIT(&sync);
 436         return rc;
 437     }
 438 
 439     sync_sets = num_active_reqs - num_requests_done;
 440     if( 0 == num_requests_done ) {
 441         
 442         SYNC_WAIT(&sync);
 443     }
 444 
 445     
 446     
 447 
 448     rptr = requests;
 449     num_requests_done = 0;
 450     num_active_reqs = 0;
 451     for (size_t i = 0; i < count; i++, rptr++) {
 452         void *_tmp_ptr = &sync;
 453 
 454         request = *rptr;
 455 
 456         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 457             continue;
 458         }
 459         
 460 
 461 
 462 
 463 
 464 
 465 
 466 
 467 
 468 
 469 
 470 
 471 
 472         if( !indices[num_active_reqs] ) {
 473             indices[num_requests_done++] = i;
 474         } else if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING) ) {
 475             indices[num_requests_done++] = i;
 476         }
 477         num_active_reqs++;
 478     }
 479     sync_unsets = num_active_reqs - num_requests_done;
 480 
 481     if( sync_sets == sync_unsets ){
 482         
 483 
 484 
 485         WAIT_SYNC_SIGNALLED(&sync);
 486     }
 487 
 488     WAIT_SYNC_RELEASE(&sync);
 489 
 490     *outcount = num_requests_done;
 491 
 492     for (size_t i = 0; i < num_requests_done; i++) {
 493         request = requests[indices[i]];
 494         assert( REQUEST_COMPLETE(request) );
 495 
 496 #if OPAL_ENABLE_FT_CR == 1
 497         if( opal_cr_is_enabled) {
 498             OMPI_CRCP_REQUEST_COMPLETE(request);
 499         }
 500 #endif
 501 
 502         
 503 
 504         if (OMPI_REQUEST_GEN == request->req_type) {
 505             ompi_grequest_invoke_query(request, &request->req_status);
 506         }
 507         if (MPI_STATUSES_IGNORE != statuses) {
 508             statuses[i] = request->req_status;
 509         }
 510 
 511         if (MPI_SUCCESS != request->req_status.MPI_ERROR) {
 512             rc = MPI_ERR_IN_STATUS;
 513         }
 514 
 515         if( request->req_persistent ) {
 516             request->req_state = OMPI_REQUEST_INACTIVE;
 517         } else {
 518             
 519             if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
 520                 int tmp;
 521                 tmp = ompi_request_free(&(requests[indices[i]]));
 522                 if (OMPI_SUCCESS != tmp) {
 523                     return tmp;
 524                 }
 525             }
 526         }
 527     }
 528 
 529     return rc;
 530 }