This source file includes following definitions.
- opal_btl_usnic_post_recv_list
- lookup_sender
- opal_btl_usnic_update_window
- opal_btl_usnic_check_rx_seq
- opal_btl_usnic_recv_fast
- opal_btl_usnic_recv_frag_bookkeeping
- opal_btl_usnic_recv
1
2
3
4
5
6
7
8
9
10 #ifndef BTL_USNIC_RECV_H
11 #define BTL_USNIC_RECV_H
12
13 #include "btl_usnic.h"
14 #include "btl_usnic_util.h"
15 #include "btl_usnic_frag.h"
16 #include "btl_usnic_proc.h"
17
18
19 void opal_btl_usnic_recv_call(opal_btl_usnic_module_t *module,
20 opal_btl_usnic_recv_segment_t *rseg,
21 opal_btl_usnic_channel_t *channel);
22
23 static inline int
24 opal_btl_usnic_post_recv_list(opal_btl_usnic_channel_t *channel)
25 {
26 struct iovec iov;
27 struct fi_msg msg;
28 uint64_t flag;
29 opal_btl_usnic_recv_segment_t *rseg;
30 int rc;
31
32 msg.msg_iov = &iov;
33 msg.iov_count = 1;
34 for (rseg = channel->repost_recv_head; NULL != rseg; rseg = rseg->rs_next) {
35 msg.context = rseg;
36 iov.iov_base = rseg->rs_protocol_header;
37 iov.iov_len = rseg->rs_len;
38
39 ++channel->rx_post_cnt;
40 if (OPAL_UNLIKELY((channel->rx_post_cnt & 15) == 0)) {
41 flag = 0;
42 } else {
43 flag = FI_MORE;
44 }
45
46 rc = fi_recvmsg(channel->ep, &msg, flag);
47 if (0 != rc) {
48 return rc;
49 }
50 }
51 channel->repost_recv_head = NULL;
52
53 return 0;
54 }
55
56
57
58
59 static inline opal_btl_usnic_endpoint_t *
60 lookup_sender(opal_btl_usnic_module_t *module, opal_btl_usnic_segment_t *seg)
61 {
62 int ret;
63 opal_btl_usnic_endpoint_t *sender;
64
65
66
67
68
69
70
71
72
73
74 ret = opal_hash_table_get_value_uint64(&module->senders,
75 seg->us_btl_header->sender,
76 (void**) &sender);
77 if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
78 return sender;
79 }
80
81
82
83 sender = opal_btl_usnic_proc_lookup_endpoint(module,
84 seg->us_btl_header->sender);
85 if (NULL != sender) {
86 opal_hash_table_set_value_uint64(&module->senders,
87 seg->us_btl_header->sender, sender);
88 return sender;
89 }
90
91
92 return NULL;
93 }
94
95
96
97
98
99
100 static inline void
101 opal_btl_usnic_update_window(
102 opal_btl_usnic_endpoint_t *endpoint,
103 uint32_t window_index)
104 {
105 uint32_t i;
106
107
108 #if MSGDEBUG1
109 opal_output(0, "ep: %p, ack_needed = %s\n", (void*)endpoint, endpoint->endpoint_ack_needed?"true":"false");
110 #endif
111 if (!endpoint->endpoint_ack_needed) {
112 opal_btl_usnic_add_to_endpoints_needing_ack(endpoint);
113 }
114
115
116 if (0 == endpoint->endpoint_acktime) {
117 endpoint->endpoint_acktime = get_nsec() + 50000;
118 }
119
120
121
122
123
124 endpoint->endpoint_rcvd_segs[window_index] = true;
125
126
127
128
129 i = endpoint->endpoint_rfstart;
130 while (endpoint->endpoint_rcvd_segs[i]) {
131 endpoint->endpoint_rcvd_segs[i] = false;
132 endpoint->endpoint_next_contig_seq_to_recv++;
133 i = WINDOW_SIZE_MOD(i + 1);
134
135 #if MSGDEBUG1
136 opal_output(0, "Advance window to %d; next seq to send %" UDSEQ, i,
137 endpoint->endpoint_next_contig_seq_to_recv);
138 #endif
139 }
140 endpoint->endpoint_rfstart = i;
141 }
142
143 static inline int
144 opal_btl_usnic_check_rx_seq(
145 opal_btl_usnic_endpoint_t *endpoint,
146 opal_btl_usnic_recv_segment_t *seg,
147 uint32_t *window_index)
148 {
149 uint32_t i;
150 opal_btl_usnic_seq_t seq;
151 int delta;
152
153
154
155
156 if (seg->rs_base.us_btl_header->ack_present) {
157 #if MSGDEBUG1
158 opal_output(0, "Handle piggy-packed ACK seq %"UDSEQ"\n", seg->rs_base.us_btl_header->ack_seq);
159 #endif
160 OPAL_THREAD_LOCK(&btl_usnic_lock);
161 opal_btl_usnic_handle_ack(endpoint,
162 seg->rs_base.us_btl_header->ack_seq);
163 OPAL_THREAD_UNLOCK(&btl_usnic_lock);
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 seq = seg->rs_base.us_btl_header->pkt_seq;
190 delta = SEQ_DIFF(seq, endpoint->endpoint_next_contig_seq_to_recv);
191 if (delta < 0 || delta >= WINDOW_SIZE) {
192 #if MSGDEBUG1
193 opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ " outside of window (%" UDSEQ " - %" UDSEQ "), %p, module %p -- DROPPED\n",
194 (void*)endpoint, seg->rs_base.us_btl_header->pkt_seq,
195 endpoint->endpoint_next_contig_seq_to_recv,
196 (endpoint->endpoint_next_contig_seq_to_recv +
197 WINDOW_SIZE - 1),
198 (void*) seg,
199 (void*) endpoint->endpoint_module);
200 #endif
201
202
203 if (delta < 0) {
204 ++endpoint->endpoint_module->stats.num_oow_low_recvs;
205 } else {
206 ++endpoint->endpoint_module->stats.num_oow_high_recvs;
207 }
208 goto dup_needs_ack;
209 }
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234 i = SEQ_DIFF(seq, endpoint->endpoint_next_contig_seq_to_recv);
235 i = WINDOW_SIZE_MOD(i + endpoint->endpoint_rfstart);
236 if (endpoint->endpoint_rcvd_segs[i]) {
237 #if MSGDEBUG1
238 opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ ", seg %p: duplicate -- DROPPED\n",
239 (void*) endpoint, seg->rs_base.us_btl_header->pkt_seq, (void*) seg);
240 #endif
241
242
243 assert(SEQ_LE(seq, endpoint->endpoint_highest_seq_rcvd));
244
245
246 assert (SEQ_GT(seq, endpoint->endpoint_next_contig_seq_to_recv - 1));
247
248
249 ++endpoint->endpoint_module->stats.num_dup_recvs;
250 goto dup_needs_ack;
251 }
252
253
254 if (SEQ_GT(seq, endpoint->endpoint_highest_seq_rcvd)) {
255 endpoint->endpoint_highest_seq_rcvd = seq;
256 }
257
258 *window_index = i;
259 return 0;
260
261 dup_needs_ack:
262 if (!endpoint->endpoint_ack_needed) {
263 opal_btl_usnic_add_to_endpoints_needing_ack(endpoint);
264 }
265 return -1;
266 }
267
268
269
270
271
272
273
274
275 static inline void
276 opal_btl_usnic_recv_fast(opal_btl_usnic_module_t *module,
277 opal_btl_usnic_recv_segment_t *seg,
278 opal_btl_usnic_channel_t *channel)
279 {
280 opal_btl_usnic_segment_t *bseg;
281 mca_btl_active_message_callback_t* reg;
282 opal_btl_usnic_seq_t seq;
283 opal_btl_usnic_endpoint_t *endpoint;
284 int delta;
285 int i;
286
287
288 opal_memchecker_base_mem_defined(seg->rs_protocol_header, seg->rs_len);
289
290 bseg = &seg->rs_base;
291
292
293 endpoint = lookup_sender(module, bseg);
294 seg->rs_endpoint = endpoint;
295
296 #if 0
297 opal_output(0, "fast recv %d bytes:\n", bseg->us_btl_header->payload_len + sizeof(opal_btl_usnic_btl_header_t));
298 opal_btl_usnic_dump_hex(15, USNIC_OUT, bseg->us_btl_header, bseg->us_btl_header->payload_len + sizeof(opal_btl_usnic_btl_header_t));
299 #endif
300
301
302
303
304 if (endpoint != NULL && !endpoint->endpoint_exiting &&
305 (OPAL_BTL_USNIC_PAYLOAD_TYPE_FRAG ==
306 bseg->us_btl_header->payload_type) &&
307 seg->rs_base.us_btl_header->put_addr == NULL) {
308
309 seq = seg->rs_base.us_btl_header->pkt_seq;
310 delta = SEQ_DIFF(seq, endpoint->endpoint_next_contig_seq_to_recv);
311 if (delta < 0 || delta >= WINDOW_SIZE) {
312 goto drop;
313 }
314
315 i = seq - endpoint->endpoint_next_contig_seq_to_recv;
316 i = WINDOW_SIZE_MOD(i + endpoint->endpoint_rfstart);
317 if (endpoint->endpoint_rcvd_segs[i]) {
318 goto drop;
319 }
320
321
322
323
324
325
326
327 reg = mca_btl_base_active_message_trigger + bseg->us_btl_header->tag;
328 seg->rs_segment.seg_len = bseg->us_btl_header->payload_len;
329 reg->cbfunc(&module->super, bseg->us_btl_header->tag,
330 &seg->rs_desc, reg->cbdata);
331
332 drop:
333 channel->chan_deferred_recv = seg;
334 }
335
336
337 else {
338 opal_btl_usnic_recv_call(module, seg, channel);
339 }
340 }
341
342
343
344 static inline int
345 opal_btl_usnic_recv_frag_bookkeeping(
346 opal_btl_usnic_module_t* module,
347 opal_btl_usnic_recv_segment_t *seg,
348 opal_btl_usnic_channel_t *channel)
349 {
350 opal_btl_usnic_endpoint_t* endpoint;
351 uint32_t window_index;
352 int rc;
353
354 endpoint = seg->rs_endpoint;
355
356
357 opal_memchecker_base_mem_defined(
358 (void*)(seg->rs_protocol_header), seg->rs_len);
359
360 ++module->stats.num_total_recvs;
361
362
363 rc = opal_btl_usnic_check_rx_seq(endpoint, seg, &window_index);
364 if (OPAL_UNLIKELY(rc != 0)) {
365 goto repost;
366 }
367
368 ++module->stats.num_frag_recvs;
369
370 opal_btl_usnic_update_window(endpoint, window_index);
371
372 repost:
373
374 if (endpoint->endpoint_exiting && ENDPOINT_DRAINED(endpoint)) {
375 OBJ_RELEASE(endpoint);
376 }
377
378 ++module->stats.num_recv_reposts;
379
380
381 seg->rs_next = channel->repost_recv_head;
382 channel->repost_recv_head = seg;
383
384 return rc;
385 }
386
387
388
389
390
391 static inline void
392 opal_btl_usnic_recv(opal_btl_usnic_module_t *module,
393 opal_btl_usnic_recv_segment_t *seg,
394 opal_btl_usnic_channel_t *channel)
395 {
396 opal_btl_usnic_segment_t *bseg;
397 mca_btl_active_message_callback_t* reg;
398 opal_btl_usnic_endpoint_t *endpoint;
399 int rc;
400
401
402 opal_memchecker_base_mem_defined(seg->rs_protocol_header, seg->rs_len);
403
404 bseg = &seg->rs_base;
405
406
407 endpoint = lookup_sender(module, bseg);
408 seg->rs_endpoint = endpoint;
409
410
411
412
413
414 if (endpoint != NULL && !endpoint->endpoint_exiting &&
415 (OPAL_BTL_USNIC_PAYLOAD_TYPE_FRAG ==
416 bseg->us_btl_header->payload_type) &&
417 seg->rs_base.us_btl_header->put_addr == NULL) {
418
419 MSGDEBUG1_OUT("<-- Received FRAG (fastpath) ep %p, seq %" UDSEQ ", len=%" PRIu16 "\n",
420 (void*) endpoint, bseg->us_btl_header->pkt_seq,
421 bseg->us_btl_header->payload_len);
422
423
424 rc = opal_btl_usnic_recv_frag_bookkeeping(module, seg, channel);
425 if (rc != 0) {
426 return;
427 }
428
429
430
431
432
433
434
435 reg = mca_btl_base_active_message_trigger + bseg->us_btl_header->tag;
436 seg->rs_segment.seg_len = bseg->us_btl_header->payload_len;
437 reg->cbfunc(&module->super, bseg->us_btl_header->tag,
438 &seg->rs_desc, reg->cbdata);
439
440 }
441
442
443 else {
444 opal_btl_usnic_recv_call(module, seg, channel);
445 }
446 }
447
448 #endif