This source file includes following definitions.
- vprotocol_pessimist_event_logger_connect
- vprotocol_pessimist_event_logger_disconnect
- vprotocol_pessimist_matching_replay
- vprotocol_pessimist_delivery_replay
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include "ompi_config.h"
16 #include "vprotocol_pessimist_eventlog.h"
17 #include "opal/mca/pmix/pmix.h"
18 #include "opal/util/printf.h"
19 #include "ompi/dpm/dpm.h"
20
21 int vprotocol_pessimist_event_logger_connect(int el_rank, ompi_communicator_t **el_comm)
22 {
23 int rc;
24 char *port;
25 int rank;
26 vprotocol_pessimist_clock_t connect_info[2];
27 opal_list_t results;
28 opal_pmix_pdata_t *pdat;
29
30 OBJ_CONSTRUCT(&results, opal_list_t);
31 pdat = OBJ_NEW(opal_pmix_pdata_t);
32 opal_asprintf(&pdat->value.key, VPROTOCOL_EVENT_LOGGER_NAME_FMT, el_rank);
33 opal_list_append(&results, &pdat->super);
34
35 rc = opal_pmix.lookup(&results, NULL);
36 if (OPAL_SUCCESS != rc ||
37 OPAL_STRING != pdat->value.type ||
38 NULL == pdat->value.data.string) {
39 OPAL_LIST_DESTRUCT(&results);
40 return OMPI_ERR_NOT_FOUND;
41 }
42 port = strdup(pdat->value.data.string);
43 OPAL_LIST_DESTRUCT(&results);
44 V_OUTPUT_VERBOSE(45, "Found port < %s >", port);
45
46 rc = ompi_dpm_connect_accept(MPI_COMM_SELF, 0, port, true, el_comm);
47 if(OMPI_SUCCESS != rc) {
48 OMPI_ERROR_LOG(rc);
49 }
50
51
52 rank = ompi_comm_rank(&ompi_mpi_comm_world.comm);
53 rc = mca_pml_v.host_pml.pml_send(&rank, 1, MPI_INTEGER, 0,
54 VPROTOCOL_PESSIMIST_EVENTLOG_NEW_CLIENT_CMD,
55 MCA_PML_BASE_SEND_STANDARD,
56 mca_vprotocol_pessimist.el_comm);
57 if(OPAL_UNLIKELY(MPI_SUCCESS != rc))
58 OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc,
59 __FILE__ ": failed sending event logger handshake");
60 rc = mca_pml_v.host_pml.pml_recv(&connect_info, 2, MPI_UNSIGNED_LONG_LONG,
61 0, VPROTOCOL_PESSIMIST_EVENTLOG_NEW_CLIENT_CMD,
62 mca_vprotocol_pessimist.el_comm, MPI_STATUS_IGNORE);
63 if(OPAL_UNLIKELY(MPI_SUCCESS != rc)) \
64 OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc, \
65 __FILE__ ": failed receiving event logger handshake");
66
67 return rc;
68 }
69
70 int vprotocol_pessimist_event_logger_disconnect(ompi_communicator_t *el_comm)
71 {
72 ompi_dpm_disconnect(el_comm);
73 return OMPI_SUCCESS;
74 }
75
76 void vprotocol_pessimist_matching_replay(int *src) {
77 #if OPAL_ENABLE_DEBUG
78 vprotocol_pessimist_clock_t max = 0;
79 #endif
80 mca_vprotocol_pessimist_event_t *event;
81
82
83 for(event = (mca_vprotocol_pessimist_event_t *) opal_list_get_first(&mca_vprotocol_pessimist.replay_events);
84 event != (mca_vprotocol_pessimist_event_t *) opal_list_get_end(&mca_vprotocol_pessimist.replay_events);
85 event = (mca_vprotocol_pessimist_event_t *) opal_list_get_next(event))
86 {
87 vprotocol_pessimist_matching_event_t *mevent;
88
89 if(VPROTOCOL_PESSIMIST_EVENT_TYPE_MATCHING != event->type) continue;
90 mevent = &(event->u_event.e_matching);
91 if(mevent->reqid == mca_vprotocol_pessimist.clock)
92 {
93
94 V_OUTPUT_VERBOSE(70, "pessimist: replay\tmatch\t%"PRIpclock"\trecv is forced from %d", mevent->reqid, mevent->src);
95 (*src) = mevent->src;
96 opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
97 (opal_list_item_t *) event);
98 VPESSIMIST_EVENT_RETURN(event);
99 }
100 #if OPAL_ENABLE_DEBUG
101 else if(mevent->reqid > max)
102 max = mevent->reqid;
103 }
104
105
106 assert(((*src) != MPI_ANY_SOURCE) || (mca_vprotocol_pessimist.clock > max));
107 #else
108 }
109 #endif
110 }
111
112 void vprotocol_pessimist_delivery_replay(size_t n, ompi_request_t **reqs,
113 int *outcount, int *index,
114 ompi_status_public_t *status) {
115 mca_vprotocol_pessimist_event_t *event;
116
117 for(event = (mca_vprotocol_pessimist_event_t *) opal_list_get_first(&mca_vprotocol_pessimist.replay_events);
118 event != (mca_vprotocol_pessimist_event_t *) opal_list_get_end(&mca_vprotocol_pessimist.replay_events);
119 event = (mca_vprotocol_pessimist_event_t *) opal_list_get_next(event))
120 {
121 vprotocol_pessimist_delivery_event_t *devent;
122
123 if(VPROTOCOL_PESSIMIST_EVENT_TYPE_DELIVERY != event->type) continue;
124 devent = &(event->u_event.e_delivery);
125 if(devent->probeid < mca_vprotocol_pessimist.clock)
126 {
127
128 V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
129 *index = MPI_UNDEFINED;
130 *outcount = 0;
131 mca_vprotocol_pessimist.clock++;
132
133 return;
134 }
135 else if(devent->probeid == mca_vprotocol_pessimist.clock)
136 {
137 int i;
138 for(i = 0; i < (int) n; i++)
139 {
140 if(VPESSIMIST_FTREQ(reqs[i])->reqid == devent->reqid)
141 {
142 V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\t%"PRIpclock, devent->probeid, devent->reqid);
143 opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
144 (opal_list_item_t *) event);
145 VPESSIMIST_EVENT_RETURN(event);
146 *index = i;
147 *outcount = 1;
148 mca_vprotocol_pessimist.clock++;
149 ompi_request_wait(&reqs[i], status);
150 return;
151 }
152 }
153 V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
154 assert(devent->reqid == 0);
155 *index = MPI_UNDEFINED;
156 *outcount = 0;
157 mca_vprotocol_pessimist.clock++;
158 opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
159 (opal_list_item_t *) event);
160 VPESSIMIST_EVENT_RETURN(event);
161 return;
162 }
163 }
164 V_OUTPUT_VERBOSE(50, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnot forced", mca_vprotocol_pessimist.clock);
165 }