This source file includes following definitions.
- send_callback
- main
1 #include "orte_config.h"
2
3 #include <stdio.h>
4 #include <signal.h>
5 #include <math.h>
6 #include <sys/time.h>
7
8 #include "opal/runtime/opal_progress.h"
9
10 #include "orte/util/proc_info.h"
11 #include "orte/util/name_fns.h"
12 #include "orte/runtime/orte_globals.h"
13 #include "orte/mca/rml/rml.h"
14 #include "orte/mca/errmgr/errmgr.h"
15
16 #include "orte/runtime/runtime.h"
17 #include "orte/runtime/orte_wait.h"
18
19 #define MY_TAG 12345
20 #define MAX_COUNT 3
21
22 static bool msg_recvd;
23 static volatile bool msg_active;
24
25 static void send_callback(int status, orte_process_name_t *peer,
26 opal_buffer_t* buffer, orte_rml_tag_t tag,
27 void* cbdata)
28
29 {
30 OBJ_RELEASE(buffer);
31 if (ORTE_SUCCESS != status) {
32 exit(1);
33 }
34 msg_active = false;
35 }
36
37
38 int
39 main(int argc, char *argv[]){
40 int count;
41 int msgsize;
42 uint8_t *msg;
43 int i, j, rc;
44 orte_process_name_t peer;
45 double maxpower;
46 opal_buffer_t *buf;
47 orte_rml_recv_cb_t blob;
48 int sock_conduit_id = 0;
49 struct timeval start, end;
50
51
52
53 orte_init(&argc, &argv, ORTE_PROC_NON_MPI);
54
55 if (argc > 1) {
56 count = atoi(argv[1]);
57 if (count < 0) {
58 count = INT_MAX-1;
59 }
60 } else {
61 count = MAX_COUNT;
62 }
63
64 peer.jobid = ORTE_PROC_MY_NAME->jobid;
65 peer.vpid = ORTE_PROC_MY_NAME->vpid + 1;
66 if (peer.vpid == orte_process_info.num_procs) {
67 peer.vpid = 0;
68 }
69
70 gettimeofday(&start, NULL);
71 for (j=1; j < count+1; j++) {
72
73 if (ORTE_PROC_MY_NAME->vpid == 0) {
74
75 buf = OBJ_NEW(opal_buffer_t);
76
77 maxpower = (double)(j%7);
78 msgsize = (int)pow(10.0, maxpower);
79 opal_output(0, "Ring %d message size %d bytes", j, msgsize);
80 msg = (uint8_t*)malloc(msgsize);
81 opal_dss.pack(buf, msg, msgsize, OPAL_BYTE);
82 free(msg);
83 orte_rml.send_buffer_transport_nb(sock_conduit_id,&peer, buf, MY_TAG, orte_rml_send_callback, NULL);
84
85
86 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
87 blob.active = true;
88 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
89 ORTE_RML_NON_PERSISTENT,
90 orte_rml_recv_callback, &blob);
91 ORTE_WAIT_FOR_COMPLETION(blob.active);
92 OBJ_DESTRUCT(&blob);
93
94 opal_output(0, "%s Ring %d completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
95 } else {
96
97 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
98 blob.active = true;
99 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
100 ORTE_RML_NON_PERSISTENT,
101 orte_rml_recv_callback, &blob);
102 ORTE_WAIT_FOR_COMPLETION(blob.active);
103
104 opal_output(0, "%s received message %d from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j, ORTE_NAME_PRINT(&blob.name));
105
106
107 buf = OBJ_NEW(opal_buffer_t);
108 opal_dss.copy_payload(buf, &blob.data);
109 OBJ_DESTRUCT(&blob);
110 msg_active = true;
111 orte_rml.send_buffer_transport_nb(sock_conduit_id,&peer, buf, MY_TAG, send_callback, NULL);
112 ORTE_WAIT_FOR_COMPLETION(msg_active);
113 }
114 }
115 gettimeofday(&end, NULL);
116 orte_finalize();
117 printf("start: %d secs, %d usecs\n",start.tv_sec,start.tv_usec);
118 printf("end: %d secs, %d usecs\n",end.tv_sec,end.tv_usec);
119 printf("Total minutes = %d, Total seconds = %d", (end.tv_sec - start.tv_sec)/60, (end.tv_sec - start.tv_sec) );
120 return 0;
121 }