1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2017 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2015-2017 Research Organization for Information Science
13 * and Technology (RIST). All rights reserved.
14 * Copyright (c) 2017 IBM Corporation. All rights reserved.
15 * $COPYRIGHT$
16 *
17 * Additional copyrights may follow
18 *
19 * $HEADER$
20 */
21
22 #include "ompi_config.h"
23 #include "coll_basic.h"
24
25 #include "mpi.h"
26 #include "ompi/constants.h"
27 #include "ompi/datatype/ompi_datatype.h"
28 #include "ompi/op/op.h"
29 #include "ompi/mca/coll/coll.h"
30 #include "ompi/mca/coll/base/coll_tags.h"
31 #include "ompi/mca/coll/base/coll_base_util.h"
32 #include "coll_basic.h"
33 #include "ompi/mca/pml/pml.h"
34
35
36 /*
37 * allreduce_intra
38 *
39 * Function: - allreduce using other MPI collectives
40 * Accepts: - same as MPI_Allreduce()
41 * Returns: - MPI_SUCCESS or error code
42 */
43 int
44 mca_coll_basic_allreduce_intra(const void *sbuf, void *rbuf, int count,
45 struct ompi_datatype_t *dtype,
46 struct ompi_op_t *op,
47 struct ompi_communicator_t *comm,
48 mca_coll_base_module_t *module)
49 {
50 int err;
51
52 /* Reduce to 0 and broadcast. */
53
54 if (MPI_IN_PLACE == sbuf) {
55 if (0 == ompi_comm_rank(comm)) {
56 err = comm->c_coll->coll_reduce(MPI_IN_PLACE, rbuf, count, dtype, op, 0, comm, comm->c_coll->coll_reduce_module);
57 } else {
58 err = comm->c_coll->coll_reduce(rbuf, NULL, count, dtype, op, 0, comm, comm->c_coll->coll_reduce_module);
59 }
60 } else {
61 err = comm->c_coll->coll_reduce(sbuf, rbuf, count, dtype, op, 0, comm, comm->c_coll->coll_reduce_module);
62 }
63 if (MPI_SUCCESS != err) {
64 return err;
65 }
66
67 return comm->c_coll->coll_bcast(rbuf, count, dtype, 0, comm, comm->c_coll->coll_bcast_module);
68 }
69
70
71 /*
72 * allreduce_inter
73 *
74 * Function: - allreduce using other MPI collectives
75 * Accepts: - same as MPI_Allreduce()
76 * Returns: - MPI_SUCCESS or error code
77 */
78 int
79 mca_coll_basic_allreduce_inter(const void *sbuf, void *rbuf, int count,
80 struct ompi_datatype_t *dtype,
81 struct ompi_op_t *op,
82 struct ompi_communicator_t *comm,
83 mca_coll_base_module_t *module)
84 {
85 int err, i, rank, root = 0, rsize, line;
86 ptrdiff_t extent, dsize, gap;
87 char *tmpbuf = NULL, *pml_buffer = NULL;
88 ompi_request_t **reqs = NULL;
89
90 rank = ompi_comm_rank(comm);
91 rsize = ompi_comm_remote_size(comm);
92
93 /* determine result of the remote group, you cannot
94 * use coll_reduce for inter-communicators, since than
95 * you would need to determine an order between the
96 * two groups (e.g. which group is providing the data
97 * and which one enters coll_reduce with providing
98 * MPI_PROC_NULL as root argument etc.) Here,
99 * we execute the data exchange for both groups
100 * simultaniously. */
101 /*****************************************************************/
102 if (rank == root) {
103 err = ompi_datatype_type_extent(dtype, &extent);
104 if (OMPI_SUCCESS != err) {
105 return OMPI_ERROR;
106 }
107 dsize = opal_datatype_span(&dtype->super, count, &gap);
108 tmpbuf = (char *) malloc(dsize);
109 if (NULL == tmpbuf) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto exit; }
110 pml_buffer = tmpbuf - gap;
111
112 if (rsize > 1) {
113 reqs = ompi_coll_base_comm_get_reqs(module->base_data, rsize - 1);
114 if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto exit; }
115 }
116
117 /* Do a send-recv between the two root procs. to avoid deadlock */
118 err = ompi_coll_base_sendrecv_actual(sbuf, count, dtype, 0,
119 MCA_COLL_BASE_TAG_ALLREDUCE,
120 rbuf, count, dtype, 0,
121 MCA_COLL_BASE_TAG_ALLREDUCE,
122 comm, MPI_STATUS_IGNORE);
123 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
124
125 /* Loop receiving and calling reduction function (C or Fortran). */
126 for (i = 1; i < rsize; i++) {
127 err = MCA_PML_CALL(recv(pml_buffer, count, dtype, i,
128 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
129 MPI_STATUS_IGNORE));
130 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
131
132 /* Perform the reduction */
133 ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
134 }
135 } else {
136 /* If not root, send data to the root. */
137 err = MCA_PML_CALL(send(sbuf, count, dtype, root,
138 MCA_COLL_BASE_TAG_ALLREDUCE,
139 MCA_PML_BASE_SEND_STANDARD, comm));
140 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
141 }
142
143
144 /* now we have on one process the result of the remote group. To distribute
145 * the data to all processes in the local group, we exchange the data between
146 * the two root processes. They then send it to every other process in the
147 * remote group. */
148 /***************************************************************************/
149 if (rank == root) {
150 /* sendrecv between the two roots */
151 err = ompi_coll_base_sendrecv_actual(rbuf, count, dtype, 0,
152 MCA_COLL_BASE_TAG_ALLREDUCE,
153 pml_buffer, count, dtype, 0,
154 MCA_COLL_BASE_TAG_ALLREDUCE,
155 comm, MPI_STATUS_IGNORE);
156 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
157
158 /* distribute the data to other processes in remote group.
159 * Note that we start from 1 (not from zero), since zero
160 * has already the correct data AND we avoid a potential
161 * deadlock here.
162 */
163 if (rsize > 1) {
164 for (i = 1; i < rsize; i++) {
165 err = MCA_PML_CALL(isend(pml_buffer, count, dtype, i,
166 MCA_COLL_BASE_TAG_ALLREDUCE,
167 MCA_PML_BASE_SEND_STANDARD, comm,
168 &reqs[i - 1]));
169 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
170 }
171
172 err =
173 ompi_request_wait_all(rsize - 1, reqs,
174 MPI_STATUSES_IGNORE);
175 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
176 }
177 } else {
178 err = MCA_PML_CALL(recv(rbuf, count, dtype, root,
179 MCA_COLL_BASE_TAG_ALLREDUCE,
180 comm, MPI_STATUS_IGNORE));
181 if (OMPI_SUCCESS != err) { line = __LINE__; goto exit; }
182 }
183
184 exit:
185 if( MPI_SUCCESS != err ) {
186 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"%s:%4d\tError occurred %d, rank %2d", __FILE__,
187 line, err, rank));
188 (void)line; // silence compiler warning
189 ompi_coll_base_free_reqs(reqs, rsize - 1);
190 }
191 if (NULL != tmpbuf) {
192 free(tmpbuf);
193 }
194
195 return err;
196 }