This source file includes following definitions.
- mca_btl_tcp_frag_eager_constructor
- mca_btl_tcp_frag_max_constructor
- mca_btl_tcp_frag_user_constructor
- mca_btl_tcp_frag_dump
- mca_btl_tcp_frag_send
- mca_btl_tcp_frag_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 #include "opal_config.h"
31
32 #ifdef HAVE_SYS_TYPES_H
33 #include <sys/types.h>
34 #endif
35 #ifdef HAVE_SYS_UIO_H
36 #include <sys/uio.h>
37 #endif
38 #ifdef HAVE_NET_UIO_H
39 #include <net/uio.h>
40 #endif
41 #ifdef HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44
45 #include "opal/opal_socket_errno.h"
46 #include "opal/mca/btl/base/btl_base_error.h"
47 #include "opal/util/show_help.h"
48
49 #include "btl_tcp_frag.h"
50 #include "btl_tcp_endpoint.h"
51 #include "btl_tcp_proc.h"
52
53
54 static void mca_btl_tcp_frag_eager_constructor(mca_btl_tcp_frag_t* frag)
55 {
56 frag->size = mca_btl_tcp_module.super.btl_eager_limit;
57 frag->my_list = &mca_btl_tcp_component.tcp_frag_eager;
58 }
59
60 static void mca_btl_tcp_frag_max_constructor(mca_btl_tcp_frag_t* frag)
61 {
62 frag->size = mca_btl_tcp_module.super.btl_max_send_size;
63 frag->my_list = &mca_btl_tcp_component.tcp_frag_max;
64 }
65
66 static void mca_btl_tcp_frag_user_constructor(mca_btl_tcp_frag_t* frag)
67 {
68 frag->size = 0;
69 frag->my_list = &mca_btl_tcp_component.tcp_frag_user;
70 }
71
72
73 OBJ_CLASS_INSTANCE(
74 mca_btl_tcp_frag_t,
75 mca_btl_base_descriptor_t,
76 NULL,
77 NULL);
78
79 OBJ_CLASS_INSTANCE(
80 mca_btl_tcp_frag_eager_t,
81 mca_btl_base_descriptor_t,
82 mca_btl_tcp_frag_eager_constructor,
83 NULL);
84
85 OBJ_CLASS_INSTANCE(
86 mca_btl_tcp_frag_max_t,
87 mca_btl_base_descriptor_t,
88 mca_btl_tcp_frag_max_constructor,
89 NULL);
90
91 OBJ_CLASS_INSTANCE(
92 mca_btl_tcp_frag_user_t,
93 mca_btl_base_descriptor_t,
94 mca_btl_tcp_frag_user_constructor,
95 NULL);
96
97 size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t* frag, char* msg, char* buf, size_t length)
98 {
99 int i, used;
100
101 used = snprintf(buf, length, "%s frag %p iov_cnt %d iov_idx %d size %lu\n",
102 msg, (void*)frag, (int)frag->iov_cnt, (int)frag->iov_idx, frag->size);
103 if ((size_t)used >= length) return length;
104 for( i = 0; i < (int)frag->iov_cnt; i++ ) {
105 used += snprintf(&buf[used], length - used, "[%s%p:%lu] ",
106 (i < (int)frag->iov_idx ? "*" : ""),
107 frag->iov[i].iov_base, frag->iov[i].iov_len);
108 if ((size_t)used >= length) return length;
109 }
110 return used;
111 }
112
113 bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
114 {
115 ssize_t cnt;
116 size_t i, num_vecs;
117
118
119 do {
120 cnt = writev(sd, frag->iov_ptr, frag->iov_cnt);
121 if(cnt < 0) {
122 switch(opal_socket_errno) {
123 case EINTR:
124 continue;
125 case EWOULDBLOCK:
126 return false;
127 case EFAULT:
128 BTL_ERROR(("mca_btl_tcp_frag_send: writev error (%p, %lu)\n\t%s(%lu)\n",
129 frag->iov_ptr[0].iov_base, (unsigned long) frag->iov_ptr[0].iov_len,
130 strerror(opal_socket_errno), (unsigned long) frag->iov_cnt));
131 frag->endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
132 mca_btl_tcp_endpoint_close(frag->endpoint);
133 return false;
134 default:
135 BTL_ERROR(("mca_btl_tcp_frag_send: writev failed: %s (%d)",
136 strerror(opal_socket_errno),
137 opal_socket_errno));
138 frag->endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
139 mca_btl_tcp_endpoint_close(frag->endpoint);
140 return false;
141 }
142 }
143 } while(cnt < 0);
144
145
146 num_vecs = frag->iov_cnt;
147 for( i = 0; i < num_vecs; i++) {
148 if(cnt >= (ssize_t)frag->iov_ptr->iov_len) {
149 cnt -= frag->iov_ptr->iov_len;
150 frag->iov_ptr++;
151 frag->iov_idx++;
152 frag->iov_cnt--;
153 } else {
154 frag->iov_ptr->iov_base = (opal_iov_base_ptr_t)
155 (((unsigned char*)frag->iov_ptr->iov_base) + cnt);
156 frag->iov_ptr->iov_len -= cnt;
157 OPAL_OUTPUT_VERBOSE((100, opal_btl_base_framework.framework_output,
158 "%s:%d write %ld bytes on socket %d\n",
159 __FILE__, __LINE__, cnt, sd));
160 break;
161 }
162 }
163 return (frag->iov_cnt == 0);
164 }
165
166 bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
167 {
168 mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
169 ssize_t cnt;
170 int32_t i, num_vecs, dont_copy_data = 0;
171
172 repeat:
173 num_vecs = frag->iov_cnt;
174 #if MCA_BTL_TCP_ENDPOINT_CACHE
175 if( 0 != btl_endpoint->endpoint_cache_length ) {
176 size_t length;
177
178
179
180
181 cnt = length = btl_endpoint->endpoint_cache_length;
182 for( i = 0; i < (int)frag->iov_cnt; i++ ) {
183 if( length > frag->iov_ptr[i].iov_len )
184 length = frag->iov_ptr[i].iov_len;
185 if( (0 == dont_copy_data) || (length < frag->iov_ptr[i].iov_len) ) {
186 memcpy( frag->iov_ptr[i].iov_base, btl_endpoint->endpoint_cache_pos, length );
187 } else {
188 frag->segments[0].seg_addr.pval = btl_endpoint->endpoint_cache_pos;
189 frag->iov_ptr[i].iov_base = btl_endpoint->endpoint_cache_pos;
190 }
191 btl_endpoint->endpoint_cache_pos += length;
192 btl_endpoint->endpoint_cache_length -= length;
193 length = btl_endpoint->endpoint_cache_length;
194 if( 0 == length ) {
195 btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
196 break;
197 }
198 }
199 goto advance_iov_position;
200 }
201
202
203
204 frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache_pos;
205 frag->iov_ptr[num_vecs].iov_len =
206 mca_btl_tcp_component.tcp_endpoint_cache - btl_endpoint->endpoint_cache_length;
207 num_vecs++;
208 #endif
209
210
211 do {
212 cnt = readv(sd, frag->iov_ptr, num_vecs);
213 if( 0 < cnt ) goto advance_iov_position;
214 if( cnt == 0 ) {
215 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
216 mca_btl_tcp_endpoint_close(btl_endpoint);
217 return false;
218 }
219 switch(opal_socket_errno) {
220 case EINTR:
221 continue;
222 case EWOULDBLOCK:
223 return false;
224 case EFAULT:
225 BTL_ERROR(("mca_btl_tcp_frag_recv: readv error (%p, %lu)\n\t%s(%lu)\n",
226 frag->iov_ptr[0].iov_base, (unsigned long) frag->iov_ptr[0].iov_len,
227 strerror(opal_socket_errno), (unsigned long) frag->iov_cnt));
228 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
229 mca_btl_tcp_endpoint_close(btl_endpoint);
230 return false;
231
232 case ECONNRESET:
233 opal_show_help("help-mpi-btl-tcp.txt", "peer hung up",
234 true, opal_process_info.nodename,
235 getpid(),
236 btl_endpoint->endpoint_proc->proc_opal->proc_hostname);
237 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
238 mca_btl_tcp_endpoint_close(btl_endpoint);
239 return false;
240
241 default:
242 BTL_ERROR(("mca_btl_tcp_frag_recv: readv failed: %s (%d)",
243 strerror(opal_socket_errno),
244 opal_socket_errno));
245 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
246 mca_btl_tcp_endpoint_close(btl_endpoint);
247 return false;
248 }
249 } while( cnt < 0 );
250
251 advance_iov_position:
252
253 num_vecs = frag->iov_cnt;
254 for( i = 0; i < num_vecs; i++ ) {
255 if( cnt < (ssize_t)frag->iov_ptr->iov_len ) {
256 frag->iov_ptr->iov_base = (opal_iov_base_ptr_t)
257 (((unsigned char*)frag->iov_ptr->iov_base) + cnt);
258 frag->iov_ptr->iov_len -= cnt;
259 cnt = 0;
260 break;
261 }
262 cnt -= frag->iov_ptr->iov_len;
263 frag->iov_idx++;
264 frag->iov_ptr++;
265 frag->iov_cnt--;
266 }
267 #if MCA_BTL_TCP_ENDPOINT_CACHE
268 btl_endpoint->endpoint_cache_length = cnt;
269 #endif
270
271
272 if(frag->iov_cnt == 0) {
273 if (btl_endpoint->endpoint_nbo && frag->iov_idx == 1) MCA_BTL_TCP_HDR_NTOH(frag->hdr);
274 switch(frag->hdr.type) {
275 case MCA_BTL_TCP_HDR_TYPE_SEND:
276 if(frag->iov_idx == 1 && frag->hdr.size) {
277 frag->segments[0].seg_addr.pval = frag+1;
278 frag->segments[0].seg_len = frag->hdr.size;
279 frag->iov[1].iov_base = (IOVBASE_TYPE*)(frag->segments[0].seg_addr.pval);
280 frag->iov[1].iov_len = frag->hdr.size;
281 frag->iov_cnt++;
282 goto repeat;
283 }
284 break;
285 case MCA_BTL_TCP_HDR_TYPE_PUT:
286 if(frag->iov_idx == 1) {
287 frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->segments;
288 frag->iov[1].iov_len = frag->hdr.count * sizeof(mca_btl_base_segment_t);
289 frag->iov_cnt++;
290 goto repeat;
291 } else if (frag->iov_idx == 2) {
292 for( i = 0; i < frag->hdr.count; i++ ) {
293 if (btl_endpoint->endpoint_nbo) MCA_BTL_BASE_SEGMENT_NTOH(frag->segments[i]);
294 frag->iov[i+2].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
295 frag->iov[i+2].iov_len = frag->segments[i].seg_len;
296 }
297 frag->iov_cnt += frag->hdr.count;
298 goto repeat;
299 }
300 break;
301 case MCA_BTL_TCP_HDR_TYPE_GET:
302 default:
303 break;
304 }
305 return true;
306 }
307 return false;
308 }
309