root/orte/test/system/oob_stress_channel.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. close_channel_callback
  2. open_channel_callback
  3. send_callback
  4. recv_callback
  5. channel_send_callback
  6. main

   1 #include "orte_config.h"
   2 
   3 #include <stdio.h>
   4 #include <signal.h>
   5 #include <math.h>
   6 
   7 #include "opal/runtime/opal_progress.h"
   8 
   9 #include "orte/util/proc_info.h"
  10 #include "orte/util/name_fns.h"
  11 #include "orte/runtime/orte_globals.h"
  12 #include "orte/mca/rml/rml.h"
  13 #include "orte/mca/errmgr/errmgr.h"
  14 
  15 #include "orte/runtime/runtime.h"
  16 #include "orte/runtime/orte_wait.h"
  17 #include "orte/mca/qos/qos.h"
  18 #include "orte/util/attr.h"
  19 
  20 #define MY_TAG 12345
  21 #define MAX_COUNT 3
  22 
  23 static volatile bool msgs_recvd;
  24 static volatile bool channel_inactive = false;
  25 static volatile bool channel_active = false;
  26 static volatile bool msg_active = false;
  27 static volatile orte_rml_channel_num_t channel;
  28 static volatile int num_msgs_recvd = 0;
  29 static volatile int num_msgs_sent = 0;
  30 
  31 static void close_channel_callback(int status,
  32                                   orte_rml_channel_num_t channel_num,
  33                                   orte_process_name_t * peer,
  34                                   opal_list_t *qos_attributes,
  35                                   void * cbdata)
  36 {
  37     if (ORTE_SUCCESS != status)
  38         opal_output(0, "close channel not successful status =%d", status);
  39     else
  40         opal_output(0, "close channel successful - channel num = %d", channel_num);
  41     channel_active = false;
  42 }
  43 
  44 static void open_channel_callback(int status,
  45                                   orte_rml_channel_num_t channel_num,
  46                                   orte_process_name_t * peer,
  47                                   opal_list_t *qos_attributes,
  48                                   void * cbdata)
  49 {
  50     if (ORTE_SUCCESS != status) {
  51         opal_output(0, "open channel not successful status =%d", status);
  52 
  53     } else {
  54         channel = channel_num;
  55         opal_output(0, "Open channel successful - channel num = %d", channel_num);
  56 
  57     }
  58     channel_inactive = false;
  59 }
  60 
  61 static void send_callback(int status, orte_process_name_t *peer,
  62                           opal_buffer_t* buffer, orte_rml_tag_t tag,
  63                           void* cbdata)
  64 
  65 {
  66     OBJ_RELEASE(buffer);
  67     num_msgs_sent++;
  68     if (ORTE_SUCCESS != status) {
  69         opal_output(0, "rml_send_nb  not successful status =%d", status);
  70     }
  71     if(num_msgs_sent == 5)
  72         msg_active = false;
  73 }
  74 
  75 static void recv_callback(int status, orte_process_name_t *sender,
  76                           opal_buffer_t* buffer, orte_rml_tag_t tag,
  77                           void* cbdata)
  78 
  79 {
  80     //orte_rml_recv_cb_t *blob = (orte_rml_recv_cb_t*)cbdata;
  81     num_msgs_recvd++;
  82     opal_output(0, "recv_callback received msg =%d", num_msgs_recvd);
  83     if ( num_msgs_recvd == 5) {
  84         num_msgs_recvd =0;
  85         msgs_recvd = false;
  86 
  87     }
  88 
  89 }
  90 
  91 static void channel_send_callback (int status, orte_rml_channel_num_t channel,
  92                                    opal_buffer_t * buffer, orte_rml_tag_t tag,
  93                                    void *cbdata)
  94 {
  95     OBJ_RELEASE(buffer);
  96     if (ORTE_SUCCESS != status) {
  97         opal_output(0, "send_nb_channel not successful status =%d", status);
  98     }
  99     msg_active = false;
 100 }
 101 
 102 
 103 int main(int argc, char *argv[]){
 104     int count;
 105     int msgsize;
 106     int *type, type_val;
 107     int *i, j, rc, n;
 108     orte_process_name_t peer;
 109     double maxpower;
 110     opal_buffer_t *buf;
 111     orte_rml_recv_cb_t blob;
 112     opal_list_t *qos_attributes;
 113     int  window;
 114     uint32_t timeout = 1;
 115     bool retry = false;
 116     uint8_t *msg;
 117     /*
 118      * Init
 119      */
 120     orte_init(&argc, &argv, ORTE_PROC_NON_MPI);
 121 
 122     if (argc > 1) {
 123         count = atoi(argv[1]);
 124         if (count < 0) {
 125             count = INT_MAX-1;
 126         }
 127     } else {
 128         count = MAX_COUNT;
 129     }
 130 
 131     peer.jobid = ORTE_PROC_MY_NAME->jobid;
 132     peer.vpid = ORTE_PROC_MY_NAME->vpid + 1;
 133     if (peer.vpid == orte_process_info.num_procs) {
 134         peer.vpid = 0;
 135     }
 136     type_val = orte_qos_ack;
 137     type = &type_val;
 138     window = 5;
 139     count =3;
 140     qos_attributes = OBJ_NEW (opal_list_t);
 141     if (ORTE_SUCCESS == (rc = orte_set_attribute( qos_attributes,
 142                                   ORTE_QOS_TYPE, ORTE_ATTR_GLOBAL, (void*)type, OPAL_UINT8))) {
 143         type = &window;
 144         if (ORTE_SUCCESS == (rc = orte_set_attribute(qos_attributes, ORTE_QOS_WINDOW_SIZE,
 145                                       ORTE_ATTR_GLOBAL, (void*) type, OPAL_UINT32))) {
 146             //  orte_get_attribute( &qos_attributes, ORTE_QOS_WINDOW_SIZE, (void**)&type, OPAL_UINT32);
 147             // opal_output(0, "%s set attribute window =%d complete \n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), *type );
 148             type = &timeout;
 149             orte_set_attribute (qos_attributes, ORTE_QOS_ACK_NACK_TIMEOUT, ORTE_ATTR_GLOBAL,
 150                                     (void*)type, OPAL_UINT32);
 151             orte_set_attribute (qos_attributes, ORTE_QOS_MSG_RETRY, ORTE_ATTR_GLOBAL,
 152                                     NULL, OPAL_BOOL);
 153            /* Uncomment following lines to print channel attributes */
 154            /*
 155            opal_output(0, "%s set attribute retry =%d complete \n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), retry );
 156            orte_get_attribute( qos_attributes, ORTE_QOS_TYPE, (void**)&type, OPAL_UINT8);
 157            opal_output(0, "%s set attribute type =%d complete \n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), *type );
 158            orte_get_attribute( qos_attributes, ORTE_QOS_WINDOW_SIZE, (void**)&type, OPAL_UINT32);
 159            opal_output(0, "%s set attribute window =%d complete \n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), *type )
 160            orte_get_attribute( qos_attributes, ORTE_QOS_ACK_NACK_TIMEOUT, (void**)&type, OPAL_UINT32);
 161            opal_output(0, "%s set attribute timeout =%d complete \n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), *type );*/
 162            channel_inactive = true;
 163            orte_rml.open_channel ( &peer, qos_attributes, open_channel_callback, NULL);
 164            opal_output(0, "%s process sent open channel request %d waiting for completion \n",
 165                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
 166            ORTE_WAIT_FOR_COMPLETION(channel_inactive);
 167            opal_output(0, "%s open channel complete to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 168                                                                  ORTE_NAME_PRINT(&peer));
 169           }
 170     }
 171     for (j = 0; j< count; j++)
 172     {
 173         if (ORTE_PROC_MY_NAME->vpid == 0)
 174         {
 175             /* rank0 starts ring */
 176             msg_active = true;
 177             for (n = 0; n< window; n++ )
 178             {
 179                 buf = OBJ_NEW(opal_buffer_t);
 180                 maxpower = (double)(j%7);
 181                 msgsize = (int)pow(10.0, maxpower);
 182                 opal_output(0, "Ring %d message %d size %d bytes", j,n, msgsize);
 183                 msg = (uint8_t*)malloc(msgsize);
 184                 opal_dss.pack(buf, msg, msgsize, OPAL_BYTE);
 185                 free(msg);
 186                 orte_rml.send_buffer_channel_nb(channel, buf, MY_TAG, channel_send_callback, NULL);
 187                 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
 188                 blob.active = true;
 189                 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
 190                                         ORTE_RML_NON_PERSISTENT,
 191                                         orte_rml_recv_callback, &blob);
 192                 ORTE_WAIT_FOR_COMPLETION(blob.active);
 193                 OBJ_DESTRUCT(&blob);
 194                 //orte_rml.send_buffer_nb(&peer, buf,MY_TAG, send_callback, NULL)
 195             }
 196             ORTE_WAIT_FOR_COMPLETION(msg_active);
 197             opal_output(0, "%s Ring %d completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
 198             //sleep(2);
 199         }
 200         else
 201         {
 202             msg_active = true;
 203             for (n =0; n < window; n++) {
 204                 OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
 205                 blob.active = true;
 206                 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
 207                                 ORTE_RML_NON_PERSISTENT,
 208                                 orte_rml_recv_callback, &blob);
 209                 ORTE_WAIT_FOR_COMPLETION(blob.active);
 210                 opal_output(0, "%s received message %d from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j,
 211                                 ORTE_NAME_PRINT(&blob.name));
 212                 /* send it along */
 213                buf = OBJ_NEW(opal_buffer_t);
 214                opal_dss.copy_payload(buf, &blob.data);
 215                OBJ_DESTRUCT(&blob);
 216                orte_rml.send_buffer_channel_nb(channel, buf, MY_TAG, channel_send_callback, NULL);
 217             }
 218             ORTE_WAIT_FOR_COMPLETION(msg_active);
 219             opal_output(0, "%s Ring %d completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
 220             //sleep (2);
 221         }
 222     }
 223     channel_active = true;
 224     orte_rml.close_channel ( channel,close_channel_callback, NULL);
 225     opal_output(0, "%s process sent close channel request waiting for completion \n",
 226                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 227     ORTE_WAIT_FOR_COMPLETION(channel_active);
 228     opal_output(0, "%s close channel complete to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 229                 ORTE_NAME_PRINT(&peer));
 230     orte_finalize();
 231     return 0;
 232 }

/* [<][>][^][v][top][bottom][index][help] */