This source file includes following definitions.
- pmix_ptl_base_set_nonblocking
- pmix_ptl_base_set_blocking
- pmix_ptl_base_send_blocking
- pmix_ptl_base_recv_blocking
- pmix_ptl_base_connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 #include <src/include/pmix_config.h>
21 #include "include/pmix_stdint.h"
22
23 #include <stdio.h>
24 #ifdef HAVE_UNISTD_H
25 #include <unistd.h>
26 #endif
27 #ifdef HAVE_FCNTL_H
28 #include <fcntl.h>
29 #endif
30 #ifdef HAVE_SYS_SOCKET_H
31 #include <sys/socket.h>
32 #endif
33
34 #include "include/pmix_socket_errno.h"
35 #include "src/util/argv.h"
36 #include "src/util/error.h"
37 #include "src/util/getid.h"
38 #include "src/util/strnlen.h"
39 #include "src/include/pmix_globals.h"
40 #include "src/client/pmix_client_ops.h"
41 #include "src/server/pmix_server_ops.h"
42
43 #include "src/mca/ptl/base/base.h"
44
45 pmix_status_t pmix_ptl_base_set_nonblocking(int sd)
46 {
47 int flags;
48
49 if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
50 pmix_output(0, "ptl:base:set_nonblocking: fcntl(F_GETFL) failed: %s (%d)\n",
51 strerror(pmix_socket_errno),
52 pmix_socket_errno);
53 } else {
54 flags |= O_NONBLOCK;
55 if(fcntl(sd, F_SETFL, flags) < 0)
56 pmix_output(0, "ptl:base:set_nonblocking: fcntl(F_SETFL) failed: %s (%d)\n",
57 strerror(pmix_socket_errno),
58 pmix_socket_errno);
59 }
60 return PMIX_SUCCESS;
61 }
62
63 pmix_status_t pmix_ptl_base_set_blocking(int sd)
64 {
65 int flags;
66
67 if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
68 pmix_output(0, "ptl:base:set_blocking: fcntl(F_GETFL) failed: %s (%d)\n",
69 strerror(pmix_socket_errno),
70 pmix_socket_errno);
71 } else {
72 flags &= ~(O_NONBLOCK);
73 if(fcntl(sd, F_SETFL, flags) < 0)
74 pmix_output(0, "ptl:base:set_blocking: fcntl(F_SETFL) failed: %s (%d)\n",
75 strerror(pmix_socket_errno),
76 pmix_socket_errno);
77 }
78 return PMIX_SUCCESS;
79 }
80
81
82
83
84
85 pmix_status_t pmix_ptl_base_send_blocking(int sd, char *ptr, size_t size)
86 {
87 size_t cnt = 0;
88 int retval;
89
90 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
91 "send blocking of %"PRIsize_t" bytes to socket %d",
92 size, sd );
93 while (cnt < size) {
94 retval = send(sd, (char*)ptr+cnt, size-cnt, 0);
95 if (retval < 0) {
96 if (EAGAIN == pmix_socket_errno ||
97 EWOULDBLOCK == pmix_socket_errno) {
98
99 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
100 "blocking_send received error %d:%s from remote - cycling",
101 pmix_socket_errno, strerror(pmix_socket_errno));
102 continue;
103 }
104 if (pmix_socket_errno != EINTR) {
105 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
106 "ptl:base:peer_send_blocking: send() to socket %d failed: %s (%d)\n",
107 sd, strerror(pmix_socket_errno),
108 pmix_socket_errno);
109 return PMIX_ERR_UNREACH;
110 }
111 continue;
112 }
113 cnt += retval;
114 }
115
116 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
117 "blocking send complete to socket %d", sd);
118 return PMIX_SUCCESS;
119 }
120
121
122
123
124
125 pmix_status_t pmix_ptl_base_recv_blocking(int sd, char *data, size_t size)
126 {
127 size_t cnt = 0;
128
129 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
130 "waiting for blocking recv of %"PRIsize_t" bytes", size);
131
132 while (cnt < size) {
133 int retval = recv(sd, (char *)data+cnt, size-cnt, MSG_WAITALL);
134
135
136 if (retval == 0) {
137 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
138 "ptl:base:recv_blocking: remote closed connection");
139 return PMIX_ERR_UNREACH;
140 }
141
142
143 if (retval < 0) {
144 if (EAGAIN == pmix_socket_errno ||
145 EWOULDBLOCK == pmix_socket_errno) {
146
147 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
148 "blocking_recv received error %d:%s from remote - cycling",
149 pmix_socket_errno, strerror(pmix_socket_errno));
150 return PMIX_ERR_TEMP_UNAVAILABLE;
151 }
152 if (pmix_socket_errno != EINTR ) {
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
168 "blocking_recv received error %d:%s from remote - aborting",
169 pmix_socket_errno, strerror(pmix_socket_errno));
170 return PMIX_ERR_UNREACH;
171 }
172 continue;
173 }
174 cnt += retval;
175 }
176
177 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
178 "blocking receive complete from remote");
179 return PMIX_SUCCESS;
180 }
181
182 #define PMIX_MAX_RETRIES 10
183
184 pmix_status_t pmix_ptl_base_connect(struct sockaddr_storage *addr,
185 pmix_socklen_t addrlen, int *fd)
186 {
187 int sd = -1;
188 int retries = 0;
189
190 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
191 "ptl_base_connect: attempting to connect to server");
192
193 while (retries < PMIX_MAX_RETRIES) {
194 retries++;
195
196 sd = socket(addr->ss_family, SOCK_STREAM, 0);
197 if (sd < 0) {
198 pmix_output(0, "pmix:create_socket: socket() failed: %s (%d)\n",
199 strerror(pmix_socket_errno),
200 pmix_socket_errno);
201 continue;
202 }
203 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
204 "pmix_ptl_base_connect: attempting to connect to server on socket %d", sd);
205
206 if (connect(sd, (struct sockaddr*)addr, addrlen) < 0) {
207 if (pmix_socket_errno == ETIMEDOUT) {
208
209 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
210 "timeout connecting to server");
211 CLOSE_THE_SOCKET(sd);
212 continue;
213 }
214
215
216
217
218
219
220 if (ECONNABORTED == pmix_socket_errno) {
221 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
222 "connection to server aborted by OS - retrying");
223 CLOSE_THE_SOCKET(sd);
224 continue;
225 } else {
226 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
227 "Connect failed: %s (%d)", strerror(pmix_socket_errno),
228 pmix_socket_errno);
229 CLOSE_THE_SOCKET(sd);
230 continue;
231 }
232 } else {
233
234 break;
235 }
236 }
237
238 if (retries == PMIX_MAX_RETRIES || sd < 0){
239
240
241 if (0 <= sd) {
242 CLOSE_THE_SOCKET(sd);
243 }
244 return PMIX_ERR_UNREACH;
245 }
246 *fd = sd;
247
248 return PMIX_SUCCESS;
249 }