This source file includes following definitions.
- mca_btl_tcp2_add_procs
- mca_btl_tcp2_del_procs
- mca_btl_tcp2_alloc
- mca_btl_tcp2_free
- mca_btl_tcp2_prepare_src
- mca_btl_tcp2_prepare_dst
- mca_btl_tcp2_send
- mca_btl_tcp2_put
- mca_btl_tcp2_get
- mca_btl_tcp2_finalize
- mca_btl_tcp_dump
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include <string.h>
25 #include "opal/class/opal_bitmap.h"
26 #include "ompi/mca/btl/btl.h"
27
28 #include "btl_tcp2.h"
29 #include "btl_tcp2_frag.h"
30 #include "btl_tcp2_proc.h"
31 #include "btl_tcp2_endpoint.h"
32 #include "opal/datatype/opal_convertor.h"
33 #include "ompi/mca/mpool/base/base.h"
34 #include "ompi/mca/mpool/mpool.h"
35 #include "btl_tcp.h"
36 #include "btl_tcp_frag.h"
37 #include "btl_tcp_proc.h"
38 #include "btl_tcp_endpoint.h"
39
40 mca_btl_tcp2_module_t mca_btl_tcp2_module = {
41 {
42 &mca_btl_tcp2_component.super,
43 0,
44 0,
45 0,
46 0,
47 0,
48 0,
49 0,
50 0,
51 0,
52 0,
53 mca_btl_tcp2_add_procs,
54 mca_btl_tcp2_del_procs,
55 NULL,
56 mca_btl_tcp2_finalize,
57 mca_btl_tcp2_alloc,
58 mca_btl_tcp2_free,
59 mca_btl_tcp2_prepare_src,
60 mca_btl_tcp2_prepare_dst,
61 mca_btl_tcp2_send,
62 NULL,
63 mca_btl_tcp_put,
64 NULL,
65 mca_btl_tcp_dump,
66 NULL,
67 NULL,
68 mca_btl_tcp2_ft_event
69 }
70 };
71
72
73
74
75
76 int mca_btl_tcp2_add_procs( struct mca_btl_base_module_t* btl,
77 size_t nprocs,
78 struct ompi_proc_t **ompi_procs,
79 struct mca_btl_base_endpoint_t** peers,
80 opal_bitmap_t* reachable )
81 {
82 mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*)btl;
83 ompi_proc_t* my_proc;
84 int i, rc;
85
86
87 my_proc = ompi_proc_local();
88 if( NULL == my_proc ) {
89 return OMPI_ERR_OUT_OF_RESOURCE;
90 }
91 for(i = 0; i < (int) nprocs; i++) {
92
93 struct ompi_proc_t* ompi_proc = ompi_procs[i];
94 mca_btl_tcp2_proc_t* tcp_proc;
95 mca_btl_base_endpoint_t* tcp_endpoint;
96
97
98 if( my_proc == ompi_proc ) {
99 continue;
100 }
101
102 if(NULL == (tcp_proc = mca_btl_tcp2_proc_create(ompi_proc))) {
103 return OMPI_ERR_OUT_OF_RESOURCE;
104 }
105
106
107
108
109
110
111
112 OPAL_THREAD_LOCK(&tcp_proc->proc_lock);
113
114
115
116
117
118 tcp_endpoint = OBJ_NEW(mca_btl_tcp2_endpoint_t);
119 if(NULL == tcp_endpoint) {
120 OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
121 return OMPI_ERR_OUT_OF_RESOURCE;
122 }
123
124 tcp_endpoint->endpoint_btl = tcp_btl;
125 rc = mca_btl_tcp2_proc_insert(tcp_proc, tcp_endpoint);
126 if(rc != OMPI_SUCCESS) {
127 OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
128 OBJ_RELEASE(tcp_endpoint);
129 continue;
130 }
131
132 opal_bitmap_set_bit(reachable, i);
133 OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
134 peers[i] = tcp_endpoint;
135 opal_list_append(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
136
137
138
139
140 #if !MCA_BTL_TCP_USES_PROGRESS_THREAD
141 opal_progress_event_users_increment();
142 #endif
143 }
144
145 return OMPI_SUCCESS;
146 }
147
148 int mca_btl_tcp2_del_procs(struct mca_btl_base_module_t* btl,
149 size_t nprocs,
150 struct ompi_proc_t **procs,
151 struct mca_btl_base_endpoint_t ** endpoints)
152 {
153 mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*)btl;
154 size_t i;
155 for(i=0; i<nprocs; i++) {
156 mca_btl_tcp2_endpoint_t* tcp_endpoint = endpoints[i];
157 if(tcp_endpoint->endpoint_proc != mca_btl_tcp2_proc_local()) {
158 opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
159 OBJ_RELEASE(tcp_endpoint);
160 }
161 #if !MCA_BTL_TCP_USES_PROGRESS_THREAD
162 opal_progress_event_users_decrement();
163 #endif
164 }
165 return OMPI_SUCCESS;
166 }
167
168
169
170
171
172
173
174
175
176 mca_btl_base_descriptor_t* mca_btl_tcp2_alloc(
177 struct mca_btl_base_module_t* btl,
178 struct mca_btl_base_endpoint_t* endpoint,
179 uint8_t order,
180 size_t size,
181 uint32_t flags)
182 {
183 mca_btl_tcp2_frag_t* frag = NULL;
184
185 if(size <= btl->btl_eager_limit) {
186 MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
187 } else if (size <= btl->btl_max_send_size) {
188 MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
189 }
190 if( OPAL_UNLIKELY(NULL == frag) ) {
191 return NULL;
192 }
193
194 frag->segments[0].seg_len = size;
195 frag->segments[0].seg_addr.pval = frag+1;
196
197 frag->base.des_src = frag->segments;
198 frag->base.des_src_cnt = 1;
199 frag->base.des_dst = NULL;
200 frag->base.des_dst_cnt = 0;
201 frag->base.des_flags = flags;
202 frag->base.order = MCA_BTL_NO_ORDER;
203 frag->btl = (mca_btl_tcp_module_t*)btl;
204 frag->endpoint = endpoint;
205 return (mca_btl_base_descriptor_t*)frag;
206 }
207
208
209
210
211
212
213 int mca_btl_tcp2_free(
214 struct mca_btl_base_module_t* btl,
215 mca_btl_base_descriptor_t* des)
216 {
217 mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)des;
218 MCA_BTL_TCP_FRAG_RETURN(frag);
219 return OMPI_SUCCESS;
220 }
221
222
223
224
225
226
227
228
229 mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_src(
230 struct mca_btl_base_module_t* btl,
231 struct mca_btl_base_endpoint_t* endpoint,
232 struct mca_mpool_base_registration_t* registration,
233 struct opal_convertor_t* convertor,
234 uint8_t order,
235 size_t reserve,
236 size_t* size,
237 uint32_t flags)
238 {
239 mca_btl_tcp2_frag_t* frag;
240 struct iovec iov;
241 uint32_t iov_count = 1;
242 size_t max_data = *size;
243 int rc;
244
245 if( OPAL_UNLIKELY(max_data > UINT32_MAX) ) {
246 max_data = (size_t)UINT32_MAX;
247 }
248
249
250
251
252 if (max_data+reserve <= btl->btl_eager_limit) {
253 MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
254 } else {
255
256
257
258
259 MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
260 }
261 if( OPAL_UNLIKELY(NULL == frag) ) {
262 return NULL;
263 }
264
265 frag->segments[0].seg_addr.pval = (frag + 1);
266 frag->segments[0].seg_len = reserve;
267
268 frag->base.des_src_cnt = 1;
269 if(opal_convertor_need_buffers(convertor)) {
270
271 if (max_data + reserve > frag->size) {
272 max_data = frag->size - reserve;
273 }
274 iov.iov_len = max_data;
275 iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)(frag->segments[0].seg_addr.pval)) + reserve);
276
277 rc = opal_convertor_pack(convertor, &iov, &iov_count, &max_data );
278 if( OPAL_UNLIKELY(rc < 0) ) {
279 mca_btl_tcp2_free(btl, &frag->base);
280 return NULL;
281 }
282
283 frag->segments[0].seg_len += max_data;
284
285 } else {
286
287 iov.iov_len = max_data;
288 iov.iov_base = NULL;
289
290 rc = opal_convertor_pack(convertor, &iov, &iov_count, &max_data );
291 if( OPAL_UNLIKELY(rc < 0) ) {
292 mca_btl_tcp2_free(btl, &frag->base);
293 return NULL;
294 }
295
296 frag->segments[1].seg_addr.pval = iov.iov_base;
297 frag->segments[1].seg_len = max_data;
298 frag->base.des_src_cnt = 2;
299 }
300
301 frag->base.des_src = frag->segments;
302 frag->base.des_dst = NULL;
303 frag->base.des_dst_cnt = 0;
304 frag->base.des_flags = flags;
305 frag->base.order = MCA_BTL_NO_ORDER;
306 *size = max_data;
307 return &frag->base;
308 }
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325 mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_dst(
326 struct mca_btl_base_module_t* btl,
327 struct mca_btl_base_endpoint_t* endpoint,
328 struct mca_mpool_base_registration_t* registration,
329 struct opal_convertor_t* convertor,
330 uint8_t order,
331 size_t reserve,
332 size_t* size,
333 uint32_t flags)
334 {
335 mca_btl_tcp2_frag_t* frag;
336
337 if( OPAL_UNLIKELY((*size) > UINT32_MAX) ) {
338 *size = (size_t)UINT32_MAX;
339 }
340 MCA_BTL_TCP_FRAG_ALLOC_USER(frag);
341 if( OPAL_UNLIKELY(NULL == frag) ) {
342 return NULL;
343 }
344
345 frag->segments->seg_len = *size;
346 opal_convertor_get_current_pointer( convertor, (void**)&(frag->segments->seg_addr.pval) );
347
348 frag->base.des_src = NULL;
349 frag->base.des_src_cnt = 0;
350 frag->base.des_dst = frag->segments;
351 frag->base.des_dst_cnt = 1;
352 frag->base.des_flags = flags;
353 frag->base.order = MCA_BTL_NO_ORDER;
354 return &frag->base;
355 }
356
357
358
359
360
361
362
363
364
365
366
367 int mca_btl_tcp2_send( struct mca_btl_base_module_t* btl,
368 struct mca_btl_base_endpoint_t* endpoint,
369 struct mca_btl_base_descriptor_t* descriptor,
370 mca_btl_base_tag_t tag )
371 {
372 mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
373 mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)descriptor;
374 int i;
375
376 frag->btl = tcp_btl;
377 frag->endpoint = endpoint;
378 frag->rc = 0;
379 frag->iov_idx = 0;
380 frag->iov_cnt = 1;
381 frag->iov_ptr = frag->iov;
382 frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
383 frag->iov[0].iov_len = sizeof(frag->hdr);
384 frag->hdr.size = 0;
385 for( i = 0; i < (int)frag->base.des_src_cnt; i++) {
386 frag->hdr.size += frag->segments[i].seg_len;
387 frag->iov[i+1].iov_len = frag->segments[i].seg_len;
388 frag->iov[i+1].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
389 frag->iov_cnt++;
390 }
391 frag->hdr.base.tag = tag;
392 frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_SEND;
393 frag->hdr.count = 0;
394 if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
395 return mca_btl_tcp_endpoint_send(endpoint,frag);
396 }
397
398
399
400
401
402
403
404
405
406
407 int mca_btl_tcp2_put( mca_btl_base_module_t* btl,
408 mca_btl_base_endpoint_t* endpoint,
409 mca_btl_base_descriptor_t* descriptor )
410 {
411 mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
412 mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)descriptor;
413 int i;
414
415 frag->btl = tcp_btl;
416 frag->endpoint = endpoint;
417 frag->rc = 0;
418 frag->iov_idx = 0;
419 frag->hdr.size = 0;
420 frag->iov_cnt = 2;
421 frag->iov_ptr = frag->iov;
422 frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
423 frag->iov[0].iov_len = sizeof(frag->hdr);
424 frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->base.des_dst;
425 frag->iov[1].iov_len = frag->base.des_dst_cnt * sizeof(mca_btl_base_segment_t);
426 for( i = 0; i < (int)frag->base.des_src_cnt; i++ ) {
427 frag->hdr.size += frag->segments[i].seg_len;
428 frag->iov[i+2].iov_len = frag->segments[i].seg_len;
429 frag->iov[i+2].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
430 frag->iov_cnt++;
431 }
432 frag->hdr.base.tag = MCA_BTL_TAG_BTL;
433 frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT;
434 frag->hdr.count = frag->base.des_dst_cnt;
435 if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
436 return ((i = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i);
437 }
438
439
440
441
442
443
444
445
446
447
448
449 int mca_btl_tcp2_get(
450 mca_btl_base_module_t* btl,
451 mca_btl_base_endpoint_t* endpoint,
452 mca_btl_base_descriptor_t* descriptor)
453 {
454 mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
455 mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)descriptor;
456 int rc;
457
458 frag->btl = tcp_btl;
459 frag->endpoint = endpoint;
460 frag->rc = 0;
461 frag->iov_idx = 0;
462 frag->hdr.size = 0;
463 frag->iov_cnt = 2;
464 frag->iov_ptr = frag->iov;
465 frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
466 frag->iov[0].iov_len = sizeof(frag->hdr);
467 frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->base.des_src;
468 frag->iov[1].iov_len = frag->base.des_src_cnt * sizeof(mca_btl_base_segment_t);
469 frag->hdr.base.tag = MCA_BTL_TAG_BTL;
470 frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_GET;
471 frag->hdr.count = frag->base.des_src_cnt;
472 if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
473 return ((rc = mca_btl_tcp_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc);
474 }
475
476
477
478
479
480
481
482 int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl)
483 {
484 mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
485 opal_list_item_t* item;
486 for( item = opal_list_remove_first(&tcp_btl->tcp_endpoints);
487 item != NULL;
488 item = opal_list_remove_first(&tcp_btl->tcp_endpoints)) {
489 mca_btl_tcp2_endpoint_t *endpoint = (mca_btl_tcp2_endpoint_t*)item;
490 OBJ_RELEASE(endpoint);
491 #if !MCA_BTL_TCP_USES_PROGRESS_THREAD
492 opal_progress_event_users_decrement();
493 #endif
494 }
495 free(tcp_btl);
496 return OMPI_SUCCESS;
497 }
498
499
500
501
502 void mca_btl_tcp_dump(struct mca_btl_base_module_t* base_btl,
503 struct mca_btl_base_endpoint_t* endpoint,
504 int verbose)
505 {
506 mca_btl_tcp_module_t* btl = (mca_btl_tcp_module_t*)base_btl;
507 mca_btl_base_err("%s TCP %p kernel_id %d\n"
508 #if MCA_BTL_TCP_STATISTICS
509 " | statistics: sent %lu recv %lu\n"
510 #endif
511 " | latency %u bandwidth %u\n",
512 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (void*)btl, btl->tcp_ifkindex,
513 #if MCA_BTL_TCP_STATISTICS
514 btl->tcp_bytes_sent, btl->btl_bytes_recv,
515 #endif
516 btl->super.btl_latency, btl->super.btl_bandwidth);
517 if( NULL != endpoint ) {
518 mca_btl_tcp_endpoint_dump( endpoint, "TCP" );
519 } else if( verbose ) {
520 opal_list_item_t *item;
521
522 for(item = opal_list_get_first(&btl->tcp_endpoints);
523 item != opal_list_get_end(&btl->tcp_endpoints);
524 item = opal_list_get_next(item)) {
525 mca_btl_tcp_endpoint_dump( (mca_btl_base_endpoint_t*)item, "TCP" );
526 }
527 }
528 }
529