1 |
#include "EXTERN.h" |
2 |
#include "perl.h" |
3 |
#include "XSUB.h" |
4 |
|
5 |
#include <sys/types.h> |
6 |
#include <unistd.h> |
7 |
#include <fcntl.h> |
8 |
#include <signal.h> |
9 |
#include <sched.h> |
10 |
|
11 |
#define STACKSIZE 1024 /* yeah */ |
12 |
|
13 |
enum { REQ_QUIT, REQ_READ, REQ_WRITE, REQ_OPEN, REQ_CLOSE }; |
14 |
|
15 |
typedef struct { |
16 |
char stack[STACKSIZE]; |
17 |
} aio_thread; |
18 |
|
19 |
typedef struct { |
20 |
int type; |
21 |
aio_thread *thread; |
22 |
|
23 |
/* read/write */ |
24 |
int fd; |
25 |
off_t offset; |
26 |
size_t length; |
27 |
ssize_t result; |
28 |
mode_t mode; /* open */ |
29 |
int errorno; |
30 |
SV *data, *callback; |
31 |
void *dataptr; |
32 |
STRLEN dataoffset; |
33 |
} aio_cb; |
34 |
|
35 |
typedef aio_cb *aio_req; |
36 |
|
37 |
static int started; |
38 |
static int nreqs; |
39 |
static int reqpipe[2], respipe[2]; |
40 |
|
41 |
static int aio_proc(void *arg); |
42 |
|
43 |
static void |
44 |
start_thread(void) |
45 |
{ |
46 |
aio_thread *thr; |
47 |
|
48 |
New (0, thr, 1, aio_thread); |
49 |
|
50 |
if (clone (aio_proc, |
51 |
&(thr->stack[STACKSIZE]), |
52 |
CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND, |
53 |
thr) >= 0) |
54 |
started++; |
55 |
else |
56 |
Safefree (thr); |
57 |
} |
58 |
|
59 |
static void |
60 |
end_thread(void) |
61 |
{ |
62 |
aio_req req; |
63 |
New (0, req, 1, aio_cb); |
64 |
req->type = REQ_QUIT; |
65 |
write (reqpipe[1], &req, sizeof (aio_req)); |
66 |
} |
67 |
|
68 |
static void |
69 |
send_req (aio_req req) |
70 |
{ |
71 |
nreqs++; |
72 |
write (reqpipe[1], &req, sizeof (aio_req)); |
73 |
} |
74 |
|
75 |
static void |
76 |
read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length, |
77 |
SV *data, STRLEN dataoffset, SV*callback) |
78 |
{ |
79 |
aio_req req; |
80 |
STRLEN svlen; |
81 |
char *svptr = SvPV (data, svlen); |
82 |
|
83 |
if (dataoffset < 0) |
84 |
dataoffset += svlen; |
85 |
|
86 |
if (dataoffset < 0 || dataoffset > svlen) |
87 |
croak ("data offset outside of string"); |
88 |
|
89 |
if (dowrite) |
90 |
{ |
91 |
/* write: check length and adjust. */ |
92 |
if (length < 0 || length + dataoffset > svlen) |
93 |
length = svlen - dataoffset; |
94 |
} |
95 |
else |
96 |
{ |
97 |
/* read: grow scalar as necessary */ |
98 |
svptr = SvGROW (data, length + dataoffset); |
99 |
} |
100 |
|
101 |
if (length < 0) |
102 |
croak ("length must not be negative"); |
103 |
|
104 |
New (0, req, 1, aio_cb); |
105 |
|
106 |
if (!req) |
107 |
croak ("out of memory during aio_req allocation"); |
108 |
|
109 |
req->type = dowrite ? REQ_WRITE : REQ_READ; |
110 |
req->fd = fd; |
111 |
req->offset = offset; |
112 |
req->length = length; |
113 |
req->data = SvREFCNT_inc (data); |
114 |
req->dataptr = (char *)svptr + dataoffset; |
115 |
req->callback = SvREFCNT_inc (callback); |
116 |
|
117 |
send_req (req); |
118 |
} |
119 |
|
120 |
static int |
121 |
poll_cb (pTHX) |
122 |
{ |
123 |
dSP; |
124 |
int count = 0; |
125 |
aio_req req; |
126 |
|
127 |
while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req)) |
128 |
{ |
129 |
if (req->type == REQ_QUIT) |
130 |
{ |
131 |
Safefree (req->thread); |
132 |
started--; |
133 |
} |
134 |
else |
135 |
{ |
136 |
int errorno = errno; |
137 |
errno = req->errorno; |
138 |
|
139 |
if (req->type == REQ_READ) |
140 |
SvCUR_set (req->data, req->dataoffset |
141 |
+ req->result > 0 ? req->result : 0); |
142 |
|
143 |
PUSHMARK (SP); |
144 |
XPUSHs (sv_2mortal (newSViv (req->result))); |
145 |
PUTBACK; |
146 |
call_sv (req->callback, G_VOID); |
147 |
SPAGAIN; |
148 |
|
149 |
SvREFCNT_dec (req->data); |
150 |
SvREFCNT_dec (req->callback); |
151 |
|
152 |
errno = errorno; |
153 |
nreqs--; |
154 |
count++; |
155 |
} |
156 |
|
157 |
Safefree (req); |
158 |
} |
159 |
|
160 |
return count; |
161 |
} |
162 |
|
163 |
static sigset_t fullsigset; |
164 |
|
165 |
#undef errno |
166 |
#include <asm/unistd.h> |
167 |
|
168 |
static int |
169 |
aio_proc(void *thr_arg) |
170 |
{ |
171 |
aio_thread *thr = thr_arg; |
172 |
aio_req req; |
173 |
int errno; |
174 |
|
175 |
/* we rely on gcc's ability to create closures. */ |
176 |
_syscall3(int,lseek,int,fd,off_t,offset,int,whence) |
177 |
_syscall3(int,read,int,fd,char *,buf,off_t,count) |
178 |
_syscall3(int,write,int,fd,char *,buf,off_t,count) |
179 |
_syscall3(int,open,char *,pathname,int,flags,mode_t,mode) |
180 |
_syscall1(int,close,int,fd) |
181 |
|
182 |
sigprocmask (SIG_SETMASK, &fullsigset, 0); |
183 |
|
184 |
/* then loop */ |
185 |
while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req)) |
186 |
{ |
187 |
req->thread = thr; |
188 |
errno = 0; |
189 |
|
190 |
if (req->type == REQ_READ || req->type == REQ_WRITE) |
191 |
{ |
192 |
if (lseek (req->fd, req->offset, SEEK_SET) == req->offset) |
193 |
{ |
194 |
if (req->type == REQ_READ) |
195 |
req->result = read (req->fd, req->dataptr, req->length); |
196 |
else |
197 |
req->result = write(req->fd, req->dataptr, req->length); |
198 |
} |
199 |
} |
200 |
else if (req->type == REQ_OPEN) |
201 |
{ |
202 |
req->result = open (req->dataptr, req->fd, req->mode); |
203 |
} |
204 |
else if (req->type == REQ_CLOSE) |
205 |
{ |
206 |
req->result = close (req->fd); |
207 |
} |
208 |
else |
209 |
{ |
210 |
write (respipe[1], (void *)&req, sizeof (req)); |
211 |
break; |
212 |
} |
213 |
|
214 |
req->errorno = errno; |
215 |
write (respipe[1], (void *)&req, sizeof (req)); |
216 |
} |
217 |
|
218 |
return 0; |
219 |
} |
220 |
|
221 |
MODULE = Linux::AIO PACKAGE = Linux::AIO |
222 |
|
223 |
BOOT: |
224 |
{ |
225 |
sigfillset (&fullsigset); |
226 |
sigdelset (&fullsigset, SIGTERM); |
227 |
sigdelset (&fullsigset, SIGQUIT); |
228 |
sigdelset (&fullsigset, SIGABRT); |
229 |
sigdelset (&fullsigset, SIGINT); |
230 |
|
231 |
if (pipe (reqpipe) || pipe (respipe)) |
232 |
croak ("unable to initialize request or result pipe"); |
233 |
|
234 |
if (fcntl (respipe[0], F_SETFL, O_NONBLOCK)) |
235 |
croak ("cannot set result pipe to nonblocking mode"); |
236 |
} |
237 |
|
238 |
void |
239 |
min_parallel(nthreads) |
240 |
int nthreads |
241 |
PROTOTYPE: $ |
242 |
CODE: |
243 |
while (nthreads > started) |
244 |
start_thread (); |
245 |
|
246 |
void |
247 |
max_parallel(nthreads) |
248 |
int nthreads |
249 |
PROTOTYPE: $ |
250 |
CODE: |
251 |
int cur = started; |
252 |
while (cur > nthreads) |
253 |
{ |
254 |
end_thread (); |
255 |
cur--; |
256 |
} |
257 |
|
258 |
poll_cb (); |
259 |
while (started > nthreads) |
260 |
{ |
261 |
sched_yield (); |
262 |
poll_cb (); |
263 |
} |
264 |
|
265 |
void |
266 |
aio_read(fh,offset,length,data,dataoffset,callback) |
267 |
PerlIO * fh |
268 |
UV offset |
269 |
STRLEN length |
270 |
SV * data |
271 |
STRLEN dataoffset |
272 |
SV * callback |
273 |
PROTOTYPE: $$$$$$ |
274 |
ALIAS: |
275 |
aio_write = 1 |
276 |
CODE: |
277 |
SvUPGRADE (data, SVt_PV); |
278 |
SvPOK_on (data); |
279 |
read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback); |
280 |
|
281 |
void |
282 |
aio_open(pathname,flags,mode,callback) |
283 |
char * pathname |
284 |
int flags |
285 |
int mode |
286 |
SV * callback |
287 |
PROTOTYPE: $$$$ |
288 |
CODE: |
289 |
aio_req req; |
290 |
|
291 |
New (0, req, 1, aio_cb); |
292 |
|
293 |
if (!req) |
294 |
croak ("out of memory during aio_req allocation"); |
295 |
|
296 |
req->type = REQ_OPEN; |
297 |
req->dataptr = pathname; |
298 |
req->fd = flags; |
299 |
req->mode = mode; |
300 |
req->callback = SvREFCNT_inc (callback); |
301 |
|
302 |
send_req (req); |
303 |
|
304 |
void |
305 |
aio_close(fh,callback) |
306 |
PerlIO * fh |
307 |
SV * callback |
308 |
PROTOTYPE: $ |
309 |
CODE: |
310 |
aio_req req; |
311 |
|
312 |
New (0, req, 1, aio_cb); |
313 |
|
314 |
if (!req) |
315 |
croak ("out of memory during aio_req allocation"); |
316 |
|
317 |
req->type = REQ_CLOSE; |
318 |
req->fd = PerlIO_fileno (fh); |
319 |
req->callback = SvREFCNT_inc (callback); |
320 |
|
321 |
send_req (req); |
322 |
|
323 |
int |
324 |
poll_fileno() |
325 |
PROTOTYPE: |
326 |
CODE: |
327 |
RETVAL = respipe[0]; |
328 |
OUTPUT: |
329 |
RETVAL |
330 |
|
331 |
int |
332 |
poll_cb(...) |
333 |
PROTOTYPE: |
334 |
CODE: |
335 |
RETVAL = poll_cb (aTHX); |
336 |
OUTPUT: |
337 |
RETVAL |
338 |
|
339 |
int |
340 |
nreqs() |
341 |
PROTOTYPE: |
342 |
CODE: |
343 |
RETVAL = nreqs; |
344 |
OUTPUT: |
345 |
RETVAL |
346 |
|