This source file includes following definitions.
- pmix_ptl_base_setup_fork
- pmix_ptl_base_set_notification_cbfunc
- pmix_ptl_base_get_available_modules
- pmix_ptl_base_assign_module
- pmix_ptl_base_connect_to_peer
- post_recv
- pmix_ptl_base_register_recv
- cancel_recv
- pmix_ptl_base_cancel_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 #include <src/include/pmix_config.h>
21
22 #include <stdio.h>
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include "src/util/argv.h"
28 #include "src/util/error.h"
29 #include "src/include/pmix_globals.h"
30
31 #include "src/mca/ptl/base/base.h"
32
33 pmix_status_t pmix_ptl_base_setup_fork(const pmix_proc_t *proc, char ***env)
34 {
35 pmix_ptl_base_active_t *active;
36 pmix_status_t rc;
37
38 if (!pmix_ptl_globals.initialized) {
39 return PMIX_ERR_INIT;
40 }
41
42 PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
43 if (NULL != active->component->setup_fork) {
44 rc = active->component->setup_fork(proc, env);
45 if (PMIX_SUCCESS != rc && PMIX_ERR_NOT_AVAILABLE != rc) {
46 return rc;
47 }
48 }
49 }
50 return PMIX_SUCCESS;
51 }
52
53 pmix_status_t pmix_ptl_base_set_notification_cbfunc(pmix_ptl_cbfunc_t cbfunc)
54 {
55 pmix_ptl_posted_recv_t *req;
56
57
58
59 req = PMIX_NEW(pmix_ptl_posted_recv_t);
60 if (NULL == req) {
61 return PMIX_ERR_NOMEM;
62 }
63 req->tag = 0;
64 req->cbfunc = cbfunc;
65 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
66 "posting notification recv on tag %d", req->tag);
67
68
69
70 pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
71 return PMIX_SUCCESS;
72 }
73
74 char* pmix_ptl_base_get_available_modules(void)
75 {
76 pmix_ptl_base_active_t *active;
77 char **tmp=NULL, *reply=NULL;
78
79 if (!pmix_ptl_globals.initialized) {
80 return NULL;
81 }
82
83 PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
84 pmix_argv_append_nosize(&tmp, active->component->base.pmix_mca_component_name);
85 }
86 if (NULL != tmp) {
87 reply = pmix_argv_join(tmp, ',');
88 pmix_argv_free(tmp);
89 }
90 return reply;
91 }
92
93
94 pmix_ptl_module_t* pmix_ptl_base_assign_module(void)
95 {
96 pmix_ptl_base_active_t *active;
97
98 if (!pmix_ptl_globals.initialized) {
99 return NULL;
100 }
101
102 active = (pmix_ptl_base_active_t*)pmix_list_get_first(&pmix_ptl_globals.actives);
103 return active->module;
104 }
105
106 pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *peer,
107 pmix_info_t info[], size_t ninfo)
108 {
109 pmix_peer_t *pr = (pmix_peer_t*)peer;
110 pmix_ptl_base_active_t *active;
111
112 PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
113 if (NULL != active->module->connect_to_peer) {
114 if (PMIX_SUCCESS == active->module->connect_to_peer(peer, info, ninfo)) {
115 pr->nptr->compat.ptl = active->module;
116 return PMIX_SUCCESS;
117 }
118 }
119 }
120
121 return PMIX_ERR_UNREACH;
122 }
123
124
125 static void post_recv(int fd, short args, void *cbdata)
126 {
127 pmix_ptl_posted_recv_t *req = (pmix_ptl_posted_recv_t*)cbdata;
128 pmix_ptl_recv_t *msg, *nmsg;
129 pmix_buffer_t buf;
130
131 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
132 "posting recv on tag %d", req->tag);
133
134
135 pmix_list_append(&pmix_ptl_globals.posted_recvs, &req->super);
136
137
138
139 PMIX_LIST_FOREACH_SAFE(msg, nmsg, &pmix_ptl_globals.unexpected_msgs, pmix_ptl_recv_t) {
140 if (msg->hdr.tag == req->tag || UINT_MAX == req->tag) {
141 if (NULL != req->cbfunc) {
142
143 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
144 if (NULL != msg->data) {
145 buf.base_ptr = (char*)msg->data;
146 buf.bytes_allocated = buf.bytes_used = msg->hdr.nbytes;
147 buf.unpack_ptr = buf.base_ptr;
148 buf.pack_ptr = ((char*)buf.base_ptr) + buf.bytes_used;
149 }
150 msg->data = NULL;
151 req->cbfunc(msg->peer, &msg->hdr, &buf, req->cbdata);
152 PMIX_DESTRUCT(&buf);
153 }
154 pmix_list_remove_item(&pmix_ptl_globals.unexpected_msgs, &msg->super);
155 PMIX_RELEASE(msg);
156 }
157 }
158 }
159
160 pmix_status_t pmix_ptl_base_register_recv(struct pmix_peer_t *peer,
161 pmix_ptl_cbfunc_t cbfunc,
162 pmix_ptl_tag_t tag)
163 {
164 pmix_ptl_posted_recv_t *req;
165
166 req = PMIX_NEW(pmix_ptl_posted_recv_t);
167 if (NULL == req) {
168 return PMIX_ERR_NOMEM;
169 }
170 req->tag = tag;
171 req->cbfunc = cbfunc;
172
173
174 pmix_event_assign(&(req->ev), pmix_globals.evbase, -1,
175 EV_WRITE, post_recv, req);
176 pmix_event_active(&(req->ev), EV_WRITE, 1);
177 return PMIX_SUCCESS;
178 }
179
180 static void cancel_recv(int fd, short args, void *cbdata)
181 {
182 pmix_ptl_posted_recv_t *req = (pmix_ptl_posted_recv_t*)cbdata;
183 pmix_ptl_posted_recv_t *rcv;
184
185 PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
186 if (rcv->tag == req->tag) {
187 pmix_list_remove_item(&pmix_ptl_globals.posted_recvs, &rcv->super);
188 PMIX_RELEASE(rcv);
189 PMIX_RELEASE(req);
190 return;
191 }
192 }
193 PMIX_RELEASE(req);
194 }
195
196 pmix_status_t pmix_ptl_base_cancel_recv(struct pmix_peer_t *peer,
197 pmix_ptl_tag_t tag)
198 {
199 pmix_ptl_posted_recv_t *req;
200
201 req = PMIX_NEW(pmix_ptl_posted_recv_t);
202 if (NULL == req) {
203 return PMIX_ERR_NOMEM;
204 }
205 req->tag = tag;
206
207
208 pmix_event_assign(&(req->ev), pmix_globals.evbase, -1,
209 EV_WRITE, cancel_recv, req);
210 pmix_event_active(&(req->ev), EV_WRITE, 1);
211 return PMIX_SUCCESS;
212 }