root/orte/test/system/ofi_conduit_stress.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. send_callback
  2. main

   1 #include "orte_config.h"
   2 
   3 #include <stdio.h>
   4 #include <signal.h>
   5 #include <math.h>
   6 #include <sys/time.h>
   7 
   8 #include "opal/runtime/opal_progress.h"
   9 
  10 #include "orte/util/proc_info.h"
  11 #include "orte/util/name_fns.h"
  12 #include "orte/runtime/orte_globals.h"
  13 #include "orte/mca/rml/rml.h"
  14 #include "orte/mca/errmgr/errmgr.h"
  15 
  16 #include "orte/runtime/runtime.h"
  17 #include "orte/runtime/orte_wait.h"
  18 #include "orte/util/attr.h"
  19 
  20 #define MY_TAG 12345
  21 #define MAX_COUNT 3
  22 
  23 static bool msg_recvd;
  24 static volatile bool msg_active;
  25 
  26 static void send_callback(int status, orte_process_name_t *peer,
  27                           opal_buffer_t* buffer, orte_rml_tag_t tag,
  28                           void* cbdata)
  29 
  30 {
  31     OBJ_RELEASE(buffer);
  32     if (ORTE_SUCCESS != status) {
  33         exit(1);
  34     }
  35     msg_active = false;
  36 }
  37 
  38 
  39 int
  40 main(int argc, char *argv[]){
  41     int count;
  42     int msgsize;
  43     uint8_t *msg;
  44     int i, j, rc;
  45     orte_process_name_t peer;
  46     double maxpower;
  47     opal_buffer_t *buf;
  48     orte_rml_recv_cb_t blob;
  49     int conduit_id = 0;  //use the first available conduit
  50     struct timeval start, end;
  51     opal_list_t *conduit_attr;
  52     
  53 
  54     /*
  55      * Init
  56      */
  57     orte_init(&argc, &argv, ORTE_PROC_NON_MPI);
  58 
  59     
  60     conduit_attr = OBJ_NEW(opal_list_t);
  61    if( ORTE_SUCCESS == 
  62             ( orte_set_attribute( conduit_attr, ORTE_RML_PROVIDER_ATTRIB, ORTE_ATTR_GLOBAL,"sockets",OPAL_STRING)))   {
  63     if( ORTE_SUCCESS == 
  64             ( orte_set_attribute( conduit_attr, ORTE_RML_INCLUDE_COMP_ATTRIB, ORTE_ATTR_GLOBAL,"ofi",OPAL_STRING)))   {
  65         opal_output(0, "%s calling open_conduit with ORTE_RML_INCLUDE_COMP_ATTRIB and ORTE_RML_OFI_PROV_NAME_ATTRIB",
  66                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
  67         conduit_id = orte_rml_API_open_conduit(conduit_attr);
  68         if (0 > conduit_id ) {
  69             opal_output(0, "Conduit could not be opened for OFI, exiting");
  70             return;
  71         }
  72      }
  73    }
  74 
  75     opal_output(0, "Using conduit-id %d ", conduit_id);
  76 
  77     if (argc > 1) {
  78         count = atoi(argv[1]);
  79         if (count < 0) {
  80             count = INT_MAX-1;
  81         }
  82     } else {
  83         count = MAX_COUNT;
  84     }
  85 
  86     peer.jobid = ORTE_PROC_MY_NAME->jobid;
  87     peer.vpid = ORTE_PROC_MY_NAME->vpid + 1;
  88     if (peer.vpid == orte_process_info.num_procs) {
  89         peer.vpid = 0;
  90     }
  91 
  92     gettimeofday(&start, NULL);
  93     for (j=1; j < count+1; j++) {
  94         /* rank0 starts ring */
  95         if (ORTE_PROC_MY_NAME->vpid == 0) {
  96             /* setup the initiating buffer - put random sized message in it */
  97             buf = OBJ_NEW(opal_buffer_t);
  98 
  99             maxpower = (double)(j%7);
 100             msgsize = (int)pow(10.0, maxpower);
 101             opal_output(0, "Ring %d message size %d bytes", j, msgsize);
 102             msg = (uint8_t*)malloc(msgsize);
 103             opal_dss.pack(buf, msg, msgsize, OPAL_BYTE);
 104             free(msg);
 105             orte_rml.send_buffer_nb(conduit_id,&peer, buf, MY_TAG, orte_rml_send_callback, NULL);
 106 
 107             /* wait for it to come around */
 108             OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
 109             blob.active = true;
 110             orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
 111                                     ORTE_RML_NON_PERSISTENT,
 112                                     orte_rml_recv_callback, &blob);
 113             ORTE_WAIT_FOR_COMPLETION(blob.active);
 114             OBJ_DESTRUCT(&blob);
 115 
 116             opal_output(0, "%s Ring %d completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
 117         } else {
 118             /* wait for msg */
 119             OBJ_CONSTRUCT(&blob, orte_rml_recv_cb_t);
 120             blob.active = true;
 121             orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, MY_TAG,
 122                                     ORTE_RML_NON_PERSISTENT,
 123                                     orte_rml_recv_callback, &blob);
 124             ORTE_WAIT_FOR_COMPLETION(blob.active);
 125 
 126             opal_output(0, "%s received message %d from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j, ORTE_NAME_PRINT(&blob.name));
 127 
 128             /* send it along */
 129             buf = OBJ_NEW(opal_buffer_t);
 130             opal_dss.copy_payload(buf, &blob.data);
 131             OBJ_DESTRUCT(&blob);
 132             msg_active = true;
 133             orte_rml.send_buffer_nb(conduit_id,&peer, buf, MY_TAG, send_callback, NULL);
 134             ORTE_WAIT_FOR_COMPLETION(msg_active);
 135         }
 136     }
 137     gettimeofday(&end, NULL);
 138     orte_finalize();
 139     printf("start: %d secs, %d usecs\n",start.tv_sec,start.tv_usec);
 140     printf("end: %d secs, %d usecs\n",end.tv_sec,end.tv_usec);
 141     printf("Total minutes = %d, Total seconds = %d", (end.tv_sec - start.tv_sec)/60, (end.tv_sec - start.tv_sec)   );
 142     return 0;
 143 }

/* [<][>][^][v][top][bottom][index][help] */