This source file includes following definitions.
- send_callback
- main
1 #include "orte_config.h"
2
3 #include <stdio.h>
4 #include <signal.h>
5 #include <math.h>
6 #include <sys/time.h>
7
8 #include "opal/runtime/opal_progress.h"
9
10 #include "orte/util/proc_info.h"
11 #include "orte/util/name_fns.h"
12 #include "orte/runtime/orte_globals.h"
13 #include "orte/mca/rml/rml.h"
14 #include "orte/mca/errmgr/errmgr.h"
15
16 #include "orte/runtime/runtime.h"
17 #include "orte/runtime/orte_wait.h"
18 #include "orte/util/attr.h"
19
20 #define MY_TAG 12345
21 #define MAX_COUNT 3
22
23 static bool msg_recvd;
24 static volatile bool msg_active;
25
26 static void send_callback(int status, orte_process_name_t *peer,
27 opal_buffer_t* buffer, orte_rml_tag_t tag,
28 void* cbdata)
29
30 {
31 OBJ_RELEASE(buffer);
32 if (ORTE_SUCCESS != status) {
33 exit(1);
34 }
35 msg_active = false;
36 }
37
38
39 int
40 main(int argc, char *argv[]){
41 int count;
42 int msgsize;
43 uint8_t *msg;
44 int i, j, rc;
45 orte_process_name_t peer;
46 double maxpower;
47 opal_buffer_t *buf;
48 orte_rml_recv_cb_t blob;
49 int conduit_id = 0;
50 struct timeval start, end;
51 opal_list_t *conduit_attr;
52
53
54
55
56
57 orte_init(&argc, &argv, ORTE_PROC_NON_MPI);
58
59
60 conduit_attr = OBJ_NEW(opal_list_t);
61 if( ORTE_SUCCESS ==
62 ( orte_set_attribute( conduit_attr, ORTE_RML_PROVIDER_ATTRIB, ORTE_ATTR_GLOBAL,"sockets",OPAL_STRING))) {
63 if( ORTE_SUCCESS ==
64 ( orte_set_attribute( conduit_attr, ORTE_RML_INCLUDE_COMP_ATTRIB, ORTE_ATTR_GLOBAL,"ofi",OPAL_STRING))) {
65 opal_output(0, "%s calling open_conduit with ORTE_RML_INCLUDE_COMP_ATTRIB and ORTE_RML_OFI_PROV_NAME_ATTRIB",
66 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
67 conduit_id = orte_rml_API_open_conduit(conduit_attr);
68 if (0 > conduit_id ) {
69 opal_output(0, "Conduit could not be opened for OFI, exiting");
70 return;
71 }
72 }
73 }
74
75 opal_output(0, "Using conduit-id %d ", conduit_id);
76
77 if (argc > 1) {
78 count = atoi(argv[1]);
79 if (count < 0) {
80 count = INT_MAX-1;
81 }
82 } else {
83 count = MAX_COUNT;
84 }
85
86 peer.jobid = ORTE_PROC_MY_NAME->jobid;
87 peer.vpid = ORTE_PROC_MY_NAME->vpid + 1;
88 if (peer.vpid == orte_process_info.num_procs) {
89 peer.vpid = 0;
90 }
91
92 gettimeofday(&start, NULL);
93 for (j=1; j < count+1; j++) {
94
95 if (ORTE_PROC_MY_NAME->vpid == 0) {
96
97 buf = OBJ_NEW(opal_buffer_t);
98
99 maxpower = (double)(j%7);
100 msgsize = (int)pow(10.0, maxpower);
101 opal_output(0, "Ring %d message size %d bytes", j, msgsize);
102 msg = (uint8_t*)malloc(msgsize);
103 opal_dss.pack(buf, msg, msgsize, OPAL_BYTE);
104 free(msg);
105 orte_rml.send_buffer_nb(conduit_id,&peer, buf, MY_TAG, orte_rml_send_callback, NULL);
106
107
108 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
109 blob.active = true;
110 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
111 ORTE_RML_NON_PERSISTENT,
112 orte_rml_recv_callback, &blob);
113 ORTE_WAIT_FOR_COMPLETION(blob.active);
114 OBJ_DESTRUCT(&blob);
115
116 opal_output(0, "%s Ring %d completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
117 } else {
118
119 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
120 blob.active = true;
121 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
122 ORTE_RML_NON_PERSISTENT,
123 orte_rml_recv_callback, &blob);
124 ORTE_WAIT_FOR_COMPLETION(blob.active);
125
126 opal_output(0, "%s received message %d from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j, ORTE_NAME_PRINT(&blob.name));
127
128
129 buf = OBJ_NEW(opal_buffer_t);
130 opal_dss.copy_payload(buf, &blob.data);
131 OBJ_DESTRUCT(&blob);
132 msg_active = true;
133 orte_rml.send_buffer_nb(conduit_id,&peer, buf, MY_TAG, send_callback, NULL);
134 ORTE_WAIT_FOR_COMPLETION(msg_active);
135 }
136 }
137 gettimeofday(&end, NULL);
138 orte_finalize();
139 printf("start: %d secs, %d usecs\n",start.tv_sec,start.tv_usec);
140 printf("end: %d secs, %d usecs\n",end.tv_sec,end.tv_usec);
141 printf("Total minutes = %d, Total seconds = %d", (end.tv_sec - start.tv_sec)/60, (end.tv_sec - start.tv_sec) );
142 return 0;
143 }