root/ompi/request/req_wait.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_request_default_wait
  2. ompi_request_default_wait_any
  3. ompi_request_default_wait_all
  4. ompi_request_default_wait_some

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2018 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2006-2008 Cisco Systems, Inc.  All rights reserved.
  14  * Copyright (c) 2010-2012 Oracle and/or its affiliates.  All rights reserved.
  15  * Copyright (c) 2012      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2016-2017 Los Alamos National Security, LLC. All rights
  17  *                         reserved.
  18  * Copyright (c) 2016      Mellanox Technologies. All rights reserved.
  19  * Copyright (c) 2016      Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * $COPYRIGHT$
  22  *
  23  * Additional copyrights may follow
  24  *
  25  * $HEADER$
  26  */
  27 
  28 #include "ompi_config.h"
  29 #include "ompi/constants.h"
  30 #include "ompi/request/request.h"
  31 #include "ompi/request/request_default.h"
  32 #include "ompi/request/grequest.h"
  33 
  34 #include "ompi/mca/crcp/crcp.h"
  35 
  36 int ompi_request_default_wait(
  37     ompi_request_t ** req_ptr,
  38     ompi_status_public_t * status)
  39 {
  40     ompi_request_t *req = *req_ptr;
  41 
  42     ompi_request_wait_completion(req);
  43 
  44     OMPI_CRCP_REQUEST_COMPLETE(req);
  45 
  46     /* return status.  If it's a generalized request, we *have* to
  47        invoke the query_fn, even if the user procided STATUS_IGNORE.
  48        MPI-2:8.2. */
  49     if (OMPI_REQUEST_GEN == req->req_type) {
  50         ompi_grequest_invoke_query(req, &req->req_status);
  51     }
  52     if( MPI_STATUS_IGNORE != status ) {
  53         /* Do *NOT* set status->MPI_ERROR here!  See MPI-1.1 doc, sec
  54            3.2.5, p.22 */
  55         status->MPI_TAG    = req->req_status.MPI_TAG;
  56         status->MPI_SOURCE = req->req_status.MPI_SOURCE;
  57         status->_ucount    = req->req_status._ucount;
  58         status->_cancelled = req->req_status._cancelled;
  59     }
  60     if( req->req_persistent ) {
  61         if( req->req_state == OMPI_REQUEST_INACTIVE ) {
  62             if (MPI_STATUS_IGNORE != status) {
  63                 *status = ompi_status_empty;
  64             }
  65             return OMPI_SUCCESS;
  66         }
  67         req->req_state = OMPI_REQUEST_INACTIVE;
  68         return req->req_status.MPI_ERROR;
  69     }
  70 
  71     /* If there was an error, don't free the request -- just return
  72        the single error. */
  73     if (MPI_SUCCESS != req->req_status.MPI_ERROR) {
  74         return req->req_status.MPI_ERROR;
  75     }
  76 
  77     /* If there's an error while freeing the request, assume that the
  78        request is still there.  Otherwise, Bad Things will happen
  79        later! */
  80     return ompi_request_free(req_ptr);
  81 }
  82 
  83 
  84 int ompi_request_default_wait_any(size_t count,
  85                                   ompi_request_t ** requests,
  86                                   int *index,
  87                                   ompi_status_public_t * status)
  88 {
  89     size_t i, completed = count, num_requests_null_inactive = 0;
  90     int rc = OMPI_SUCCESS;
  91     ompi_request_t *request=NULL;
  92     ompi_wait_sync_t sync;
  93 
  94     if (OPAL_UNLIKELY(0 == count)) {
  95         *index = MPI_UNDEFINED;
  96         return OMPI_SUCCESS;
  97     }
  98 
  99     WAIT_SYNC_INIT(&sync, 1);
 100 
 101     num_requests_null_inactive = 0;
 102     for (i = 0; i < count; i++) {
 103         void *_tmp_ptr = REQUEST_PENDING;
 104 
 105         request = requests[i];
 106 
 107         /* Check for null or completed persistent request. For
 108          * MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
 109          */
 110         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 111             num_requests_null_inactive++;
 112             continue;
 113         }
 114 
 115         if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync) ) {
 116             assert(REQUEST_COMPLETE(request));
 117             completed = i;
 118             *index = i;
 119             goto after_sync_wait;
 120         }
 121     }
 122 
 123     if(num_requests_null_inactive == count) {
 124         *index = MPI_UNDEFINED;
 125         if (MPI_STATUS_IGNORE != status) {
 126             *status = ompi_status_empty;
 127         }
 128         /* No signal-in-flight can be in this case */
 129         WAIT_SYNC_RELEASE_NOWAIT(&sync);
 130         return rc;
 131     }
 132 
 133     SYNC_WAIT(&sync);
 134 
 135   after_sync_wait:
 136     /* recheck the complete status and clean up the sync primitives.
 137      * Do it backward to return the earliest complete request to the
 138      * user.
 139      */
 140     for(i = completed-1; (i+1) > 0; i--) {
 141         void *tmp_ptr = &sync;
 142 
 143         request = requests[i];
 144 
 145         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 146             continue;
 147         }
 148         /* Atomically mark the request as pending. If this succeed then
 149          * the request was not completed, and it is now marked as pending.
 150          * Otherwise, the request has been completed meanwhile, and it
 151          * has been atomically marked as REQUEST_COMPLETE.
 152          */
 153         if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &tmp_ptr, REQUEST_PENDING) ) {
 154             *index = i;
 155         }
 156     }
 157 
 158     if( *index == (int)completed ) {
 159         /* Only one request has triggered. There was no in-flight
 160          * completions. Drop the signalled flag so we won't block
 161          * in WAIT_SYNC_RELEASE 
 162          */
 163         WAIT_SYNC_SIGNALLED(&sync);
 164     }
 165 
 166     request = requests[*index];
 167     assert( REQUEST_COMPLETE(request) );
 168 #if OPAL_ENABLE_FT_CR == 1
 169     if( opal_cr_is_enabled ) {
 170         OMPI_CRCP_REQUEST_COMPLETE(request);
 171     }
 172 #endif
 173     /* Per note above, we have to call gen request query_fn even
 174        if STATUS_IGNORE was provided */
 175     if (OMPI_REQUEST_GEN == request->req_type) {
 176         rc = ompi_grequest_invoke_query(request, &request->req_status);
 177     }
 178     if (MPI_STATUS_IGNORE != status) {
 179         /* Do *NOT* set status->MPI_ERROR here!  See MPI-1.1 doc,
 180            sec 3.2.5, p.22 */
 181         int old_error = status->MPI_ERROR;
 182         *status = request->req_status;
 183         status->MPI_ERROR = old_error;
 184     }
 185     rc = request->req_status.MPI_ERROR;
 186     if( request->req_persistent ) {
 187         request->req_state = OMPI_REQUEST_INACTIVE;
 188     } else if (MPI_SUCCESS == rc) {
 189         /* Only free the request if there is no error on it */
 190         /* If there's an error while freeing the request,
 191            assume that the request is still there.  Otherwise,
 192            Bad Things will happen later! */
 193         rc = ompi_request_free(&requests[*index]);
 194     }
 195 
 196     WAIT_SYNC_RELEASE(&sync);
 197     return rc;
 198 }
 199 
 200 
 201 int ompi_request_default_wait_all( size_t count,
 202                                    ompi_request_t ** requests,
 203                                    ompi_status_public_t * statuses )
 204 {
 205     size_t i, completed = 0, failed = 0;
 206     ompi_request_t **rptr;
 207     ompi_request_t *request;
 208     int mpi_error = OMPI_SUCCESS;
 209     ompi_wait_sync_t sync;
 210 
 211     if (OPAL_UNLIKELY(0 == count)) {
 212         return OMPI_SUCCESS;
 213     }
 214 
 215     WAIT_SYNC_INIT(&sync, count);
 216     rptr = requests;
 217     for (i = 0; i < count; i++) {
 218         void *_tmp_ptr = REQUEST_PENDING;
 219 
 220         request = *rptr++;
 221 
 222         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 223             completed++;
 224             continue;
 225         }
 226 
 227         if (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync)) {
 228             if( OPAL_UNLIKELY( MPI_SUCCESS != request->req_status.MPI_ERROR ) ) {
 229                 failed++;
 230             }
 231             completed++;
 232         }
 233     }
 234     if( failed > 0 ) {
 235         goto finish;
 236     }
 237 
 238     if( 0 != completed ) {
 239         wait_sync_update(&sync, completed, OPAL_SUCCESS);
 240     }
 241 
 242     /* wait until all requests complete or until an error is triggered. */
 243     mpi_error = SYNC_WAIT(&sync);
 244     if( OPAL_SUCCESS != mpi_error ) {
 245         /* if we are in an error case, increase the failed to ensure
 246            proper cleanup during the requests completion. */
 247         failed++;
 248     }
 249 
 250  finish:
 251     rptr = requests;
 252     if (MPI_STATUSES_IGNORE != statuses) {
 253         /* fill out status and free request if required */
 254         for( i = 0; i < count; i++, rptr++ ) {
 255             void *_tmp_ptr = &sync;
 256 
 257             request = *rptr;
 258 
 259             if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 260                 statuses[i] = ompi_status_empty;
 261                 continue;
 262             }
 263 
 264             if( OPAL_UNLIKELY(0 < failed) ) {
 265                 /* if we have failed requests we skipped the waiting on the sync. Thus,
 266                  * some of the requests might not be properly completed, in which case
 267                  * we must detach all requests from the sync. However, if we can succesfully
 268                  * mark the request as pending then it is neither failed nor complete, and
 269                  * we must stop altering it.
 270                  */
 271                 if( OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING ) ) {
 272                     /*
 273                      * Per MPI 2.2 p 60:
 274                      * Allows requests to be marked as MPI_ERR_PENDING if they are
 275                      * "neither failed nor completed." Which can only happen if
 276                      * there was an error in one of the other requests.
 277                      */
 278                     statuses[i].MPI_ERROR = MPI_ERR_PENDING;
 279                     mpi_error = MPI_ERR_IN_STATUS;
 280                     continue;
 281                 }
 282             }
 283             assert( REQUEST_COMPLETE(request) );
 284 
 285             if( opal_cr_is_enabled) {
 286                 OMPI_CRCP_REQUEST_COMPLETE(request);
 287             }
 288 
 289             if (OMPI_REQUEST_GEN == request->req_type) {
 290                 ompi_grequest_invoke_query(request, &request->req_status);
 291             }
 292 
 293             statuses[i] = request->req_status;
 294 
 295             if( request->req_persistent ) {
 296                 request->req_state = OMPI_REQUEST_INACTIVE;
 297                 continue;
 298             }
 299             /* Only free the request if there is no error on it */
 300             if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
 301                 /* If there's an error while freeing the request,
 302                    assume that the request is still there.
 303                    Otherwise, Bad Things will happen later! */
 304                 int tmp = ompi_request_free(rptr);
 305                 if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
 306                     mpi_error = tmp;
 307                 }
 308             }
 309             if( statuses[i].MPI_ERROR != OMPI_SUCCESS) {
 310                 mpi_error = MPI_ERR_IN_STATUS;
 311             }
 312         }
 313     } else {
 314         int rc;
 315         /* free request if required */
 316         for( i = 0; i < count; i++, rptr++ ) {
 317             void *_tmp_ptr = &sync;
 318 
 319             request = *rptr;
 320 
 321             if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 322                 rc = ompi_status_empty.MPI_ERROR;
 323                 goto absorb_error_and_continue;
 324             }
 325             /*
 326              * Assert only if no requests were failed.
 327              * Since some may still be pending.
 328              */
 329             if( OPAL_UNLIKELY(0 < failed) ) {
 330                 /* If the request is still pending due to a failed request
 331                  * then skip it in this loop.
 332                  */
 333                  if( OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING ) ) {
 334                     /*
 335                      * Per MPI 2.2 p 60:
 336                      * Allows requests to be marked as MPI_ERR_PENDING if they are
 337                      * "neither failed nor completed." Which can only happen if
 338                      * there was an error in one of the other requests.
 339                      */
 340                     rc = MPI_ERR_PENDING;
 341                     goto absorb_error_and_continue;
 342                  }
 343             }
 344             assert( REQUEST_COMPLETE(request) );
 345 
 346             if( opal_cr_is_enabled) {
 347                 OMPI_CRCP_REQUEST_COMPLETE(request);
 348             }
 349 
 350             /* Per note above, we have to call gen request query_fn
 351                even if STATUSES_IGNORE was provided */
 352             if (OMPI_REQUEST_GEN == request->req_type) {
 353                 rc = ompi_grequest_invoke_query(request, &request->req_status);
 354             }
 355 
 356             rc = request->req_status.MPI_ERROR;
 357 
 358             if( request->req_persistent ) {
 359                 request->req_state = OMPI_REQUEST_INACTIVE;
 360             } else if (MPI_SUCCESS == rc) {
 361                 /* Only free the request if there is no error on it */
 362                 int tmp = ompi_request_free(rptr);
 363                 if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
 364                     mpi_error = tmp;
 365                 }
 366             }
 367     absorb_error_and_continue:
 368             /*
 369              * Per MPI 2.2 p34:
 370              * "It is possible for an MPI function to return MPI_ERR_IN_STATUS
 371              *  even when MPI_STATUS_IGNORE or MPI_STATUSES_IGNORE has been
 372              *  passed to that function."
 373              * So we should do so here as well.
 374              */
 375             if( OMPI_SUCCESS == mpi_error && rc != OMPI_SUCCESS) {
 376                 mpi_error = MPI_ERR_IN_STATUS;
 377             }
 378         }
 379     }
 380     WAIT_SYNC_RELEASE(&sync);
 381     return mpi_error;
 382 }
 383 
 384 
 385 int ompi_request_default_wait_some(size_t count,
 386                                    ompi_request_t ** requests,
 387                                    int * outcount,
 388                                    int * indices,
 389                                    ompi_status_public_t * statuses)
 390 {
 391     size_t num_requests_null_inactive, num_requests_done, num_active_reqs;
 392     int rc = MPI_SUCCESS;
 393     ompi_request_t **rptr = NULL;
 394     ompi_request_t *request = NULL;
 395     ompi_wait_sync_t sync;
 396     size_t sync_sets = 0, sync_unsets = 0;
 397 
 398     if (OPAL_UNLIKELY(0 == count)) {
 399         *outcount = MPI_UNDEFINED;
 400         return OMPI_SUCCESS;
 401     }
 402 
 403     WAIT_SYNC_INIT(&sync, 1);
 404 
 405     *outcount = 0;
 406 
 407     rptr = requests;
 408     num_requests_null_inactive = 0;
 409     num_requests_done = 0;
 410     num_active_reqs = 0;
 411     for (size_t i = 0; i < count; i++, rptr++) {
 412         void *_tmp_ptr = REQUEST_PENDING;
 413 
 414         request = *rptr;
 415         /*
 416          * Check for null or completed persistent request.
 417          * For MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
 418          */
 419         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 420             num_requests_null_inactive++;
 421             continue;
 422         }
 423         indices[num_active_reqs] = OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, &sync);
 424         if( !indices[num_active_reqs] ) {
 425             /* If the request is completed go ahead and mark it as such */
 426             assert( REQUEST_COMPLETE(request) );
 427             num_requests_done++;
 428         }
 429         num_active_reqs++;
 430     }
 431 
 432     if(num_requests_null_inactive == count) {
 433         *outcount = MPI_UNDEFINED;
 434         /* nobody will signall us */
 435         WAIT_SYNC_RELEASE_NOWAIT(&sync);
 436         return rc;
 437     }
 438 
 439     sync_sets = num_active_reqs - num_requests_done;
 440     if( 0 == num_requests_done ) {
 441         /* One completed request is enough to satisfy the some condition */
 442         SYNC_WAIT(&sync);
 443     }
 444 
 445     /* Do the final counting and */
 446     /* Clean up the synchronization primitives */
 447 
 448     rptr = requests;
 449     num_requests_done = 0;
 450     num_active_reqs = 0;
 451     for (size_t i = 0; i < count; i++, rptr++) {
 452         void *_tmp_ptr = &sync;
 453 
 454         request = *rptr;
 455 
 456         if( request->req_state == OMPI_REQUEST_INACTIVE ) {
 457             continue;
 458         }
 459         /* Here we have 3 possibilities:
 460          * a) request was found completed in the first loop
 461          *    => ( indices[i] == 0 )
 462          * b) request was completed between first loop and this check
 463          *    => ( indices[i] == 1 ) and we can NOT atomically mark the 
 464          *    request as pending.
 465          * c) request wasn't finished yet
 466          *    => ( indices[i] == 1 ) and we CAN  atomically mark the 
 467          *    request as pending.
 468          * NOTE that in any case (i >= num_requests_done) as latter grows
 469          * either slowly (in case of partial completion)
 470          * OR in parallel with `i` (in case of full set completion)  
 471          */
 472         if( !indices[num_active_reqs] ) {
 473             indices[num_requests_done++] = i;
 474         } else if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_PENDING) ) {
 475             indices[num_requests_done++] = i;
 476         }
 477         num_active_reqs++;
 478     }
 479     sync_unsets = num_active_reqs - num_requests_done;
 480 
 481     if( sync_sets == sync_unsets ){
 482         /* nobody knows about us,
 483          * set signa-in-progress flag to false
 484          */
 485         WAIT_SYNC_SIGNALLED(&sync);
 486     }
 487 
 488     WAIT_SYNC_RELEASE(&sync);
 489 
 490     *outcount = num_requests_done;
 491 
 492     for (size_t i = 0; i < num_requests_done; i++) {
 493         request = requests[indices[i]];
 494         assert( REQUEST_COMPLETE(request) );
 495 
 496 #if OPAL_ENABLE_FT_CR == 1
 497         if( opal_cr_is_enabled) {
 498             OMPI_CRCP_REQUEST_COMPLETE(request);
 499         }
 500 #endif
 501 
 502         /* Per note above, we have to call gen request query_fn even
 503            if STATUS_IGNORE was provided */
 504         if (OMPI_REQUEST_GEN == request->req_type) {
 505             ompi_grequest_invoke_query(request, &request->req_status);
 506         }
 507         if (MPI_STATUSES_IGNORE != statuses) {
 508             statuses[i] = request->req_status;
 509         }
 510 
 511         if (MPI_SUCCESS != request->req_status.MPI_ERROR) {
 512             rc = MPI_ERR_IN_STATUS;
 513         }
 514 
 515         if( request->req_persistent ) {
 516             request->req_state = OMPI_REQUEST_INACTIVE;
 517         } else {
 518             /* Only free the request if there was no error */
 519             if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
 520                 int tmp;
 521                 tmp = ompi_request_free(&(requests[indices[i]]));
 522                 if (OMPI_SUCCESS != tmp) {
 523                     return tmp;
 524                 }
 525             }
 526         }
 527     }
 528 
 529     return rc;
 530 }

/* [<][>][^][v][top][bottom][index][help] */