This source file includes following definitions.
- orte_rml_base_post_recv
- msg_match_recv
- orte_rml_base_process_msg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "orte_config.h"
32
33 #include <string.h>
34
35 #include "orte/constants.h"
36 #include "orte/types.h"
37
38 #include "opal/dss/dss.h"
39 #include "opal/util/output.h"
40 #include "opal/util/timings.h"
41 #include "opal/class/opal_list.h"
42
43 #include "orte/mca/errmgr/errmgr.h"
44 #include "orte/runtime/orte_globals.h"
45 #include "orte/runtime/orte_wait.h"
46 #include "orte/util/name_fns.h"
47 #include "orte/util/nidmap.h"
48 #include "orte/util/threads.h"
49
50 #include "orte/mca/rml/rml.h"
51 #include "orte/mca/rml/base/base.h"
52 #include "orte/mca/rml/base/rml_contact.h"
53
54
55 static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all);
56
57
58 void orte_rml_base_post_recv(int sd, short args, void *cbdata)
59 {
60 orte_rml_recv_request_t *req = (orte_rml_recv_request_t*)cbdata;
61 orte_rml_posted_recv_t *post, *recv;
62 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
63
64 ORTE_ACQUIRE_OBJECT(req);
65
66 opal_output_verbose(5, orte_rml_base_framework.framework_output,
67 "%s posting recv",
68 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
69
70 if (NULL == req) {
71
72
73 opal_output(0, "%s CANNOT POST NULL RML RECV REQUEST",
74 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
75 return;
76 }
77 post = req->post;
78
79
80
81
82 if (req->cancel) {
83 OPAL_LIST_FOREACH(recv, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
84 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &post->peer, &recv->peer) &&
85 post->tag == recv->tag) {
86 opal_output_verbose(5, orte_rml_base_framework.framework_output,
87 "%s canceling recv %d for peer %s",
88 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
89 post->tag, ORTE_NAME_PRINT(&recv->peer));
90
91 opal_list_remove_item(&orte_rml_base.posted_recvs, &recv->super);
92 OBJ_RELEASE(recv);
93 break;
94 }
95 }
96 OBJ_RELEASE(req);
97 return;
98 }
99
100
101 OPAL_LIST_FOREACH(recv, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
102 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &post->peer, &recv->peer) &&
103 post->tag == recv->tag) {
104 opal_output(0, "%s TWO RECEIVES WITH SAME PEER %s AND TAG %d - ABORTING",
105 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
106 ORTE_NAME_PRINT(&post->peer), post->tag);
107 abort();
108 }
109 }
110
111 opal_output_verbose(5, orte_rml_base_framework.framework_output,
112 "%s posting %s recv on tag %d for peer %s",
113 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
114 (post->persistent) ? "persistent" : "non-persistent",
115 post->tag, ORTE_NAME_PRINT(&post->peer));
116
117 opal_list_append(&orte_rml_base.posted_recvs, &post->super);
118 req->post = NULL;
119
120 msg_match_recv(post, post->persistent);
121
122
123 OBJ_RELEASE(req);
124 }
125
126 static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all)
127 {
128 opal_list_item_t *item, *next;
129 orte_rml_recv_t *msg;
130 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
131
132
133
134
135
136 item = opal_list_get_first(&orte_rml_base.unmatched_msgs);
137 while (item != opal_list_get_end(&orte_rml_base.unmatched_msgs)) {
138 next = opal_list_get_next(item);
139 msg = (orte_rml_recv_t*)item;
140 opal_output_verbose(5, orte_rml_base_framework.framework_output,
141 "%s checking recv for %s against unmatched msg from %s",
142 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
143 ORTE_NAME_PRINT(&rcv->peer),
144 ORTE_NAME_PRINT(&msg->sender));
145
146
147
148
149 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &rcv->peer) &&
150 msg->tag == rcv->tag) {
151 ORTE_RML_ACTIVATE_MESSAGE(msg);
152 opal_list_remove_item(&orte_rml_base.unmatched_msgs, item);
153 if (!get_all) {
154 break;
155 }
156 }
157 item = next;
158 }
159 }
160
161 void orte_rml_base_process_msg(int fd, short flags, void *cbdata)
162 {
163 orte_rml_recv_t *msg = (orte_rml_recv_t*)cbdata;
164 orte_rml_posted_recv_t *post;
165 orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
166 opal_buffer_t buf;
167
168 ORTE_ACQUIRE_OBJECT(msg);
169
170 OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
171 "%s message received from %s for tag %d",
172 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
173 ORTE_NAME_PRINT(&msg->sender),
174 msg->tag));
175
176
177 if (ORTE_RML_TAG_WARMUP_CONNECTION == msg->tag) {
178 if (!orte_nidmap_communicated) {
179 opal_buffer_t * buffer = OBJ_NEW(opal_buffer_t);
180 int rc;
181 if (NULL == buffer) {
182 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
183 return;
184 }
185
186 if (ORTE_SUCCESS != (rc = orte_util_nidmap_create(orte_node_pool, buffer))) {
187 ORTE_ERROR_LOG(rc);
188 OBJ_RELEASE(buffer);
189 return;
190 }
191
192 if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&msg->sender, buffer,
193 ORTE_RML_TAG_NODE_REGEX_REPORT,
194 orte_rml_send_callback, NULL))) {
195 ORTE_ERROR_LOG(rc);
196 OBJ_RELEASE(buffer);
197 return;
198 }
199 OBJ_RELEASE(msg);
200 return;
201 }
202 }
203
204
205 OPAL_LIST_FOREACH(post, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
206
207
208
209 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &post->peer) &&
210 msg->tag == post->tag) {
211
212 if (post->buffer_data) {
213
214 OBJ_CONSTRUCT(&buf, opal_buffer_t);
215 opal_dss.load(&buf, msg->iov.iov_base, msg->iov.iov_len);
216
217 msg->iov.iov_base = NULL;
218 post->cbfunc.buffer(ORTE_SUCCESS, &msg->sender, &buf, msg->tag, post->cbdata);
219
220
221
222 OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
223 "%s message received bytes from %s for tag %d called callback",
224 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
225 ORTE_NAME_PRINT(&msg->sender),
226 msg->tag));
227 OBJ_DESTRUCT(&buf);
228 } else {
229
230 post->cbfunc.iov(ORTE_SUCCESS, &msg->sender, &msg->iov, 1, msg->tag, post->cbdata);
231
232
233
234
235 }
236
237 OBJ_RELEASE(msg);
238 OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
239 "%s message tag %d on released",
240 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
241 post->tag));
242
243 if (!post->persistent) {
244 opal_list_remove_item(&orte_rml_base.posted_recvs, &post->super);
245
246
247
248
249 OBJ_RELEASE(post);
250
251 }
252 return;
253 }
254 }
255
256
257
258 OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
259 "%s message received bytes from %s for tag %d Not Matched adding to unmatched msgs",
260 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
261 ORTE_NAME_PRINT(&msg->sender),
262 msg->tag));
263 opal_list_append(&orte_rml_base.unmatched_msgs, &msg->super);
264 }