This source file includes following definitions.
- pmix_ptl_base_set_nonblocking
- pmix_ptl_base_set_blocking
- pmix_ptl_base_send_blocking
- pmix_ptl_base_recv_blocking
- pmix_ptl_base_connect
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 #include <src/include/pmix_config.h>
  21 #include "include/pmix_stdint.h"
  22 
  23 #include <stdio.h>
  24 #ifdef HAVE_UNISTD_H
  25 #include <unistd.h>
  26 #endif
  27 #ifdef HAVE_FCNTL_H
  28 #include <fcntl.h>
  29 #endif
  30 #ifdef HAVE_SYS_SOCKET_H
  31 #include <sys/socket.h>
  32 #endif
  33 
  34 #include "include/pmix_socket_errno.h"
  35 #include "src/util/argv.h"
  36 #include "src/util/error.h"
  37 #include "src/util/getid.h"
  38 #include "src/util/strnlen.h"
  39 #include "src/include/pmix_globals.h"
  40 #include "src/client/pmix_client_ops.h"
  41 #include "src/server/pmix_server_ops.h"
  42 
  43 #include "src/mca/ptl/base/base.h"
  44 
  45 pmix_status_t pmix_ptl_base_set_nonblocking(int sd)
  46 {
  47     int flags;
  48      
  49     if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
  50         pmix_output(0, "ptl:base:set_nonblocking: fcntl(F_GETFL) failed: %s (%d)\n",
  51                     strerror(pmix_socket_errno),
  52                     pmix_socket_errno);
  53     } else {
  54         flags |= O_NONBLOCK;
  55         if(fcntl(sd, F_SETFL, flags) < 0)
  56             pmix_output(0, "ptl:base:set_nonblocking: fcntl(F_SETFL) failed: %s (%d)\n",
  57                         strerror(pmix_socket_errno),
  58                         pmix_socket_errno);
  59     }
  60     return PMIX_SUCCESS;
  61 }
  62 
  63 pmix_status_t pmix_ptl_base_set_blocking(int sd)
  64 {
  65     int flags;
  66      
  67     if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
  68         pmix_output(0, "ptl:base:set_blocking: fcntl(F_GETFL) failed: %s (%d)\n",
  69                     strerror(pmix_socket_errno),
  70                     pmix_socket_errno);
  71     } else {
  72         flags &= ~(O_NONBLOCK);
  73         if(fcntl(sd, F_SETFL, flags) < 0)
  74             pmix_output(0, "ptl:base:set_blocking: fcntl(F_SETFL) failed: %s (%d)\n",
  75                         strerror(pmix_socket_errno),
  76                         pmix_socket_errno);
  77     }
  78     return PMIX_SUCCESS;
  79 }
  80 
  81 
  82 
  83 
  84 
  85 pmix_status_t pmix_ptl_base_send_blocking(int sd, char *ptr, size_t size)
  86 {
  87     size_t cnt = 0;
  88     int retval;
  89 
  90     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
  91                         "send blocking of %"PRIsize_t" bytes to socket %d",
  92                         size, sd );
  93     while (cnt < size) {
  94         retval = send(sd, (char*)ptr+cnt, size-cnt, 0);
  95         if (retval < 0) {
  96             if (EAGAIN == pmix_socket_errno ||
  97                 EWOULDBLOCK == pmix_socket_errno) {
  98                 
  99                 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 100                                     "blocking_send received error %d:%s from remote - cycling",
 101                                     pmix_socket_errno, strerror(pmix_socket_errno));
 102                 continue;
 103             }
 104             if (pmix_socket_errno != EINTR) {
 105                 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 106                                     "ptl:base:peer_send_blocking: send() to socket %d failed: %s (%d)\n",
 107                                     sd, strerror(pmix_socket_errno),
 108                                     pmix_socket_errno);
 109                 return PMIX_ERR_UNREACH;
 110             }
 111             continue;
 112         }
 113         cnt += retval;
 114     }
 115 
 116     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 117                         "blocking send complete to socket %d", sd);
 118     return PMIX_SUCCESS;
 119 }
 120 
 121 
 122 
 123 
 124 
 125 pmix_status_t pmix_ptl_base_recv_blocking(int sd, char *data, size_t size)
 126 {
 127     size_t cnt = 0;
 128 
 129     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 130                         "waiting for blocking recv of %"PRIsize_t" bytes", size);
 131 
 132     while (cnt < size) {
 133         int retval = recv(sd, (char *)data+cnt, size-cnt, MSG_WAITALL);
 134 
 135         
 136         if (retval == 0) {
 137             pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 138                                 "ptl:base:recv_blocking: remote closed connection");
 139             return PMIX_ERR_UNREACH;
 140         }
 141 
 142         
 143         if (retval < 0) {
 144             if (EAGAIN == pmix_socket_errno ||
 145                 EWOULDBLOCK == pmix_socket_errno) {
 146                 
 147                 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 148                                     "blocking_recv received error %d:%s from remote - cycling",
 149                                     pmix_socket_errno, strerror(pmix_socket_errno));
 150                 return PMIX_ERR_TEMP_UNAVAILABLE;
 151             }
 152             if (pmix_socket_errno != EINTR ) {
 153                 
 154 
 155 
 156 
 157 
 158 
 159 
 160 
 161 
 162 
 163 
 164 
 165 
 166 
 167                 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 168                                     "blocking_recv received error %d:%s from remote - aborting",
 169                                     pmix_socket_errno, strerror(pmix_socket_errno));
 170                 return PMIX_ERR_UNREACH;
 171             }
 172             continue;
 173         }
 174         cnt += retval;
 175     }
 176 
 177     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 178                         "blocking receive complete from remote");
 179     return PMIX_SUCCESS;
 180 }
 181 
 182 #define PMIX_MAX_RETRIES 10
 183 
 184 pmix_status_t pmix_ptl_base_connect(struct sockaddr_storage *addr,
 185                                     pmix_socklen_t addrlen, int *fd)
 186 {
 187     int sd = -1;
 188     int retries = 0;
 189 
 190     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 191                         "ptl_base_connect: attempting to connect to server");
 192 
 193     while (retries < PMIX_MAX_RETRIES) {
 194         retries++;
 195         
 196         sd = socket(addr->ss_family, SOCK_STREAM, 0);
 197         if (sd < 0) {
 198             pmix_output(0, "pmix:create_socket: socket() failed: %s (%d)\n",
 199                         strerror(pmix_socket_errno),
 200                         pmix_socket_errno);
 201             continue;
 202         }
 203         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 204                             "pmix_ptl_base_connect: attempting to connect to server on socket %d", sd);
 205         
 206         if (connect(sd, (struct sockaddr*)addr, addrlen) < 0) {
 207             if (pmix_socket_errno == ETIMEDOUT) {
 208                 
 209                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 210                                     "timeout connecting to server");
 211                 CLOSE_THE_SOCKET(sd);
 212                 continue;
 213             }
 214 
 215             
 216 
 217 
 218 
 219 
 220             if (ECONNABORTED == pmix_socket_errno) {
 221                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 222                                     "connection to server aborted by OS - retrying");
 223                 CLOSE_THE_SOCKET(sd);
 224                 continue;
 225             } else {
 226                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 227                                     "Connect failed: %s (%d)", strerror(pmix_socket_errno),
 228                                     pmix_socket_errno);
 229                 CLOSE_THE_SOCKET(sd);
 230                 continue;
 231             }
 232         } else {
 233             
 234             break;
 235         }
 236     }
 237 
 238     if (retries == PMIX_MAX_RETRIES || sd < 0){
 239         
 240 
 241         if (0 <= sd) {
 242             CLOSE_THE_SOCKET(sd);
 243         }
 244         return PMIX_ERR_UNREACH;
 245     }
 246     *fd = sd;
 247 
 248     return PMIX_SUCCESS;
 249 }