This source file includes following definitions.
- mca_pml_ucx_request_free
- mca_pml_ucx_request_cancel
- mca_pml_ucx_send_completion
- mca_pml_ucx_bsend_completion
- mca_pml_ucx_recv_completion
- mca_pml_ucx_persistent_request_detach
- mca_pml_ucx_persistent_request_complete
- mca_pml_ucx_preq_completion
- mca_pml_ucx_psend_completion
- mca_pml_ucx_precv_completion
- mca_pml_ucx_request_init_common
- mca_pml_ucx_request_init
- mca_pml_ucx_request_cleanup
- mca_pml_ucx_persistent_request_free
- mca_pml_ucx_persistent_request_cancel
- mca_pml_ucx_persisternt_request_construct
- mca_pml_ucx_persisternt_request_destruct
- mca_pml_completed_request_free
- mca_pml_completed_request_cancel
- mca_pml_ucx_completed_request_init
1
2
3
4
5
6
7
8
9
10
11
12
13 #include "pml_ucx_request.h"
14 #include "ompi/mca/pml/base/pml_base_bsend.h"
15 #include "ompi/message/message.h"
16 #include <inttypes.h>
17
18
19 static int mca_pml_ucx_request_free(ompi_request_t **rptr)
20 {
21 ompi_request_t *req = *rptr;
22
23 PML_UCX_VERBOSE(9, "free request *%p=%p", (void*)rptr, (void*)req);
24
25 *rptr = MPI_REQUEST_NULL;
26 mca_pml_ucx_request_reset(req);
27 ucp_request_free(req);
28 return OMPI_SUCCESS;
29 }
30
31 static int mca_pml_ucx_request_cancel(ompi_request_t *req, int flag)
32 {
33 ucp_request_cancel(ompi_pml_ucx.ucp_worker, req);
34 return OMPI_SUCCESS;
35 }
36
37 void mca_pml_ucx_send_completion(void *request, ucs_status_t status)
38 {
39 ompi_request_t *req = request;
40
41 PML_UCX_VERBOSE(8, "send request %p completed with status %s", (void*)req,
42 ucs_status_string(status));
43
44 mca_pml_ucx_set_send_status(&req->req_status, status);
45 PML_UCX_ASSERT( !(REQUEST_COMPLETE(req)));
46 ompi_request_complete(req, true);
47 }
48
49 void mca_pml_ucx_bsend_completion(void *request, ucs_status_t status)
50 {
51 ompi_request_t *req = request;
52
53 PML_UCX_VERBOSE(8, "bsend request %p buffer %p completed with status %s", (void*)req,
54 req->req_complete_cb_data, ucs_status_string(status));
55 mca_pml_base_bsend_request_free(req->req_complete_cb_data);
56 req->req_complete_cb_data = NULL;
57 mca_pml_ucx_set_send_status(&req->req_status, status);
58 PML_UCX_ASSERT( !(REQUEST_COMPLETE(req)));
59 mca_pml_ucx_request_free(&req);
60 }
61
62 void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
63 ucp_tag_recv_info_t *info)
64 {
65 ompi_request_t *req = request;
66
67 PML_UCX_VERBOSE(8, "receive request %p completed with status %s tag %"PRIx64" len %zu",
68 (void*)req, ucs_status_string(status), info->sender_tag,
69 info->length);
70
71 mca_pml_ucx_set_recv_status(&req->req_status, status, info);
72 PML_UCX_ASSERT( !(REQUEST_COMPLETE(req)));
73 ompi_request_complete(req, true);
74 }
75
76 static void mca_pml_ucx_persistent_request_detach(mca_pml_ucx_persistent_request_t *preq,
77 ompi_request_t *tmp_req)
78 {
79 tmp_req->req_complete_cb_data = NULL;
80 preq->tmp_req = NULL;
81 }
82
83 inline void
84 mca_pml_ucx_persistent_request_complete(mca_pml_ucx_persistent_request_t *preq,
85 ompi_request_t *tmp_req)
86 {
87 preq->ompi.req_status = tmp_req->req_status;
88 mca_pml_ucx_request_reset(tmp_req);
89 mca_pml_ucx_persistent_request_detach(preq, tmp_req);
90 ucp_request_free(tmp_req);
91 ompi_request_complete(&preq->ompi, true);
92 }
93
94 static inline void mca_pml_ucx_preq_completion(ompi_request_t *tmp_req)
95 {
96 mca_pml_ucx_persistent_request_t *preq;
97
98 ompi_request_complete(tmp_req, false);
99 preq = (mca_pml_ucx_persistent_request_t*)tmp_req->req_complete_cb_data;
100 if (preq != NULL) {
101 PML_UCX_ASSERT(preq->tmp_req != NULL);
102 mca_pml_ucx_persistent_request_complete(preq, tmp_req);
103 }
104 }
105
106 void mca_pml_ucx_psend_completion(void *request, ucs_status_t status)
107 {
108 ompi_request_t *tmp_req = request;
109
110 PML_UCX_VERBOSE(8, "persistent send request %p completed with status %s",
111 (void*)tmp_req, ucs_status_string(status));
112
113 mca_pml_ucx_set_send_status(&tmp_req->req_status, status);
114 mca_pml_ucx_preq_completion(tmp_req);
115 }
116
117 void mca_pml_ucx_precv_completion(void *request, ucs_status_t status,
118 ucp_tag_recv_info_t *info)
119 {
120 ompi_request_t *tmp_req = request;
121
122 PML_UCX_VERBOSE(8, "persistent receive request %p completed with status %s tag %"PRIx64" len %zu",
123 (void*)tmp_req, ucs_status_string(status), info->sender_tag,
124 info->length);
125
126 mca_pml_ucx_set_recv_status(&tmp_req->req_status, status, info);
127 mca_pml_ucx_preq_completion(tmp_req);
128 }
129
130 static void mca_pml_ucx_request_init_common(ompi_request_t* ompi_req,
131 bool req_persistent,
132 ompi_request_state_t state,
133 ompi_request_free_fn_t req_free,
134 ompi_request_cancel_fn_t req_cancel)
135 {
136 OMPI_REQUEST_INIT(ompi_req, req_persistent);
137 ompi_req->req_type = OMPI_REQUEST_PML;
138 ompi_req->req_state = state;
139 ompi_req->req_start = mca_pml_ucx_start;
140 ompi_req->req_free = req_free;
141 ompi_req->req_cancel = req_cancel;
142
143
144
145
146
147 ompi_req->req_complete_cb_data = NULL;
148 }
149
150 void mca_pml_ucx_request_init(void *request)
151 {
152 ompi_request_t* ompi_req = request;
153 OBJ_CONSTRUCT(ompi_req, ompi_request_t);
154 mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE,
155 mca_pml_ucx_request_free,
156 mca_pml_ucx_request_cancel);
157 }
158
159 void mca_pml_ucx_request_cleanup(void *request)
160 {
161 ompi_request_t* ompi_req = request;
162 ompi_req->req_state = OMPI_REQUEST_INVALID;
163 OMPI_REQUEST_FINI(ompi_req);
164 OBJ_DESTRUCT(ompi_req);
165 }
166
167 static int mca_pml_ucx_persistent_request_free(ompi_request_t **rptr)
168 {
169 mca_pml_ucx_persistent_request_t* preq = (mca_pml_ucx_persistent_request_t*)*rptr;
170 ompi_request_t *tmp_req = preq->tmp_req;
171
172 preq->ompi.req_state = OMPI_REQUEST_INVALID;
173 if (tmp_req != NULL) {
174 mca_pml_ucx_persistent_request_detach(preq, tmp_req);
175 ucp_request_free(tmp_req);
176 }
177 if ((preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) &&
178 (MCA_PML_BASE_SEND_BUFFERED == preq->send.mode)) {
179 OBJ_RELEASE(preq->datatype.ompi_datatype);
180 }
181 PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &preq->ompi.super);
182 *rptr = MPI_REQUEST_NULL;
183 return OMPI_SUCCESS;
184 }
185
186 static int mca_pml_ucx_persistent_request_cancel(ompi_request_t *req, int flag)
187 {
188 mca_pml_ucx_persistent_request_t* preq = (mca_pml_ucx_persistent_request_t*)req;
189
190 if (preq->tmp_req != NULL) {
191 ucp_request_cancel(ompi_pml_ucx.ucp_worker, preq->tmp_req);
192 }
193 return OMPI_SUCCESS;
194 }
195
196 static void mca_pml_ucx_persisternt_request_construct(mca_pml_ucx_persistent_request_t* req)
197 {
198 mca_pml_ucx_request_init_common(&req->ompi, true, OMPI_REQUEST_INACTIVE,
199 mca_pml_ucx_persistent_request_free,
200 mca_pml_ucx_persistent_request_cancel);
201 req->tmp_req = NULL;
202 }
203
204 static void mca_pml_ucx_persisternt_request_destruct(mca_pml_ucx_persistent_request_t* req)
205 {
206 req->ompi.req_state = OMPI_REQUEST_INVALID;
207 OMPI_REQUEST_FINI(&req->ompi);
208 }
209
210 OBJ_CLASS_INSTANCE(mca_pml_ucx_persistent_request_t,
211 ompi_request_t,
212 mca_pml_ucx_persisternt_request_construct,
213 mca_pml_ucx_persisternt_request_destruct);
214
215 static int mca_pml_completed_request_free(struct ompi_request_t** rptr)
216 {
217 *rptr = MPI_REQUEST_NULL;
218 return OMPI_SUCCESS;
219 }
220
221 static int mca_pml_completed_request_cancel(struct ompi_request_t* ompi_req, int flag)
222 {
223 return OMPI_SUCCESS;
224 }
225
226 void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req)
227 {
228 mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE,
229 mca_pml_completed_request_free,
230 mca_pml_completed_request_cancel);
231 ompi_req->req_mpi_object.comm = &ompi_mpi_comm_world.comm;
232 ompi_request_complete(ompi_req, false);
233 }
234