This source file includes following definitions.
- ompi_comm_request_init
- ompi_comm_request_fini
- ompi_comm_request_schedule_append
- ompi_comm_request_progress
- ompi_comm_request_start
- ompi_comm_request_cancel
- ompi_comm_request_free
- ompi_comm_request_construct
- ompi_comm_request_destruct
- ompi_comm_request_get
- ompi_comm_request_return
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include "comm_request.h"
19
20 #include "opal/class/opal_free_list.h"
21 #include "opal/include/opal/sys/atomic.h"
22
23 static opal_free_list_t ompi_comm_requests;
24 static opal_list_t ompi_comm_requests_active;
25 static opal_mutex_t ompi_comm_request_mutex;
26 bool ompi_comm_request_progress_active = false;
27 bool ompi_comm_request_initialized = false;
28
29 typedef struct ompi_comm_request_item_t {
30 opal_list_item_t super;
31 ompi_comm_request_callback_fn_t callback;
32 ompi_request_t *subreqs[OMPI_COMM_REQUEST_MAX_SUBREQ];
33 int subreq_count;
34 } ompi_comm_request_item_t;
35 OBJ_CLASS_DECLARATION(ompi_comm_request_item_t);
36
37 static int ompi_comm_request_progress (void);
38
39 void ompi_comm_request_init (void)
40 {
41 OBJ_CONSTRUCT(&ompi_comm_requests, opal_free_list_t);
42 (void) opal_free_list_init (&ompi_comm_requests, sizeof (ompi_comm_request_t), 8,
43 OBJ_CLASS(ompi_comm_request_t), 0, 0, 0, -1, 8,
44 NULL, 0, NULL, NULL, NULL);
45
46 OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t);
47 ompi_comm_request_progress_active = false;
48 OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t);
49 ompi_comm_request_initialized = true;
50 }
51
52 void ompi_comm_request_fini (void)
53 {
54 if (!ompi_comm_request_initialized) {
55 return;
56 }
57
58 ompi_comm_request_initialized = false;
59
60 opal_mutex_lock (&ompi_comm_request_mutex);
61 if (ompi_comm_request_progress_active) {
62 opal_progress_unregister (ompi_comm_request_progress);
63 }
64 opal_mutex_unlock (&ompi_comm_request_mutex);
65 OBJ_DESTRUCT(&ompi_comm_request_mutex);
66 OBJ_DESTRUCT(&ompi_comm_requests_active);
67 OBJ_DESTRUCT(&ompi_comm_requests);
68 }
69
70
71 int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
72 ompi_request_t *subreqs[], int subreq_count)
73 {
74 ompi_comm_request_item_t *request_item;
75 int i;
76
77 if (subreq_count > OMPI_COMM_REQUEST_MAX_SUBREQ) {
78 return OMPI_ERR_BAD_PARAM;
79 }
80
81 request_item = OBJ_NEW(ompi_comm_request_item_t);
82 if (NULL == request_item) {
83 return OMPI_ERR_OUT_OF_RESOURCE;
84 }
85
86 request_item->callback = callback;
87
88 for (i = 0 ; i < subreq_count ; ++i) {
89 request_item->subreqs[i] = subreqs[i];
90 }
91
92 request_item->subreq_count = subreq_count;
93
94 opal_list_append (&request->schedule, &request_item->super);
95
96 return OMPI_SUCCESS;
97 }
98
99 static int ompi_comm_request_progress (void)
100 {
101 ompi_comm_request_t *request, *next;
102 static opal_atomic_int32_t progressing = 0;
103
104
105 if (opal_atomic_swap_32 (&progressing, 1)) {
106 return 0;
107 }
108
109 opal_mutex_lock (&ompi_comm_request_mutex);
110
111 OPAL_LIST_FOREACH_SAFE(request, next, &ompi_comm_requests_active, ompi_comm_request_t) {
112 int rc = OMPI_SUCCESS;
113
114 if (opal_list_get_size (&request->schedule)) {
115 ompi_comm_request_item_t *request_item = (ompi_comm_request_item_t *) opal_list_remove_first (&request->schedule);
116 int item_complete = true;
117
118
119 while (request_item->subreq_count) {
120 ompi_request_t *subreq = request_item->subreqs[request_item->subreq_count-1];
121 if( REQUEST_COMPLETE(subreq) ) {
122 if (OMPI_SUCCESS != subreq->req_status.MPI_ERROR) {
123
124
125 request->super.req_status.MPI_ERROR = subreq->req_status.MPI_ERROR;
126 }
127 ompi_request_free (&subreq);
128 request_item->subreq_count--;
129 } else {
130 item_complete = false;
131 break;
132 }
133 }
134
135 if (item_complete) {
136 if (request_item->callback) {
137 opal_mutex_unlock (&ompi_comm_request_mutex);
138
139
140 rc = request_item->callback (request);
141 opal_mutex_lock (&ompi_comm_request_mutex);
142 }
143 OBJ_RELEASE(request_item);
144 } else {
145 opal_list_prepend (&request->schedule, &request_item->super);
146 }
147 }
148
149
150 if (0 == opal_list_get_size (&request->schedule)) {
151 opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
152 request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : rc;
153 ompi_request_complete (&request->super, true);
154 }
155 }
156
157 if (0 == opal_list_get_size (&ompi_comm_requests_active)) {
158
159 ompi_comm_request_progress_active = false;
160 opal_progress_unregister (ompi_comm_request_progress);
161 }
162
163 opal_mutex_unlock (&ompi_comm_request_mutex);
164 progressing = 0;
165
166 return 1;
167 }
168
169 void ompi_comm_request_start (ompi_comm_request_t *request)
170 {
171 opal_mutex_lock (&ompi_comm_request_mutex);
172 opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
173
174
175 if (!ompi_comm_request_progress_active) {
176 opal_progress_register (ompi_comm_request_progress);
177 ompi_comm_request_progress_active = true;
178 }
179
180 request->super.req_state = OMPI_REQUEST_ACTIVE;
181 request->super.req_status.MPI_ERROR = OMPI_SUCCESS;
182
183 opal_mutex_unlock (&ompi_comm_request_mutex);
184 }
185
186 static int ompi_comm_request_cancel (struct ompi_request_t *ompi_req, int complete)
187 {
188 ompi_comm_request_t *tmp, *request = (ompi_comm_request_t *) ompi_req;
189 ompi_comm_request_item_t *item, *next;
190
191 opal_mutex_lock (&ompi_comm_request_mutex);
192
193 OPAL_LIST_FOREACH_SAFE(item, next, &request->schedule, ompi_comm_request_item_t) {
194 for (int i = 0 ; i < item->subreq_count ; ++i) {
195 ompi_request_cancel (item->subreqs[i]);
196 }
197
198 opal_list_remove_item (&request->schedule, &item->super);
199 OBJ_RELEASE(item);
200 }
201
202
203 OPAL_LIST_FOREACH(tmp, &ompi_comm_requests_active, ompi_comm_request_t) {
204 if (tmp == request) {
205 opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
206 break;
207 }
208 }
209
210 opal_mutex_unlock (&ompi_comm_request_mutex);
211
212 return MPI_ERR_REQUEST;
213 }
214
215 static int ompi_comm_request_free (struct ompi_request_t **ompi_req)
216 {
217 ompi_comm_request_t *request = (ompi_comm_request_t *) *ompi_req;
218
219 if( !REQUEST_COMPLETE(*ompi_req) ) {
220 return MPI_ERR_REQUEST;
221 }
222
223 OMPI_REQUEST_FINI(*ompi_req);
224 ompi_comm_request_return (request);
225
226 *ompi_req = MPI_REQUEST_NULL;
227
228 return OMPI_SUCCESS;
229 }
230
231 static void ompi_comm_request_construct (ompi_comm_request_t *request)
232 {
233 request->context = NULL;
234
235 request->super.req_type = OMPI_REQUEST_COMM;
236 request->super.req_status._cancelled = 0;
237 request->super.req_free = ompi_comm_request_free;
238 request->super.req_cancel = ompi_comm_request_cancel;
239
240 OBJ_CONSTRUCT(&request->schedule, opal_list_t);
241 }
242
243 static void ompi_comm_request_destruct (ompi_comm_request_t *request)
244 {
245 OBJ_DESTRUCT(&request->schedule);
246 }
247
248 OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t,
249 ompi_comm_request_construct,
250 ompi_comm_request_destruct);
251
252 OBJ_CLASS_INSTANCE(ompi_comm_request_item_t, opal_list_item_t, NULL, NULL);
253
254 ompi_comm_request_t *ompi_comm_request_get (void)
255 {
256 opal_free_list_item_t *item;
257
258 item = opal_free_list_get (&ompi_comm_requests);
259 if (OPAL_UNLIKELY(NULL == item)) {
260 return NULL;
261 }
262
263 OMPI_REQUEST_INIT((ompi_request_t *) item, false);
264
265 return (ompi_comm_request_t *) item;
266 }
267
268 void ompi_comm_request_return (ompi_comm_request_t *request)
269 {
270 if (request->context) {
271 OBJ_RELEASE (request->context);
272 request->context = NULL;
273 }
274
275 OMPI_REQUEST_FINI(&request->super);
276 opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request);
277 }
278