root/ompi/communicator/comm_request.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_comm_request_init
  2. ompi_comm_request_fini
  3. ompi_comm_request_schedule_append
  4. ompi_comm_request_progress
  5. ompi_comm_request_start
  6. ompi_comm_request_cancel
  7. ompi_comm_request_free
  8. ompi_comm_request_construct
  9. ompi_comm_request_destruct
  10. ompi_comm_request_get
  11. ompi_comm_request_return

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2013-2018 Los Alamos National Security, LLC.  All rights
   4  *                         reseved.
   5  * Copyright (c) 2015      Research Organization for Information Science
   6  *                         and Technology (RIST). All rights reserved.
   7  * Copyright (c) 2004-2016 The University of Tennessee and The University
   8  *                         of Tennessee Research Foundation.  All rights
   9  *                         reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include "comm_request.h"
  19 
  20 #include "opal/class/opal_free_list.h"
  21 #include "opal/include/opal/sys/atomic.h"
  22 
  23 static opal_free_list_t ompi_comm_requests;
  24 static opal_list_t ompi_comm_requests_active;
  25 static opal_mutex_t ompi_comm_request_mutex;
  26 bool ompi_comm_request_progress_active = false;
  27 bool ompi_comm_request_initialized = false;
  28 
  29 typedef struct ompi_comm_request_item_t {
  30     opal_list_item_t super;
  31     ompi_comm_request_callback_fn_t callback;
  32     ompi_request_t *subreqs[OMPI_COMM_REQUEST_MAX_SUBREQ];
  33     int subreq_count;
  34 } ompi_comm_request_item_t;
  35 OBJ_CLASS_DECLARATION(ompi_comm_request_item_t);
  36 
  37 static int ompi_comm_request_progress (void);
  38 
  39 void ompi_comm_request_init (void)
  40 {
  41     OBJ_CONSTRUCT(&ompi_comm_requests, opal_free_list_t);
  42     (void) opal_free_list_init (&ompi_comm_requests, sizeof (ompi_comm_request_t), 8,
  43                                 OBJ_CLASS(ompi_comm_request_t), 0, 0, 0, -1, 8,
  44                                 NULL, 0, NULL, NULL, NULL);
  45 
  46     OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t);
  47     ompi_comm_request_progress_active = false;
  48     OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t);
  49     ompi_comm_request_initialized = true;
  50 }
  51 
  52 void ompi_comm_request_fini (void)
  53 {
  54     if (!ompi_comm_request_initialized) {
  55         return;
  56     }
  57 
  58     ompi_comm_request_initialized = false;
  59 
  60     opal_mutex_lock (&ompi_comm_request_mutex);
  61     if (ompi_comm_request_progress_active) {
  62         opal_progress_unregister (ompi_comm_request_progress);
  63     }
  64     opal_mutex_unlock (&ompi_comm_request_mutex);
  65     OBJ_DESTRUCT(&ompi_comm_request_mutex);
  66     OBJ_DESTRUCT(&ompi_comm_requests_active);
  67     OBJ_DESTRUCT(&ompi_comm_requests);
  68 }
  69 
  70 
  71 int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
  72                             ompi_request_t *subreqs[], int subreq_count)
  73 {
  74     ompi_comm_request_item_t *request_item;
  75     int i;
  76 
  77     if (subreq_count > OMPI_COMM_REQUEST_MAX_SUBREQ) {
  78         return OMPI_ERR_BAD_PARAM;
  79     }
  80 
  81     request_item = OBJ_NEW(ompi_comm_request_item_t);
  82     if (NULL == request_item) {
  83         return OMPI_ERR_OUT_OF_RESOURCE;
  84     }
  85 
  86     request_item->callback = callback;
  87 
  88     for (i = 0 ; i < subreq_count ; ++i) {
  89         request_item->subreqs[i] = subreqs[i];
  90     }
  91 
  92     request_item->subreq_count = subreq_count;
  93 
  94     opal_list_append (&request->schedule, &request_item->super);
  95 
  96     return OMPI_SUCCESS;
  97 }
  98 
  99 static int ompi_comm_request_progress (void)
 100 {
 101     ompi_comm_request_t *request, *next;
 102     static opal_atomic_int32_t progressing = 0;
 103 
 104     /* don't allow re-entry */
 105     if (opal_atomic_swap_32 (&progressing, 1)) {
 106         return 0;
 107     }
 108 
 109     opal_mutex_lock (&ompi_comm_request_mutex);
 110 
 111     OPAL_LIST_FOREACH_SAFE(request, next, &ompi_comm_requests_active, ompi_comm_request_t) {
 112         int rc = OMPI_SUCCESS;
 113 
 114         if (opal_list_get_size (&request->schedule)) {
 115             ompi_comm_request_item_t *request_item = (ompi_comm_request_item_t *) opal_list_remove_first (&request->schedule);
 116             int item_complete = true;
 117 
 118             /* don't call ompi_request_test_all as it causes a recursive call into opal_progress */
 119             while (request_item->subreq_count) {
 120                 ompi_request_t *subreq = request_item->subreqs[request_item->subreq_count-1];
 121                 if( REQUEST_COMPLETE(subreq) ) {
 122                     if (OMPI_SUCCESS != subreq->req_status.MPI_ERROR) {
 123                         /* Let it continue but mark it as failed, so
 124                          * that it does some subreqs cleanup */
 125                         request->super.req_status.MPI_ERROR = subreq->req_status.MPI_ERROR;
 126                     }
 127                     ompi_request_free (&subreq);
 128                     request_item->subreq_count--;
 129                 } else {
 130                     item_complete = false;
 131                     break;
 132                 }
 133             }
 134 
 135             if (item_complete) {
 136                 if (request_item->callback) {
 137                     opal_mutex_unlock (&ompi_comm_request_mutex);
 138                     /* the callback should check for errors in the request
 139                      * status. */
 140                     rc = request_item->callback (request);
 141                     opal_mutex_lock (&ompi_comm_request_mutex);
 142                 }
 143                 OBJ_RELEASE(request_item);
 144             } else {
 145                 opal_list_prepend (&request->schedule, &request_item->super);
 146             }
 147         }
 148 
 149         /* if the request schedule is empty then the request is complete */
 150         if (0 == opal_list_get_size (&request->schedule)) {
 151             opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
 152             request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : rc;
 153             ompi_request_complete (&request->super, true);
 154         }
 155     }
 156 
 157     if (0 == opal_list_get_size (&ompi_comm_requests_active)) {
 158         /* no more active requests. disable this progress function */
 159         ompi_comm_request_progress_active = false;
 160         opal_progress_unregister (ompi_comm_request_progress);
 161     }
 162 
 163     opal_mutex_unlock (&ompi_comm_request_mutex);
 164     progressing = 0;
 165 
 166     return 1;
 167 }
 168 
 169 void ompi_comm_request_start (ompi_comm_request_t *request)
 170 {
 171     opal_mutex_lock (&ompi_comm_request_mutex);
 172     opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
 173 
 174     /* check if we need to start the communicator request progress function */
 175     if (!ompi_comm_request_progress_active) {
 176         opal_progress_register (ompi_comm_request_progress);
 177         ompi_comm_request_progress_active = true;
 178     }
 179 
 180     request->super.req_state = OMPI_REQUEST_ACTIVE;
 181     request->super.req_status.MPI_ERROR = OMPI_SUCCESS;
 182 
 183     opal_mutex_unlock (&ompi_comm_request_mutex);
 184 }
 185 
 186 static int ompi_comm_request_cancel (struct ompi_request_t *ompi_req, int complete)
 187 {
 188     ompi_comm_request_t *tmp, *request = (ompi_comm_request_t *) ompi_req;
 189     ompi_comm_request_item_t *item, *next;
 190 
 191     opal_mutex_lock (&ompi_comm_request_mutex);
 192 
 193     OPAL_LIST_FOREACH_SAFE(item, next, &request->schedule, ompi_comm_request_item_t) {
 194         for (int i = 0 ; i < item->subreq_count ; ++i) {
 195             ompi_request_cancel (item->subreqs[i]);
 196         }
 197 
 198         opal_list_remove_item (&request->schedule, &item->super);
 199         OBJ_RELEASE(item);
 200     }
 201 
 202     /* remove the request for the list of active requests */
 203     OPAL_LIST_FOREACH(tmp, &ompi_comm_requests_active, ompi_comm_request_t) {
 204         if (tmp == request) {
 205             opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
 206             break;
 207         }
 208     }
 209 
 210     opal_mutex_unlock (&ompi_comm_request_mutex);
 211 
 212     return MPI_ERR_REQUEST;
 213 }
 214 
 215 static int ompi_comm_request_free (struct ompi_request_t **ompi_req)
 216 {
 217     ompi_comm_request_t *request = (ompi_comm_request_t *) *ompi_req;
 218 
 219     if( !REQUEST_COMPLETE(*ompi_req) ) {
 220         return MPI_ERR_REQUEST;
 221     }
 222 
 223     OMPI_REQUEST_FINI(*ompi_req);
 224     ompi_comm_request_return (request);
 225 
 226     *ompi_req = MPI_REQUEST_NULL;
 227 
 228     return OMPI_SUCCESS;
 229 }
 230 
 231 static void ompi_comm_request_construct (ompi_comm_request_t *request)
 232 {
 233     request->context = NULL;
 234 
 235     request->super.req_type = OMPI_REQUEST_COMM;
 236     request->super.req_status._cancelled = 0;
 237     request->super.req_free = ompi_comm_request_free;
 238     request->super.req_cancel = ompi_comm_request_cancel;
 239 
 240     OBJ_CONSTRUCT(&request->schedule, opal_list_t);
 241 }
 242 
 243 static void ompi_comm_request_destruct (ompi_comm_request_t *request)
 244 {
 245     OBJ_DESTRUCT(&request->schedule);
 246 }
 247 
 248 OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t,
 249                    ompi_comm_request_construct,
 250                    ompi_comm_request_destruct);
 251 
 252 OBJ_CLASS_INSTANCE(ompi_comm_request_item_t, opal_list_item_t, NULL, NULL);
 253 
 254 ompi_comm_request_t *ompi_comm_request_get (void)
 255 {
 256     opal_free_list_item_t *item;
 257 
 258     item = opal_free_list_get (&ompi_comm_requests);
 259     if (OPAL_UNLIKELY(NULL == item)) {
 260         return NULL;
 261     }
 262 
 263     OMPI_REQUEST_INIT((ompi_request_t *) item, false);
 264 
 265     return (ompi_comm_request_t *) item;
 266 }
 267 
 268 void ompi_comm_request_return (ompi_comm_request_t *request)
 269 {
 270     if (request->context) {
 271         OBJ_RELEASE (request->context);
 272         request->context = NULL;
 273     }
 274 
 275     OMPI_REQUEST_FINI(&request->super);
 276     opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request);
 277 }
 278 

/* [<][>][^][v][top][bottom][index][help] */