This source file includes following definitions.
- progress
- init_module_fns
- hcoll_rte_fns_setup
- recv_nb
- send_nb
- test
- ec_handle_compare
- get_ec_handles
- get_my_ec
- group_size
- my_rank
- ec_on_local_node
- get_world_group_handle
- jobid
- group_id
- request_free
- get_coll_handle
- coll_handle_test
- coll_handle_free
- coll_handle_complete
- world_rank
- ompi_combiner_2_hcoll_combiner
- get_mpi_type_envelope
- get_mpi_type_contents
- get_hcoll_type
- set_hcoll_type
- get_mpi_constants
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "ompi_config.h"
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif
28 #include <sys/types.h>
29 #ifdef HAVE_SYS_MMAN_H
30 #include <sys/mman.h>
31 #endif
32 #include <fcntl.h>
33 #include <errno.h>
34
35 #include "coll_hcoll.h"
36
37 #include "ompi/constants.h"
38 #include "ompi/communicator/communicator.h"
39 #include "ompi/datatype/ompi_datatype.h"
40 #include "ompi/datatype/ompi_datatype_internal.h"
41 #include "ompi/mca/pml/pml.h"
42
43
44 #include "hcoll/api/hcoll_dte.h"
45 #include "hcoll/api/hcoll_api.h"
46 #include "hcoll/api/hcoll_constants.h"
47 #include "coll_hcoll_dtypes.h"
48
49
50
51
52
53 static int recv_nb(dte_data_representation_t data ,
54 uint32_t count ,
55 void *buffer,
56 rte_ec_handle_t ,
57 rte_grp_handle_t ,
58 uint32_t tag,
59 rte_request_handle_t * req);
60
61 static int send_nb(dte_data_representation_t data,
62 uint32_t count,
63 void *buffer,
64 rte_ec_handle_t ec_h,
65 rte_grp_handle_t grp_h,
66 uint32_t tag, rte_request_handle_t *req);
67
68 static int test( rte_request_handle_t * request ,
69 int * completed );
70
71 static int ec_handle_compare( rte_ec_handle_t handle_1 ,
72 rte_grp_handle_t
73 group_handle_1 ,
74 rte_ec_handle_t handle_2 ,
75 rte_grp_handle_t
76 group_handle_2 );
77
78 static int get_ec_handles( int num_ec ,
79 int * ec_indexes ,
80 rte_grp_handle_t ,
81 rte_ec_handle_t * ec_handles );
82
83 #if 0
84 static int get_my_ec(rte_grp_handle_t , rte_ec_handle_t *ec_handle);
85 #endif
86
87 static int group_size ( rte_grp_handle_t group );
88 static int my_rank (rte_grp_handle_t grp_h);
89 static int ec_on_local_node (rte_ec_handle_t ec, rte_grp_handle_t group);
90 static rte_grp_handle_t get_world_group_handle(void);
91 static uint32_t jobid(void);
92
93 static void progress(void){
94 opal_progress();
95 }
96
97 static void* get_coll_handle(void);
98 static int coll_handle_test(void* handle);
99 static void coll_handle_free(void *handle);
100 static void coll_handle_complete(void *handle);
101 static int group_id(rte_grp_handle_t group);
102
103 static int world_rank(rte_grp_handle_t grp_h, rte_ec_handle_t ec);
104
105 #if HCOLL_API >= HCOLL_VERSION(3,6)
106 static int get_mpi_type_envelope(void *mpi_type, int *num_integers,
107 int *num_addresses, int *num_datatypes,
108 hcoll_mpi_type_combiner_t *combiner);
109 static int get_mpi_type_contents(void *mpi_type, int max_integers, int max_addresses,
110 int max_datatypes, int *array_of_integers,
111 void *array_of_addresses, void *array_of_datatypes);
112 static int get_hcoll_type(void *mpi_type, dte_data_representation_t *hcoll_type);
113 static int set_hcoll_type(void *mpi_type, dte_data_representation_t hcoll_type);
114 static int get_mpi_constants(size_t *mpi_datatype_size,
115 int *mpi_order_c, int *mpi_order_fortran,
116 int *mpi_distribute_block,
117 int *mpi_distribute_cyclic,
118 int *mpi_distribute_none,
119 int *mpi_distribute_dflt_darg);
120 #endif
121
122 static void init_module_fns(void){
123 hcoll_rte_functions.send_fn = send_nb;
124 hcoll_rte_functions.recv_fn = recv_nb;
125 hcoll_rte_functions.ec_cmp_fn = ec_handle_compare;
126 hcoll_rte_functions.get_ec_handles_fn = get_ec_handles;
127 hcoll_rte_functions.rte_group_size_fn = group_size;
128 hcoll_rte_functions.test_fn = test;
129 hcoll_rte_functions.rte_my_rank_fn = my_rank;
130 hcoll_rte_functions.rte_ec_on_local_node_fn = ec_on_local_node;
131 hcoll_rte_functions.rte_world_group_fn = get_world_group_handle;
132 hcoll_rte_functions.rte_jobid_fn = jobid;
133 hcoll_rte_functions.rte_progress_fn = progress;
134 hcoll_rte_functions.rte_get_coll_handle_fn = get_coll_handle;
135 hcoll_rte_functions.rte_coll_handle_test_fn = coll_handle_test;
136 hcoll_rte_functions.rte_coll_handle_free_fn = coll_handle_free;
137 hcoll_rte_functions.rte_coll_handle_complete_fn = coll_handle_complete;
138 hcoll_rte_functions.rte_group_id_fn = group_id;
139 hcoll_rte_functions.rte_world_rank_fn = world_rank;
140 #if HCOLL_API >= HCOLL_VERSION(3,6)
141 hcoll_rte_functions.rte_get_mpi_type_envelope_fn = get_mpi_type_envelope;
142 hcoll_rte_functions.rte_get_mpi_type_contents_fn = get_mpi_type_contents;
143 hcoll_rte_functions.rte_get_hcoll_type_fn = get_hcoll_type;
144 hcoll_rte_functions.rte_set_hcoll_type_fn = set_hcoll_type;
145 hcoll_rte_functions.rte_get_mpi_constants_fn = get_mpi_constants;
146 #endif
147 }
148
149
150 void hcoll_rte_fns_setup(void)
151 {
152 init_module_fns();
153 OBJ_CONSTRUCT(&mca_coll_hcoll_component.requests, opal_free_list_t);
154 opal_free_list_init(
155 &(mca_coll_hcoll_component.requests),
156 sizeof(ompi_request_t),
157
158 8,
159 OBJ_CLASS(ompi_request_t),
160
161 0, 0,
162
163 10,
164 -1,
165 10,
166
167 NULL,
168 0,
169 NULL,
170 NULL,
171 NULL
172 );
173 }
174
175 static int recv_nb(struct dte_data_representation_t data,
176 uint32_t count ,
177 void *buffer,
178 rte_ec_handle_t ec_h,
179 rte_grp_handle_t grp_h,
180 uint32_t tag,
181 rte_request_handle_t *req)
182 {
183 ompi_communicator_t *comm = (ompi_communicator_t *)grp_h;
184
185 if (NULL == ec_h.handle && -1 != ec_h.rank) {
186 fprintf(stderr,"***Error in hcolrte_rml_recv_nb: wrong null argument: "
187 "ec_h.handle = %p, ec_h.rank = %d\n",ec_h.handle,ec_h.rank);
188 return HCOLL_ERROR;
189 }
190 assert(HCOL_DTE_IS_INLINE(data));
191
192 size_t size;
193 ompi_request_t *ompi_req;
194
195 if (!buffer && !HCOL_DTE_IS_ZERO(data)) {
196 fprintf(stderr, "***Error in hcolrte_rml_recv_nb: buffer pointer is NULL"
197 " for non DTE_ZERO INLINE data representation\n");
198 return HCOLL_ERROR;
199 }
200 size = (size_t)data.rep.in_line_rep.data_handle.in_line.packed_size*count/8;
201
202 HCOL_VERBOSE(30,"PML_IRECV: dest = %d: buf = %p: size = %u: comm = %p",
203 ec_h.rank, buffer, (unsigned int)size, (void *)comm);
204 if (MCA_PML_CALL(irecv(buffer,size,&(ompi_mpi_unsigned_char.dt),ec_h.rank,
205 tag,comm,&ompi_req)))
206 {
207 return HCOLL_ERROR;
208 }
209 req->data = (void *)ompi_req;
210 req->status = HCOLRTE_REQUEST_ACTIVE;
211
212 return HCOLL_SUCCESS;
213 }
214
215
216 static int send_nb( dte_data_representation_t data,
217 uint32_t count,
218 void *buffer,
219 rte_ec_handle_t ec_h,
220 rte_grp_handle_t grp_h,
221 uint32_t tag,
222 rte_request_handle_t *req)
223 {
224 ompi_communicator_t *comm = (ompi_communicator_t *)grp_h;
225
226 if (! ec_h.handle) {
227 fprintf(stderr,"***Error in hcolrte_rml_send_nb: wrong null argument: "
228 "ec_h.handle = %p, ec_h.rank = %d\n",ec_h.handle,ec_h.rank);
229 return HCOLL_ERROR;
230 }
231 assert(HCOL_DTE_IS_INLINE(data));
232
233 size_t size;
234 ompi_request_t *ompi_req;
235 if (!buffer && !HCOL_DTE_IS_ZERO(data)) {
236 fprintf(stderr, "***Error in hcolrte_rml_send_nb: buffer pointer is NULL"
237 " for non DTE_ZERO INLINE data representation\n");
238 return HCOLL_ERROR;
239 }
240 size = (size_t)data.rep.in_line_rep.data_handle.in_line.packed_size*count/8;
241 HCOL_VERBOSE(30,"PML_ISEND: dest = %d: buf = %p: size = %u: comm = %p",
242 ec_h.rank, buffer, (unsigned int)size, (void *)comm);
243 if (MCA_PML_CALL(isend(buffer,size,&(ompi_mpi_unsigned_char.dt),ec_h.rank,
244 tag,MCA_PML_BASE_SEND_STANDARD,comm,&ompi_req)))
245 {
246 return HCOLL_ERROR;
247 }
248 req->data = (void *)ompi_req;
249 req->status = HCOLRTE_REQUEST_ACTIVE;
250 return HCOLL_SUCCESS;
251 }
252
253 static int test( rte_request_handle_t * request ,
254 int * completed )
255 {
256 ompi_request_t * ompi_req = (ompi_request_t *)request->data;
257 if (HCOLRTE_REQUEST_ACTIVE != request->status){
258 *completed = true;
259 return HCOLL_SUCCESS;
260 }
261
262
263 *completed = REQUEST_COMPLETE(ompi_req);
264 if (*completed){
265 ompi_request_free(&ompi_req);
266 request->status = HCOLRTE_REQUEST_DONE;
267 }
268
269 return HCOLL_SUCCESS;
270 }
271
272 static int ec_handle_compare( rte_ec_handle_t handle_1 ,
273 rte_grp_handle_t
274 group_handle_1 ,
275 rte_ec_handle_t handle_2 ,
276 rte_grp_handle_t
277 group_handle_2 )
278 {
279 return handle_1.handle == handle_2.handle;
280 }
281
282 static int get_ec_handles( int num_ec ,
283 int * ec_indexes ,
284 rte_grp_handle_t grp_h,
285 rte_ec_handle_t * ec_handles )
286 {
287 int i;
288 ompi_communicator_t *comm = (ompi_communicator_t *)grp_h;
289 for (i=0; i<num_ec; i++){
290 ompi_proc_t *proc = ompi_comm_peer_lookup(comm,ec_indexes[i]);
291 ec_handles[i].rank = ec_indexes[i];
292 ec_handles[i].handle = (void *)proc;
293 }
294 return HCOLL_SUCCESS;
295 }
296
297 #if 0
298 static int get_my_ec ( rte_grp_handle_t grp_h, rte_ec_handle_t *ec_handle)
299 {
300 ompi_communicator_t *comm = (ompi_communicator_t *)grp_h;
301 int my_rank = ompi_comm_rank(comm);
302 ompi_proc_t *my_proc = ompi_comm_peer_lookup(comm,my_rank);
303 ec_handle->handle = (void *)my_proc;
304 ec_handle->rank = my_rank;
305 return HCOLL_SUCCESS;
306 }
307 #endif
308
309 static int group_size ( rte_grp_handle_t grp_h )
310 {
311 return ompi_comm_size((ompi_communicator_t *)grp_h);
312 }
313
314 static int my_rank (rte_grp_handle_t grp_h )
315 {
316 return ompi_comm_rank((ompi_communicator_t *)grp_h);
317 }
318
319 static int ec_on_local_node (rte_ec_handle_t ec, rte_grp_handle_t group){
320 ompi_proc_t *proc = (ompi_proc_t *)ec.handle;
321 return OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags);
322 }
323
324
325 static rte_grp_handle_t get_world_group_handle(void)
326 {
327 return (rte_grp_handle_t)&ompi_mpi_comm_world.comm;
328 }
329
330 static uint32_t jobid(void){
331 return ORTE_PROC_MY_NAME->jobid;
332 }
333
334 static int group_id(rte_grp_handle_t group){
335 return ((ompi_communicator_t *)group)->c_contextid;
336 }
337
338 static int
339 request_free(struct ompi_request_t **ompi_req)
340 {
341 ompi_request_t *req = *ompi_req;
342 if (!coll_handle_test(req)) {
343 return OMPI_ERROR;
344 }
345 coll_handle_free(req);
346 *ompi_req = &ompi_request_empty;
347 return OMPI_SUCCESS;
348 }
349
350 static void* get_coll_handle(void)
351 {
352 ompi_request_t *ompi_req;
353 opal_free_list_item_t *item;
354 item = opal_free_list_wait (&(mca_coll_hcoll_component.requests));
355 if (OPAL_UNLIKELY(NULL == item)) {
356 HCOL_ERROR("Wait for free list failed.\n");
357 return NULL;
358 }
359 ompi_req = (ompi_request_t *)item;
360 OMPI_REQUEST_INIT(ompi_req,false);
361 ompi_req->req_complete_cb = NULL;
362 ompi_req->req_status.MPI_ERROR = MPI_SUCCESS;
363 ompi_req->req_state = OMPI_REQUEST_ACTIVE;
364 ompi_req->req_free = request_free;
365 ompi_req->req_type = OMPI_REQUEST_COLL;
366 return (void *)ompi_req;
367 }
368
369 static int coll_handle_test(void* handle)
370 {
371 ompi_request_t *ompi_req = (ompi_request_t *)handle;
372 return REQUEST_COMPLETE(ompi_req);;
373 }
374
375 static void coll_handle_free(void *handle){
376 ompi_request_t *ompi_req = (ompi_request_t *)handle;
377 opal_free_list_return (&mca_coll_hcoll_component.requests,
378 (opal_free_list_item_t *)ompi_req);
379 }
380
381 static void coll_handle_complete(void *handle)
382 {
383 ompi_request_t *ompi_req = (ompi_request_t *)handle;
384 ompi_request_complete(ompi_req,true);
385 }
386
387
388 static int world_rank(rte_grp_handle_t grp_h, rte_ec_handle_t ec){
389 ompi_proc_t *proc = (ompi_proc_t *)ec.handle;
390 return ((ompi_process_name_t*)&proc->super.proc_name)->vpid;
391 }
392
393 #if HCOLL_API >= HCOLL_VERSION(3,6)
394 hcoll_mpi_type_combiner_t ompi_combiner_2_hcoll_combiner(int ompi_combiner) {
395 switch (ompi_combiner)
396 {
397 case MPI_COMBINER_CONTIGUOUS:
398 return HCOLL_MPI_COMBINER_CONTIGUOUS;
399 case MPI_COMBINER_VECTOR:
400 return HCOLL_MPI_COMBINER_VECTOR;
401 case MPI_COMBINER_HVECTOR:
402 return HCOLL_MPI_COMBINER_HVECTOR;
403 case MPI_COMBINER_INDEXED:
404 return HCOLL_MPI_COMBINER_INDEXED;
405 case MPI_COMBINER_HINDEXED_INTEGER:
406 case MPI_COMBINER_HINDEXED:
407 return HCOLL_MPI_COMBINER_HINDEXED;
408 case MPI_COMBINER_DUP:
409 return HCOLL_MPI_COMBINER_DUP;
410 case MPI_COMBINER_INDEXED_BLOCK:
411 return HCOLL_MPI_COMBINER_INDEXED_BLOCK;
412 case MPI_COMBINER_HINDEXED_BLOCK:
413 return HCOLL_MPI_COMBINER_HINDEXED_BLOCK;
414 case MPI_COMBINER_SUBARRAY:
415 return HCOLL_MPI_COMBINER_SUBARRAY;
416 case MPI_COMBINER_DARRAY:
417 return HCOLL_MPI_COMBINER_DARRAY;
418 case MPI_COMBINER_F90_REAL:
419 return HCOLL_MPI_COMBINER_F90_REAL;
420 case MPI_COMBINER_F90_COMPLEX:
421 return HCOLL_MPI_COMBINER_F90_COMPLEX;
422 case MPI_COMBINER_F90_INTEGER:
423 return HCOLL_MPI_COMBINER_F90_INTEGER;
424 case MPI_COMBINER_RESIZED:
425 return HCOLL_MPI_COMBINER_RESIZED;
426 case MPI_COMBINER_STRUCT:
427 case MPI_COMBINER_STRUCT_INTEGER:
428 return HCOLL_MPI_COMBINER_STRUCT;
429 default:
430 break;
431 }
432 return HCOLL_MPI_COMBINER_LAST;
433 }
434
435
436 static int get_mpi_type_envelope(void *mpi_type, int *num_integers,
437 int *num_addresses, int *num_datatypes,
438 hcoll_mpi_type_combiner_t *combiner) {
439 int ompi_combiner, rc;
440 rc = ompi_datatype_get_args( (ompi_datatype_t*)mpi_type, 0, num_integers, NULL,
441 num_addresses, NULL,
442 num_datatypes, NULL, &ompi_combiner);
443 *combiner = ompi_combiner_2_hcoll_combiner(ompi_combiner);
444 return rc == OMPI_SUCCESS ? HCOLL_SUCCESS : HCOLL_ERROR;
445 }
446
447 static int get_mpi_type_contents(void *mpi_type, int max_integers, int max_addresses,
448 int max_datatypes, int *array_of_integers,
449 void *array_of_addresses, void *array_of_datatypes) {
450 int rc;
451 rc = ompi_datatype_get_args( (ompi_datatype_t*)mpi_type, 1, &max_integers, array_of_integers,
452 &max_addresses, array_of_addresses,
453 &max_datatypes, array_of_datatypes, NULL );
454 return rc == OMPI_SUCCESS ? HCOLL_SUCCESS : HCOLL_ERROR;
455 }
456
457 static int get_hcoll_type(void *mpi_type, dte_data_representation_t *hcoll_type) {
458 *hcoll_type = ompi_dtype_2_hcoll_dtype((ompi_datatype_t*)mpi_type, TRY_FIND_DERIVED);
459 return HCOL_DTE_IS_ZERO((*hcoll_type)) ? HCOLL_ERR_NOT_FOUND : HCOLL_SUCCESS;
460 }
461
462 static int set_hcoll_type(void *mpi_type, dte_data_representation_t hcoll_type) {
463 int rc;
464 mca_coll_hcoll_dtype_t *hcoll_dtype = (mca_coll_hcoll_dtype_t*)
465 opal_free_list_get(&mca_coll_hcoll_component.dtypes);
466 ompi_datatype_t *dtype = (ompi_datatype_t*)mpi_type;
467 hcoll_dtype->type = hcoll_type;
468 rc = ompi_attr_set_c(TYPE_ATTR, (void*)dtype, &(dtype->d_keyhash), hcoll_type_attr_keyval, (void *)hcoll_dtype, false);
469 if (OMPI_SUCCESS != rc) {
470 HCOL_VERBOSE(1,"hcoll ompi_attr_set_c failed for derived dtype");
471 goto Cleanup;
472 }
473 return HCOLL_SUCCESS;
474 Cleanup:
475 opal_free_list_return(&mca_coll_hcoll_component.dtypes,
476 &hcoll_dtype->super);
477 return rc;
478 }
479
480 static int get_mpi_constants(size_t *mpi_datatype_size,
481 int *mpi_order_c, int *mpi_order_fortran,
482 int *mpi_distribute_block,
483 int *mpi_distribute_cyclic,
484 int *mpi_distribute_none,
485 int *mpi_distribute_dflt_darg) {
486 *mpi_datatype_size = sizeof(MPI_Datatype);
487 *mpi_order_c = MPI_ORDER_C;
488 *mpi_order_fortran = MPI_ORDER_FORTRAN;
489 *mpi_distribute_block = MPI_DISTRIBUTE_BLOCK;
490 *mpi_distribute_cyclic = MPI_DISTRIBUTE_CYCLIC;
491 *mpi_distribute_none = MPI_DISTRIBUTE_NONE;
492 *mpi_distribute_dflt_darg = MPI_DISTRIBUTE_DFLT_DARG;
493 return HCOLL_SUCCESS;
494 }
495
496 #endif