This source file includes following definitions.
- ompi_osc_pt2pt_frag_constructor
- frag_send_cb
- frag_send
- ompi_osc_pt2pt_frag_start
- ompi_osc_pt2pt_flush_active_frag
- ompi_osc_pt2pt_frag_flush_pending
- ompi_osc_pt2pt_frag_flush_pending_all
- ompi_osc_pt2pt_frag_flush_target
- ompi_osc_pt2pt_frag_flush_all
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include "osc_pt2pt.h"
17 #include "osc_pt2pt_frag.h"
18 #include "osc_pt2pt_data_move.h"
19
20 static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag)
21 {
22 frag->buffer = frag->super.ptr;
23 }
24
25 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, opal_free_list_item_t,
26 ompi_osc_pt2pt_frag_constructor, NULL);
27
28 static int frag_send_cb (ompi_request_t *request)
29 {
30 ompi_osc_pt2pt_frag_t *frag =
31 (ompi_osc_pt2pt_frag_t*) request->req_complete_cb_data;
32 ompi_osc_pt2pt_module_t *module = frag->module;
33
34 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
35 "osc pt2pt: frag_send complete to %d, frag = %p, request = %p",
36 frag->target, (void *) frag, (void *) request));
37
38 mark_outgoing_completion(module);
39 opal_free_list_return (&mca_osc_pt2pt_component.frags, &frag->super);
40
41 ompi_request_free (&request);
42
43 return 1;
44 }
45
46 static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag)
47 {
48 int count;
49
50 count = (int)((uintptr_t) frag->top - (uintptr_t) frag->buffer);
51
52 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
53 "osc pt2pt: frag_send called to %d, frag = %p, count = %d",
54 frag->target, (void *) frag, count));
55
56 OSC_PT2PT_HTON(frag->header, module, frag->target);
57 return ompi_osc_pt2pt_isend_w_cb (frag->buffer, count, MPI_BYTE, frag->target, OSC_PT2PT_FRAG_TAG,
58 module->comm, frag_send_cb, frag);
59 }
60
61
62 int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module,
63 ompi_osc_pt2pt_frag_t *frag)
64 {
65 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, frag->target);
66 int ret;
67
68 assert(0 == frag->pending && peer->active_frag != (intptr_t) frag);
69
70
71
72 ompi_osc_signal_outgoing (module, frag->target, 1);
73
74
75
76 if (!ompi_osc_pt2pt_peer_sends_active (module, frag->target) || opal_list_get_size (&peer->queued_frags)) {
77 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "queuing fragment to peer %d",
78 frag->target));
79 OPAL_THREAD_SCOPED_LOCK(&peer->lock,
80 opal_list_append(&peer->queued_frags, (opal_list_item_t *) frag));
81 return OMPI_SUCCESS;
82 }
83
84 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "sending fragment to peer %d",
85 frag->target));
86
87 ret = frag_send(module, frag);
88
89 opal_condition_broadcast(&module->cond);
90
91 return ret;
92 }
93
94 static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_peer_t *peer)
95 {
96 ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
97 int ret = OMPI_SUCCESS;
98
99 if (NULL == active_frag) {
100
101 return OMPI_SUCCESS;
102 }
103
104 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
105 "osc pt2pt: flushing active fragment to target %d. pending: %d",
106 active_frag->target, active_frag->pending));
107
108 if (opal_atomic_compare_exchange_strong_ptr (&peer->active_frag, (intptr_t *) &active_frag, 0)) {
109 if (0 != OPAL_THREAD_ADD_FETCH32(&active_frag->pending, -1)) {
110
111 return OMPI_ERR_RMA_SYNC;
112 }
113
114 ompi_osc_signal_outgoing (module, active_frag->target, 1);
115 ret = frag_send (module, active_frag);
116 }
117
118 return ret;
119 }
120
121 int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target)
122 {
123 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
124 ompi_osc_pt2pt_frag_t *frag;
125 int ret = OMPI_SUCCESS;
126
127
128 OPAL_THREAD_LOCK(&peer->lock);
129 while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
130 ret = frag_send(module, frag);
131 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
132 break;
133 }
134 }
135 OPAL_THREAD_UNLOCK(&peer->lock);
136
137 return ret;
138 }
139
140 int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module)
141 {
142 int ret = OPAL_SUCCESS;
143
144 for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
145 ret = ompi_osc_pt2pt_frag_flush_pending (module, i);
146 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
147 return ret;
148 }
149 }
150
151 return ret;
152 }
153
154 int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
155 {
156 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
157 int ret = OMPI_SUCCESS;
158
159 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
160 "osc pt2pt: frag flush to target target %d. queue fragments: %lu",
161 target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
162
163 ret = ompi_osc_pt2pt_frag_flush_pending (module, target);
164 if (OMPI_SUCCESS != ret) {
165
166 return ret;
167 }
168
169
170
171 ret = ompi_osc_pt2pt_flush_active_frag (module, peer);
172
173 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
174 "osc pt2pt: frag flush target %d finished", target));
175
176 return ret;
177 }
178
179 int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
180 {
181 int ret = OMPI_SUCCESS;
182
183 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
184 "osc pt2pt: frag flush all begin"));
185
186
187 for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
188 ret = ompi_osc_pt2pt_frag_flush_target (module, i);
189 if (OMPI_SUCCESS != ret) {
190 break;
191 }
192 }
193
194 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
195 "osc pt2pt: frag flush all done. ret: %d", ret));
196
197 return ret;
198 }