This source file includes following definitions.
- __SENDER_BASED_METHOD_COPY
- __SENDER_BASED_METHOD_COPY
- vprotocol_pessimist_sb_progress_req
- vprotocol_pessimist_sb_progress_all_reqs
- __SENDER_BASED_METHOD_FLUSH
- vprotocol_pessimist_sender_based_copy_start
1
2
3
4
5
6
7
8
9
10
11 #ifndef __VPROTOCOL_PESSIMIST_SENDERBASED_H__
12 #define __VPROTOCOL_PESSIMIST_SENDERBASED_H__
13
14 #include "ompi_config.h"
15 #include "ompi/mca/pml/base/pml_base_sendreq.h"
16 #include "ompi/mca/pml/v/pml_v_output.h"
17 #include "vprotocol_pessimist_sender_based_types.h"
18 #include "vprotocol_pessimist_request.h"
19 #include "vprotocol_pessimist.h"
20
21 BEGIN_C_DECLS
22
23
24
25 int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size);
26
27
28
29 void vprotocol_pessimist_sender_based_finalize(void);
30
31
32
33
34 void vprotocol_pessimist_sender_based_alloc(size_t len);
35
36
37
38
39
40 #if defined(SB_USE_PACK_METHOD)
41 static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *pmlreq)
42 {
43 if(0 != pmlreq->req_bytes_packed)
44 {
45 opal_convertor_t conv;
46 size_t max_data;
47 size_t zero = 0;
48 unsigned int iov_count = 1;
49 struct iovec iov;
50
51 max_data = iov.iov_len = pmlreq->req_bytes_packed;
52 iov.iov_base = (IOVBASE_TYPE *) VPESSIMIST_SEND_FTREQ(pmlreq)->sb.cursor;
53 opal_convertor_clone_with_position( &pmlreq->req_base.req_convertor,
54 &conv, 0, &zero );
55 opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
56 }
57 }
58
59 #define __SENDER_BASED_METHOD_FLUSH(REQ)
60
61
62
63
64
65 #elif defined(SB_USE_CONVERTOR_METHOD)
66 int32_t vprotocol_pessimist_sender_based_convertor_advance(opal_convertor_t*,
67 struct iovec*,
68 uint32_t*,
69 size_t*);
70
71 #define __SENDER_BASED_METHOD_COPY(REQ) do { \
72 opal_convertor_t *pConv; \
73 mca_vprotocol_pessimist_send_request_t *ftreq; \
74 \
75 pConv = & (REQ)->req_base.req_convertor; \
76 ftreq = VPESSIMIST_SEND_FTREQ(REQ); \
77 ftreq->sb.conv_flags = pConv->flags; \
78 ftreq->sb.conv_advance = pConv->fAdvance; \
79 \
80 pConv->flags &= ~CONVERTOR_NO_OP; \
81 pConv->fAdvance = vprotocol_pessimist_sender_based_convertor_advance; \
82 } while(0)
83
84 #define __SENDER_BASED_METHOD_FLUSH(REQ)
85
86 #define VPESSIMIST_CONV_REQ(CONV) ((mca_vprotocol_pessimist_send_request_t *) \
87 (mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset + \
88 (uintptr_t) ((CONV)->clone_of)))
89
90
91
92
93
94 #elif defined(SB_USE_PROGRESS_METHOD)
95 static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *req)
96 {
97 if(req->req_bytes_packed)
98 {
99 mca_vprotocol_pessimist_send_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
100 ftreq->sb.bytes_progressed = 0;
101 opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
102 &ftreq->list_item);
103 }
104 }
105
106 static inline int vprotocol_pessimist_sb_progress_req(mca_pml_base_send_request_t *req)
107 {
108 mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
109 size_t max_data = 0;
110
111 if(ftreq->sb.bytes_progressed < req->req_bytes_packed)
112 {
113 opal_convertor_t conv;
114 unsigned int iov_count = 1;
115 struct iovec iov;
116 uintptr_t position = ftreq->sb.bytes_progressed;
117 max_data = req->req_bytes_packed - ftreq->sb.bytes_progressed;
118 iov.iov_len = max_data;
119 iov.iov_base = (IOVBASE_TYPE *) (ftreq->sb.cursor + position);
120
121 V_OUTPUT_VERBOSE(80, "pessimist:\tsb\tprgress\t%"PRIpclock"\tsize %lu from position %lu", ftreq->reqid, max_data, position);
122 opal_convertor_clone_with_position(&req->req_base.req_convertor,
123 &conv, 0, &position );
124 opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
125 ftreq->sb.bytes_progressed += max_data;
126 }
127 return max_data;
128 }
129
130 static inline int vprotocol_pessimist_sb_progress_all_reqs(void)
131 {
132 int ret = 0;
133
134
135 if(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq))
136 {
137 mca_vprotocol_pessimist_request_t *ftreq = (mca_vprotocol_pessimist_request_t *)
138 opal_list_remove_first(&mca_vprotocol_pessimist.sender_based.sb_sendreq);
139 if(vprotocol_pessimist_sb_progress_req(VPROTOCOL_SEND_REQ(ftreq)))
140 ret = 1;
141 opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
142 &ftreq->list_item);
143 }
144 return ret;
145 }
146
147 static inline void __SENDER_BASED_METHOD_FLUSH(ompi_request_t *req)
148 {
149 mca_pml_base_send_request_t *pmlreq = (mca_pml_base_send_request_t *) req;
150
151 if((pmlreq->req_base.req_type == MCA_PML_REQUEST_SEND) &&
152 pmlreq->req_bytes_packed)
153 {
154 mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
155 assert(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq));
156 opal_list_remove_item(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
157 (opal_list_item_t *) ftreq);
158 vprotocol_pessimist_sb_progress_req(pmlreq);
159 assert(pmlreq->req_bytes_packed == ftreq->sb.bytes_progressed);
160 }
161 }
162
163 #endif
164
165
166
167
168
169 static inline void vprotocol_pessimist_sender_based_copy_start(ompi_request_t *req)
170 {
171 vprotocol_pessimist_sender_based_header_t *sbhdr;
172 mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
173 mca_pml_base_send_request_t *pmlreq = (mca_pml_base_send_request_t *) req;
174
175
176 if(mca_vprotocol_pessimist.sender_based.sb_available <
177 pmlreq->req_bytes_packed +
178 sizeof(vprotocol_pessimist_sender_based_header_t))
179 {
180 vprotocol_pessimist_sender_based_alloc(pmlreq->req_bytes_packed);
181 }
182
183
184
185 ftreq->sb.cursor = mca_vprotocol_pessimist.sender_based.sb_cursor;
186 #if 1
187 mca_vprotocol_pessimist.sender_based.sb_cursor +=
188 sizeof(vprotocol_pessimist_sender_based_header_t) +
189 pmlreq->req_bytes_packed;
190 mca_vprotocol_pessimist.sender_based.sb_available -=
191 sizeof(vprotocol_pessimist_sender_based_header_t) +
192 pmlreq->req_bytes_packed;
193 #endif
194 sbhdr = (vprotocol_pessimist_sender_based_header_t *) ftreq->sb.cursor;
195 sbhdr->size = pmlreq->req_bytes_packed;
196 sbhdr->dst = pmlreq->req_base.req_peer;
197 sbhdr->tag = pmlreq->req_base.req_tag;
198 sbhdr->contextid = pmlreq->req_base.req_comm->c_contextid;
199 sbhdr->sequence = pmlreq->req_base.req_sequence;
200 ftreq->sb.cursor += sizeof(vprotocol_pessimist_sender_based_header_t);
201 V_OUTPUT_VERBOSE(70, "pessimist:\tsb\tsend\t%"PRIpclock"\tsize %lu (+%lu header)", VPESSIMIST_FTREQ(req)->reqid, (long unsigned)pmlreq->req_bytes_packed, (long unsigned)sizeof(vprotocol_pessimist_sender_based_header_t));
202
203
204 __SENDER_BASED_METHOD_COPY(pmlreq);
205 }
206
207
208
209 #define vprotocol_pessimist_sender_based_flush(REQ) __SENDER_BASED_METHOD_FLUSH(REQ)
210
211 END_C_DECLS
212
213 #endif
214