This source file includes following definitions.
- orte_rml_base_register
- orte_rml_base_close
- orte_rml_base_open
- orte_rml_base_select
- orte_rml_send_callback
- orte_rml_recv_callback
- xfer_cons
- send_cons
- send_req_cons
- send_req_des
- recv_cons
- recv_des
- rcv_cons
- rcv_des
- prcv_cons
- prq_cons
- prq_des
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include "orte_config.h"
19
20 #include <string.h>
21
22 #include "opal/dss/dss.h"
23 #include "orte/mca/mca.h"
24 #include "opal/mca/base/mca_base_component_repository.h"
25 #include "opal/util/output.h"
26
27 #include "orte/mca/errmgr/errmgr.h"
28 #include "orte/mca/rml/rml.h"
29 #include "orte/mca/state/state.h"
30 #include "orte/runtime/orte_wait.h"
31 #include "orte/util/name_fns.h"
32 #include "orte/util/threads.h"
33
34 #include "orte/mca/rml/base/base.h"
35
36
37
38
39 #include "orte/mca/rml/base/static-components.h"
40
41
42
43 orte_rml_base_module_t orte_rml = {0};
44
45 orte_rml_base_t orte_rml_base = {{{0}}};
46
47 static int orte_rml_base_register(mca_base_register_flag_t flags)
48 {
49 orte_rml_base.max_retries = 3;
50 mca_base_var_register("orte", "rml", "base", "max_retries",
51 "Max #times to retry sending a message",
52 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
53 OPAL_INFO_LVL_9,
54 MCA_BASE_VAR_SCOPE_READONLY,
55 &orte_rml_base.max_retries);
56
57 #if OPAL_ENABLE_TIMING
58 orte_rml_base.timing = false;
59 (void) mca_base_var_register ("orte", "rml", "base", "timing",
60 "Enable RML timings",
61 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
62 OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
63 &orte_rml_base.timing);
64 #endif
65
66 return ORTE_SUCCESS;
67 }
68
69 static int orte_rml_base_close(void)
70 {
71 OPAL_LIST_DESTRUCT(&orte_rml_base.posted_recvs);
72 return mca_base_framework_components_close(&orte_rml_base_framework, NULL);
73 }
74
75 static int orte_rml_base_open(mca_base_open_flag_t flags)
76 {
77
78
79 OBJ_CONSTRUCT(&orte_rml_base.posted_recvs, opal_list_t);
80 OBJ_CONSTRUCT(&orte_rml_base.unmatched_msgs, opal_list_t);
81
82
83 return mca_base_framework_components_open(&orte_rml_base_framework, flags);
84 }
85
86 MCA_BASE_FRAMEWORK_DECLARE(orte, rml, "ORTE Run-Time Messaging Layer",
87 orte_rml_base_register, orte_rml_base_open, orte_rml_base_close,
88 mca_rml_base_static_components, 0);
89
90
91
92
93 int orte_rml_base_select(void)
94 {
95 orte_rml_component_t *best_component = NULL;
96 orte_rml_base_module_t *best_module = NULL;
97
98
99
100
101 if( OPAL_SUCCESS != mca_base_select("rml", orte_rml_base_framework.framework_output,
102 &orte_rml_base_framework.framework_components,
103 (mca_base_module_t **) &best_module,
104 (mca_base_component_t **) &best_component, NULL) ) {
105
106
107 return ORTE_ERROR;
108 }
109
110
111 orte_rml = *best_module;
112
113 return ORTE_SUCCESS;
114 }
115
116 void orte_rml_send_callback(int status, orte_process_name_t *peer,
117 opal_buffer_t* buffer, orte_rml_tag_t tag,
118 void* cbdata)
119
120 {
121 OBJ_RELEASE(buffer);
122 if (ORTE_SUCCESS != status) {
123 opal_output_verbose(2, orte_rml_base_framework.framework_output,
124 "%s UNABLE TO SEND MESSAGE TO %s TAG %d: %s",
125 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
126 ORTE_NAME_PRINT(peer), tag,
127 ORTE_ERROR_NAME(status));
128 if (ORTE_ERR_NO_PATH_TO_TARGET == status) {
129 ORTE_ACTIVATE_PROC_STATE(peer, ORTE_PROC_STATE_NO_PATH_TO_TARGET);
130 } else if (ORTE_ERR_ADDRESSEE_UNKNOWN == status) {
131 ORTE_ACTIVATE_PROC_STATE(peer, ORTE_PROC_STATE_PEER_UNKNOWN);
132 } else {
133 ORTE_ACTIVATE_PROC_STATE(peer, ORTE_PROC_STATE_UNABLE_TO_SEND_MSG);
134 }
135 }
136 }
137
138 void orte_rml_recv_callback(int status, orte_process_name_t* sender,
139 opal_buffer_t *buffer,
140 orte_rml_tag_t tag, void *cbdata)
141 {
142 orte_rml_recv_cb_t *blob = (orte_rml_recv_cb_t*)cbdata;
143
144 ORTE_ACQUIRE_OBJECT(blob);
145
146 blob->name.jobid = sender->jobid;
147 blob->name.vpid = sender->vpid;
148
149 opal_dss.copy_payload(&blob->data, buffer);
150
151 ORTE_POST_OBJECT(blob);
152 blob->active = false;
153 }
154
155
156
157 static void xfer_cons(orte_self_send_xfer_t *xfer)
158 {
159 xfer->iov = NULL;
160 xfer->cbfunc.iov = NULL;
161 xfer->buffer = NULL;
162 xfer->cbfunc.buffer = NULL;
163 xfer->cbdata = NULL;
164 }
165 OBJ_CLASS_INSTANCE(orte_self_send_xfer_t,
166 opal_object_t,
167 xfer_cons, NULL);
168
169 static void send_cons(orte_rml_send_t *ptr)
170 {
171 ptr->retries = 0;
172 ptr->cbdata = NULL;
173 ptr->iov = NULL;
174 ptr->buffer = NULL;
175 ptr->data = NULL;
176 ptr->seq_num = 0xFFFFFFFF;
177 }
178 OBJ_CLASS_INSTANCE(orte_rml_send_t,
179 opal_list_item_t,
180 send_cons, NULL);
181
182
183 static void send_req_cons(orte_rml_send_request_t *ptr)
184 {
185 OBJ_CONSTRUCT(&ptr->send, orte_rml_send_t);
186 }
187 static void send_req_des(orte_rml_send_request_t *ptr)
188 {
189 OBJ_DESTRUCT(&ptr->send);
190 }
191 OBJ_CLASS_INSTANCE(orte_rml_send_request_t,
192 opal_object_t,
193 send_req_cons, send_req_des);
194
195 static void recv_cons(orte_rml_recv_t *ptr)
196 {
197 ptr->iov.iov_base = NULL;
198 ptr->iov.iov_len = 0;
199 }
200 static void recv_des(orte_rml_recv_t *ptr)
201 {
202 if (NULL != ptr->iov.iov_base) {
203 free(ptr->iov.iov_base);
204 }
205 }
206 OBJ_CLASS_INSTANCE(orte_rml_recv_t,
207 opal_list_item_t,
208 recv_cons, recv_des);
209
210 static void rcv_cons(orte_rml_recv_cb_t *ptr)
211 {
212 OBJ_CONSTRUCT(&ptr->data, opal_buffer_t);
213 ptr->active = false;
214 }
215 static void rcv_des(orte_rml_recv_cb_t *ptr)
216 {
217 OBJ_DESTRUCT(&ptr->data);
218 }
219 OBJ_CLASS_INSTANCE(orte_rml_recv_cb_t, opal_object_t,
220 rcv_cons, rcv_des);
221
222 static void prcv_cons(orte_rml_posted_recv_t *ptr)
223 {
224 ptr->cbdata = NULL;
225 }
226 OBJ_CLASS_INSTANCE(orte_rml_posted_recv_t,
227 opal_list_item_t,
228 prcv_cons, NULL);
229
230 static void prq_cons(orte_rml_recv_request_t *ptr)
231 {
232 ptr->cancel = false;
233 ptr->post = OBJ_NEW(orte_rml_posted_recv_t);
234 }
235 static void prq_des(orte_rml_recv_request_t *ptr)
236 {
237 if (NULL != ptr->post) {
238 OBJ_RELEASE(ptr->post);
239 }
240 }
241 OBJ_CLASS_INSTANCE(orte_rml_recv_request_t,
242 opal_object_t,
243 prq_cons, prq_des);