This source file includes following definitions.
- pmix_ptl_base_setup_fork
- pmix_ptl_base_set_notification_cbfunc
- pmix_ptl_base_get_available_modules
- pmix_ptl_base_assign_module
- pmix_ptl_base_connect_to_peer
- post_recv
- pmix_ptl_base_register_recv
- cancel_recv
- pmix_ptl_base_cancel_recv
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 #include <src/include/pmix_config.h>
  21 
  22 #include <stdio.h>
  23 #ifdef HAVE_UNISTD_H
  24 #include <unistd.h>
  25 #endif
  26 
  27 #include "src/util/argv.h"
  28 #include "src/util/error.h"
  29 #include "src/include/pmix_globals.h"
  30 
  31 #include "src/mca/ptl/base/base.h"
  32 
  33 pmix_status_t pmix_ptl_base_setup_fork(const pmix_proc_t *proc, char ***env)
  34 {
  35     pmix_ptl_base_active_t *active;
  36     pmix_status_t rc;
  37 
  38     if (!pmix_ptl_globals.initialized) {
  39         return PMIX_ERR_INIT;
  40     }
  41 
  42     PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
  43         if (NULL != active->component->setup_fork) {
  44             rc = active->component->setup_fork(proc, env);
  45             if (PMIX_SUCCESS != rc && PMIX_ERR_NOT_AVAILABLE != rc) {
  46                 return rc;
  47             }
  48         }
  49     }
  50     return PMIX_SUCCESS;
  51 }
  52 
  53 pmix_status_t pmix_ptl_base_set_notification_cbfunc(pmix_ptl_cbfunc_t cbfunc)
  54 {
  55     pmix_ptl_posted_recv_t *req;
  56 
  57     
  58 
  59     req = PMIX_NEW(pmix_ptl_posted_recv_t);
  60     if (NULL == req) {
  61         return PMIX_ERR_NOMEM;
  62     }
  63     req->tag = 0;
  64     req->cbfunc = cbfunc;
  65     pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
  66                         "posting notification recv on tag %d", req->tag);
  67     
  68 
  69 
  70     pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
  71     return PMIX_SUCCESS;
  72 }
  73 
  74 char* pmix_ptl_base_get_available_modules(void)
  75 {
  76     pmix_ptl_base_active_t *active;
  77     char **tmp=NULL, *reply=NULL;
  78 
  79     if (!pmix_ptl_globals.initialized) {
  80         return NULL;
  81     }
  82 
  83     PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
  84         pmix_argv_append_nosize(&tmp, active->component->base.pmix_mca_component_name);
  85     }
  86     if (NULL != tmp) {
  87         reply = pmix_argv_join(tmp, ',');
  88         pmix_argv_free(tmp);
  89     }
  90     return reply;
  91 }
  92 
  93 
  94 pmix_ptl_module_t* pmix_ptl_base_assign_module(void)
  95 {
  96     pmix_ptl_base_active_t *active;
  97 
  98     if (!pmix_ptl_globals.initialized) {
  99         return NULL;
 100     }
 101 
 102     active = (pmix_ptl_base_active_t*)pmix_list_get_first(&pmix_ptl_globals.actives);
 103     return active->module;
 104 }
 105 
 106 pmix_status_t pmix_ptl_base_connect_to_peer(struct pmix_peer_t *peer,
 107                                             pmix_info_t info[], size_t ninfo)
 108 {
 109     pmix_peer_t *pr = (pmix_peer_t*)peer;
 110     pmix_ptl_base_active_t *active;
 111 
 112     PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
 113         if (NULL != active->module->connect_to_peer) {
 114             if (PMIX_SUCCESS == active->module->connect_to_peer(peer, info, ninfo)) {
 115                 pr->nptr->compat.ptl = active->module;
 116                 return PMIX_SUCCESS;
 117             }
 118         }
 119     }
 120 
 121     return PMIX_ERR_UNREACH;
 122 }
 123 
 124 
 125 static void post_recv(int fd, short args, void *cbdata)
 126 {
 127     pmix_ptl_posted_recv_t *req = (pmix_ptl_posted_recv_t*)cbdata;
 128     pmix_ptl_recv_t *msg, *nmsg;
 129     pmix_buffer_t buf;
 130 
 131     pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 132                         "posting recv on tag %d", req->tag);
 133 
 134     
 135     pmix_list_append(&pmix_ptl_globals.posted_recvs, &req->super);
 136 
 137     
 138 
 139     PMIX_LIST_FOREACH_SAFE(msg, nmsg, &pmix_ptl_globals.unexpected_msgs, pmix_ptl_recv_t) {
 140         if (msg->hdr.tag == req->tag || UINT_MAX == req->tag) {
 141             if (NULL != req->cbfunc) {
 142                 
 143                 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
 144                 if (NULL != msg->data) {
 145                     buf.base_ptr = (char*)msg->data;
 146                     buf.bytes_allocated = buf.bytes_used = msg->hdr.nbytes;
 147                     buf.unpack_ptr = buf.base_ptr;
 148                     buf.pack_ptr = ((char*)buf.base_ptr) + buf.bytes_used;
 149                 }
 150                 msg->data = NULL;  
 151                 req->cbfunc(msg->peer, &msg->hdr, &buf, req->cbdata);
 152                 PMIX_DESTRUCT(&buf);  
 153             }
 154             pmix_list_remove_item(&pmix_ptl_globals.unexpected_msgs, &msg->super);
 155             PMIX_RELEASE(msg);
 156         }
 157     }
 158 }
 159 
 160 pmix_status_t pmix_ptl_base_register_recv(struct pmix_peer_t *peer,
 161                                           pmix_ptl_cbfunc_t cbfunc,
 162                                           pmix_ptl_tag_t tag)
 163 {
 164     pmix_ptl_posted_recv_t *req;
 165 
 166     req = PMIX_NEW(pmix_ptl_posted_recv_t);
 167     if (NULL == req) {
 168         return PMIX_ERR_NOMEM;
 169     }
 170     req->tag = tag;
 171     req->cbfunc = cbfunc;
 172     
 173 
 174     pmix_event_assign(&(req->ev), pmix_globals.evbase, -1,
 175                       EV_WRITE, post_recv, req);
 176     pmix_event_active(&(req->ev), EV_WRITE, 1);
 177     return PMIX_SUCCESS;
 178 }
 179 
 180 static void cancel_recv(int fd, short args, void *cbdata)
 181 {
 182     pmix_ptl_posted_recv_t *req = (pmix_ptl_posted_recv_t*)cbdata;
 183     pmix_ptl_posted_recv_t *rcv;
 184 
 185     PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
 186         if (rcv->tag == req->tag) {
 187             pmix_list_remove_item(&pmix_ptl_globals.posted_recvs, &rcv->super);
 188             PMIX_RELEASE(rcv);
 189             PMIX_RELEASE(req);
 190             return;
 191         }
 192     }
 193     PMIX_RELEASE(req);
 194 }
 195 
 196 pmix_status_t pmix_ptl_base_cancel_recv(struct pmix_peer_t *peer,
 197                                         pmix_ptl_tag_t tag)
 198 {
 199     pmix_ptl_posted_recv_t *req;
 200 
 201     req = PMIX_NEW(pmix_ptl_posted_recv_t);
 202     if (NULL == req) {
 203         return PMIX_ERR_NOMEM;
 204     }
 205     req->tag = tag;
 206     
 207 
 208     pmix_event_assign(&(req->ev), pmix_globals.evbase, -1,
 209                       EV_WRITE, cancel_recv, req);
 210     pmix_event_active(&(req->ev), EV_WRITE, 1);
 211     return PMIX_SUCCESS;
 212 }