This source file includes following definitions.
- recv_nb
- recv_buffer_nb
- recv_cancel
- oob_ping
- rml_oob_open
- rml_oob_close
- component_query
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "orte_config.h"
27 #include "orte/constants.h"
28
29 #ifdef HAVE_NETINET_IN_H
30 #include <netinet/in.h>
31 #endif
32 #ifdef HAVE_ARPA_INET_H
33 #include <arpa/inet.h>
34 #endif
35
36 #include "opal/mca/base/base.h"
37 #include "opal/util/output.h"
38 #include "opal/util/argv.h"
39 #include "opal/mca/backtrace/backtrace.h"
40 #include "opal/mca/event/event.h"
41
42 #if OPAL_ENABLE_FT_CR == 1
43 #include "orte/mca/rml/rml.h"
44 #include "orte/mca/state/state.h"
45 #endif
46 #include "orte/mca/rml/base/base.h"
47 #include "orte/mca/rml/rml_types.h"
48 #include "orte/mca/routed/routed.h"
49 #include "orte/mca/errmgr/errmgr.h"
50 #include "orte/util/name_fns.h"
51 #include "orte/runtime/orte_globals.h"
52
53 #include "orte/mca/oob/oob.h"
54 #include "orte/mca/oob/base/base.h"
55 #include "orte/mca/routed/routed.h"
56 #include "rml_oob.h"
57
58 static int rml_oob_open(void);
59 static int rml_oob_close(void);
60 static int component_query(mca_base_module_t **module, int *priority);
61
62
63
64
65 orte_rml_component_t mca_rml_oob_component = {
66
67
68
69 .base = {
70 ORTE_RML_BASE_VERSION_3_0_0,
71
72 .mca_component_name = "oob",
73 MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
74 ORTE_RELEASE_VERSION),
75 .mca_open_component = rml_oob_open,
76 .mca_close_component = rml_oob_close,
77 .mca_query_component = component_query,
78
79 },
80 .data = {
81
82 MCA_BASE_METADATA_PARAM_CHECKPOINT
83 },
84 .priority = 5
85 };
86
87
88 static void recv_nb(orte_process_name_t* peer,
89 orte_rml_tag_t tag,
90 bool persistent,
91 orte_rml_callback_fn_t cbfunc,
92 void* cbdata)
93 {
94 orte_rml_recv_request_t *req;
95
96 opal_output_verbose(10, orte_rml_base_framework.framework_output,
97 "%s rml_recv_nb for peer %s tag %d",
98 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
99 ORTE_NAME_PRINT(peer), tag);
100
101
102
103 req = OBJ_NEW(orte_rml_recv_request_t);
104 req->post->buffer_data = false;
105 req->post->peer.jobid = peer->jobid;
106 req->post->peer.vpid = peer->vpid;
107 req->post->tag = tag;
108 req->post->persistent = persistent;
109 req->post->cbfunc.iov = cbfunc;
110 req->post->cbdata = cbdata;
111 ORTE_THREADSHIFT(req, orte_event_base, orte_rml_base_post_recv, ORTE_MSG_PRI);
112 }
113 static void recv_buffer_nb(orte_process_name_t* peer,
114 orte_rml_tag_t tag,
115 bool persistent,
116 orte_rml_buffer_callback_fn_t cbfunc,
117 void* cbdata)
118 {
119 orte_rml_recv_request_t *req;
120
121 opal_output_verbose(10, orte_rml_base_framework.framework_output,
122 "%s rml_recv_buffer_nb for peer %s tag %d",
123 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
124 ORTE_NAME_PRINT(peer), tag);
125
126
127
128 req = OBJ_NEW(orte_rml_recv_request_t);
129 req->post->buffer_data = true;
130 req->post->peer.jobid = peer->jobid;
131 req->post->peer.vpid = peer->vpid;
132 req->post->tag = tag;
133 req->post->persistent = persistent;
134 req->post->cbfunc.buffer = cbfunc;
135 req->post->cbdata = cbdata;
136 ORTE_THREADSHIFT(req, orte_event_base, orte_rml_base_post_recv, ORTE_MSG_PRI);
137 }
138 static void recv_cancel(orte_process_name_t* peer, orte_rml_tag_t tag)
139 {
140 orte_rml_recv_request_t *req;
141
142 opal_output_verbose(10, orte_rml_base_framework.framework_output,
143 "%s rml_recv_cancel for peer %s tag %d",
144 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
145 ORTE_NAME_PRINT(peer), tag);
146
147 ORTE_ACQUIRE_OBJECT(orte_event_base_active);
148 if (!orte_event_base_active) {
149
150 return;
151 }
152
153
154
155 req = OBJ_NEW(orte_rml_recv_request_t);
156 req->cancel = true;
157 req->post->peer.jobid = peer->jobid;
158 req->post->peer.vpid = peer->vpid;
159 req->post->tag = tag;
160 ORTE_THREADSHIFT(req, orte_event_base, orte_rml_base_post_recv, ORTE_MSG_PRI);
161 }
162 static int oob_ping(const char* uri, const struct timeval* tv)
163 {
164 return ORTE_ERR_UNREACH;
165 }
166
167 static orte_rml_base_module_t base_module = {
168 .component = (struct orte_rml_component_t*)&mca_rml_oob_component,
169 .ping = oob_ping,
170 .send_nb = orte_rml_oob_send_nb,
171 .send_buffer_nb = orte_rml_oob_send_buffer_nb,
172 .recv_nb = recv_nb,
173 .recv_buffer_nb = recv_buffer_nb,
174 .recv_cancel = recv_cancel,
175 .purge = NULL
176 };
177
178 static int rml_oob_open(void)
179 {
180 return ORTE_SUCCESS;
181 }
182
183
184 static int rml_oob_close(void)
185 {
186 return ORTE_SUCCESS;
187 }
188
189 static int component_query(mca_base_module_t **module, int *priority)
190 {
191 *priority = 50;
192 *module = (mca_base_module_t *) &base_module;
193 return ORTE_SUCCESS;
194 }