This source file includes following definitions.
- vprotocol_pessimist_matching_log_prepare
- vprotocol_pessimist_matching_log_finish
- vprotocol_pessimist_event_flush
- vprotocol_pessimist_delivery_log
1
2
3
4
5
6
7
8
9
10
11 #ifndef __VPROTOCOL_PESSIMIST_EVENTLOG_H__
12 #define __VPROTOCOL_PESSIMIST_EVENTLOG_H__
13
14 #include "vprotocol_pessimist.h"
15 #include "vprotocol_pessimist_request.h"
16 #include "vprotocol_pessimist_eventlog_protocol.h"
17
18 BEGIN_C_DECLS
19
20
21
22
23 int vprotocol_pessimist_event_logger_connect(int el_rank, ompi_communicator_t **el_comm);
24
25
26
27
28 int vprotocol_pessimist_event_logger_disconnect(ompi_communicator_t *el_comm);
29
30
31
32
33
34
35
36
37
38
39
40 static inline void vprotocol_pessimist_matching_log_prepare(ompi_request_t *req)
41 {
42 mca_pml_base_request_t *pmlreq = (mca_pml_base_request_t *) req;
43 if(MPI_ANY_SOURCE == pmlreq->req_peer)
44 {
45 mca_vprotocol_pessimist_event_t *event;
46 VPESSIMIST_MATCHING_EVENT_NEW(event);
47 event->req = pmlreq;
48 VPESSIMIST_RECV_FTREQ(req)->event = event;
49 opal_list_append(&mca_vprotocol_pessimist.pending_events,
50 (opal_list_item_t *) event);
51 }
52 }
53
54
55
56
57
58 static inline void vprotocol_pessimist_matching_log_finish(ompi_request_t *req)
59 {
60 mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_FTREQ(req);
61 if(ftreq->event)
62 {
63 mca_vprotocol_pessimist_event_t *event;
64 vprotocol_pessimist_matching_event_t *mevent;
65
66 V_OUTPUT_VERBOSE(70, "pessimist:\tlog\tmatch\t%"PRIpclock"\tsrc %d\tseq %"PRIpclock, ftreq->reqid, req->req_status.MPI_SOURCE, ((mca_pml_base_request_t *) req)->req_sequence);
67 event = ftreq->event;
68 mevent = &(event->u_event.e_matching);
69 mevent->reqid = ftreq->reqid;
70 mevent->src = req->req_status.MPI_SOURCE;
71 ftreq->event = NULL;
72 event->req = NULL;
73 }
74 }
75
76 #include "ompi/request/request_default.h"
77
78
79 #define __VPROTOCOL_PESSIMIST_SEND_BUFFER() do { \
80 if(OPAL_UNLIKELY(mca_vprotocol_pessimist.event_buffer_length)) \
81 { \
82 int rc; \
83 ompi_request_t *req; \
84 vprotocol_pessimist_clock_t max_clock; \
85 if(OPAL_UNLIKELY(ompi_comm_invalid(mca_vprotocol_pessimist.el_comm))) \
86 { \
87 rc = vprotocol_pessimist_event_logger_connect(0, \
88 &mca_vprotocol_pessimist.el_comm); \
89 if(OMPI_SUCCESS != rc) \
90 OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc, \
91 __FILE__ ": failed to connect to an Event Logger"); \
92 } \
93 rc = mca_pml_v.host_pml.pml_irecv(&max_clock, \
94 1, MPI_UNSIGNED_LONG_LONG, 0, \
95 VPROTOCOL_PESSIMIST_EVENTLOG_ACK, \
96 mca_vprotocol_pessimist.el_comm, &req); \
97 rc = mca_pml_v.host_pml.pml_send(mca_vprotocol_pessimist.event_buffer,\
98 mca_vprotocol_pessimist.event_buffer_length * \
99 sizeof(vprotocol_pessimist_mem_event_t), MPI_BYTE, 0, \
100 VPROTOCOL_PESSIMIST_EVENTLOG_PUT_EVENTS_CMD, \
101 MCA_PML_BASE_SEND_STANDARD, mca_vprotocol_pessimist.el_comm); \
102 if(OPAL_UNLIKELY(MPI_SUCCESS != rc)) \
103 OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc, \
104 __FILE__ ": failed logging a set of recovery event"); \
105 mca_vprotocol_pessimist.event_buffer_length = 0; \
106 rc = mca_pml_v.host_request_fns.req_wait(&req, MPI_STATUS_IGNORE); \
107 if(OPAL_UNLIKELY(MPI_SUCCESS != rc)) \
108 OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc, \
109 __FILE__ ": failed logging a set of recovery event"); \
110 } \
111 } while(0)
112
113
114
115
116
117 static inline void vprotocol_pessimist_event_flush(void)
118 {
119 if(OPAL_UNLIKELY(!opal_list_is_empty(&mca_vprotocol_pessimist.pending_events)))
120 {
121 mca_vprotocol_pessimist_event_t *event;
122 mca_vprotocol_pessimist_event_t *prv_event;
123
124 for(event =
125 (mca_vprotocol_pessimist_event_t *)
126 opal_list_get_first(&mca_vprotocol_pessimist.pending_events);
127 event !=
128 (mca_vprotocol_pessimist_event_t *)
129 opal_list_get_end(&mca_vprotocol_pessimist.pending_events);
130 event =
131 (mca_vprotocol_pessimist_event_t *)
132 opal_list_get_next(event))
133 {
134 if(event->u_event.e_matching.src == -1)
135 {
136
137
138 assert(event->type == VPROTOCOL_PESSIMIST_EVENT_TYPE_MATCHING);
139 if(event->req->req_ompi.req_status.MPI_SOURCE == -1)
140 {
141 V_OUTPUT_VERBOSE(41, "pessimist:\tlog\tel\t%"PRIpclock"\tnot matched yet (%d)", event->u_event.e_matching.reqid, event->u_event.e_matching.src);
142 continue;
143 }
144 event->u_event.e_matching.src =
145 event->req->req_ompi.req_status.MPI_SOURCE;
146 }
147
148 V_OUTPUT_VERBOSE(40, "pessimist:\tlog\tel\t%"PRIpclock"\tfrom %d\tsent to EL", event->u_event.e_matching.reqid, event->u_event.e_matching.src);
149 mca_vprotocol_pessimist.event_buffer[mca_vprotocol_pessimist.event_buffer_length++] =
150 event->u_event;
151 if(mca_vprotocol_pessimist.event_buffer_length ==
152 mca_vprotocol_pessimist.event_buffer_max_length)
153 __VPROTOCOL_PESSIMIST_SEND_BUFFER();
154 assert(mca_vprotocol_pessimist.event_buffer_length < mca_vprotocol_pessimist.event_buffer_max_length);
155 prv_event = (mca_vprotocol_pessimist_event_t *)
156 opal_list_remove_item(&mca_vprotocol_pessimist.pending_events,
157 (opal_list_item_t *) event);
158 VPESSIMIST_EVENT_RETURN(event);
159 event = prv_event;
160 }
161 }
162 __VPROTOCOL_PESSIMIST_SEND_BUFFER();
163 }
164
165
166
167
168
169
170
171 #define VPROTOCOL_PESSIMIST_MATCHING_REPLAY(src) do { \
172 if(mca_vprotocol_pessimist.replay && ((src) == MPI_ANY_SOURCE)) \
173 vprotocol_pessimist_matching_replay(&(src)); \
174 } while(0)
175 void vprotocol_pessimist_matching_replay(int *src);
176
177
178
179
180
181
182
183
184 static inline void vprotocol_pessimist_delivery_log(ompi_request_t *req)
185 {
186 mca_vprotocol_pessimist_event_t *event;
187 vprotocol_pessimist_delivery_event_t *devent;
188
189 if(req == NULL)
190 {
191
192 V_OUTPUT_VERBOSE(70, "pessimist:\tlog\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
193 event = (mca_vprotocol_pessimist_event_t*)
194 opal_list_get_last(&mca_vprotocol_pessimist.pending_events);
195 if(event->type == VPROTOCOL_PESSIMIST_EVENT_TYPE_DELIVERY &&
196 event->u_event.e_delivery.reqid == 0)
197 {
198
199 event->u_event.e_delivery.probeid = mca_vprotocol_pessimist.clock++;
200 }
201 else
202 {
203
204
205 VPESSIMIST_DELIVERY_EVENT_NEW(event);
206 devent = &(event->u_event.e_delivery);
207 devent->probeid = mca_vprotocol_pessimist.clock++;
208 devent->reqid = 0;
209 opal_list_append(&mca_vprotocol_pessimist.pending_events,
210 (opal_list_item_t *) event);
211 }
212 }
213 else
214 {
215
216 V_OUTPUT_VERBOSE(70, "pessimist:\tlog\tdeliver\t%"PRIpclock"\treq %"PRIpclock, mca_vprotocol_pessimist.clock, VPESSIMIST_FTREQ(req)->reqid);
217 VPESSIMIST_DELIVERY_EVENT_NEW(event);
218 devent = &(event->u_event.e_delivery);
219 devent->probeid = mca_vprotocol_pessimist.clock++;
220 devent->reqid = VPESSIMIST_FTREQ(req)->reqid;
221 opal_list_append(&mca_vprotocol_pessimist.pending_events,
222 (opal_list_item_t *) event);
223 }
224 }
225
226
227
228
229
230
231
232
233
234 #define VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(n, reqs, outcount, i, status) do {\
235 if(mca_vprotocol_pessimist.replay) \
236 vprotocol_pessimist_delivery_replay(n, reqs, outcount, i, status); \
237 } while(0)
238 void vprotocol_pessimist_delivery_replay(size_t, ompi_request_t **,
239 int *, int *, ompi_status_public_t *);
240
241 END_C_DECLS
242
243 #endif