1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 #ifndef MCA_RML_BASE_H
41 #define MCA_RML_BASE_H
42
43 #include "orte_config.h"
44
45 #include "opal/dss/dss_types.h"
46 #include "orte/mca/mca.h"
47 #include "opal/util/timings.h"
48 #include "opal/class/opal_pointer_array.h"
49
50 #include "orte/runtime/orte_globals.h"
51 #include "orte/mca/routed/routed.h"
52
53 #include "orte/mca/rml/rml.h"
54
55
56 BEGIN_C_DECLS
57
58
59
60
61 ORTE_DECLSPEC extern mca_base_framework_t orte_rml_base_framework;
62
63 ORTE_DECLSPEC int orte_rml_base_select(void);
64
65
66
67
68
69
70 typedef struct {
71 opal_list_t posted_recvs;
72 opal_list_t unmatched_msgs;
73 int max_retries;
74 #if OPAL_ENABLE_TIMING
75 bool timing;
76 #endif
77 } orte_rml_base_t;
78 ORTE_DECLSPEC extern orte_rml_base_t orte_rml_base;
79
80
81
82 typedef struct {
83 opal_list_item_t super;
84 orte_process_name_t dst;
85 orte_process_name_t origin;
86 int status;
87 orte_rml_tag_t tag;
88 int retries;
89
90
91 union {
92 orte_rml_callback_fn_t iov;
93 orte_rml_buffer_callback_fn_t buffer;
94 } cbfunc;
95 void *cbdata;
96
97
98 struct iovec *iov;
99 int count;
100
101 opal_buffer_t *buffer;
102
103 uint32_t seq_num;
104
105
106
107 char *data;
108 } orte_rml_send_t;
109 OBJ_CLASS_DECLARATION(orte_rml_send_t);
110
111
112 typedef struct {
113 opal_object_t super;
114 opal_event_t ev;
115 orte_rml_send_t send;
116 } orte_rml_send_request_t;
117 OBJ_CLASS_DECLARATION(orte_rml_send_request_t);
118
119
120 typedef struct {
121 opal_list_item_t super;
122 opal_event_t ev;
123 orte_process_name_t sender;
124 orte_rml_tag_t tag;
125 uint32_t seq_num;
126 struct iovec iov;
127 } orte_rml_recv_t;
128 OBJ_CLASS_DECLARATION(orte_rml_recv_t);
129
130 typedef struct {
131 opal_list_item_t super;
132 bool buffer_data;
133 orte_process_name_t peer;
134 orte_rml_tag_t tag;
135 bool persistent;
136 union {
137 orte_rml_callback_fn_t iov;
138 orte_rml_buffer_callback_fn_t buffer;
139 } cbfunc;
140 void *cbdata;
141 } orte_rml_posted_recv_t;
142 OBJ_CLASS_DECLARATION(orte_rml_posted_recv_t);
143
144
145 typedef struct {
146 opal_object_t super;
147 opal_event_t ev;
148 bool cancel;
149 orte_rml_posted_recv_t *post;
150 } orte_rml_recv_request_t;
151 OBJ_CLASS_DECLARATION(orte_rml_recv_request_t);
152
153
154 typedef struct {
155 opal_object_t object;
156 opal_event_t ev;
157 orte_rml_tag_t tag;
158 struct iovec* iov;
159 int count;
160 opal_buffer_t *buffer;
161 union {
162 orte_rml_callback_fn_t iov;
163 orte_rml_buffer_callback_fn_t buffer;
164 } cbfunc;
165 void *cbdata;
166 } orte_self_send_xfer_t;
167 OBJ_CLASS_DECLARATION(orte_self_send_xfer_t);
168
169 #define ORTE_RML_POST_MESSAGE(p, t, s, b, l) \
170 do { \
171 orte_rml_recv_t *msg; \
172 opal_output_verbose(5, orte_rml_base_framework.framework_output, \
173 "%s Message posted at %s:%d for tag %d", \
174 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
175 __FILE__, __LINE__, (t)); \
176 msg = OBJ_NEW(orte_rml_recv_t); \
177 msg->sender.jobid = (p)->jobid; \
178 msg->sender.vpid = (p)->vpid; \
179 msg->tag = (t); \
180 msg->seq_num = (s); \
181 msg->iov.iov_base = (IOVBASE_TYPE*)(b); \
182 msg->iov.iov_len = (l); \
183 \
184 opal_event_set(orte_event_base, &msg->ev, -1, \
185 OPAL_EV_WRITE, \
186 orte_rml_base_process_msg, msg); \
187 opal_event_set_priority(&msg->ev, ORTE_MSG_PRI); \
188 opal_event_active(&msg->ev, OPAL_EV_WRITE, 1); \
189 } while(0);
190
191 #define ORTE_RML_ACTIVATE_MESSAGE(m) \
192 do { \
193 \
194 opal_event_set(orte_event_base, &(m)->ev, -1, \
195 OPAL_EV_WRITE, \
196 orte_rml_base_process_msg, (m)); \
197 opal_event_set_priority(&(m)->ev, ORTE_MSG_PRI); \
198 opal_event_active(&(m)->ev, OPAL_EV_WRITE, 1); \
199 } while(0);
200
201 #define ORTE_RML_SEND_COMPLETE(m) \
202 do { \
203 opal_output_verbose(5, orte_rml_base_framework.framework_output, \
204 "%s-%s Send message complete at %s:%d", \
205 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
206 ORTE_NAME_PRINT(&((m)->dst)), \
207 __FILE__, __LINE__); \
208 if (NULL != (m)->iov) { \
209 if (NULL != (m)->cbfunc.iov) { \
210 (m)->cbfunc.iov((m)->status, \
211 &((m)->dst), \
212 (m)->iov, (m)->count, \
213 (m)->tag, (m)->cbdata); \
214 } \
215 } else if (NULL != (m)->cbfunc.buffer) { \
216 \
217 (m)->cbfunc.buffer((m)->status, &((m)->dst), \
218 (m)->buffer, \
219 (m)->tag, (m)->cbdata); \
220 } \
221 OBJ_RELEASE(m); \
222 }while(0);
223
224
225 ORTE_DECLSPEC void orte_rml_base_post_recv(int sd, short args, void *cbdata);
226 ORTE_DECLSPEC void orte_rml_base_process_msg(int fd, short flags, void *cbdata);
227
228
229 END_C_DECLS
230
231 #endif