This source file includes following definitions.
- mca_pml_ob1_irecv_init
- mca_pml_ob1_irecv
- mca_pml_ob1_recv
- mca_pml_ob1_imrecv
- mca_pml_ob1_mrecv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "ompi_config.h"
28 #include "ompi/request/request.h"
29 #include "pml_ob1_recvreq.h"
30 #include "pml_ob1_recvfrag.h"
31 #include "ompi/peruse/peruse-internal.h"
32 #include "ompi/message/message.h"
33 #include "ompi/memchecker.h"
34
35
36
37
38
39
40
41
42 mca_pml_ob1_recv_request_t *mca_pml_ob1_recvreq = NULL;
43
44 int mca_pml_ob1_irecv_init(void *addr,
45 size_t count,
46 ompi_datatype_t * datatype,
47 int src,
48 int tag,
49 struct ompi_communicator_t *comm,
50 struct ompi_request_t **request)
51 {
52 mca_pml_ob1_recv_request_t *recvreq;
53 MCA_PML_OB1_RECV_REQUEST_ALLOC(recvreq);
54 if (NULL == recvreq)
55 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
56
57 recvreq->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
58 MCA_PML_OB1_RECV_REQUEST_INIT(recvreq,
59 addr,
60 count, datatype, src, tag, comm, true);
61
62 PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_ACTIVATE,
63 &((recvreq)->req_recv.req_base),
64 PERUSE_RECV);
65
66
67
68
69
70 recvreq->req_recv.req_base.req_pml_complete = true;
71
72 *request = (ompi_request_t *) recvreq;
73 return OMPI_SUCCESS;
74 }
75
76 int mca_pml_ob1_irecv(void *addr,
77 size_t count,
78 ompi_datatype_t * datatype,
79 int src,
80 int tag,
81 struct ompi_communicator_t *comm,
82 struct ompi_request_t **request)
83 {
84 mca_pml_ob1_recv_request_t *recvreq;
85 MCA_PML_OB1_RECV_REQUEST_ALLOC(recvreq);
86 if (NULL == recvreq)
87 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
88
89 recvreq->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
90 MCA_PML_OB1_RECV_REQUEST_INIT(recvreq,
91 addr,
92 count, datatype, src, tag, comm, false);
93
94 PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_ACTIVATE,
95 &((recvreq)->req_recv.req_base),
96 PERUSE_RECV);
97
98 MCA_PML_OB1_RECV_REQUEST_START(recvreq);
99 *request = (ompi_request_t *) recvreq;
100 return OMPI_SUCCESS;
101 }
102
103
104 int mca_pml_ob1_recv(void *addr,
105 size_t count,
106 ompi_datatype_t * datatype,
107 int src,
108 int tag,
109 struct ompi_communicator_t *comm,
110 ompi_status_public_t * status)
111 {
112 mca_pml_ob1_recv_request_t *recvreq = NULL;
113 int rc;
114
115 if (OPAL_LIKELY(!ompi_mpi_thread_multiple)) {
116 recvreq = mca_pml_ob1_recvreq;
117 mca_pml_ob1_recvreq = NULL;
118 }
119
120 if( OPAL_UNLIKELY(NULL == recvreq) ) {
121 MCA_PML_OB1_RECV_REQUEST_ALLOC(recvreq);
122 if (NULL == recvreq)
123 return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
124 }
125
126 recvreq->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
127 MCA_PML_OB1_RECV_REQUEST_INIT(recvreq, addr, count, datatype,
128 src, tag, comm, false);
129
130 PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_ACTIVATE,
131 &(recvreq->req_recv.req_base),
132 PERUSE_RECV);
133
134 MCA_PML_OB1_RECV_REQUEST_START(recvreq);
135 ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi);
136
137 if( true == recvreq->req_recv.req_base.req_pml_complete ) {
138
139 MEMCHECKER(
140 memchecker_call(&opal_memchecker_base_mem_defined,
141 recvreq->req_recv.req_base.req_addr,
142 recvreq->req_recv.req_base.req_count,
143 recvreq->req_recv.req_base.req_datatype);
144 );
145 }
146
147 if (NULL != status) {
148 *status = recvreq->req_recv.req_base.req_ompi.req_status;
149 }
150
151 rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR;
152
153 if (recvreq->req_recv.req_base.req_pml_complete) {
154
155
156 MEMCHECKER(
157 memchecker_call(&opal_memchecker_base_mem_defined,
158 recvreq->req_recv.req_base.req_addr,
159 recvreq->req_recv.req_base.req_count,
160 recvreq->req_recv.req_base.req_datatype);
161 );
162 }
163
164 if (OPAL_UNLIKELY(ompi_mpi_thread_multiple || NULL != mca_pml_ob1_recvreq)) {
165 MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq);
166 } else {
167 mca_pml_ob1_recv_request_fini (recvreq);
168 mca_pml_ob1_recvreq = recvreq;
169 }
170
171 return rc;
172 }
173
174
175 int
176 mca_pml_ob1_imrecv( void *buf,
177 size_t count,
178 ompi_datatype_t *datatype,
179 struct ompi_message_t **message,
180 struct ompi_request_t **request )
181 {
182 mca_pml_ob1_recv_frag_t* frag;
183 mca_pml_ob1_recv_request_t *recvreq;
184 mca_pml_ob1_hdr_t *hdr;
185 int src, tag;
186 ompi_communicator_t *comm;
187 mca_pml_ob1_comm_proc_t* proc;
188 uint64_t seq;
189
190
191
192 recvreq = (mca_pml_ob1_recv_request_t*) (*message)->req_ptr;
193 frag = (mca_pml_ob1_recv_frag_t*) recvreq->req_recv.req_base.req_addr;
194 src = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
195 tag = recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG;
196 comm = (*message)->comm;
197 seq = recvreq->req_recv.req_base.req_sequence;
198
199
200
201
202
203
204
205
206
207 OBJ_RETAIN(comm);
208 MCA_PML_BASE_RECV_REQUEST_FINI(&recvreq->req_recv);
209 recvreq->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
210 MCA_PML_OB1_RECV_REQUEST_INIT(recvreq,
211 buf,
212 count, datatype,
213 src, tag, comm, false);
214 OBJ_RELEASE(comm);
215
216 PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_ACTIVATE,
217 &((recvreq)->req_recv.req_base),
218 PERUSE_RECV);
219
220
221 recvreq->req_lock = 0;
222 recvreq->req_pipeline_depth = 0;
223 recvreq->req_bytes_received = 0;
224
225 recvreq->req_rdma_idx = 0;
226 recvreq->req_pending = false;
227 recvreq->req_ack_sent = false;
228
229 MCA_PML_BASE_RECV_START(&recvreq->req_recv);
230
231
232 recvreq->req_recv.req_base.req_sequence = seq;
233
234 proc = mca_pml_ob1_peer_lookup (comm, recvreq->req_recv.req_base.req_peer);
235 recvreq->req_recv.req_base.req_proc = proc->ompi_proc;
236 prepare_recv_req_converter(recvreq);
237
238
239
240
241 hdr = (mca_pml_ob1_hdr_t*)frag->segments->seg_addr.pval;
242 switch(hdr->hdr_common.hdr_type) {
243 case MCA_PML_OB1_HDR_TYPE_MATCH:
244 mca_pml_ob1_recv_request_progress_match(recvreq, frag->btl, frag->segments,
245 frag->num_segments);
246 break;
247 case MCA_PML_OB1_HDR_TYPE_RNDV:
248 mca_pml_ob1_recv_request_progress_rndv(recvreq, frag->btl, frag->segments,
249 frag->num_segments);
250 break;
251 case MCA_PML_OB1_HDR_TYPE_RGET:
252 mca_pml_ob1_recv_request_progress_rget(recvreq, frag->btl, frag->segments,
253 frag->num_segments);
254 break;
255 default:
256 assert(0);
257 }
258 MCA_PML_OB1_RECV_FRAG_RETURN(frag);
259
260 ompi_message_return(*message);
261 *message = MPI_MESSAGE_NULL;
262 *request = (ompi_request_t *) recvreq;
263
264 return OMPI_SUCCESS;
265 }
266
267
268 int
269 mca_pml_ob1_mrecv( void *buf,
270 size_t count,
271 ompi_datatype_t *datatype,
272 struct ompi_message_t **message,
273 ompi_status_public_t* status )
274 {
275 mca_pml_ob1_recv_frag_t* frag;
276 mca_pml_ob1_recv_request_t *recvreq;
277 mca_pml_ob1_hdr_t *hdr;
278 int src, tag, rc;
279 ompi_communicator_t *comm;
280 mca_pml_ob1_comm_proc_t* proc;
281 uint64_t seq;
282
283
284
285 comm = (*message)->comm;
286 recvreq = (mca_pml_ob1_recv_request_t*) (*message)->req_ptr;
287 frag = (mca_pml_ob1_recv_frag_t*) recvreq->req_recv.req_base.req_addr;
288 src = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
289 tag = recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG;
290 seq = recvreq->req_recv.req_base.req_sequence;
291
292
293
294
295
296
297
298
299
300 OBJ_RETAIN(comm);
301 MCA_PML_BASE_RECV_REQUEST_FINI(&recvreq->req_recv);
302 recvreq->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
303 MCA_PML_OB1_RECV_REQUEST_INIT(recvreq,
304 buf,
305 count, datatype,
306 src, tag, comm, false);
307 OBJ_RELEASE(comm);
308
309 PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_ACTIVATE,
310 &((recvreq)->req_recv.req_base),
311 PERUSE_RECV);
312
313
314 recvreq->req_lock = 0;
315 recvreq->req_pipeline_depth = 0;
316 recvreq->req_bytes_received = 0;
317 recvreq->req_rdma_cnt = 0;
318 recvreq->req_rdma_idx = 0;
319 recvreq->req_pending = false;
320
321 MCA_PML_BASE_RECV_START(&recvreq->req_recv);
322
323
324 recvreq->req_recv.req_base.req_sequence = seq;
325
326 proc = mca_pml_ob1_peer_lookup (comm, recvreq->req_recv.req_base.req_peer);
327 recvreq->req_recv.req_base.req_proc = proc->ompi_proc;
328 prepare_recv_req_converter(recvreq);
329
330
331
332
333 hdr = (mca_pml_ob1_hdr_t*)frag->segments->seg_addr.pval;
334 switch(hdr->hdr_common.hdr_type) {
335 case MCA_PML_OB1_HDR_TYPE_MATCH:
336 mca_pml_ob1_recv_request_progress_match(recvreq, frag->btl, frag->segments,
337 frag->num_segments);
338 break;
339 case MCA_PML_OB1_HDR_TYPE_RNDV:
340 mca_pml_ob1_recv_request_progress_rndv(recvreq, frag->btl, frag->segments,
341 frag->num_segments);
342 break;
343 case MCA_PML_OB1_HDR_TYPE_RGET:
344 mca_pml_ob1_recv_request_progress_rget(recvreq, frag->btl, frag->segments,
345 frag->num_segments);
346 break;
347 default:
348 assert(0);
349 }
350
351 ompi_message_return(*message);
352 *message = MPI_MESSAGE_NULL;
353 ompi_request_wait_completion(&(recvreq->req_recv.req_base.req_ompi));
354
355 MCA_PML_OB1_RECV_FRAG_RETURN(frag);
356
357 if (NULL != status) {
358 *status = recvreq->req_recv.req_base.req_ompi.req_status;
359 }
360 rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR;
361 ompi_request_free( (ompi_request_t**)&recvreq );
362 return rc;
363 }
364