1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2011 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
13 * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
14 * All rights reserved.
15 * Copyright (c) 2015-2019 Intel, Inc. All rights reserved.
16 * Copyright (c) 2017 IBM Corporation. All rights reserved.
17 * Copyright (c) 2017 Mellanox Technologies. All rights reserved.
18 * Copyright (c) 2018 Research Organization for Information Science
19 * and Technology (RIST). All rights reserved.
20 * $COPYRIGHT$
21 *
22 * Additional copyrights may follow
23 *
24 * $HEADER$
25 */
26 /**
27 * @file
28 *
29 * I/O Forwarding Service
30 */
31
32 #ifndef PMIX_IOF_H
33 #define PMIX_IOF_H
34
35 #include <src/include/pmix_config.h>
36
37 #ifdef HAVE_SYS_TYPES_H
38 #include <sys/types.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_NET_UIO_H
44 #include <net/uio.h>
45 #endif
46 #ifdef HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49 #include <signal.h>
50
51 #include "src/class/pmix_list.h"
52 #include "src/include/pmix_globals.h"
53 #include "src/util/fd.h"
54
55 BEGIN_C_DECLS
56
57 /*
58 * Maximum size of single msg
59 */
60 #define PMIX_IOF_BASE_MSG_MAX 4096
61 #define PMIX_IOF_BASE_TAG_MAX 50
62 #define PMIX_IOF_BASE_TAGGED_OUT_MAX 8192
63 #define PMIX_IOF_MAX_INPUT_BUFFERS 50
64
65 typedef struct {
66 pmix_list_item_t super;
67 bool pending;
68 bool always_writable;
69 pmix_event_t ev;
70 struct timeval tv;
71 int fd;
72 pmix_list_t outputs;
73 } pmix_iof_write_event_t;
74 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_event_t);
75
76 typedef struct {
77 pmix_list_item_t super;
78 pmix_proc_t name;
79 pmix_iof_channel_t tag;
80 pmix_iof_write_event_t wev;
81 bool xoff;
82 bool exclusive;
83 bool closed;
84 } pmix_iof_sink_t;
85 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_sink_t);
86
87 typedef struct {
88 pmix_list_item_t super;
89 char data[PMIX_IOF_BASE_TAGGED_OUT_MAX];
90 int numbytes;
91 } pmix_iof_write_output_t;
92 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_output_t);
93
94 typedef struct {
95 pmix_object_t super;
96 pmix_event_t ev;
97 struct timeval tv;
98 int fd;
99 bool active;
100 bool always_readable;
101 pmix_proc_t *targets;
102 size_t ntargets;
103 pmix_info_t *directives;
104 size_t ndirs;
105 } pmix_iof_read_event_t;
106 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_read_event_t);
107
108
109 /* define a struct to hold booleans controlling the
110 * format/contents of the output */
111 typedef struct {
112 bool xml;
113 time_t timestamp;
114 bool tag;
115 } pmix_iof_flags_t;
116
117
118 /* Write event macro's */
119
120 static inline bool
121 pmix_iof_fd_always_ready(int fd)
122 {
123 return pmix_fd_is_regular(fd) ||
124 (pmix_fd_is_chardev(fd) && !isatty(fd)) ||
125 pmix_fd_is_blkdev(fd);
126 }
127
128 #define PMIX_IOF_SINK_BLOCKSIZE (1024)
129
130 #define PMIX_IOF_SINK_ACTIVATE(wev) \
131 do { \
132 struct timeval *tv = NULL; \
133 wev->pending = true; \
134 PMIX_POST_OBJECT(wev); \
135 if (wev->always_writable) { \
136 /* Regular is always write ready. Use timer to activate */ \
137 tv = &wev->tv; \
138 } \
139 if (pmix_event_add(&wev->ev, tv)) { \
140 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
141 } \
142 } while(0);
143
144
145 /* define an output "sink", adding it to the provided
146 * endpoint list for this proc */
147 #define PMIX_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
148 do { \
149 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output, \
150 "defining endpt: file %s line %d fd %d", \
151 __FILE__, __LINE__, (fid))); \
152 PMIX_CONSTRUCT((snk), pmix_iof_sink_t); \
153 pmix_strncpy((snk)->name.nspace, (nm)->nspace, PMIX_MAX_NSLEN); \
154 (snk)->name.rank = (nm)->rank; \
155 (snk)->tag = (tg); \
156 if (0 <= (fid)) { \
157 (snk)->wev.fd = (fid); \
158 (snk)->wev.always_writable = \
159 pmix_iof_fd_always_ready(fid); \
160 if ((snk)->wev.always_writable) { \
161 pmix_event_evtimer_set(pmix_globals.evbase, \
162 &(snk)->wev.ev, wrthndlr, (snk)); \
163 } else { \
164 pmix_event_set(pmix_globals.evbase, \
165 &(snk)->wev.ev, (snk)->wev.fd, \
166 PMIX_EV_WRITE, \
167 wrthndlr, (snk)); \
168 } \
169 } \
170 PMIX_POST_OBJECT(snk); \
171 } while(0);
172
173 /* Read event macro's */
174 #define PMIX_IOF_READ_ADDEV(rev) \
175 do { \
176 struct timeval *tv = NULL; \
177 if ((rev)->always_readable) { \
178 tv = &(rev)->tv; \
179 } \
180 if (pmix_event_add(&(rev)->ev, tv)) { \
181 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
182 } \
183 } while(0);
184
185 #define PMIX_IOF_READ_ACTIVATE(rev) \
186 do { \
187 (rev)->active = true; \
188 PMIX_POST_OBJECT(rev); \
189 PMIX_IOF_READ_ADDEV(rev); \
190 } while(0);
191
192
193 #define PMIX_IOF_READ_EVENT(rv, p, np, d, nd, fid, cbfunc, actv) \
194 do { \
195 size_t _ii; \
196 pmix_iof_read_event_t *rev; \
197 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output, \
198 "defining read event at: %s %d", \
199 __FILE__, __LINE__)); \
200 rev = PMIX_NEW(pmix_iof_read_event_t); \
201 (rev)->ntargets = (np); \
202 PMIX_PROC_CREATE((rev)->targets, (rev)->ntargets); \
203 memcpy((rev)->targets, (p), (np) * sizeof(pmix_proc_t)); \
204 if (NULL != (d)) { \
205 PMIX_INFO_CREATE((rev)->directives, (nd)); \
206 (rev)->ndirs = (nd); \
207 for (_ii=0; _ii < (nd); _ii++) { \
208 PMIX_INFO_XFER(&((rev)->directives[_ii]), &((d)[_ii])); \
209 } \
210 } \
211 rev->fd = (fid); \
212 rev->always_readable = pmix_iof_fd_always_ready(fid); \
213 *(rv) = rev; \
214 if(rev->always_readable) { \
215 pmix_event_evtimer_set(pmix_globals.evbase, \
216 &rev->ev, (cbfunc), rev); \
217 } else { \
218 pmix_event_set(pmix_globals.evbase, \
219 &rev->ev, (fid), \
220 PMIX_EV_READ, \
221 (cbfunc), rev); \
222 } \
223 if ((actv)) { \
224 PMIX_IOF_READ_ACTIVATE(rev) \
225 } \
226 } while(0);
227
228
229 PMIX_EXPORT pmix_status_t pmix_iof_flush(void);
230
231 PMIX_EXPORT pmix_status_t pmix_iof_write_output(const pmix_proc_t *name,
232 pmix_iof_channel_t stream,
233 const pmix_byte_object_t *bo,
234 pmix_iof_flags_t *flags);
235 PMIX_EXPORT void pmix_iof_static_dump_output(pmix_iof_sink_t *sink);
236 PMIX_EXPORT void pmix_iof_write_handler(int fd, short event, void *cbdata);
237 PMIX_EXPORT bool pmix_iof_stdin_check(int fd);
238 PMIX_EXPORT void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata);
239 PMIX_EXPORT void pmix_iof_stdin_cb(int fd, short event, void *cbdata);
240
241 END_C_DECLS
242
243 #endif /* PMIX_IOF_H */