This source file includes following definitions.
- opal_btl_usnic_recv_call
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "opal_config.h"
25
26 #include <unistd.h>
27
28 #include "opal_stdint.h"
29 #include "opal/mca/memchecker/base/base.h"
30 #include "opal/constants.h"
31
32 #include "opal/mca/btl/btl.h"
33 #include "opal/mca/btl/base/base.h"
34
35 #include "btl_usnic.h"
36 #include "btl_usnic_frag.h"
37 #include "btl_usnic_endpoint.h"
38 #include "btl_usnic_module.h"
39 #include "btl_usnic_proc.h"
40 #include "btl_usnic_ack.h"
41 #include "btl_usnic_recv.h"
42 #include "btl_usnic_util.h"
43
44
45
46
47
48
49 void opal_btl_usnic_recv_call(opal_btl_usnic_module_t *module,
50 opal_btl_usnic_recv_segment_t *seg,
51 opal_btl_usnic_channel_t *channel)
52 {
53 opal_btl_usnic_segment_t *bseg;
54 mca_btl_active_message_callback_t* reg;
55 opal_btl_usnic_endpoint_t *endpoint;
56 opal_btl_usnic_btl_chunk_header_t *chunk_hdr;
57 opal_btl_usnic_btl_header_t *hdr;
58 uint32_t window_index;
59 int rc;
60 #if MSGDEBUG1
61 char local_ip[IPV4STRADDRLEN];
62 char remote_ip[IPV4STRADDRLEN];
63 #endif
64
65 bseg = &seg->rs_base;
66
67 ++module->stats.num_total_recvs;
68
69
70 opal_memchecker_base_mem_defined((void*)(seg->rs_protocol_header),
71 seg->rs_len);
72
73
74 endpoint = seg->rs_endpoint;
75 if (FAKE_RECV_DROP || OPAL_UNLIKELY(NULL == endpoint)) {
76
77 #if MSGDEBUG1
78 opal_output(0, "=== Unknown sender; dropped: seq %" UDSEQ,
79 bseg->us_btl_header->pkt_seq);
80 #endif
81 ++module->stats.num_unk_recvs;
82 goto repost_no_endpoint;
83 }
84
85 #if MSGDEBUG1
86 struct opal_btl_usnic_modex_t *modex;
87
88 modex = &module->local_modex;
89 opal_btl_usnic_snprintf_ipv4_addr(local_ip, sizeof(local_ip),
90 modex->ipv4_addr,
91 modex->netmask);
92 modex = &endpoint->endpoint_remote_modex;
93 opal_btl_usnic_snprintf_ipv4_addr(remote_ip, sizeof(remote_ip),
94 modex->ipv4_addr,
95 modex->netmask);
96 #endif
97
98
99
100 if (OPAL_BTL_USNIC_PAYLOAD_TYPE_FRAG == bseg->us_btl_header->payload_type) {
101
102
103 rc = opal_btl_usnic_recv_frag_bookkeeping(module, seg, channel);
104 if (rc != 0) {
105 return;
106 }
107
108 hdr = seg->rs_base.us_btl_header;
109
110 #if MSGDEBUG1
111 opal_output(0, "<-- Received FRAG ep %p, seq %" UDSEQ ", len=%d\n",
112 (void*) endpoint, hdr->pkt_seq, hdr->payload_len);
113 #if 0
114
115 opal_output(0, "<-- Received FRAG ep %p, seq %" UDSEQ " from %s to %s: GOOD! (rel seq %d, lowest seq %" UDSEQ ", highest seq: %" UDSEQ ", rwstart %d) seg %p, module %p\n",
116 (void*) endpoint,
117 seg->rs_base.us_btl_header->pkt_seq,
118 remote_ip, local_ip,
119 window_index,
120 endpoint->endpoint_next_contig_seq_to_recv,
121 endpoint->endpoint_highest_seq_rcvd,
122 endpoint->endpoint_rfstart,
123 (void*) seg, (void*) module);
124 if (hdr->put_addr != NULL) {
125 opal_output(0, " put_addr = %p\n",
126 seg->rs_base.us_btl_header->put_addr);
127 }
128 #endif
129 #endif
130
131
132
133
134
135
136
137 if (hdr->put_addr == NULL) {
138 reg = mca_btl_base_active_message_trigger + hdr->tag;
139 seg->rs_segment.seg_len = hdr->payload_len;
140 #if MSGDEBUG2
141 opal_output(0, "small recv complete, pass up %u bytes, tag=%d\n",
142 (unsigned)bseg->us_btl_header->payload_len,
143 (int)bseg->us_btl_header->tag);
144 #endif
145 reg->cbfunc(&module->super, hdr->tag, &seg->rs_desc, reg->cbdata);
146
147
148
149
150 } else {
151 #if MSGDEBUG1
152 opal_output(0, "Copy %d PUT bytes to %p\n",
153 seg->rs_base.us_btl_header->payload_len,
154 (void*)seg->rs_base.us_btl_header->put_addr);
155 #endif
156 memcpy(seg->rs_base.us_btl_header->put_addr,
157 seg->rs_base.us_payload.raw,
158 seg->rs_base.us_btl_header->payload_len);
159 }
160
161
162 return;
163 }
164
165
166
167 if (OPAL_BTL_USNIC_PAYLOAD_TYPE_CHUNK == bseg->us_btl_header->payload_type) {
168 int frag_index;
169 opal_btl_usnic_rx_frag_info_t *fip;
170
171
172 if (OPAL_UNLIKELY(opal_btl_usnic_check_rx_seq(endpoint, seg,
173 &window_index) != 0)) {
174 goto repost;
175 }
176
177 #if MSGDEBUG1
178 opal_output(0, "<-- Received CHUNK fid %d ep %p, seq %" UDSEQ " from %s to %s: GOOD! (rel seq %d, lowest seq %" UDSEQ ", highest seq: %" UDSEQ ", rwstart %d) seg %p, module %p\n",
179 seg->rs_base.us_btl_chunk_header->ch_frag_id,
180 (void*) endpoint,
181 seg->rs_base.us_btl_chunk_header->ch_hdr.pkt_seq,
182 remote_ip, local_ip,
183 window_index,
184 endpoint->endpoint_next_contig_seq_to_recv,
185 endpoint->endpoint_highest_seq_rcvd,
186 endpoint->endpoint_rfstart,
187 (void*) seg, (void*) module);
188 #endif
189
190
191
192
193
194 chunk_hdr = seg->rs_base.us_btl_chunk_header;
195 frag_index = chunk_hdr->ch_frag_id % MAX_ACTIVE_FRAGS;
196 fip = &(endpoint->endpoint_rx_frag_info[frag_index]);
197
198
199 if (0 == fip->rfi_frag_id) {
200 fip->rfi_frag_id = chunk_hdr->ch_frag_id;
201 fip->rfi_frag_size = chunk_hdr->ch_frag_size;
202 if (chunk_hdr->ch_hdr.put_addr == NULL) {
203 int pool;
204
205 fip->rfi_data = NULL;
206
207
208
209
210 pool = usnic_fls(chunk_hdr->ch_frag_size-1);
211 if (pool >= module->first_pool &&
212 pool <= module->last_pool) {
213 opal_free_list_item_t* item;
214 opal_btl_usnic_rx_buf_t *rx_buf;
215 USNIC_COMPAT_FREE_LIST_GET(&module->module_recv_buffers[pool], item);
216 rx_buf = (opal_btl_usnic_rx_buf_t *)item;
217 if (OPAL_LIKELY(NULL != rx_buf)) {
218 fip->rfi_fl_elt = item;
219 fip->rfi_data = rx_buf->buf;
220 fip->rfi_data_pool = pool;
221 fip->rfi_data_in_pool = true;
222 }
223 }
224 if (fip->rfi_data == NULL) {
225 fip->rfi_data = malloc(chunk_hdr->ch_frag_size);
226 fip->rfi_data_in_pool = false;
227 }
228 if (fip->rfi_data == NULL) {
229 opal_btl_usnic_util_abort("malloc failed", __FILE__, __LINE__);
230 }
231 #if MSGDEBUG1
232 opal_output(0, "Start large recv to %p, size=%"PRIu32"\n",
233 (void *)fip->rfi_data, chunk_hdr->ch_frag_size);
234 #endif
235 } else {
236 #if MSGDEBUG1
237 opal_output(0, "Start PUT to %p\n",
238 (void *)chunk_hdr->ch_hdr.put_addr);
239 #endif
240 fip->rfi_data = chunk_hdr->ch_hdr.put_addr;
241 }
242 fip->rfi_bytes_left = chunk_hdr->ch_frag_size;
243 fip->rfi_frag_id = chunk_hdr->ch_frag_id;
244
245
246 } else if (fip->rfi_frag_id != chunk_hdr->ch_frag_id) {
247 ++module->stats.num_badfrag_recvs;
248 goto repost;
249 }
250 #if MSGDEBUG1
251 opal_output(0, "put_addr=%p, copy_addr=%p, off=%d\n",
252 chunk_hdr->ch_hdr.put_addr,
253 fip->rfi_data+chunk_hdr->ch_frag_offset,
254 chunk_hdr->ch_frag_offset);
255 #endif
256
257
258 ++module->stats.num_chunk_recvs;
259
260
261 assert(chunk_hdr->ch_frag_offset + chunk_hdr->ch_hdr.payload_len <=
262 fip->rfi_frag_size);
263 assert(fip->rfi_frag_size == chunk_hdr->ch_frag_size);
264
265
266 memcpy(fip->rfi_data + chunk_hdr->ch_frag_offset, (char *)(chunk_hdr+1),
267 chunk_hdr->ch_hdr.payload_len);
268
269
270 opal_btl_usnic_update_window(endpoint, window_index);
271
272 fip->rfi_bytes_left -= chunk_hdr->ch_hdr.payload_len;
273 if (0 == fip->rfi_bytes_left) {
274 mca_btl_base_descriptor_t desc;
275 mca_btl_base_segment_t segment;
276
277 segment.seg_addr.pval = fip->rfi_data;
278 segment.seg_len = fip->rfi_frag_size;
279 desc.USNIC_RECV_LOCAL = &segment;
280 desc.USNIC_RECV_LOCAL_COUNT = 1;
281
282
283 if (chunk_hdr->ch_hdr.put_addr == NULL) {
284
285
286 #if MSGDEBUG2
287 opal_output(0, "large recv complete, pass up %p, %u bytes, tag=%d\n",
288 desc.USNIC_RECV_LOCAL->seg_addr.pval,
289 (unsigned)desc.USNIC_RECV_LOCAL->seg_len,
290 (int)chunk_hdr->ch_hdr.tag);
291 #endif
292 reg = mca_btl_base_active_message_trigger +
293 chunk_hdr->ch_hdr.tag;
294
295
296 reg->cbfunc(&module->super, chunk_hdr->ch_hdr.tag,
297 &desc, reg->cbdata);
298
299
300 if (fip->rfi_data_in_pool) {
301 USNIC_COMPAT_FREE_LIST_RETURN(&module->module_recv_buffers[fip->rfi_data_pool],
302 fip->rfi_fl_elt);
303 } else {
304 free(fip->rfi_data);
305 }
306
307 #if MSGDEBUG1
308 } else {
309 opal_output(0, "PUT recv complete, no callback\n");
310 #endif
311 }
312
313
314 fip->rfi_frag_id = 0;
315
316
317 endpoint->endpoint_acktime = 0;
318 }
319 goto repost;
320 }
321
322
323
324 else if (OPAL_LIKELY(OPAL_BTL_USNIC_PAYLOAD_TYPE_ACK ==
325 bseg->us_btl_header->payload_type)) {
326 opal_btl_usnic_seq_t ack_seq;
327
328
329 ack_seq = bseg->us_btl_header->ack_seq;
330
331
332 ++module->stats.num_ack_recvs;
333
334 #if MSGDEBUG1
335 opal_output(0, " Received ACK for sequence number %" UDSEQ " from %s to %s\n",
336 bseg->us_btl_header->ack_seq, remote_ip, local_ip);
337 #endif
338 OPAL_THREAD_LOCK(&btl_usnic_lock);
339 opal_btl_usnic_handle_ack(endpoint, ack_seq);
340 OPAL_THREAD_UNLOCK(&btl_usnic_lock);
341 goto repost;
342 }
343
344
345
346 else {
347 ++module->stats.num_unk_recvs;
348 if (module->stats.num_unk_recvs < 10) {
349 opal_output_verbose(15, USNIC_OUT, "unrecognized payload type %d", bseg->us_btl_header->payload_type);
350 opal_output_verbose(15, USNIC_OUT, "base = %p, proto = %p, hdr = %p", bseg->us_list.ptr, seg->rs_protocol_header, (void*) bseg->us_btl_header);
351 opal_btl_usnic_dump_hex(15, USNIC_OUT, bseg->us_list.ptr, 96+sizeof(*bseg->us_btl_header));
352 }
353 goto repost;
354 }
355
356
357 repost:
358
359
360 if (endpoint->endpoint_exiting && ENDPOINT_DRAINED(endpoint)) {
361 OBJ_RELEASE(endpoint);
362 }
363 repost_no_endpoint:
364 ++module->stats.num_recv_reposts;
365
366
367 seg->rs_next = channel->repost_recv_head;
368 channel->repost_recv_head = seg;
369 }