This source file includes following definitions.
- pmix_iof_fd_always_ready
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 #ifndef PMIX_IOF_H
33 #define PMIX_IOF_H
34
35 #include <src/include/pmix_config.h>
36
37 #ifdef HAVE_SYS_TYPES_H
38 #include <sys/types.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_NET_UIO_H
44 #include <net/uio.h>
45 #endif
46 #ifdef HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49 #include <signal.h>
50
51 #include "src/class/pmix_list.h"
52 #include "src/include/pmix_globals.h"
53 #include "src/util/fd.h"
54
55 BEGIN_C_DECLS
56
57
58
59
60 #define PMIX_IOF_BASE_MSG_MAX 4096
61 #define PMIX_IOF_BASE_TAG_MAX 50
62 #define PMIX_IOF_BASE_TAGGED_OUT_MAX 8192
63 #define PMIX_IOF_MAX_INPUT_BUFFERS 50
64
65 typedef struct {
66 pmix_list_item_t super;
67 bool pending;
68 bool always_writable;
69 pmix_event_t ev;
70 struct timeval tv;
71 int fd;
72 pmix_list_t outputs;
73 } pmix_iof_write_event_t;
74 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_event_t);
75
76 typedef struct {
77 pmix_list_item_t super;
78 pmix_proc_t name;
79 pmix_iof_channel_t tag;
80 pmix_iof_write_event_t wev;
81 bool xoff;
82 bool exclusive;
83 bool closed;
84 } pmix_iof_sink_t;
85 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_sink_t);
86
87 typedef struct {
88 pmix_list_item_t super;
89 char data[PMIX_IOF_BASE_TAGGED_OUT_MAX];
90 int numbytes;
91 } pmix_iof_write_output_t;
92 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_output_t);
93
94 typedef struct {
95 pmix_object_t super;
96 pmix_event_t ev;
97 struct timeval tv;
98 int fd;
99 bool active;
100 bool always_readable;
101 pmix_proc_t *targets;
102 size_t ntargets;
103 pmix_info_t *directives;
104 size_t ndirs;
105 } pmix_iof_read_event_t;
106 PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_read_event_t);
107
108
109
110
111 typedef struct {
112 bool xml;
113 time_t timestamp;
114 bool tag;
115 } pmix_iof_flags_t;
116
117
118
119
120 static inline bool
121 pmix_iof_fd_always_ready(int fd)
122 {
123 return pmix_fd_is_regular(fd) ||
124 (pmix_fd_is_chardev(fd) && !isatty(fd)) ||
125 pmix_fd_is_blkdev(fd);
126 }
127
128 #define PMIX_IOF_SINK_BLOCKSIZE (1024)
129
130 #define PMIX_IOF_SINK_ACTIVATE(wev) \
131 do { \
132 struct timeval *tv = NULL; \
133 wev->pending = true; \
134 PMIX_POST_OBJECT(wev); \
135 if (wev->always_writable) { \
136 \
137 tv = &wev->tv; \
138 } \
139 if (pmix_event_add(&wev->ev, tv)) { \
140 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
141 } \
142 } while(0);
143
144
145
146
147 #define PMIX_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
148 do { \
149 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output, \
150 "defining endpt: file %s line %d fd %d", \
151 __FILE__, __LINE__, (fid))); \
152 PMIX_CONSTRUCT((snk), pmix_iof_sink_t); \
153 pmix_strncpy((snk)->name.nspace, (nm)->nspace, PMIX_MAX_NSLEN); \
154 (snk)->name.rank = (nm)->rank; \
155 (snk)->tag = (tg); \
156 if (0 <= (fid)) { \
157 (snk)->wev.fd = (fid); \
158 (snk)->wev.always_writable = \
159 pmix_iof_fd_always_ready(fid); \
160 if ((snk)->wev.always_writable) { \
161 pmix_event_evtimer_set(pmix_globals.evbase, \
162 &(snk)->wev.ev, wrthndlr, (snk)); \
163 } else { \
164 pmix_event_set(pmix_globals.evbase, \
165 &(snk)->wev.ev, (snk)->wev.fd, \
166 PMIX_EV_WRITE, \
167 wrthndlr, (snk)); \
168 } \
169 } \
170 PMIX_POST_OBJECT(snk); \
171 } while(0);
172
173
174 #define PMIX_IOF_READ_ADDEV(rev) \
175 do { \
176 struct timeval *tv = NULL; \
177 if ((rev)->always_readable) { \
178 tv = &(rev)->tv; \
179 } \
180 if (pmix_event_add(&(rev)->ev, tv)) { \
181 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \
182 } \
183 } while(0);
184
185 #define PMIX_IOF_READ_ACTIVATE(rev) \
186 do { \
187 (rev)->active = true; \
188 PMIX_POST_OBJECT(rev); \
189 PMIX_IOF_READ_ADDEV(rev); \
190 } while(0);
191
192
193 #define PMIX_IOF_READ_EVENT(rv, p, np, d, nd, fid, cbfunc, actv) \
194 do { \
195 size_t _ii; \
196 pmix_iof_read_event_t *rev; \
197 PMIX_OUTPUT_VERBOSE((1, pmix_client_globals.iof_output, \
198 "defining read event at: %s %d", \
199 __FILE__, __LINE__)); \
200 rev = PMIX_NEW(pmix_iof_read_event_t); \
201 (rev)->ntargets = (np); \
202 PMIX_PROC_CREATE((rev)->targets, (rev)->ntargets); \
203 memcpy((rev)->targets, (p), (np) * sizeof(pmix_proc_t)); \
204 if (NULL != (d)) { \
205 PMIX_INFO_CREATE((rev)->directives, (nd)); \
206 (rev)->ndirs = (nd); \
207 for (_ii=0; _ii < (nd); _ii++) { \
208 PMIX_INFO_XFER(&((rev)->directives[_ii]), &((d)[_ii])); \
209 } \
210 } \
211 rev->fd = (fid); \
212 rev->always_readable = pmix_iof_fd_always_ready(fid); \
213 *(rv) = rev; \
214 if(rev->always_readable) { \
215 pmix_event_evtimer_set(pmix_globals.evbase, \
216 &rev->ev, (cbfunc), rev); \
217 } else { \
218 pmix_event_set(pmix_globals.evbase, \
219 &rev->ev, (fid), \
220 PMIX_EV_READ, \
221 (cbfunc), rev); \
222 } \
223 if ((actv)) { \
224 PMIX_IOF_READ_ACTIVATE(rev) \
225 } \
226 } while(0);
227
228
229 PMIX_EXPORT pmix_status_t pmix_iof_flush(void);
230
231 PMIX_EXPORT pmix_status_t pmix_iof_write_output(const pmix_proc_t *name,
232 pmix_iof_channel_t stream,
233 const pmix_byte_object_t *bo,
234 pmix_iof_flags_t *flags);
235 PMIX_EXPORT void pmix_iof_static_dump_output(pmix_iof_sink_t *sink);
236 PMIX_EXPORT void pmix_iof_write_handler(int fd, short event, void *cbdata);
237 PMIX_EXPORT bool pmix_iof_stdin_check(int fd);
238 PMIX_EXPORT void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata);
239 PMIX_EXPORT void pmix_iof_stdin_cb(int fd, short event, void *cbdata);
240
241 END_C_DECLS
242
243 #endif