root/ompi/request/grequestx.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. grequestx_progress
  2. ompi_grequestx_start
  3. ompi_grequestx_class_create
  4. ompi_grequestx_class_allocate

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2016 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2012 Cisco Systems, Inc.  All rights reserved.
  13  * Copyright (c) 2009      Sun Microsystems, Inc.  All rights reserved.
  14  * Copyright (c) 2018      Research Organization for Information Science
  15  *                         and Technology (RIST).  All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "ompi_config.h"
  24 #include "ompi/communicator/communicator.h"
  25 #include "ompi/request/grequest.h"
  26 #include "ompi/mpi/fortran/base/fint_2_int.h"
  27 #include "ompi/request/grequestx.h"
  28 
  29 static bool requests_initialized = false;
  30 static opal_list_t requests;
  31 static opal_atomic_int32_t active_requests = 0;
  32 static bool in_progress = false;
  33 static opal_mutex_t lock;
  34 
  35 static int grequestx_progress(void) {
  36     ompi_grequest_t *request, *next;
  37 
  38     OPAL_THREAD_LOCK(&lock);
  39     if (!in_progress) {
  40         in_progress = true;
  41 
  42         OPAL_LIST_FOREACH_SAFE(request, next, &requests, ompi_grequest_t) {
  43             MPI_Status status;
  44             OPAL_THREAD_UNLOCK(&lock);
  45             request->greq_poll.c_poll(request->greq_state, &status);
  46             if (REQUEST_COMPLETE(&request->greq_base)) {
  47                 OPAL_THREAD_LOCK(&lock);
  48                 opal_list_remove_item(&requests, &request->greq_base.super.super);
  49                 OPAL_THREAD_UNLOCK(&lock);
  50             }
  51             OPAL_THREAD_LOCK(&lock);
  52         }
  53     }
  54     OPAL_THREAD_UNLOCK(&lock);
  55 
  56     return OMPI_SUCCESS;
  57 }
  58 
  59 int ompi_grequestx_start(
  60     MPI_Grequest_query_function *gquery_fn,
  61     MPI_Grequest_free_function *gfree_fn,
  62     MPI_Grequest_cancel_function *gcancel_fn,
  63     ompi_grequestx_poll_function *gpoll_fn,
  64     void* extra_state,
  65     ompi_request_t** request)
  66 {
  67     int rc;
  68 
  69     rc = ompi_grequest_start(gquery_fn, gfree_fn, gcancel_fn, extra_state, request);
  70     if (OMPI_SUCCESS != rc) {
  71         return rc;
  72     }
  73     ((ompi_grequest_t *)*request)->greq_poll.c_poll = gpoll_fn;
  74 
  75     if (!requests_initialized) {
  76         OBJ_CONSTRUCT(&requests, opal_list_t);
  77         OBJ_CONSTRUCT(&lock, opal_mutex_t);
  78         requests_initialized = true;
  79     }
  80 
  81     OPAL_THREAD_LOCK(&lock);
  82     opal_list_append(&requests, &((*request)->super.super));
  83     OPAL_THREAD_UNLOCK(&lock);
  84     int32_t tmp = OPAL_THREAD_ADD_FETCH32(&active_requests, 1);
  85     if (1 == tmp) {
  86         opal_progress_register(grequestx_progress);
  87     }
  88 
  89     return OMPI_SUCCESS;
  90 }
  91 
  92 
  93 struct grequestx_class {
  94     opal_object_t super;
  95     MPI_Grequest_query_function *gquery_fn;
  96     MPI_Grequest_free_function *gfree_fn;
  97     MPI_Grequest_cancel_function *gcancel_fn;
  98     ompi_grequestx_poll_function *gpoll_fn;
  99     ompi_grequestx_wait_function *gwait_fn;
 100 } ;
 101 
 102 typedef struct grequestx_class grequestx_class;
 103 
 104 static int next_class = 0;
 105 
 106 static OBJ_CLASS_INSTANCE(grequestx_class, opal_object_t, NULL, NULL);
 107 
 108 static opal_pointer_array_t classes;
 109 
 110 int ompi_grequestx_class_create(
 111     MPI_Grequest_query_function *gquery_fn,
 112     MPI_Grequest_free_function *gfree_fn,
 113     MPI_Grequest_cancel_function *gcancel_fn,
 114     ompi_grequestx_poll_function *gpoll_fn,
 115     ompi_grequestx_wait_function *gwait_fn,
 116     ompi_grequestx_class *greq_class)
 117 {
 118     grequestx_class * class = OBJ_NEW(grequestx_class);
 119     class->gquery_fn = gquery_fn;
 120     class->gfree_fn = gfree_fn;
 121     class->gcancel_fn = gcancel_fn;
 122     class->gpoll_fn = gpoll_fn;
 123     class->gwait_fn = gwait_fn;
 124 
 125     if (0 == next_class) {
 126         OBJ_CONSTRUCT(&classes, opal_pointer_array_t);
 127     }
 128     opal_pointer_array_add(&classes, class);
 129     next_class ++;
 130 
 131     return OMPI_SUCCESS;
 132 }
 133 
 134 int ompi_grequestx_class_allocate(
 135     ompi_grequestx_class greq_class,
 136     void *extra_state,
 137     ompi_request_t **request)
 138 {
 139     grequestx_class *class = opal_pointer_array_get_item(&classes, greq_class);
 140     ompi_grequestx_start(class->gquery_fn, class->gfree_fn, class->gcancel_fn, class->gpoll_fn, extra_state, request);
 141 
 142     return OMPI_SUCCESS;
 143 }

/* [<][>][^][v][top][bottom][index][help] */