This source file includes following definitions.
- component_register
- component_open
- component_close
- component_query
- setup_fork
- setup_listener
- split_and_resolve
- connection_handler
- process_cbfunc
- cnct_cbfunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 #include <src/include/pmix_config.h>
33 #include <pmix_common.h>
34
35 #ifdef HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef HAVE_SYS_TYPES_H
39 #include <sys/types.h>
40 #endif
41 #ifdef HAVE_SYS_STAT_H
42 #include <sys/stat.h>
43 #endif
44 #include <fcntl.h>
45 #ifdef HAVE_NETINET_IN_H
46 #include <netinet/in.h>
47 #endif
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
50 #endif
51 #ifdef HAVE_NETDB_H
52 #include <netdb.h>
53 #endif
54 #include <ctype.h>
55
56 #include "src/include/pmix_socket_errno.h"
57 #include "src/util/argv.h"
58 #include "src/util/error.h"
59 #include "src/util/fd.h"
60 #include "src/util/net.h"
61 #include "src/util/os_path.h"
62 #include "src/util/parse_options.h"
63 #include "src/util/pif.h"
64 #include "src/util/pmix_environ.h"
65 #include "src/util/show_help.h"
66 #include "src/util/strnlen.h"
67 #include "src/common/pmix_iof.h"
68 #include "src/server/pmix_server_ops.h"
69 #include "src/mca/bfrops/base/base.h"
70 #include "src/mca/gds/base/base.h"
71 #include "src/mca/psec/base/base.h"
72
73 #include "src/mca/ptl/base/base.h"
74 #include "src/mca/ptl/tcp/ptl_tcp.h"
75
76 static pmix_status_t component_open(void);
77 static pmix_status_t component_close(void);
78 static int component_register(void);
79 static int component_query(pmix_mca_base_module_t **module, int *priority);
80 static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
81 bool *need_listener);
82 static pmix_status_t setup_fork(const pmix_proc_t *proc, char ***env);
83
84
85
86
87 PMIX_EXPORT pmix_ptl_tcp_component_t mca_ptl_tcp_component = {
88 .super = {
89 .base = {
90 PMIX_PTL_BASE_VERSION_1_0_0,
91
92
93 .pmix_mca_component_name = "tcp",
94 PMIX_MCA_BASE_MAKE_VERSION(component,
95 PMIX_MAJOR_VERSION,
96 PMIX_MINOR_VERSION,
97 PMIX_RELEASE_VERSION),
98
99
100 .pmix_mca_open_component = component_open,
101 .pmix_mca_close_component = component_close,
102 .pmix_mca_register_component_params = component_register,
103 .pmix_mca_query_component = component_query
104 },
105 .priority = 30,
106 .uri = NULL,
107 .setup_listener = setup_listener,
108 .setup_fork = setup_fork
109 },
110 .session_tmpdir = NULL,
111 .system_tmpdir = NULL,
112 .if_include = NULL,
113 .if_exclude = NULL,
114 .ipv4_port = 0,
115 .ipv6_port = 0,
116 .disable_ipv4_family = false,
117 .disable_ipv6_family = true,
118 .session_filename = NULL,
119 .nspace_filename = NULL,
120 .system_filename = NULL,
121 .rendezvous_filename = NULL,
122 .wait_to_connect = 4,
123 .max_retries = 2,
124 .report_uri = NULL,
125 .remote_connections = false,
126 .handshake_wait_time = 4,
127 .handshake_max_retries = 2
128 };
129
130 static char **split_and_resolve(char **orig_str, char *name);
131 static void connection_handler(int sd, short args, void *cbdata);
132 static void cnct_cbfunc(pmix_status_t status,
133 pmix_proc_t *proc, void *cbdata);
134
135 static int component_register(void)
136 {
137 pmix_mca_base_component_t *component = &mca_ptl_tcp_component.super.base;
138
139 (void)pmix_mca_base_component_var_register(component, "server_uri",
140 "URI of a server a tool wishes to connect to - either the "
141 "URI itself, or file:path-to-file-containing-uri",
142 PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
143 PMIX_INFO_LVL_2,
144 PMIX_MCA_BASE_VAR_SCOPE_LOCAL,
145 &mca_ptl_tcp_component.super.uri);
146
147 (void)pmix_mca_base_component_var_register(component, "report_uri",
148 "Output URI [- => stdout, + => stderr, or filename]",
149 PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
150 PMIX_INFO_LVL_2,
151 PMIX_MCA_BASE_VAR_SCOPE_LOCAL,
152 &mca_ptl_tcp_component.report_uri);
153
154 (void)pmix_mca_base_component_var_register(component, "remote_connections",
155 "Enable connections from remote tools",
156 PMIX_MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
157 PMIX_INFO_LVL_2,
158 PMIX_MCA_BASE_VAR_SCOPE_LOCAL,
159 &mca_ptl_tcp_component.remote_connections);
160
161 (void)pmix_mca_base_component_var_register(component, "if_include",
162 "Comma-delimited list of devices and/or CIDR notation of TCP networks (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with ptl_tcp_if_exclude.",
163 PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
164 PMIX_INFO_LVL_2,
165 PMIX_MCA_BASE_VAR_SCOPE_LOCAL,
166 &mca_ptl_tcp_component.if_include);
167
168 (void)pmix_mca_base_component_var_register(component, "if_exclude",
169 "Comma-delimited list of devices and/or CIDR notation of TCP networks to NOT use -- all devices not matching these specifications will be used (e.g., \"eth0,192.168.0.0/16\"). If set to a non-default value, it is mutually exclusive with ptl_tcp_if_include.",
170 PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
171 PMIX_INFO_LVL_2,
172 PMIX_MCA_BASE_VAR_SCOPE_LOCAL,
173 &mca_ptl_tcp_component.if_exclude);
174
175
176 if (NULL != mca_ptl_tcp_component.if_include &&
177 NULL != mca_ptl_tcp_component.if_exclude) {
178
179
180 pmix_show_help("help-ptl-tcp.txt", "include-exclude", true,
181 mca_ptl_tcp_component.if_include,
182 mca_ptl_tcp_component.if_exclude);
183 return PMIX_ERR_NOT_AVAILABLE;
184 }
185
186 (void)pmix_mca_base_component_var_register(component, "ipv4_port",
187 "IPv4 port to be used",
188 PMIX_MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
189 PMIX_INFO_LVL_4,
190 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
191 &mca_ptl_tcp_component.ipv4_port);
192
193 (void)pmix_mca_base_component_var_register(component, "ipv6_port",
194 "IPv6 port to be used",
195 PMIX_MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
196 PMIX_INFO_LVL_4,
197 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
198 &mca_ptl_tcp_component.ipv6_port);
199
200 (void)pmix_mca_base_component_var_register(component, "disable_ipv4_family",
201 "Disable the IPv4 interfaces",
202 PMIX_MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
203 PMIX_INFO_LVL_4,
204 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
205 &mca_ptl_tcp_component.disable_ipv4_family);
206
207 (void)pmix_mca_base_component_var_register(component, "disable_ipv6_family",
208 "Disable the IPv6 interfaces",
209 PMIX_MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
210 PMIX_INFO_LVL_4,
211 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
212 &mca_ptl_tcp_component.disable_ipv6_family);
213
214 (void)pmix_mca_base_component_var_register(component, "connection_wait_time",
215 "Number of seconds to wait for the server connection file to appear",
216 PMIX_MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
217 PMIX_INFO_LVL_4,
218 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
219 &mca_ptl_tcp_component.wait_to_connect);
220
221 (void)pmix_mca_base_component_var_register(component, "max_retries",
222 "Number of times to look for the connection file before quitting",
223 PMIX_MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
224 PMIX_INFO_LVL_4,
225 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
226 &mca_ptl_tcp_component.max_retries);
227
228 (void)pmix_mca_base_component_var_register(component, "handshake_wait_time",
229 "Number of seconds to wait for the server reply to the handshake request",
230 PMIX_MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
231 PMIX_INFO_LVL_4,
232 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
233 &mca_ptl_tcp_component.handshake_wait_time);
234
235 (void)pmix_mca_base_component_var_register(component, "handshake_max_retries",
236 "Number of times to retry the handshake request before giving up",
237 PMIX_MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
238 PMIX_INFO_LVL_4,
239 PMIX_MCA_BASE_VAR_SCOPE_READONLY,
240 &mca_ptl_tcp_component.handshake_max_retries);
241
242 return PMIX_SUCCESS;
243 }
244
245 static char *urifile = NULL;
246
247 static pmix_status_t component_open(void)
248 {
249 char *tdir;
250
251 memset(&mca_ptl_tcp_component.connection, 0, sizeof(mca_ptl_tcp_component.connection));
252
253
254
255 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) ||
256 PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
257 mca_ptl_tcp_component.session_tmpdir = strdup(pmix_server_globals.tmpdir);
258 } else {
259 if (NULL != (tdir = getenv("PMIX_SERVER_TMPDIR"))) {
260 mca_ptl_tcp_component.session_tmpdir = strdup(tdir);
261 } else {
262 mca_ptl_tcp_component.session_tmpdir = strdup(pmix_tmp_directory());
263 }
264 }
265
266 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) ||
267 PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
268 mca_ptl_tcp_component.system_tmpdir = strdup(pmix_server_globals.system_tmpdir);
269 } else {
270 if (NULL != (tdir = getenv("PMIX_SYSTEM_TMPDIR"))) {
271 mca_ptl_tcp_component.system_tmpdir = strdup(tdir);
272 } else {
273 mca_ptl_tcp_component.system_tmpdir = strdup(pmix_tmp_directory());
274 }
275 }
276
277 if (NULL != mca_ptl_tcp_component.report_uri &&
278 0 != strcmp(mca_ptl_tcp_component.report_uri, "-") &&
279 0 != strcmp(mca_ptl_tcp_component.report_uri, "+")) {
280 urifile = strdup(mca_ptl_tcp_component.report_uri);
281 }
282 return PMIX_SUCCESS;
283 }
284
285
286 pmix_status_t component_close(void)
287 {
288 if (NULL != mca_ptl_tcp_component.system_filename) {
289 unlink(mca_ptl_tcp_component.system_filename);
290 free(mca_ptl_tcp_component.system_filename);
291 }
292 if (NULL != mca_ptl_tcp_component.session_filename) {
293 unlink(mca_ptl_tcp_component.session_filename);
294 free(mca_ptl_tcp_component.session_filename);
295 }
296 if (NULL != mca_ptl_tcp_component.nspace_filename) {
297 unlink(mca_ptl_tcp_component.nspace_filename);
298 free(mca_ptl_tcp_component.nspace_filename);
299 }
300 if (NULL != mca_ptl_tcp_component.rendezvous_filename) {
301 unlink(mca_ptl_tcp_component.rendezvous_filename);
302 free(mca_ptl_tcp_component.rendezvous_filename);
303 }
304 if (NULL != urifile) {
305
306 unlink(urifile);
307 free(urifile);
308 urifile = NULL;
309 }
310 if (NULL != mca_ptl_tcp_component.session_tmpdir) {
311 free(mca_ptl_tcp_component.session_tmpdir);
312 }
313 if (NULL != mca_ptl_tcp_component.system_tmpdir) {
314 free(mca_ptl_tcp_component.system_tmpdir);
315 }
316 return PMIX_SUCCESS;
317 }
318
319 static int component_query(pmix_mca_base_module_t **module, int *priority)
320 {
321 *module = (pmix_mca_base_module_t*)&pmix_ptl_tcp_module;
322 return PMIX_SUCCESS;
323 }
324
325 static pmix_status_t setup_fork(const pmix_proc_t *proc, char ***env)
326 {
327 pmix_setenv("PMIX_SERVER_TMPDIR", mca_ptl_tcp_component.session_tmpdir, true, env);
328 pmix_setenv("PMIX_SYSTEM_TMPDIR", mca_ptl_tcp_component.system_tmpdir, true, env);
329
330 return PMIX_SUCCESS;
331 }
332
333
334
335
336
337
338
339
340
341
342 static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
343 bool *need_listener)
344 {
345 int flags = 0;
346 pmix_listener_t *lt;
347 int i, rc, saveindex = -1;
348 char **interfaces = NULL;
349 bool including = false;
350 char name[32];
351 struct sockaddr_storage my_ss;
352 int kindex;
353 size_t n;
354 bool session_tool = false;
355 bool system_tool = false;
356 pmix_socklen_t addrlen;
357 char *prefix, myhost[PMIX_MAXHOSTNAMELEN];
358 char myconnhost[PMIX_MAXHOSTNAMELEN];
359 int myport;
360 pmix_kval_t *urikv;
361
362 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
363 "ptl:tcp setup_listener");
364
365
366 if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
367 return PMIX_ERR_NOT_SUPPORTED;
368 }
369
370
371 if (NULL != info) {
372 for (n=0; n < ninfo; n++) {
373 if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_IF_INCLUDE)) {
374 if (NULL != mca_ptl_tcp_component.if_include) {
375 free(mca_ptl_tcp_component.if_include);
376 }
377 mca_ptl_tcp_component.if_include = strdup(info[n].value.data.string);
378 } else if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_IF_EXCLUDE)) {
379 if (NULL != mca_ptl_tcp_component.if_exclude) {
380 free(mca_ptl_tcp_component.if_exclude);
381 }
382 mca_ptl_tcp_component.if_exclude = strdup(info[n].value.data.string);
383 } else if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_IPV4_PORT)) {
384 mca_ptl_tcp_component.ipv4_port = info[n].value.data.integer;
385 } else if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_IPV6_PORT)) {
386 mca_ptl_tcp_component.ipv6_port = info[n].value.data.integer;
387 } else if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_DISABLE_IPV4)) {
388 mca_ptl_tcp_component.disable_ipv4_family = PMIX_INFO_TRUE(&info[n]);
389 } else if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_DISABLE_IPV6)) {
390 mca_ptl_tcp_component.disable_ipv6_family = PMIX_INFO_TRUE(&info[n]);
391 } else if (PMIX_CHECK_KEY(&info[n], PMIX_SERVER_REMOTE_CONNECTIONS)) {
392 mca_ptl_tcp_component.remote_connections = PMIX_INFO_TRUE(&info[n]);
393 } else if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_URI)) {
394 if (NULL != mca_ptl_tcp_component.super.uri) {
395 free(mca_ptl_tcp_component.super.uri);
396 }
397 mca_ptl_tcp_component.super.uri = strdup(info[n].value.data.string);
398 } else if (PMIX_CHECK_KEY(&info[n], PMIX_TCP_REPORT_URI)) {
399 if (NULL != mca_ptl_tcp_component.report_uri) {
400 free(mca_ptl_tcp_component.report_uri);
401 }
402 mca_ptl_tcp_component.report_uri = strdup(info[n].value.data.string);
403 } else if (PMIX_CHECK_KEY(&info[n], PMIX_SERVER_TMPDIR)) {
404 if (NULL != mca_ptl_tcp_component.session_tmpdir) {
405 free(mca_ptl_tcp_component.session_tmpdir);
406 }
407 mca_ptl_tcp_component.session_tmpdir = strdup(info[n].value.data.string);
408 } else if (PMIX_CHECK_KEY(&info[n], PMIX_SYSTEM_TMPDIR)) {
409 if (NULL != mca_ptl_tcp_component.system_tmpdir) {
410 free(mca_ptl_tcp_component.system_tmpdir);
411 }
412 mca_ptl_tcp_component.system_tmpdir = strdup(info[n].value.data.string);
413 } else if (0 == strcmp(info[n].key, PMIX_SERVER_TOOL_SUPPORT)) {
414 session_tool = PMIX_INFO_TRUE(&info[n]);
415 } else if (PMIX_CHECK_KEY(&info[n], PMIX_SERVER_SYSTEM_SUPPORT)) {
416 system_tool = PMIX_INFO_TRUE(&info[n]);
417 } else if (PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer) &&
418 PMIX_CHECK_KEY(&info[n], PMIX_LAUNCHER_RENDEZVOUS_FILE)) {
419 mca_ptl_tcp_component.rendezvous_filename = strdup(info[n].value.data.string);
420 }
421 }
422 }
423
424
425
426
427
428
429 if (NULL != mca_ptl_tcp_component.if_include) {
430 interfaces = split_and_resolve(&mca_ptl_tcp_component.if_include,
431 "include");
432 including = true;
433 } else if (NULL != mca_ptl_tcp_component.if_exclude) {
434 interfaces = split_and_resolve(&mca_ptl_tcp_component.if_exclude,
435 "exclude");
436 including = false;
437 }
438
439
440
441
442 for (i = pmix_ifbegin(); i >= 0; i = pmix_ifnext(i)) {
443 if (PMIX_SUCCESS != pmix_ifindextoaddr(i, (struct sockaddr*)&my_ss, sizeof(my_ss))) {
444 pmix_output (0, "ptl_tcp: problems getting address for index %i (kernel index %i)\n",
445 i, pmix_ifindextokindex(i));
446 continue;
447 }
448
449 if (AF_INET != my_ss.ss_family &&
450 AF_INET6 != my_ss.ss_family) {
451 continue;
452 }
453
454 pmix_ifindextoname(i, name, sizeof(name));
455
456
457 if (0 == strncmp(name, "vir", 3)) {
458 continue;
459 }
460
461 if (AF_INET == my_ss.ss_family &&
462 mca_ptl_tcp_component.disable_ipv4_family) {
463 continue;
464 } else if (AF_INET6 == my_ss.ss_family &&
465 mca_ptl_tcp_component.disable_ipv6_family) {
466 continue;
467 }
468
469 kindex = pmix_ifindextokindex(i);
470 if (kindex <= 0) {
471 continue;
472 }
473 pmix_output_verbose(10, pmix_ptl_base_framework.framework_output,
474 "WORKING INTERFACE %d KERNEL INDEX %d FAMILY: %s", i, kindex,
475 (AF_INET == my_ss.ss_family) ? "V4" : "V6");
476
477 if (NULL != interfaces) {
478
479 rc = pmix_ifmatches(kindex, interfaces);
480
481
482
483 if (PMIX_ERR_NETWORK_NOT_PARSEABLE == rc) {
484 pmix_show_help("help-ptl-tcp.txt", "not-parseable", true);
485 pmix_argv_free(interfaces);
486 return PMIX_ERR_BAD_PARAM;
487 }
488
489 if (including) {
490 if (PMIX_SUCCESS != rc) {
491 pmix_output_verbose(10, pmix_ptl_base_framework.framework_output,
492 "ptl:tcp:init rejecting interface %s (not in include list)", name);
493 continue;
494 }
495 } else {
496
497 if (PMIX_SUCCESS == rc) {
498 pmix_output_verbose(10, pmix_ptl_base_framework.framework_output,
499 "ptl:tcp:init rejecting interface %s (in exclude list)", name);
500 continue;
501 }
502 }
503 }
504
505
506
507 if (pmix_ifisloopback(i)) {
508 if (mca_ptl_tcp_component.remote_connections) {
509
510 continue;
511 } else {
512 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
513 "ptl:tcp:init loopback interface %s selected", name);
514 saveindex = i;
515 break;
516 }
517 } else {
518
519
520 if (saveindex < 0) {
521 saveindex = i;
522 }
523 }
524 }
525
526 if (NULL != interfaces) {
527 pmix_argv_free(interfaces);
528 }
529
530
531 if (saveindex < 0) {
532 return PMIX_ERR_NOT_AVAILABLE;
533 }
534
535
536 if (PMIX_SUCCESS != pmix_ifindextoaddr(saveindex,
537 (struct sockaddr*)&mca_ptl_tcp_component.connection,
538 sizeof(struct sockaddr))) {
539 pmix_output (0, "ptl:tcp: problems getting address for kernel index %i\n",
540 pmix_ifindextokindex(saveindex));
541 return PMIX_ERR_NOT_AVAILABLE;
542 }
543
544
545 if (AF_INET == mca_ptl_tcp_component.connection.ss_family) {
546 ((struct sockaddr_in*) &mca_ptl_tcp_component.connection)->sin_port = htons(mca_ptl_tcp_component.ipv4_port);
547 if (0 != mca_ptl_tcp_component.ipv4_port) {
548 flags = 1;
549 }
550 } else if (AF_INET6 == mca_ptl_tcp_component.connection.ss_family) {
551 ((struct sockaddr_in6*) &mca_ptl_tcp_component.connection)->sin6_port = htons(mca_ptl_tcp_component.ipv6_port);
552 if (0 != mca_ptl_tcp_component.ipv6_port) {
553 flags = 1;
554 }
555 }
556
557 lt = PMIX_NEW(pmix_listener_t);
558 lt->varname = strdup("PMIX_SERVER_URI4:PMIX_SERVER_URI3:PMIX_SERVER_URI2:PMIX_SERVER_URI21");
559 lt->protocol = PMIX_PROTOCOL_V2;
560 lt->ptl = (struct pmix_ptl_module_t*)&pmix_ptl_tcp_module;
561 lt->cbfunc = connection_handler;
562
563 addrlen = sizeof(struct sockaddr_storage);
564
565 lt->socket = socket(mca_ptl_tcp_component.connection.ss_family, SOCK_STREAM, 0);
566 if (lt->socket < 0) {
567 printf("%s:%d socket() failed\n", __FILE__, __LINE__);
568 goto sockerror;
569 }
570
571
572 if (setsockopt (lt->socket, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
573 pmix_output(0, "ptl:tcp:create_listen: unable to set the "
574 "SO_REUSEADDR option (%s:%d)\n",
575 strerror(pmix_socket_errno), pmix_socket_errno);
576 CLOSE_THE_SOCKET(lt->socket);
577 goto sockerror;
578 }
579
580
581
582 if (pmix_fd_set_cloexec(lt->socket) != PMIX_SUCCESS) {
583 CLOSE_THE_SOCKET(lt->socket);
584 goto sockerror;
585 }
586
587 if (bind(lt->socket, (struct sockaddr*)&mca_ptl_tcp_component.connection, sizeof(struct sockaddr)) < 0) {
588 printf("%s:%d bind() failed: %s\n", __FILE__, __LINE__, strerror(errno));
589 CLOSE_THE_SOCKET(lt->socket);
590 goto sockerror;
591 }
592
593
594 if (getsockname(lt->socket, (struct sockaddr*)&mca_ptl_tcp_component.connection, &addrlen) < 0) {
595 pmix_output(0, "ptl:tcp:create_listen: getsockname(): %s (%d)",
596 strerror(pmix_socket_errno), pmix_socket_errno);
597 CLOSE_THE_SOCKET(lt->socket);
598 goto sockerror;
599 }
600
601
602 if (listen(lt->socket, SOMAXCONN) < 0) {
603 printf("%s:%d listen() failed\n", __FILE__, __LINE__);
604 CLOSE_THE_SOCKET(lt->socket);
605 goto sockerror;
606 }
607
608
609 if ((flags = fcntl(lt->socket, F_GETFL, 0)) < 0) {
610 printf("%s:%d fcntl(F_GETFL) failed\n", __FILE__, __LINE__);
611 CLOSE_THE_SOCKET(lt->socket);
612 goto sockerror;
613 }
614 flags |= O_NONBLOCK;
615 if (fcntl(lt->socket, F_SETFL, flags) < 0) {
616 printf("%s:%d fcntl(F_SETFL) failed\n", __FILE__, __LINE__);
617 CLOSE_THE_SOCKET(lt->socket);
618 goto sockerror;
619 }
620
621 gethostname(myhost, sizeof(myhost));
622 if (AF_INET == mca_ptl_tcp_component.connection.ss_family) {
623 prefix = "tcp4://";
624 myport = ntohs(((struct sockaddr_in*) &mca_ptl_tcp_component.connection)->sin_port);
625 inet_ntop(AF_INET, &((struct sockaddr_in*) &mca_ptl_tcp_component.connection)->sin_addr,
626 myconnhost, PMIX_MAXHOSTNAMELEN);
627 } else if (AF_INET6 == mca_ptl_tcp_component.connection.ss_family) {
628 prefix = "tcp6://";
629 myport = ntohs(((struct sockaddr_in6*) &mca_ptl_tcp_component.connection)->sin6_port);
630 inet_ntop(AF_INET6, &((struct sockaddr_in6*) &mca_ptl_tcp_component.connection)->sin6_addr,
631 myconnhost, PMIX_MAXHOSTNAMELEN);
632 } else {
633 goto sockerror;
634 }
635
636 rc = asprintf(<->uri, "%s.%d;%s%s:%d", pmix_globals.myid.nspace, pmix_globals.myid.rank, prefix, myconnhost, myport);
637 if (0 > rc || NULL == lt->uri) {
638 CLOSE_THE_SOCKET(lt->socket);
639 goto sockerror;
640 }
641 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
642 "ptl:tcp URI %s", lt->uri);
643
644
645 urikv = PMIX_NEW(pmix_kval_t);
646 urikv->key = strdup(PMIX_SERVER_URI);
647 PMIX_VALUE_CREATE(urikv->value, 1);
648 PMIX_VALUE_LOAD(urikv->value, lt->uri, PMIX_STRING);
649 PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer,
650 &pmix_globals.myid, PMIX_INTERNAL,
651 urikv);
652 PMIX_RELEASE(urikv);
653
654 if (NULL != mca_ptl_tcp_component.report_uri) {
655
656 if (0 == strcmp(mca_ptl_tcp_component.report_uri, "-")) {
657 fprintf(stdout, "%s\n", lt->uri);
658 } else if (0 == strcmp(mca_ptl_tcp_component.report_uri, "+")) {
659
660 fprintf(stderr, "%s\n", lt->uri);
661 } else {
662
663 FILE *fp;
664 fp = fopen(mca_ptl_tcp_component.report_uri, "w");
665 if (NULL == fp) {
666 pmix_output(0, "Impossible to open the file %s in write mode\n", mca_ptl_tcp_component.report_uri);
667 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
668 CLOSE_THE_SOCKET(lt->socket);
669 free(mca_ptl_tcp_component.system_filename);
670 mca_ptl_tcp_component.system_filename = NULL;
671 goto sockerror;
672 }
673
674 fprintf(fp, "%s\n", lt->uri);
675
676 fprintf(fp, "v%s\n", PMIX_VERSION);
677 fclose(fp);
678 }
679 }
680
681
682 if (NULL != mca_ptl_tcp_component.rendezvous_filename) {
683 FILE *fp;
684
685 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
686 "WRITING RENDEZVOUS FILE %s",
687 mca_ptl_tcp_component.rendezvous_filename);
688 fp = fopen(mca_ptl_tcp_component.rendezvous_filename, "w");
689 if (NULL == fp) {
690 pmix_output(0, "Impossible to open the file %s in write mode\n", mca_ptl_tcp_component.rendezvous_filename);
691 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
692 CLOSE_THE_SOCKET(lt->socket);
693 free(mca_ptl_tcp_component.rendezvous_filename);
694 mca_ptl_tcp_component.rendezvous_filename = NULL;
695 goto sockerror;
696 }
697
698
699 fprintf(fp, "%s\n", lt->uri);
700
701 fprintf(fp, "v%s\n", PMIX_VERSION);
702 fclose(fp);
703
704 if (0 != chmod(mca_ptl_tcp_component.rendezvous_filename, S_IRUSR | S_IWUSR | S_IRGRP)) {
705 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
706 CLOSE_THE_SOCKET(lt->socket);
707 free(mca_ptl_tcp_component.rendezvous_filename);
708 mca_ptl_tcp_component.rendezvous_filename = NULL;
709 goto sockerror;
710 }
711 }
712
713
714 if (system_tool) {
715 FILE *fp;
716
717 if (0 > asprintf(&mca_ptl_tcp_component.system_filename, "%s/pmix.sys.%s",
718 mca_ptl_tcp_component.system_tmpdir, myhost)) {
719 CLOSE_THE_SOCKET(lt->socket);
720 goto sockerror;
721 }
722 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
723 "WRITING SYSTEM FILE %s",
724 mca_ptl_tcp_component.system_filename);
725 fp = fopen(mca_ptl_tcp_component.system_filename, "w");
726 if (NULL == fp) {
727 pmix_output(0, "Impossible to open the file %s in write mode\n", mca_ptl_tcp_component.system_filename);
728 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
729 CLOSE_THE_SOCKET(lt->socket);
730 free(mca_ptl_tcp_component.system_filename);
731 mca_ptl_tcp_component.system_filename = NULL;
732 goto sockerror;
733 }
734
735
736 fprintf(fp, "%s\n", lt->uri);
737
738 fprintf(fp, "v%s\n", PMIX_VERSION);
739 fclose(fp);
740
741 if (0 != chmod(mca_ptl_tcp_component.system_filename, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) {
742 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
743 CLOSE_THE_SOCKET(lt->socket);
744 free(mca_ptl_tcp_component.system_filename);
745 mca_ptl_tcp_component.system_filename = NULL;
746 goto sockerror;
747 }
748 }
749 if (session_tool) {
750 FILE *fp;
751 pid_t mypid;
752
753
754 mypid = getpid();
755 if (0 > asprintf(&mca_ptl_tcp_component.session_filename, "%s/pmix.%s.tool.%d",
756 mca_ptl_tcp_component.session_tmpdir, myhost, mypid)) {
757 CLOSE_THE_SOCKET(lt->socket);
758 goto sockerror;
759 }
760 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
761 "WRITING TOOL FILE %s",
762 mca_ptl_tcp_component.session_filename);
763 fp = fopen(mca_ptl_tcp_component.session_filename, "w");
764 if (NULL == fp) {
765 pmix_output(0, "Impossible to open the file %s in write mode\n", mca_ptl_tcp_component.session_filename);
766 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
767 CLOSE_THE_SOCKET(lt->socket);
768 free(mca_ptl_tcp_component.session_filename);
769 mca_ptl_tcp_component.session_filename = NULL;
770 goto sockerror;
771 }
772
773
774 fprintf(fp, "%s\n", lt->uri);
775
776 fprintf(fp, "%s\n", PMIX_VERSION);
777 fclose(fp);
778
779 if (0 != chmod(mca_ptl_tcp_component.session_filename, S_IRUSR | S_IWUSR | S_IRGRP)) {
780 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
781 CLOSE_THE_SOCKET(lt->socket);
782 free(mca_ptl_tcp_component.session_filename);
783 mca_ptl_tcp_component.session_filename = NULL;
784 goto sockerror;
785 }
786
787
788
789 if (0 > asprintf(&mca_ptl_tcp_component.nspace_filename, "%s/pmix.%s.tool.%s",
790 mca_ptl_tcp_component.session_tmpdir, myhost, pmix_globals.myid.nspace)) {
791 CLOSE_THE_SOCKET(lt->socket);
792 goto sockerror;
793 }
794 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
795 "WRITING TOOL FILE %s",
796 mca_ptl_tcp_component.nspace_filename);
797 fp = fopen(mca_ptl_tcp_component.nspace_filename, "w");
798 if (NULL == fp) {
799 pmix_output(0, "Impossible to open the file %s in write mode\n", mca_ptl_tcp_component.nspace_filename);
800 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
801 CLOSE_THE_SOCKET(lt->socket);
802 free(mca_ptl_tcp_component.nspace_filename);
803 mca_ptl_tcp_component.nspace_filename = NULL;
804 goto sockerror;
805 }
806
807
808 fprintf(fp, "%s\n", lt->uri);
809
810 fprintf(fp, "%s\n", PMIX_VERSION);
811 fclose(fp);
812
813 if (0 != chmod(mca_ptl_tcp_component.nspace_filename, S_IRUSR | S_IWUSR | S_IRGRP)) {
814 PMIX_ERROR_LOG(PMIX_ERR_FILE_OPEN_FAILURE);
815 CLOSE_THE_SOCKET(lt->socket);
816 free(mca_ptl_tcp_component.nspace_filename);
817 mca_ptl_tcp_component.nspace_filename = NULL;
818 goto sockerror;
819 }
820 }
821
822 if (PMIX_PROC_IS_TOOL(pmix_globals.mypeer) && pmix_globals.connected) {
823 char **clnup = NULL, *cptr = NULL;
824 pmix_info_t dir;
825 if (NULL != mca_ptl_tcp_component.nspace_filename) {
826 pmix_argv_append_nosize(&clnup, mca_ptl_tcp_component.nspace_filename);
827 }
828 if (NULL != mca_ptl_tcp_component.session_filename) {
829 pmix_argv_append_nosize(&clnup, mca_ptl_tcp_component.session_filename);
830 }
831 if (NULL != clnup) {
832 cptr = pmix_argv_join(clnup, ',');
833 pmix_argv_free(clnup);
834 PMIX_INFO_LOAD(&dir, PMIX_REGISTER_CLEANUP, cptr, PMIX_STRING);
835 free(cptr);
836 PMIx_Job_control_nb(&pmix_globals.myid, 1, &dir, 1, NULL, NULL);
837 PMIX_INFO_DESTRUCT(&dir);
838 }
839 }
840
841
842 *need_listener = true;
843 pmix_list_append(&pmix_ptl_globals.listeners, <->super);
844
845 return PMIX_SUCCESS;
846
847 sockerror:
848 PMIX_RELEASE(lt);
849 return PMIX_ERROR;
850 }
851
852
853
854
855
856
857 static char **split_and_resolve(char **orig_str, char *name)
858 {
859 int i, ret, save, if_index;
860 char **argv, *str, *tmp;
861 char if_name[IF_NAMESIZE];
862 struct sockaddr_storage argv_inaddr, if_inaddr;
863 uint32_t argv_prefix;
864
865
866 if (NULL == orig_str || NULL == *orig_str) {
867 return NULL;
868 }
869
870 argv = pmix_argv_split(*orig_str, ',');
871 if (NULL == argv) {
872 return NULL;
873 }
874 for (save = i = 0; NULL != argv[i]; ++i) {
875 if (isalpha(argv[i][0])) {
876 argv[save++] = argv[i];
877 continue;
878 }
879
880
881
882 argv_prefix = 0;
883 tmp = strdup(argv[i]);
884 str = strchr(argv[i], '/');
885 if (NULL == str) {
886 pmix_show_help("help-ptl-tcp.txt", "invalid if_inexclude",
887 true, name, tmp, "Invalid specification (missing \"/\")");
888 free(argv[i]);
889 free(tmp);
890 continue;
891 }
892 *str = '\0';
893 argv_prefix = atoi(str + 1);
894
895
896 ((struct sockaddr*) &argv_inaddr)->sa_family = AF_INET;
897 ret = inet_pton(AF_INET, argv[i],
898 &((struct sockaddr_in*) &argv_inaddr)->sin_addr);
899 free(argv[i]);
900
901 if (1 != ret) {
902 pmix_show_help("help-ptl-tcp.txt", "invalid if_inexclude",
903 true, name, tmp,
904 "Invalid specification (inet_pton() failed)");
905 free(tmp);
906 continue;
907 }
908 pmix_output_verbose(20, pmix_ptl_base_framework.framework_output,
909 "ptl:tcp: Searching for %s address+prefix: %s / %u",
910 name,
911 pmix_net_get_hostname((struct sockaddr*) &argv_inaddr),
912 argv_prefix);
913
914
915 for (if_index = pmix_ifbegin(); if_index >= 0;
916 if_index = pmix_ifnext(if_index)) {
917 pmix_ifindextoaddr(if_index,
918 (struct sockaddr*) &if_inaddr,
919 sizeof(if_inaddr));
920 if (pmix_net_samenetwork((struct sockaddr*) &argv_inaddr,
921 (struct sockaddr*) &if_inaddr,
922 argv_prefix)) {
923 break;
924 }
925 }
926
927 if (if_index < 0) {
928 pmix_show_help("help-ptl-tcp.txt", "invalid if_inexclude",
929 true, name, tmp,
930 "Did not find interface matching this subnet");
931 free(tmp);
932 continue;
933 }
934
935
936
937 pmix_ifindextoname(if_index, if_name, sizeof(if_name));
938 pmix_output_verbose(20, pmix_ptl_base_framework.framework_output,
939 "ptl:tcp: Found match: %s (%s)",
940 pmix_net_get_hostname((struct sockaddr*) &if_inaddr),
941 if_name);
942 argv[save++] = strdup(if_name);
943 free(tmp);
944 }
945
946
947
948 argv[save] = NULL;
949 free(*orig_str);
950 *orig_str = pmix_argv_join(argv, ',');
951 return argv;
952 }
953
954 static void connection_handler(int sd, short args, void *cbdata)
955 {
956 pmix_pending_connection_t *pnd = (pmix_pending_connection_t*)cbdata;
957 pmix_ptl_hdr_t hdr;
958 pmix_peer_t *peer;
959 pmix_rank_t rank=0;
960 pmix_status_t rc;
961 char *msg, *mg, *version;
962 char *sec, *bfrops, *gds;
963 pmix_bfrop_buffer_type_t bftype;
964 char *nspace;
965 uint32_t len, u32;
966 size_t cnt, msglen, n;
967 pmix_namespace_t *nptr, *tmp;
968 bool found;
969 pmix_rank_info_t *info;
970 pmix_proc_t proc;
971 pmix_info_t ginfo;
972 pmix_proc_type_t proc_type;
973 pmix_byte_object_t cred;
974 pmix_buffer_t buf;
975
976
977 PMIX_ACQUIRE_OBJECT(pnd);
978
979 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
980 "ptl:tcp:connection_handler: new connection: %d",
981 pnd->sd);
982
983
984 pmix_ptl_base_set_blocking(pnd->sd);
985
986
987 memset(&hdr, 0, sizeof(pmix_ptl_hdr_t));
988
989
990 if (PMIX_SUCCESS != (rc = pmix_ptl_base_recv_blocking(pnd->sd, (char*)&hdr, sizeof(pmix_ptl_hdr_t)))) {
991 CLOSE_THE_SOCKET(pnd->sd);
992 PMIX_RELEASE(pnd);
993 return;
994 }
995
996
997
998
999 if (PMIX_MAX_CRED_SIZE < hdr.nbytes) {
1000 CLOSE_THE_SOCKET(pnd->sd);
1001 PMIX_RELEASE(pnd);
1002 return;
1003 }
1004 if (NULL == (msg = (char*)malloc(hdr.nbytes))) {
1005 CLOSE_THE_SOCKET(pnd->sd);
1006 PMIX_RELEASE(pnd);
1007 return;
1008 }
1009 if (PMIX_SUCCESS != (rc = pmix_ptl_base_recv_blocking(pnd->sd, msg, hdr.nbytes))) {
1010
1011 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1012 "ptl:tcp:connection_handler unable to complete recv of connect-ack with client ON SOCKET %d",
1013 pnd->sd);
1014 free(msg);
1015 CLOSE_THE_SOCKET(pnd->sd);
1016 PMIX_RELEASE(pnd);
1017 return;
1018 }
1019
1020 cnt = hdr.nbytes;
1021 mg = msg;
1022
1023 PMIX_STRNLEN(msglen, mg, cnt);
1024 if (msglen < cnt) {
1025 sec = mg;
1026 mg += strlen(sec) + 1;
1027 cnt -= strlen(sec) + 1;
1028 } else {
1029 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1030 free(msg);
1031
1032 rc = PMIX_ERR_BAD_PARAM;
1033 goto error;
1034 }
1035
1036
1037
1038 if (sizeof(uint32_t) <= cnt) {
1039 memcpy(&len, mg, sizeof(uint32_t));
1040 mg += sizeof(uint32_t);
1041 cnt -= sizeof(uint32_t);
1042 } else {
1043 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1044 free(msg);
1045
1046 rc = PMIX_ERR_BAD_PARAM;
1047 goto error;
1048 }
1049
1050 pnd->len = ntohl(len);
1051
1052
1053 if (0 < pnd->len) {
1054 pnd->cred = (char*)malloc(pnd->len);
1055 if (NULL == pnd->cred) {
1056
1057 free(msg);
1058 CLOSE_THE_SOCKET(pnd->sd);
1059 PMIX_RELEASE(pnd);
1060 return;
1061 }
1062 memcpy(pnd->cred, mg, pnd->len);
1063 mg += pnd->len;
1064 cnt -= pnd->len;
1065 }
1066
1067
1068 if (1 <= cnt) {
1069 memcpy(&pnd->flag, mg, 1);
1070 ++mg;
1071 --cnt;
1072 } else {
1073 free(msg);
1074
1075 rc = PMIX_ERR_BAD_PARAM;
1076 goto error;
1077 }
1078
1079 if (0 == pnd->flag) {
1080
1081 proc_type = PMIX_PROC_CLIENT;
1082 PMIX_STRNLEN(msglen, mg, cnt);
1083 if (msglen < cnt) {
1084 nspace = mg;
1085 mg += strlen(nspace) + 1;
1086 cnt -= strlen(nspace) + 1;
1087 } else {
1088 free(msg);
1089
1090 rc = PMIX_ERR_BAD_PARAM;
1091 goto error;
1092 }
1093
1094 if (sizeof(pmix_rank_t) <= cnt) {
1095
1096 memcpy(&u32, mg, sizeof(uint32_t));
1097 rank = ntohl(u32);
1098 mg += sizeof(uint32_t);
1099 cnt -= sizeof(uint32_t);
1100 } else {
1101 free(msg);
1102
1103 rc = PMIX_ERR_BAD_PARAM;
1104 goto error;
1105 }
1106 } else if (1 == pnd->flag) {
1107
1108 proc_type = PMIX_PROC_TOOL;
1109
1110 if (sizeof(uint32_t) <= cnt) {
1111 memcpy(&u32, mg, sizeof(uint32_t));
1112 mg += sizeof(uint32_t);
1113 cnt -= sizeof(uint32_t);
1114 pnd->uid = ntohl(u32);
1115 } else {
1116 free(msg);
1117
1118 rc = PMIX_ERR_BAD_PARAM;
1119 goto error;
1120 }
1121 if (sizeof(uint32_t) <= cnt) {
1122 memcpy(&u32, mg, sizeof(uint32_t));
1123 mg += sizeof(uint32_t);
1124 cnt -= sizeof(uint32_t);
1125 pnd->gid = ntohl(u32);
1126 } else {
1127 free(msg);
1128
1129 rc = PMIX_ERR_BAD_PARAM;
1130 goto error;
1131 }
1132 } else if (2 == pnd->flag) {
1133
1134 proc_type = PMIX_PROC_LAUNCHER;
1135
1136 if (sizeof(uint32_t) <= cnt) {
1137 memcpy(&u32, mg, sizeof(uint32_t));
1138 mg += sizeof(uint32_t);
1139 cnt -= sizeof(uint32_t);
1140 pnd->uid = ntohl(u32);
1141 } else {
1142 free(msg);
1143
1144 rc = PMIX_ERR_BAD_PARAM;
1145 goto error;
1146 }
1147 if (sizeof(uint32_t) <= cnt) {
1148 memcpy(&u32, mg, sizeof(uint32_t));
1149 mg += sizeof(uint32_t);
1150 cnt -= sizeof(uint32_t);
1151 pnd->gid = ntohl(u32);
1152 } else {
1153 free(msg);
1154
1155 rc = PMIX_ERR_BAD_PARAM;
1156 goto error;
1157 }
1158 } else if (3 == pnd->flag || 6 == pnd->flag) {
1159
1160 if (3 == pnd->flag) {
1161 proc_type = PMIX_PROC_TOOL;
1162 } else {
1163 proc_type = PMIX_PROC_LAUNCHER;
1164 }
1165
1166 if (sizeof(uint32_t) <= cnt) {
1167 memcpy(&u32, mg, sizeof(uint32_t));
1168 mg += sizeof(uint32_t);
1169 cnt -= sizeof(uint32_t);
1170 pnd->uid = ntohl(u32);
1171 } else {
1172 free(msg);
1173
1174 rc = PMIX_ERR_BAD_PARAM;
1175 goto error;
1176 }
1177 if (sizeof(uint32_t) <= cnt) {
1178 memcpy(&u32, mg, sizeof(uint32_t));
1179 mg += sizeof(uint32_t);
1180 cnt -= sizeof(uint32_t);
1181 pnd->gid = ntohl(u32);
1182 } else {
1183 free(msg);
1184
1185 rc = PMIX_ERR_BAD_PARAM;
1186 goto error;
1187 }
1188
1189 pnd->need_id = true;
1190 } else if (4 == pnd->flag || 5 == pnd->flag || 7 == pnd->flag || 8 == pnd->flag) {
1191
1192 if (4 == pnd->flag || 5 == pnd->flag) {
1193 proc_type = PMIX_PROC_TOOL;
1194 } else {
1195 proc_type = PMIX_PROC_LAUNCHER;
1196 }
1197
1198 if (sizeof(uint32_t) <= cnt) {
1199 memcpy(&u32, mg, sizeof(uint32_t));
1200 mg += sizeof(uint32_t);
1201 cnt -= sizeof(uint32_t);
1202 pnd->uid = ntohl(u32);
1203 } else {
1204 free(msg);
1205
1206 rc = PMIX_ERR_BAD_PARAM;
1207 goto error;
1208 }
1209 if (sizeof(uint32_t) <= cnt) {
1210 memcpy(&u32, mg, sizeof(uint32_t));
1211 mg += sizeof(uint32_t);
1212 cnt -= sizeof(uint32_t);
1213 pnd->gid = ntohl(u32);
1214 } else {
1215 free(msg);
1216
1217 rc = PMIX_ERR_BAD_PARAM;
1218 goto error;
1219 }
1220 PMIX_STRNLEN(msglen, mg, cnt);
1221 if (msglen < cnt) {
1222 nspace = mg;
1223 mg += strlen(nspace) + 1;
1224 cnt -= strlen(nspace) + 1;
1225 } else {
1226 free(msg);
1227
1228 rc = PMIX_ERR_BAD_PARAM;
1229 goto error;
1230 }
1231
1232 if (sizeof(pmix_rank_t) <= cnt) {
1233
1234 memcpy(&u32, mg, sizeof(uint32_t));
1235 rank = ntohl(u32);
1236 mg += sizeof(uint32_t);
1237 cnt -= sizeof(uint32_t);
1238 } else {
1239 free(msg);
1240
1241 rc = PMIX_ERR_BAD_PARAM;
1242 goto error;
1243 }
1244 } else {
1245
1246 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
1247 rc = PMIX_ERR_NOT_SUPPORTED;
1248 free(msg);
1249 goto error;
1250 }
1251
1252
1253 PMIX_STRNLEN(msglen, mg, cnt);
1254 if (msglen < cnt) {
1255 version = mg;
1256 mg += strlen(version) + 1;
1257 cnt -= strlen(version) + 1;
1258 } else {
1259 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1260 free(msg);
1261
1262 rc = PMIX_ERR_BAD_PARAM;
1263 goto error;
1264 }
1265
1266 if (0 == strncmp(version, "2.0", 3)) {
1267
1268 proc_type = proc_type | PMIX_PROC_V20;
1269 bfrops = "v20";
1270 bftype = pmix_bfrops_globals.default_type;
1271 gds = "ds12,hash";
1272 } else {
1273 int major;
1274 major = strtoul(version, NULL, 10);
1275 if (2 == major) {
1276 proc_type = proc_type | PMIX_PROC_V21;
1277 } else if (3 <= major) {
1278 proc_type = proc_type | PMIX_PROC_V3;
1279 } else {
1280 free(msg);
1281 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
1282 rc = PMIX_ERR_NOT_SUPPORTED;
1283 goto error;
1284 }
1285
1286 PMIX_STRNLEN(msglen, mg, cnt);
1287 if (msglen < cnt) {
1288 bfrops = mg;
1289 mg += strlen(bfrops) + 1;
1290 cnt -= strlen(bfrops) + 1;
1291 } else {
1292 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1293 free(msg);
1294
1295 rc = PMIX_ERR_BAD_PARAM;
1296 goto error;
1297 }
1298
1299
1300 if (sizeof(bftype) < cnt) {
1301 memcpy(&bftype, mg, sizeof(bftype));
1302 mg += sizeof(bftype);
1303 cnt -= sizeof(bftype);
1304 } else {
1305 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1306 free(msg);
1307
1308 rc = PMIX_ERR_BAD_PARAM;
1309 goto error;
1310 }
1311
1312
1313 PMIX_STRNLEN(msglen, mg, cnt);
1314 if (msglen < cnt) {
1315 gds = mg;
1316 mg += strlen(gds) + 1;
1317 cnt -= strlen(gds) + 1;
1318 } else {
1319 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1320 free(msg);
1321
1322 rc = PMIX_ERR_BAD_PARAM;
1323 goto error;
1324 }
1325 }
1326
1327
1328 if (0 != pnd->flag) {
1329 peer = PMIX_NEW(pmix_peer_t);
1330 if (NULL == peer) {
1331
1332 free(msg);
1333 CLOSE_THE_SOCKET(pnd->sd);
1334 PMIX_RELEASE(pnd);
1335 return;
1336 }
1337 pnd->peer = peer;
1338
1339
1340
1341 if (5 == pnd->flag || 8 == pnd->flag) {
1342
1343
1344
1345
1346 nptr = NULL;
1347 PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
1348 if (0 == strcmp(tmp->nspace, nspace)) {
1349 nptr = tmp;
1350 break;
1351 }
1352 }
1353 if (NULL == nptr) {
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363 nptr = PMIX_NEW(pmix_namespace_t);
1364 if (NULL == nptr) {
1365 rc = PMIX_ERR_NOMEM;
1366 goto error;
1367 }
1368 nptr->nspace = strdup(nspace);
1369 }
1370
1371 info = NULL;
1372 found = false;
1373 PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
1374 if (info->pname.rank == rank) {
1375 found = true;
1376 break;
1377 }
1378 }
1379 if (!found) {
1380
1381 info = PMIX_NEW(pmix_rank_info_t);
1382 info->pname.nspace = strdup(nspace);
1383 info->pname.rank = rank;
1384 info->uid = pnd->uid;
1385 info->gid = pnd->gid;
1386 pmix_list_append(&nptr->ranks, &info->super);
1387 }
1388 PMIX_RETAIN(info);
1389 peer->info = info;
1390 PMIX_RETAIN(nptr);
1391 } else {
1392 nptr = PMIX_NEW(pmix_namespace_t);
1393 if (NULL == nptr) {
1394 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1395 CLOSE_THE_SOCKET(pnd->sd);
1396 PMIX_RELEASE(pnd);
1397 PMIX_RELEASE(peer);
1398 return;
1399 }
1400 }
1401 peer->nptr = nptr;
1402
1403 peer->nptr->compat.bfrops = pmix_bfrops_base_assign_module(bfrops);
1404 if (NULL == peer->nptr->compat.bfrops) {
1405 PMIX_RELEASE(peer);
1406 CLOSE_THE_SOCKET(pnd->sd);
1407 PMIX_RELEASE(pnd);
1408 return;
1409 }
1410
1411 peer->nptr->compat.type = bftype;
1412 n = 0;
1413
1414 if (0 < cnt) {
1415 int32_t foo;
1416 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
1417 PMIX_LOAD_BUFFER(peer, &buf, mg, cnt);
1418 foo = 1;
1419 PMIX_BFROPS_UNPACK(rc, peer, &buf, &pnd->ninfo, &foo, PMIX_SIZE);
1420 foo = (int32_t)pnd->ninfo;
1421
1422 if (!pnd->need_id) {
1423 pnd->ninfo += 5;
1424 } else {
1425 pnd->ninfo += 3;
1426 }
1427 PMIX_INFO_CREATE(pnd->info, pnd->ninfo);
1428 PMIX_BFROPS_UNPACK(rc, peer, &buf, pnd->info, &foo, PMIX_INFO);
1429 n = foo;
1430 } else {
1431 if (!pnd->need_id) {
1432 pnd->ninfo = 5;
1433 } else {
1434 pnd->ninfo = 3;
1435 }
1436 PMIX_INFO_CREATE(pnd->info, pnd->ninfo);
1437 }
1438
1439
1440 pnd->proc_type = proc_type;
1441
1442
1443 pnd->psec = strdup(sec);
1444 if (NULL != gds) {
1445 pnd->gds = strdup(gds);
1446 }
1447
1448
1449 if (NULL == pmix_host_server.tool_connected) {
1450 if (pnd->need_id) {
1451
1452
1453
1454 rc = PMIX_ERR_NOT_SUPPORTED;
1455 PMIX_RELEASE(peer);
1456
1457 free(msg);
1458 goto error;
1459 } else {
1460
1461 PMIX_LOAD_PROCID(&proc, nspace, rank);
1462 cnct_cbfunc(PMIX_SUCCESS, &proc, (void*)pnd);
1463
1464 free(msg);
1465 return;
1466 }
1467 }
1468
1469
1470
1471
1472 PMIX_INFO_LOAD(&pnd->info[n], PMIX_VERSION_INFO, version, PMIX_STRING);
1473 ++n;
1474
1475 PMIX_INFO_LOAD(&pnd->info[n], PMIX_USERID, &pnd->uid, PMIX_UINT32);
1476 ++n;
1477
1478 PMIX_INFO_LOAD(&pnd->info[n], PMIX_GRPID, &pnd->gid, PMIX_UINT32);
1479 ++n;
1480
1481 if (!pnd->need_id) {
1482 PMIX_INFO_LOAD(&pnd->info[n], PMIX_NSPACE, nspace, PMIX_STRING);
1483 ++n;
1484 PMIX_INFO_LOAD(&pnd->info[n], PMIX_RANK, &rank, PMIX_PROC_RANK);
1485 ++n;
1486 }
1487
1488 free(msg);
1489
1490
1491 pmix_host_server.tool_connected(pnd->info, pnd->ninfo, cnct_cbfunc, pnd);
1492 return;
1493 }
1494
1495
1496 nptr = NULL;
1497 PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
1498 if (0 == strcmp(tmp->nspace, nspace)) {
1499 nptr = tmp;
1500 break;
1501 }
1502 }
1503 if (NULL == nptr) {
1504
1505 free(msg);
1506
1507 rc = PMIX_ERR_NOT_FOUND;
1508 goto error;
1509 }
1510
1511
1512 info = NULL;
1513 found = false;
1514 PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
1515 if (info->pname.rank == rank) {
1516 found = true;
1517 break;
1518 }
1519 }
1520 if (!found) {
1521
1522 free(msg);
1523
1524 rc = PMIX_ERR_NOT_FOUND;
1525 goto error;
1526 }
1527
1528
1529
1530
1531 peer = PMIX_NEW(pmix_peer_t);
1532 if (NULL == peer) {
1533
1534 free(msg);
1535 CLOSE_THE_SOCKET(pnd->sd);
1536 PMIX_RELEASE(pnd);
1537 return;
1538 }
1539
1540 peer->proc_type = proc_type;
1541
1542 peer->protocol = pnd->protocol;
1543
1544 PMIX_RETAIN(nptr);
1545 peer->nptr = nptr;
1546 PMIX_RETAIN(info);
1547 peer->info = info;
1548
1549 peer->epilog.uid = info->uid;
1550 peer->epilog.gid = info->gid;
1551
1552 nptr->epilog.uid = info->uid;
1553 nptr->epilog.gid = info->gid;
1554 info->proc_cnt++;
1555 peer->sd = pnd->sd;
1556 if (0 > (peer->index = pmix_pointer_array_add(&pmix_server_globals.clients, peer))) {
1557 free(msg);
1558 info->proc_cnt--;
1559 PMIX_RELEASE(peer);
1560
1561 CLOSE_THE_SOCKET(pnd->sd);
1562 PMIX_RELEASE(pnd);
1563 return;
1564 }
1565 info->peerid = peer->index;
1566
1567
1568 peer->nptr->compat.psec = pmix_psec_base_assign_module(sec);
1569 if (NULL == peer->nptr->compat.psec) {
1570 free(msg);
1571 info->proc_cnt--;
1572 pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
1573 PMIX_RELEASE(peer);
1574
1575 goto error;
1576 }
1577
1578
1579 peer->nptr->compat.bfrops = pmix_bfrops_base_assign_module(bfrops);
1580 if (NULL == peer->nptr->compat.bfrops) {
1581 free(msg);
1582 info->proc_cnt--;
1583 pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
1584 PMIX_RELEASE(peer);
1585
1586 goto error;
1587 }
1588
1589 peer->nptr->compat.type = bftype;
1590
1591
1592 if (NULL != gds) {
1593 PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, gds, PMIX_STRING);
1594 peer->nptr->compat.gds = pmix_gds_base_assign_module(&ginfo, 1);
1595 PMIX_INFO_DESTRUCT(&ginfo);
1596 } else {
1597 peer->nptr->compat.gds = pmix_gds_base_assign_module(NULL, 0);
1598 }
1599 if (NULL == peer->nptr->compat.gds) {
1600 free(msg);
1601 info->proc_cnt--;
1602 pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
1603 PMIX_RELEASE(peer);
1604
1605 goto error;
1606 }
1607
1608
1609
1610 if (!nptr->version_stored) {
1611 PMIX_INFO_LOAD(&ginfo, PMIX_BFROPS_MODULE, peer->nptr->compat.bfrops->version, PMIX_STRING);
1612 PMIX_GDS_CACHE_JOB_INFO(rc, pmix_globals.mypeer, peer->nptr, &ginfo, 1);
1613 PMIX_INFO_DESTRUCT(&ginfo);
1614 nptr->version_stored = true;
1615 }
1616
1617 free(msg);
1618
1619
1620 peer->nptr->compat.ptl = &pmix_ptl_tcp_module;
1621
1622
1623 cred.bytes = pnd->cred;
1624 cred.size = pnd->len;
1625 PMIX_PSEC_VALIDATE_CONNECTION(rc, peer, NULL, 0, NULL, NULL, &cred);
1626 if (PMIX_SUCCESS != rc) {
1627 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1628 "validation of client connection failed");
1629 info->proc_cnt--;
1630 pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
1631 PMIX_RELEASE(peer);
1632
1633 goto error;
1634 }
1635
1636 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1637 "client connection validated");
1638
1639
1640 u32 = htonl(PMIX_SUCCESS);
1641 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t)))) {
1642 PMIX_ERROR_LOG(rc);
1643 info->proc_cnt--;
1644 pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
1645 PMIX_RELEASE(peer);
1646 CLOSE_THE_SOCKET(pnd->sd);
1647 PMIX_RELEASE(pnd);
1648 return;
1649 }
1650
1651 u32 = htonl(peer->index);
1652 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t)))) {
1653 PMIX_ERROR_LOG(rc);
1654 info->proc_cnt--;
1655 pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
1656 PMIX_RELEASE(peer);
1657 goto error;
1658 }
1659
1660 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1661 "connect-ack from client completed");
1662
1663
1664 if (NULL != pmix_host_server.client_connected) {
1665 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1666 proc.rank = peer->info->pname.rank;
1667 rc = pmix_host_server.client_connected(&proc, peer->info->server_object,
1668 NULL, NULL);
1669 if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) {
1670 PMIX_ERROR_LOG(rc);
1671 info->proc_cnt--;
1672 pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
1673 PMIX_RELEASE(peer);
1674 goto error;
1675 }
1676 }
1677
1678 pmix_ptl_base_set_nonblocking(pnd->sd);
1679
1680
1681 pmix_event_assign(&peer->recv_event, pmix_globals.evbase, pnd->sd,
1682 EV_READ|EV_PERSIST, pmix_ptl_base_recv_handler, peer);
1683 pmix_event_add(&peer->recv_event, NULL);
1684 peer->recv_ev_active = true;
1685 pmix_event_assign(&peer->send_event, pmix_globals.evbase, pnd->sd,
1686 EV_WRITE|EV_PERSIST, pmix_ptl_base_send_handler, peer);
1687 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1688 "pmix:server client %s:%u has connected on socket %d",
1689 peer->info->pname.nspace, peer->info->pname.rank, peer->sd);
1690 PMIX_RELEASE(pnd);
1691 return;
1692
1693 error:
1694
1695 u32 = htonl(rc);
1696 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(int)))) {
1697 PMIX_ERROR_LOG(rc);
1698 CLOSE_THE_SOCKET(pnd->sd);
1699 }
1700 PMIX_RELEASE(pnd);
1701 return;
1702 }
1703
1704
1705 static void process_cbfunc(int sd, short args, void *cbdata)
1706 {
1707 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1708 pmix_pending_connection_t *pnd = (pmix_pending_connection_t*)cd->cbdata;
1709 pmix_namespace_t *nptr;
1710 pmix_rank_info_t *info;
1711 pmix_peer_t *peer;
1712 int rc;
1713 uint32_t u32;
1714 pmix_info_t ginfo;
1715 pmix_byte_object_t cred;
1716 pmix_iof_req_t *req;
1717
1718
1719 PMIX_ACQUIRE_OBJECT(cd);
1720
1721
1722 u32 = ntohl(cd->status);
1723 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t)))) {
1724 PMIX_ERROR_LOG(rc);
1725 CLOSE_THE_SOCKET(pnd->sd);
1726 PMIX_RELEASE(pnd->peer);
1727 PMIX_RELEASE(pnd);
1728 PMIX_RELEASE(cd);
1729 return;
1730 }
1731
1732
1733 if (PMIX_SUCCESS != cd->status) {
1734 PMIX_RELEASE(pnd->peer);
1735 PMIX_RELEASE(pnd);
1736 PMIX_RELEASE(cd);
1737 return;
1738 }
1739
1740
1741 if (pnd->need_id) {
1742
1743 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, cd->proc.nspace, PMIX_MAX_NSLEN+1))) {
1744 PMIX_ERROR_LOG(rc);
1745 CLOSE_THE_SOCKET(pnd->sd);
1746 PMIX_RELEASE(pnd->peer);
1747 PMIX_RELEASE(pnd);
1748 PMIX_RELEASE(cd);
1749 return;
1750 }
1751
1752
1753 u32 = ntohl(cd->proc.rank);
1754 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t)))) {
1755 PMIX_ERROR_LOG(rc);
1756 CLOSE_THE_SOCKET(pnd->sd);
1757 PMIX_RELEASE(pnd->peer);
1758 PMIX_RELEASE(pnd);
1759 PMIX_RELEASE(cd);
1760 return;
1761 }
1762 }
1763
1764
1765 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, pmix_globals.myid.nspace, PMIX_MAX_NSLEN+1))) {
1766 PMIX_ERROR_LOG(rc);
1767 CLOSE_THE_SOCKET(pnd->sd);
1768 PMIX_RELEASE(pnd->peer);
1769 PMIX_RELEASE(pnd);
1770 PMIX_RELEASE(cd);
1771 return;
1772 }
1773
1774
1775 u32 = ntohl(pmix_globals.myid.rank);
1776 if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t)))) {
1777 PMIX_ERROR_LOG(rc);
1778 CLOSE_THE_SOCKET(pnd->sd);
1779 PMIX_RELEASE(pnd->peer);
1780 PMIX_RELEASE(pnd);
1781 PMIX_RELEASE(cd);
1782 return;
1783 }
1784
1785
1786 peer = (pmix_peer_t*)pnd->peer;
1787 nptr = peer->nptr;
1788
1789
1790
1791 if (5 != pnd->flag && 8 != pnd->flag) {
1792 PMIX_RETAIN(nptr);
1793 nptr->nspace = strdup(cd->proc.nspace);
1794 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
1795 info = PMIX_NEW(pmix_rank_info_t);
1796 info->pname.nspace = strdup(nptr->nspace);
1797 info->pname.rank = cd->proc.rank;
1798 info->uid = pnd->uid;
1799 info->gid = pnd->gid;
1800 pmix_list_append(&nptr->ranks, &info->super);
1801 PMIX_RETAIN(info);
1802 peer->info = info;
1803 }
1804
1805
1806 peer->proc_type = pnd->proc_type;
1807
1808 peer->protocol = pnd->protocol;
1809
1810 peer->epilog.uid = peer->info->uid;
1811 peer->epilog.gid = peer->info->gid;
1812 nptr->epilog.uid = peer->info->uid;
1813 nptr->epilog.gid = peer->info->gid;
1814 peer->proc_cnt = 1;
1815 peer->sd = pnd->sd;
1816
1817
1818
1819 peer->nptr->compat.psec = pmix_psec_base_assign_module(pnd->psec);
1820 if (NULL == peer->nptr->compat.psec) {
1821 PMIX_RELEASE(peer);
1822 pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
1823 PMIX_RELEASE(nptr);
1824 CLOSE_THE_SOCKET(pnd->sd);
1825 goto done;
1826 }
1827
1828
1829
1830 peer->nptr->compat.ptl = &pmix_ptl_tcp_module;
1831
1832 PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, pnd->gds, PMIX_STRING);
1833 peer->nptr->compat.gds = pmix_gds_base_assign_module(&ginfo, 1);
1834 PMIX_INFO_DESTRUCT(&ginfo);
1835 if (NULL == peer->nptr->compat.gds) {
1836 PMIX_RELEASE(peer);
1837 pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
1838 PMIX_RELEASE(nptr);
1839 CLOSE_THE_SOCKET(pnd->sd);
1840 goto done;
1841 }
1842
1843
1844
1845 if (!peer->nptr->version_stored) {
1846 PMIX_INFO_LOAD(&ginfo, PMIX_BFROPS_MODULE, peer->nptr->compat.bfrops->version, PMIX_STRING);
1847 PMIX_GDS_CACHE_JOB_INFO(rc, pmix_globals.mypeer, peer->nptr, &ginfo, 1);
1848 PMIX_INFO_DESTRUCT(&ginfo);
1849 nptr->version_stored = true;
1850 }
1851
1852
1853 req = PMIX_NEW(pmix_iof_req_t);
1854 if (NULL == req) {
1855 PMIX_RELEASE(peer);
1856 pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
1857 PMIX_RELEASE(nptr);
1858 CLOSE_THE_SOCKET(pnd->sd);
1859 goto done;
1860 }
1861 PMIX_RETAIN(peer);
1862 req->peer = peer;
1863 req->pname.nspace = strdup(pmix_globals.myid.nspace);
1864 req->pname.rank = pmix_globals.myid.rank;
1865 req->channels = PMIX_FWD_STDOUT_CHANNEL | PMIX_FWD_STDERR_CHANNEL | PMIX_FWD_STDDIAG_CHANNEL;
1866 pmix_list_append(&pmix_globals.iof_requests, &req->super);
1867
1868
1869 cred.bytes = pnd->cred;
1870 cred.size = pnd->len;
1871 PMIX_PSEC_VALIDATE_CONNECTION(rc, peer, NULL, 0, NULL, NULL, &cred);
1872 if (PMIX_SUCCESS != rc) {
1873 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1874 "validation of tool credentials failed: %s",
1875 PMIx_Error_string(rc));
1876 PMIX_RELEASE(peer);
1877 pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
1878 PMIX_RELEASE(nptr);
1879 CLOSE_THE_SOCKET(pnd->sd);
1880 goto done;
1881 }
1882
1883
1884 pmix_ptl_base_set_nonblocking(pnd->sd);
1885
1886 if (0 > (peer->index = pmix_pointer_array_add(&pmix_server_globals.clients, peer))) {
1887 PMIX_RELEASE(pnd);
1888 PMIX_RELEASE(cd);
1889 PMIX_RELEASE(peer);
1890 pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
1891 PMIX_RELEASE(nptr);
1892
1893 return;
1894 }
1895 peer->info->peerid = peer->index;
1896
1897
1898 pmix_event_assign(&peer->recv_event, pmix_globals.evbase, peer->sd,
1899 EV_READ|EV_PERSIST, pmix_ptl_base_recv_handler, peer);
1900 pmix_event_add(&peer->recv_event, NULL);
1901 peer->recv_ev_active = true;
1902 pmix_event_assign(&peer->send_event, pmix_globals.evbase, peer->sd,
1903 EV_WRITE|EV_PERSIST, pmix_ptl_base_send_handler, peer);
1904 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1905 "pmix:server tool %s:%d has connected on socket %d",
1906 peer->info->pname.nspace, peer->info->pname.rank, peer->sd);
1907
1908 done:
1909 PMIX_RELEASE(pnd);
1910 PMIX_RELEASE(cd);
1911 }
1912
1913
1914
1915 static void cnct_cbfunc(pmix_status_t status,
1916 pmix_proc_t *proc, void *cbdata)
1917 {
1918 pmix_setup_caddy_t *cd;
1919
1920 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
1921 "pmix:tcp:cnct_cbfunc returning %s:%d %s",
1922 proc->nspace, proc->rank, PMIx_Error_string(status));
1923
1924
1925 cd = PMIX_NEW(pmix_setup_caddy_t);
1926 if (NULL == cd) {
1927 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1928 return;
1929 }
1930 cd->status = status;
1931 pmix_strncpy(cd->proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
1932 cd->proc.rank = proc->rank;
1933 cd->cbdata = cbdata;
1934 PMIX_THREADSHIFT(cd, process_cbfunc);
1935 }