This source file includes following definitions.
- mca_pml_ucx_request_reset
- mca_pml_ucx_set_send_status
- mca_pml_ucx_set_recv_status
- mca_pml_ucx_set_recv_status_safe
1
2
3
4
5
6
7
8
9
10
11
12
13 #ifndef PML_UCX_REQUEST_H_
14 #define PML_UCX_REQUEST_H_
15
16 #include "pml_ucx.h"
17 #include "pml_ucx_datatype.h"
18
19
20 enum {
21 MCA_PML_UCX_REQUEST_FLAG_SEND = (1 << 0),
22 MCA_PML_UCX_REQUEST_FLAG_FREE_CALLED = (1 << 1),
23 MCA_PML_UCX_REQUEST_FLAG_COMPLETED = (1 << 2)
24 };
25
26
27
28
29
30
31
32
33
34 #define PML_UCX_TAG_BITS 24
35 #define PML_UCX_RANK_BITS 20
36 #define PML_UCX_CONTEXT_BITS 20
37 #define PML_UCX_ANY_SOURCE_MASK 0x80000000000ffffful
38 #define PML_UCX_SPECIFIC_SOURCE_MASK 0x800000fffffffffful
39 #define PML_UCX_TAG_MASK 0x7fffff0000000000ul
40
41
42 #define PML_UCX_MAKE_SEND_TAG(_tag, _comm) \
43 ((((uint64_t) (_tag) ) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS)) | \
44 (((uint64_t)(_comm)->c_my_rank ) << PML_UCX_CONTEXT_BITS) | \
45 ((uint64_t)(_comm)->c_contextid))
46
47
48 #define PML_UCX_MAKE_RECV_TAG(_ucp_tag, _ucp_tag_mask, _tag, _src, _comm) \
49 { \
50 if ((_src) == MPI_ANY_SOURCE) { \
51 _ucp_tag_mask = PML_UCX_ANY_SOURCE_MASK; \
52 } else { \
53 _ucp_tag_mask = PML_UCX_SPECIFIC_SOURCE_MASK; \
54 } \
55 \
56 _ucp_tag = (((uint64_t)(_src) & UCS_MASK(PML_UCX_RANK_BITS)) << PML_UCX_CONTEXT_BITS) | \
57 (_comm)->c_contextid; \
58 \
59 if ((_tag) != MPI_ANY_TAG) { \
60 _ucp_tag_mask |= PML_UCX_TAG_MASK; \
61 _ucp_tag |= ((uint64_t)(_tag)) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS); \
62 } \
63 }
64
65 #define PML_UCX_TAG_GET_SOURCE(_tag) \
66 (((_tag) >> PML_UCX_CONTEXT_BITS) & UCS_MASK(PML_UCX_RANK_BITS))
67
68
69 #define PML_UCX_TAG_GET_MPI_TAG(_tag) \
70 ((_tag) >> (PML_UCX_CONTEXT_BITS + PML_UCX_RANK_BITS))
71
72
73 #define PML_UCX_MESSAGE_NEW(_comm, _ucp_msg, _info, _message) \
74 { \
75 struct ompi_message_t *msg = ompi_message_alloc(); \
76 if (msg == NULL) { \
77 \
78 return OMPI_ERR_OUT_OF_RESOURCE; \
79 } \
80 \
81 msg->comm = (_comm); \
82 msg->req_ptr = (_ucp_msg); \
83 msg->peer = PML_UCX_TAG_GET_SOURCE((_info)->sender_tag); \
84 msg->count = (_info)->length; \
85 *(_message) = msg; \
86 }
87
88
89 #define PML_UCX_MESSAGE_RELEASE(_message) \
90 { \
91 ompi_message_return(*(_message)); \
92 *(_message) = MPI_MESSAGE_NULL; \
93 }
94
95
96 struct pml_ucx_persistent_request {
97 ompi_request_t ompi;
98 ompi_request_t *tmp_req;
99 unsigned flags;
100 void *buffer;
101 size_t count;
102 union {
103 ucp_datatype_t datatype;
104 ompi_datatype_t *ompi_datatype;
105 } datatype;
106 ucp_tag_t tag;
107 struct {
108 mca_pml_base_send_mode_t mode;
109 ucp_ep_h ep;
110 } send;
111 struct {
112 ucp_tag_t tag_mask;
113 } recv;
114 };
115
116
117 void mca_pml_ucx_send_completion(void *request, ucs_status_t status);
118
119 void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
120 ucp_tag_recv_info_t *info);
121
122 void mca_pml_ucx_psend_completion(void *request, ucs_status_t status);
123
124 void mca_pml_ucx_bsend_completion(void *request, ucs_status_t status);
125
126 void mca_pml_ucx_precv_completion(void *request, ucs_status_t status,
127 ucp_tag_recv_info_t *info);
128
129 void mca_pml_ucx_persistent_request_complete(mca_pml_ucx_persistent_request_t *preq,
130 ompi_request_t *tmp_req);
131
132 void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req);
133
134 void mca_pml_ucx_request_init(void *request);
135
136 void mca_pml_ucx_request_cleanup(void *request);
137
138
139 static inline void mca_pml_ucx_request_reset(ompi_request_t *req)
140 {
141 req->req_complete = REQUEST_PENDING;
142 }
143
144 static void mca_pml_ucx_set_send_status(ompi_status_public_t* mpi_status,
145 ucs_status_t status)
146 {
147 if (OPAL_LIKELY(status == UCS_OK)) {
148 mpi_status->MPI_ERROR = MPI_SUCCESS;
149 mpi_status->_cancelled = false;
150 } else if (status == UCS_ERR_CANCELED) {
151 mpi_status->_cancelled = true;
152 } else {
153 mpi_status->MPI_ERROR = MPI_ERR_INTERN;
154 }
155 }
156
157 static inline void mca_pml_ucx_set_recv_status(ompi_status_public_t* mpi_status,
158 ucs_status_t ucp_status,
159 const ucp_tag_recv_info_t *info)
160 {
161 int64_t tag;
162
163 if (OPAL_LIKELY(ucp_status == UCS_OK)) {
164 tag = info->sender_tag;
165 mpi_status->MPI_ERROR = MPI_SUCCESS;
166 mpi_status->MPI_SOURCE = PML_UCX_TAG_GET_SOURCE(tag);
167 mpi_status->MPI_TAG = PML_UCX_TAG_GET_MPI_TAG(tag);
168 mpi_status->_cancelled = false;
169 mpi_status->_ucount = info->length;
170 } else if (ucp_status == UCS_ERR_MESSAGE_TRUNCATED) {
171 mpi_status->MPI_ERROR = MPI_ERR_TRUNCATE;
172 } else if (ucp_status == UCS_ERR_CANCELED) {
173 mpi_status->MPI_ERROR = MPI_SUCCESS;
174 mpi_status->_cancelled = true;
175 } else {
176 mpi_status->MPI_ERROR = MPI_ERR_INTERN;
177 }
178 }
179
180 static inline void mca_pml_ucx_set_recv_status_safe(ompi_status_public_t* mpi_status,
181 ucs_status_t ucp_status,
182 const ucp_tag_recv_info_t *info)
183 {
184 if (mpi_status != MPI_STATUS_IGNORE) {
185 mca_pml_ucx_set_recv_status(mpi_status, ucp_status, info);
186 }
187 }
188
189 OBJ_CLASS_DECLARATION(mca_pml_ucx_persistent_request_t);
190
191
192 #endif