This source file includes following definitions.
- opal_common_ucx_mem_release_cb
- opal_common_ucx_mca_var_register
- opal_common_ucx_mca_register
- opal_common_ucx_mca_deregister
- opal_common_ucx_empty_complete_cb
- opal_common_ucx_mca_fence_complete_cb
- opal_common_ucx_mca_proc_added
- opal_common_ucx_mca_pmix_fence_nb
- opal_common_ucx_mca_pmix_fence
- opal_common_ucx_wait_all_requests
- opal_common_ucx_del_procs_nofence
- opal_common_ucx_del_procs
1
2
3
4
5
6
7
8
9
10 #include "opal_config.h"
11
12 #include "common_ucx.h"
13 #include "opal/mca/base/mca_base_var.h"
14 #include "opal/mca/base/mca_base_framework.h"
15 #include "opal/mca/pmix/pmix.h"
16 #include "opal/memoryhooks/memory.h"
17
18 #include <ucm/api/ucm.h>
19
20
21
22 extern mca_base_framework_t opal_memory_base_framework;
23
24 opal_common_ucx_module_t opal_common_ucx = {
25 .verbose = 0,
26 .progress_iterations = 100,
27 .registered = 0,
28 .opal_mem_hooks = 0
29 };
30
31 static void opal_common_ucx_mem_release_cb(void *buf, size_t length,
32 void *cbdata, bool from_alloc)
33 {
34 ucm_vm_munmap(buf, length);
35 }
36
37 OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component)
38 {
39 static int registered = 0;
40 static int hook_index;
41 static int verbose_index;
42 static int progress_index;
43 if (!registered) {
44 verbose_index = mca_base_var_register("opal", "opal_common", "ucx", "verbose",
45 "Verbose level of the UCX components",
46 MCA_BASE_VAR_TYPE_INT, NULL, 0,
47 MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_3,
48 MCA_BASE_VAR_SCOPE_LOCAL,
49 &opal_common_ucx.verbose);
50 progress_index = mca_base_var_register("opal", "opal_common", "ucx", "progress_iterations",
51 "Set number of calls of internal UCX progress "
52 "calls per opal_progress call",
53 MCA_BASE_VAR_TYPE_INT, NULL, 0,
54 MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_3,
55 MCA_BASE_VAR_SCOPE_LOCAL,
56 &opal_common_ucx.progress_iterations);
57 hook_index = mca_base_var_register("opal", "opal_common", "ucx", "opal_mem_hooks",
58 "Use OPAL memory hooks, instead of UCX internal "
59 "memory hooks", MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
60 OPAL_INFO_LVL_3,
61 MCA_BASE_VAR_SCOPE_LOCAL,
62 &opal_common_ucx.opal_mem_hooks);
63 registered = 1;
64 }
65 if (component) {
66 mca_base_var_register_synonym(verbose_index, component->mca_project_name,
67 component->mca_type_name,
68 component->mca_component_name,
69 "verbose", 0);
70 mca_base_var_register_synonym(progress_index, component->mca_project_name,
71 component->mca_type_name,
72 component->mca_component_name,
73 "progress_iterations", 0);
74 mca_base_var_register_synonym(hook_index, component->mca_project_name,
75 component->mca_type_name,
76 component->mca_component_name,
77 "opal_mem_hooks", 0);
78 }
79 }
80
81 OPAL_DECLSPEC void opal_common_ucx_mca_register(void)
82 {
83 int ret;
84
85 opal_common_ucx.registered++;
86 if (opal_common_ucx.registered > 1) {
87
88 return;
89 }
90
91 opal_common_ucx.output = opal_output_open(NULL);
92 opal_output_set_verbosity(opal_common_ucx.output, opal_common_ucx.verbose);
93
94
95 if (opal_common_ucx.opal_mem_hooks) {
96 ret = mca_base_framework_open(&opal_memory_base_framework, 0);
97 if (OPAL_SUCCESS != ret) {
98
99 MCA_COMMON_UCX_VERBOSE(1, "failed to initialize memory base framework: %d, "
100 "memory hooks will not be used", ret);
101 return;
102 }
103
104 if ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) ==
105 ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) &
106 opal_mem_hooks_support_level())) {
107 MCA_COMMON_UCX_VERBOSE(1, "%s", "using OPAL memory hooks as external events");
108 ucm_set_external_event(UCM_EVENT_VM_UNMAPPED);
109 opal_mem_hooks_register_release(opal_common_ucx_mem_release_cb, NULL);
110 }
111 }
112 }
113
114 OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void)
115 {
116
117 opal_common_ucx.registered--;
118 assert(opal_common_ucx.registered >= 0);
119 if (opal_common_ucx.registered) {
120 return;
121 }
122 opal_mem_hooks_unregister_release(opal_common_ucx_mem_release_cb);
123 opal_output_close(opal_common_ucx.output);
124 }
125
126 void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status)
127 {
128 }
129
130 static void opal_common_ucx_mca_fence_complete_cb(int status, void *fenced)
131 {
132 *(int*)fenced = 1;
133 }
134
135 void opal_common_ucx_mca_proc_added(void)
136 {
137 #if HAVE_DECL_UCM_TEST_EVENTS
138 static int warned = 0;
139 static char *mem_hooks_suggestion = "Pls try adding --mca opal_common_ucx_opal_mem_hooks 1 "
140 "to mpirun/oshrun command line to resolve this issue.";
141 ucs_status_t status;
142
143 if (!warned) {
144 status = ucm_test_events(UCM_EVENT_VM_UNMAPPED);
145 if (status != UCS_OK) {
146 MCA_COMMON_UCX_WARN("UCX is unable to handle VM_UNMAP event. "
147 "This may cause performance degradation or data "
148 "corruption. %s",
149 opal_common_ucx.opal_mem_hooks ? "" : mem_hooks_suggestion);
150 warned = 1;
151 }
152 }
153 #endif
154 }
155
156 OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
157 {
158 return opal_pmix.fence_nb(NULL, 0, opal_common_ucx_mca_fence_complete_cb, (void *)fenced);
159 }
160
161 OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
162 {
163 volatile int fenced = 0;
164 int ret = OPAL_SUCCESS;
165
166 if (OPAL_SUCCESS != (ret = opal_pmix.fence_nb(NULL, 0,
167 opal_common_ucx_mca_fence_complete_cb, (void*)&fenced))){
168 return ret;
169 }
170
171 while (!fenced) {
172 ucp_worker_progress(worker);
173 }
174
175 return ret;
176 }
177
178
179 static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker_h worker)
180 {
181 int i;
182
183 MCA_COMMON_UCX_VERBOSE(2, "waiting for %d disconnect requests", count);
184 for (i = 0; i < count; ++i) {
185 opal_common_ucx_wait_request(reqs[i], worker, "ucp_disconnect_nb");
186 reqs[i] = NULL;
187 }
188 }
189
190 OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
191 size_t my_rank, size_t max_disconnect, ucp_worker_h worker) {
192 size_t num_reqs;
193 size_t max_reqs;
194 void *dreq, **dreqs;
195 size_t i;
196 size_t n;
197
198 MCA_COMMON_UCX_ASSERT(procs || !count);
199 MCA_COMMON_UCX_ASSERT(max_disconnect > 0);
200
201 max_reqs = (max_disconnect > count) ? count : max_disconnect;
202
203 dreqs = malloc(sizeof(*dreqs) * max_reqs);
204 if (dreqs == NULL) {
205 return OPAL_ERR_OUT_OF_RESOURCE;
206 }
207
208 num_reqs = 0;
209
210 for (i = 0; i < count; ++i) {
211 n = (i + my_rank) % count;
212 if (procs[n].ep == NULL) {
213 continue;
214 }
215
216 MCA_COMMON_UCX_VERBOSE(2, "disconnecting from rank %zu", procs[n].vpid);
217 dreq = ucp_disconnect_nb(procs[n].ep);
218 if (dreq != NULL) {
219 if (UCS_PTR_IS_ERR(dreq)) {
220 MCA_COMMON_UCX_ERROR("ucp_disconnect_nb(%zu) failed: %s", procs[n].vpid,
221 ucs_status_string(UCS_PTR_STATUS(dreq)));
222 continue;
223 } else {
224 dreqs[num_reqs++] = dreq;
225 if (num_reqs >= max_disconnect) {
226 opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
227 num_reqs = 0;
228 }
229 }
230 }
231 }
232
233
234
235 opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
236 free(dreqs);
237
238 return OPAL_SUCCESS;
239 }
240
241 OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
242 size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
243 {
244 opal_common_ucx_del_procs_nofence(procs, count, my_rank, max_disconnect, worker);
245
246 return opal_common_ucx_mca_pmix_fence(worker);
247 }
248