This source file includes following definitions.
- close_channel_callback
- open_channel_callback
- send_callback
- recv_callback
- channel_send_callback
- main
1 #include "orte_config.h"
2
3 #include <stdio.h>
4 #include <signal.h>
5 #include <math.h>
6
7 #include "opal/runtime/opal_progress.h"
8
9 #include "orte/util/proc_info.h"
10 #include "orte/util/name_fns.h"
11 #include "orte/runtime/orte_globals.h"
12 #include "orte/mca/rml/rml.h"
13 #include "orte/mca/errmgr/errmgr.h"
14
15 #include "orte/runtime/runtime.h"
16 #include "orte/runtime/orte_wait.h"
17 #include "orte/mca/qos/qos.h"
18 #include "orte/util/attr.h"
19
20 #define MY_TAG 12345
21 #define MAX_COUNT 3
22
23 static volatile bool msgs_recvd;
24 static volatile bool channel_inactive = false;
25 static volatile bool channel_active = false;
26 static volatile bool msg_active = false;
27 static volatile orte_rml_channel_num_t channel;
28 static volatile int num_msgs_recvd = 0;
29 static volatile int num_msgs_sent = 0;
30
31 static void close_channel_callback(int status,
32 orte_rml_channel_num_t channel_num,
33 orte_process_name_t * peer,
34 opal_list_t *qos_attributes,
35 void * cbdata)
36 {
37 if (ORTE_SUCCESS != status)
38 opal_output(0, "close channel not successful status =%d", status);
39 else
40 opal_output(0, "close channel successful - channel num = %d", channel_num);
41 channel_active = false;
42 }
43
44 static void open_channel_callback(int status,
45 orte_rml_channel_num_t channel_num,
46 orte_process_name_t * peer,
47 opal_list_t *qos_attributes,
48 void * cbdata)
49 {
50 if (ORTE_SUCCESS != status) {
51 opal_output(0, "open channel not successful status =%d", status);
52
53 } else {
54 channel = channel_num;
55 opal_output(0, "Open channel successful - channel num = %d", channel_num);
56
57 }
58 channel_inactive = false;
59 }
60
61 static void send_callback(int status, orte_process_name_t *peer,
62 opal_buffer_t* buffer, orte_rml_tag_t tag,
63 void* cbdata)
64
65 {
66 OBJ_RELEASE(buffer);
67 num_msgs_sent++;
68 if (ORTE_SUCCESS != status) {
69 opal_output(0, "rml_send_nb not successful status =%d", status);
70 }
71 if(num_msgs_sent == 5)
72 msg_active = false;
73 }
74
75 static void recv_callback(int status, orte_process_name_t *sender,
76 opal_buffer_t* buffer, orte_rml_tag_t tag,
77 void* cbdata)
78
79 {
80
81 num_msgs_recvd++;
82 opal_output(0, "recv_callback received msg =%d", num_msgs_recvd);
83 if ( num_msgs_recvd == 5) {
84 num_msgs_recvd =0;
85 msgs_recvd = false;
86
87 }
88
89 }
90
91 static void channel_send_callback (int status, orte_rml_channel_num_t channel,
92 opal_buffer_t * buffer, orte_rml_tag_t tag,
93 void *cbdata)
94 {
95 OBJ_RELEASE(buffer);
96 if (ORTE_SUCCESS != status) {
97 opal_output(0, "send_nb_channel not successful status =%d", status);
98 }
99 msg_active = false;
100 }
101
102
103 int main(int argc, char *argv[]){
104 int count;
105 int msgsize;
106 int *type, type_val;
107 int *i, j, rc, n;
108 orte_process_name_t peer;
109 double maxpower;
110 opal_buffer_t *buf;
111 orte_rml_recv_cb_t blob;
112 opal_list_t *qos_attributes;
113 int window;
114 uint32_t timeout = 1;
115 bool retry = false;
116 uint8_t *msg;
117
118
119
120 orte_init(&argc, &argv, ORTE_PROC_NON_MPI);
121
122 if (argc > 1) {
123 count = atoi(argv[1]);
124 if (count < 0) {
125 count = INT_MAX-1;
126 }
127 } else {
128 count = MAX_COUNT;
129 }
130
131 peer.jobid = ORTE_PROC_MY_NAME->jobid;
132 peer.vpid = ORTE_PROC_MY_NAME->vpid + 1;
133 if (peer.vpid == orte_process_info.num_procs) {
134 peer.vpid = 0;
135 }
136 type_val = orte_qos_ack;
137 type = &type_val;
138 window = 5;
139 count =3;
140 qos_attributes = OBJ_NEW (opal_list_t);
141 if (ORTE_SUCCESS == (rc = orte_set_attribute( qos_attributes,
142 ORTE_QOS_TYPE, ORTE_ATTR_GLOBAL, (void*)type, OPAL_UINT8))) {
143 type = &window;
144 if (ORTE_SUCCESS == (rc = orte_set_attribute(qos_attributes, ORTE_QOS_WINDOW_SIZE,
145 ORTE_ATTR_GLOBAL, (void*) type, OPAL_UINT32))) {
146
147
148 type = &timeout;
149 orte_set_attribute (qos_attributes, ORTE_QOS_ACK_NACK_TIMEOUT, ORTE_ATTR_GLOBAL,
150 (void*)type, OPAL_UINT32);
151 orte_set_attribute (qos_attributes, ORTE_QOS_MSG_RETRY, ORTE_ATTR_GLOBAL,
152 NULL, OPAL_BOOL);
153
154
155
156
157
158
159
160
161
162 channel_inactive = true;
163 orte_rml.open_channel ( &peer, qos_attributes, open_channel_callback, NULL);
164 opal_output(0, "%s process sent open channel request %d waiting for completion \n",
165 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
166 ORTE_WAIT_FOR_COMPLETION(channel_inactive);
167 opal_output(0, "%s open channel complete to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
168 ORTE_NAME_PRINT(&peer));
169 }
170 }
171 for (j = 0; j< count; j++)
172 {
173 if (ORTE_PROC_MY_NAME->vpid == 0)
174 {
175
176 msg_active = true;
177 for (n = 0; n< window; n++ )
178 {
179 buf = OBJ_NEW(opal_buffer_t);
180 maxpower = (double)(j%7);
181 msgsize = (int)pow(10.0, maxpower);
182 opal_output(0, "Ring %d message %d size %d bytes", j,n, msgsize);
183 msg = (uint8_t*)malloc(msgsize);
184 opal_dss.pack(buf, msg, msgsize, OPAL_BYTE);
185 free(msg);
186 orte_rml.send_buffer_channel_nb(channel, buf, MY_TAG, channel_send_callback, NULL);
187 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
188 blob.active = true;
189 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
190 ORTE_RML_NON_PERSISTENT,
191 orte_rml_recv_callback, &blob);
192 ORTE_WAIT_FOR_COMPLETION(blob.active);
193 OBJ_DESTRUCT(&blob);
194
195 }
196 ORTE_WAIT_FOR_COMPLETION(msg_active);
197 opal_output(0, "%s Ring %d completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
198
199 }
200 else
201 {
202 msg_active = true;
203 for (n =0; n < window; n++) {
204 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
205 blob.active = true;
206 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
207 ORTE_RML_NON_PERSISTENT,
208 orte_rml_recv_callback, &blob);
209 ORTE_WAIT_FOR_COMPLETION(blob.active);
210 opal_output(0, "%s received message %d from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j,
211 ORTE_NAME_PRINT(&blob.name));
212
213 buf = OBJ_NEW(opal_buffer_t);
214 opal_dss.copy_payload(buf, &blob.data);
215 OBJ_DESTRUCT(&blob);
216 orte_rml.send_buffer_channel_nb(channel, buf, MY_TAG, channel_send_callback, NULL);
217 }
218 ORTE_WAIT_FOR_COMPLETION(msg_active);
219 opal_output(0, "%s Ring %d completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
220
221 }
222 }
223 channel_active = true;
224 orte_rml.close_channel ( channel,close_channel_callback, NULL);
225 opal_output(0, "%s process sent close channel request waiting for completion \n",
226 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
227 ORTE_WAIT_FOR_COMPLETION(channel_active);
228 opal_output(0, "%s close channel complete to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
229 ORTE_NAME_PRINT(&peer));
230 orte_finalize();
231 return 0;
232 }