This source file includes following definitions.
- mca_btl_tcp_add_procs
- mca_btl_tcp_del_procs
- mca_btl_tcp_alloc
- mca_btl_tcp_free
- mca_btl_tcp_prepare_src
- mca_btl_tcp_send
- fake_rdma_complete
- mca_btl_tcp_put
- mca_btl_tcp_get
- mca_btl_tcp_finalize
- mca_btl_tcp_dump
- mca_btl_tcp_recv_blocking
- mca_btl_tcp_send_blocking
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "opal_config.h"
27 #include <string.h>
28 #include "opal/class/opal_bitmap.h"
29 #include "opal/mca/btl/btl.h"
30 #include "opal/datatype/opal_convertor.h"
31 #include "opal/mca/mpool/base/base.h"
32 #include "opal/mca/mpool/mpool.h"
33 #include "opal/mca/btl/base/btl_base_error.h"
34 #include "opal/opal_socket_errno.h"
35
36 #include "btl_tcp.h"
37 #include "btl_tcp_frag.h"
38 #include "btl_tcp_proc.h"
39 #include "btl_tcp_endpoint.h"
40
41
42 mca_btl_tcp_module_t mca_btl_tcp_module = {
43 .super = {
44 .btl_component = &mca_btl_tcp_component.super,
45 .btl_add_procs = mca_btl_tcp_add_procs,
46 .btl_del_procs = mca_btl_tcp_del_procs,
47 .btl_finalize = mca_btl_tcp_finalize,
48 .btl_alloc = mca_btl_tcp_alloc,
49 .btl_free = mca_btl_tcp_free,
50 .btl_prepare_src = mca_btl_tcp_prepare_src,
51 .btl_send = mca_btl_tcp_send,
52 .btl_put = mca_btl_tcp_put,
53 .btl_dump = mca_btl_base_dump,
54 .btl_ft_event = mca_btl_tcp_ft_event
55 },
56 .tcp_endpoints_mutex = OPAL_MUTEX_STATIC_INIT
57 };
58
59
60
61
62
63 int mca_btl_tcp_add_procs( struct mca_btl_base_module_t* btl,
64 size_t nprocs,
65 struct opal_proc_t **procs,
66 struct mca_btl_base_endpoint_t** peers,
67 opal_bitmap_t* reachable )
68 {
69 mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*)btl;
70 const opal_proc_t* my_proc;
71 int i, rc;
72
73
74 if( NULL == (my_proc = opal_proc_local_get()) )
75 return OPAL_ERR_OUT_OF_RESOURCE;
76
77 for(i = 0; i < (int) nprocs; i++) {
78
79 struct opal_proc_t* opal_proc = procs[i];
80 mca_btl_tcp_proc_t* tcp_proc;
81 mca_btl_base_endpoint_t* tcp_endpoint;
82 bool existing_found = false;
83
84
85 if( my_proc == opal_proc ) {
86 continue;
87 }
88
89 if(NULL == (tcp_proc = mca_btl_tcp_proc_create(opal_proc))) {
90 continue;
91 }
92
93
94
95
96
97
98
99 OPAL_THREAD_LOCK(&tcp_proc->proc_lock);
100
101 for (uint32_t j = 0 ; j < (uint32_t)tcp_proc->proc_endpoint_count ; ++j) {
102 tcp_endpoint = tcp_proc->proc_endpoints[j];
103 if (tcp_endpoint->endpoint_btl == tcp_btl) {
104 existing_found = true;
105 break;
106 }
107 }
108
109 if (!existing_found) {
110
111
112
113
114 tcp_endpoint = OBJ_NEW(mca_btl_tcp_endpoint_t);
115 if(NULL == tcp_endpoint) {
116 OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
117 return OPAL_ERR_OUT_OF_RESOURCE;
118 }
119
120 tcp_endpoint->endpoint_btl = tcp_btl;
121 rc = mca_btl_tcp_proc_insert(tcp_proc, tcp_endpoint);
122 if(rc != OPAL_SUCCESS) {
123 OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
124 OBJ_RELEASE(tcp_endpoint);
125 continue;
126 }
127
128 OPAL_THREAD_LOCK(&tcp_btl->tcp_endpoints_mutex);
129 opal_list_append(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
130 OPAL_THREAD_UNLOCK(&tcp_btl->tcp_endpoints_mutex);
131 }
132
133 OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
134
135 if (NULL != reachable) {
136 opal_bitmap_set_bit(reachable, i);
137 }
138
139 peers[i] = tcp_endpoint;
140 }
141
142 return OPAL_SUCCESS;
143 }
144
145 int mca_btl_tcp_del_procs(struct mca_btl_base_module_t* btl,
146 size_t nprocs,
147 struct opal_proc_t **procs,
148 struct mca_btl_base_endpoint_t ** endpoints)
149 {
150 mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*)btl;
151 size_t i;
152
153 OPAL_THREAD_LOCK(&tcp_btl->tcp_endpoints_mutex);
154 for( i = 0; i < nprocs; i++ ) {
155 mca_btl_tcp_endpoint_t* tcp_endpoint = endpoints[i];
156 opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
157 OBJ_RELEASE(tcp_endpoint);
158 }
159 OPAL_THREAD_UNLOCK(&tcp_btl->tcp_endpoints_mutex);
160 return OPAL_SUCCESS;
161 }
162
163
164
165
166
167
168
169
170
171 mca_btl_base_descriptor_t* mca_btl_tcp_alloc(
172 struct mca_btl_base_module_t* btl,
173 struct mca_btl_base_endpoint_t* endpoint,
174 uint8_t order,
175 size_t size,
176 uint32_t flags)
177 {
178 mca_btl_tcp_frag_t* frag = NULL;
179
180 if(size <= btl->btl_eager_limit) {
181 MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
182 } else if (size <= btl->btl_max_send_size) {
183 MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
184 }
185 if( OPAL_UNLIKELY(NULL == frag) ) {
186 return NULL;
187 }
188
189 frag->segments[0].seg_len = size;
190 frag->segments[0].seg_addr.pval = frag+1;
191
192 frag->base.des_segments = frag->segments;
193 frag->base.des_segment_count = 1;
194 frag->base.des_flags = flags;
195 frag->base.order = MCA_BTL_NO_ORDER;
196 frag->btl = (mca_btl_tcp_module_t*)btl;
197 return (mca_btl_base_descriptor_t*)frag;
198 }
199
200
201
202
203
204
205 int mca_btl_tcp_free(
206 struct mca_btl_base_module_t* btl,
207 mca_btl_base_descriptor_t* des)
208 {
209 mca_btl_tcp_frag_t* frag = (mca_btl_tcp_frag_t*)des;
210 MCA_BTL_TCP_FRAG_RETURN(frag);
211 return OPAL_SUCCESS;
212 }
213
214
215
216
217
218
219
220
221 mca_btl_base_descriptor_t* mca_btl_tcp_prepare_src(
222 struct mca_btl_base_module_t* btl,
223 struct mca_btl_base_endpoint_t* endpoint,
224 struct opal_convertor_t* convertor,
225 uint8_t order,
226 size_t reserve,
227 size_t* size,
228 uint32_t flags)
229 {
230 mca_btl_tcp_frag_t* frag;
231 struct iovec iov;
232 uint32_t iov_count = 1;
233 size_t max_data = *size;
234 int rc;
235
236 if( OPAL_UNLIKELY(max_data > UINT32_MAX) ) {
237 max_data = (size_t)UINT32_MAX;
238 }
239
240
241
242
243 if (max_data+reserve <= btl->btl_eager_limit) {
244 MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
245 } else {
246
247
248
249
250 MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
251 }
252 if( OPAL_UNLIKELY(NULL == frag) ) {
253 return NULL;
254 }
255
256 frag->segments[0].seg_addr.pval = (frag + 1);
257 frag->segments[0].seg_len = reserve;
258
259 frag->base.des_segment_count = 1;
260 if(opal_convertor_need_buffers(convertor)) {
261
262 if (max_data + reserve > frag->size) {
263 max_data = frag->size - reserve;
264 }
265 iov.iov_len = max_data;
266 iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)(frag->segments[0].seg_addr.pval)) + reserve);
267
268 rc = opal_convertor_pack(convertor, &iov, &iov_count, &max_data );
269 if( OPAL_UNLIKELY(rc < 0) ) {
270 mca_btl_tcp_free(btl, &frag->base);
271 return NULL;
272 }
273
274 frag->segments[0].seg_len += max_data;
275
276 } else {
277
278 iov.iov_len = max_data;
279 iov.iov_base = NULL;
280
281 rc = opal_convertor_pack(convertor, &iov, &iov_count, &max_data );
282 if( OPAL_UNLIKELY(rc < 0) ) {
283 mca_btl_tcp_free(btl, &frag->base);
284 return NULL;
285 }
286
287 frag->segments[1].seg_addr.pval = iov.iov_base;
288 frag->segments[1].seg_len = max_data;
289 frag->base.des_segment_count = 2;
290 }
291
292 frag->base.des_segments = frag->segments;
293 frag->base.des_flags = flags;
294 frag->base.order = MCA_BTL_NO_ORDER;
295 *size = max_data;
296 return &frag->base;
297 }
298
299
300
301
302
303
304
305
306
307
308 int mca_btl_tcp_send( struct mca_btl_base_module_t* btl,
309 struct mca_btl_base_endpoint_t* endpoint,
310 struct mca_btl_base_descriptor_t* descriptor,
311 mca_btl_base_tag_t tag )
312 {
313 mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*) btl;
314 mca_btl_tcp_frag_t* frag = (mca_btl_tcp_frag_t*)descriptor;
315 int i;
316
317 frag->btl = tcp_btl;
318 frag->endpoint = endpoint;
319 frag->rc = 0;
320 frag->iov_idx = 0;
321 frag->iov_cnt = 1;
322 frag->iov_ptr = frag->iov;
323 frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
324 frag->iov[0].iov_len = sizeof(frag->hdr);
325 frag->hdr.size = 0;
326 for( i = 0; i < (int)frag->base.des_segment_count; i++) {
327 frag->hdr.size += frag->segments[i].seg_len;
328 frag->iov[i+1].iov_len = frag->segments[i].seg_len;
329 frag->iov[i+1].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
330 frag->iov_cnt++;
331 }
332 frag->hdr.base.tag = tag;
333 frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_SEND;
334 frag->hdr.count = 0;
335 if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
336 return mca_btl_tcp_endpoint_send(endpoint,frag);
337 }
338
339 static void fake_rdma_complete (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint,
340 mca_btl_base_descriptor_t *desc, int rc)
341 {
342 mca_btl_tcp_frag_t *frag = (mca_btl_tcp_frag_t *) desc;
343
344 frag->cb.func (btl, endpoint, frag->segments[0].seg_addr.pval, NULL, frag->cb.context, frag->cb.data,
345 rc);
346 }
347
348
349
350
351
352 int mca_btl_tcp_put (mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint, void *local_address,
353 uint64_t remote_address, mca_btl_base_registration_handle_t *local_handle,
354 mca_btl_base_registration_handle_t *remote_handle, size_t size, int flags,
355 int order, mca_btl_base_rdma_completion_fn_t cbfunc, void *cbcontext, void *cbdata)
356 {
357 mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*) btl;
358 mca_btl_tcp_frag_t *frag = NULL;
359 int i;
360
361 MCA_BTL_TCP_FRAG_ALLOC_USER(frag);
362 if( OPAL_UNLIKELY(NULL == frag) ) {
363 return OPAL_ERR_OUT_OF_RESOURCE;
364 }
365
366 frag->endpoint = endpoint;
367
368 frag->segments->seg_len = size;
369 frag->segments->seg_addr.pval = local_address;
370
371 frag->base.des_segments = frag->segments;
372 frag->base.des_segment_count = 1;
373 frag->base.order = MCA_BTL_NO_ORDER;
374
375 frag->segments[0].seg_addr.pval = local_address;
376 frag->segments[0].seg_len = size;
377
378 frag->segments[1].seg_addr.lval = remote_address;
379 frag->segments[1].seg_len = size;
380 if (endpoint->endpoint_nbo) MCA_BTL_BASE_SEGMENT_HTON(frag->segments[1]);
381
382 frag->base.des_flags = MCA_BTL_DES_FLAGS_BTL_OWNERSHIP | MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
383 frag->base.des_cbfunc = fake_rdma_complete;
384
385 frag->cb.func = cbfunc;
386 frag->cb.data = cbdata;
387 frag->cb.context = cbcontext;
388
389 frag->btl = tcp_btl;
390 frag->endpoint = endpoint;
391 frag->rc = 0;
392 frag->iov_idx = 0;
393 frag->hdr.size = 0;
394 frag->iov_cnt = 2;
395 frag->iov_ptr = frag->iov;
396 frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
397 frag->iov[0].iov_len = sizeof(frag->hdr);
398 frag->iov[1].iov_base = (IOVBASE_TYPE*) (frag->segments + 1);
399 frag->iov[1].iov_len = sizeof(mca_btl_base_segment_t);
400 for( i = 0; i < (int)frag->base.des_segment_count; i++ ) {
401 frag->hdr.size += frag->segments[i].seg_len;
402 frag->iov[i+2].iov_len = frag->segments[i].seg_len;
403 frag->iov[i+2].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
404 frag->iov_cnt++;
405 }
406 frag->hdr.base.tag = MCA_BTL_TAG_BTL;
407 frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT;
408 frag->hdr.count = 1;
409 if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
410 return ((i = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OPAL_SUCCESS : i);
411 }
412
413
414
415
416
417
418 int mca_btl_tcp_get (mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint, void *local_address,
419 uint64_t remote_address, mca_btl_base_registration_handle_t *local_handle,
420 mca_btl_base_registration_handle_t *remote_handle, size_t size, int flags,
421 int order, mca_btl_base_rdma_completion_fn_t cbfunc, void *cbcontext, void *cbdata)
422 {
423 mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*) btl;
424 mca_btl_tcp_frag_t* frag = NULL;
425 int rc;
426
427 MCA_BTL_TCP_FRAG_ALLOC_USER(frag);
428 if( OPAL_UNLIKELY(NULL == frag) ) {
429 return OPAL_ERR_OUT_OF_RESOURCE;;
430 }
431
432 frag->endpoint = endpoint;
433
434 frag->segments->seg_len = size;
435 frag->segments->seg_addr.pval = local_address;
436
437 frag->base.des_segments = frag->segments;
438 frag->base.des_segment_count = 1;
439 frag->base.order = MCA_BTL_NO_ORDER;
440
441 frag->segments[0].seg_addr.pval = local_address;
442 frag->segments[0].seg_len = size;
443
444 frag->segments[1].seg_addr.lval = remote_address;
445 frag->segments[1].seg_len = size;
446
447
448
449 frag->base.des_flags = MCA_BTL_DES_FLAGS_BTL_OWNERSHIP | MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
450 frag->base.des_cbfunc = fake_rdma_complete;
451
452 frag->cb.func = cbfunc;
453 frag->cb.data = cbdata;
454 frag->cb.context = cbcontext;
455
456 frag->btl = tcp_btl;
457 frag->endpoint = endpoint;
458 frag->rc = 0;
459 frag->iov_idx = 0;
460 frag->hdr.size = 0;
461 frag->iov_cnt = 2;
462 frag->iov_ptr = frag->iov;
463 frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
464 frag->iov[0].iov_len = sizeof(frag->hdr);
465 frag->iov[1].iov_base = (IOVBASE_TYPE*) &frag->segments[1];
466 frag->iov[1].iov_len = sizeof(mca_btl_base_segment_t);
467 frag->hdr.base.tag = MCA_BTL_TAG_BTL;
468 frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_GET;
469 frag->hdr.count = 1;
470 if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
471 return ((rc = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OPAL_SUCCESS : rc);
472 }
473
474
475
476
477
478
479 int mca_btl_tcp_finalize(struct mca_btl_base_module_t* btl)
480 {
481 mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*) btl;
482 opal_list_item_t* item;
483
484
485
486
487 for( item = opal_list_remove_first(&tcp_btl->tcp_endpoints);
488 item != NULL;
489 item = opal_list_remove_first(&tcp_btl->tcp_endpoints)) {
490 mca_btl_tcp_endpoint_t *endpoint = (mca_btl_tcp_endpoint_t*)item;
491 OBJ_RELEASE(endpoint);
492 }
493 free(tcp_btl);
494 return OPAL_SUCCESS;
495 }
496
497 void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl,
498 struct mca_btl_base_endpoint_t* endpoint,
499 int verbose)
500 {
501 mca_btl_tcp_module_t* btl = (mca_btl_tcp_module_t*)base_btl;
502 mca_btl_base_err("%s TCP %p kernel_id %d\n"
503 #if MCA_BTL_TCP_STATISTICS
504 " | statistics: sent %lu recv %lu\n"
505 #endif
506 " | latency %u bandwidth %u\n",
507 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), (void*)btl, btl->tcp_ifkindex,
508 #if MCA_BTL_TCP_STATISTICS
509 btl->tcp_bytes_sent, btl->btl_bytes_recv,
510 #endif
511 btl->super.btl_latency, btl->super.btl_bandwidth);
512 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
513 if( NULL != endpoint ) {
514 MCA_BTL_TCP_ENDPOINT_DUMP(10, endpoint, false, "TCP");
515
516 } else if( verbose ) {
517 opal_list_item_t *item;
518
519 OPAL_THREAD_LOCK(&btl->tcp_endpoints_mutex);
520 for(item = opal_list_get_first(&btl->tcp_endpoints);
521 item != opal_list_get_end(&btl->tcp_endpoints);
522 item = opal_list_get_next(item)) {
523 MCA_BTL_TCP_ENDPOINT_DUMP(10, (mca_btl_base_endpoint_t*)item, false, "TCP");
524 }
525 OPAL_THREAD_UNLOCK(&btl->tcp_endpoints_mutex);
526 }
527 #endif
528 }
529
530
531
532
533
534
535
536
537
538
539
540
541 int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
542 {
543 unsigned char* ptr = (unsigned char*)data;
544 size_t cnt = 0;
545 while (cnt < size) {
546 int retval = recv(sd, ((char *)ptr) + cnt, size - cnt, 0);
547
548 if (0 == retval) {
549 OPAL_OUTPUT_VERBOSE((100, opal_btl_base_framework.framework_output,
550 "remote peer unexpectedly closed connection while I was waiting for a blocking message"));
551 break;
552 }
553
554
555 if (retval < 0) {
556 if (opal_socket_errno != EINTR &&
557 opal_socket_errno != EAGAIN &&
558 opal_socket_errno != EWOULDBLOCK) {
559 BTL_ERROR(("recv(%d) failed: %s (%d)", sd, strerror(opal_socket_errno), opal_socket_errno));
560 break;
561 }
562 continue;
563 }
564 cnt += retval;
565 }
566 return cnt;
567 }
568
569
570
571
572
573
574
575
576 int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size)
577 {
578 unsigned char* ptr = (unsigned char*)data;
579 size_t cnt = 0;
580 while(cnt < size) {
581 int retval = send(sd, ((const char *)ptr) + cnt, size - cnt, 0);
582 if (retval < 0) {
583 if (opal_socket_errno != EINTR &&
584 opal_socket_errno != EAGAIN &&
585 opal_socket_errno != EWOULDBLOCK) {
586 BTL_ERROR(("send() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno));
587 return -1;
588 }
589 continue;
590 }
591 cnt += retval;
592 }
593 return cnt;
594 }