1 |
#include "EXTERN.h" |
2 |
#include "perl.h" |
3 |
#include "XSUB.h" |
4 |
|
5 |
#include <sys/types.h> |
6 |
#include <unistd.h> |
7 |
#include <fcntl.h> |
8 |
#include <signal.h> |
9 |
#include <sched.h> |
10 |
|
11 |
#define STACKSIZE 1024 /* yeah */ |
12 |
|
13 |
enum { REQ_QUIT, REQ_READ, REQ_WRITE, REQ_OPEN }; |
14 |
|
15 |
typedef struct { |
16 |
char stack[STACKSIZE]; |
17 |
} aio_thread; |
18 |
|
19 |
typedef struct { |
20 |
int type; |
21 |
aio_thread *thread; |
22 |
|
23 |
/* read/write */ |
24 |
int fd; |
25 |
off_t offset; |
26 |
size_t length; |
27 |
ssize_t result; |
28 |
mode_t mode; /* open */ |
29 |
int errorno; |
30 |
SV *data, *callback; |
31 |
void *dataptr; |
32 |
STRLEN dataoffset; |
33 |
} aio_cb; |
34 |
|
35 |
typedef aio_cb *aio_req; |
36 |
|
37 |
static int started; |
38 |
static int nreqs; |
39 |
static int reqpipe[2], respipe[2]; |
40 |
|
41 |
static int aio_proc(void *arg); |
42 |
|
43 |
static void |
44 |
start_thread(void) |
45 |
{ |
46 |
aio_thread *thr; |
47 |
|
48 |
New (0, thr, 1, aio_thread); |
49 |
|
50 |
if (clone (aio_proc, |
51 |
&(thr->stack[STACKSIZE]), |
52 |
CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND, |
53 |
thr) >= 0) |
54 |
started++; |
55 |
else |
56 |
Safefree (thr); |
57 |
} |
58 |
|
59 |
static void |
60 |
end_thread(void) |
61 |
{ |
62 |
aio_req req; |
63 |
New (0, req, 1, aio_cb); |
64 |
req->type = REQ_QUIT; |
65 |
write (reqpipe[1], &req, sizeof (aio_req)); |
66 |
} |
67 |
|
68 |
static void |
69 |
send_req (aio_req req) |
70 |
{ |
71 |
nreqs++; |
72 |
write (reqpipe[1], &req, sizeof (aio_req)); |
73 |
} |
74 |
|
75 |
static void |
76 |
read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length, |
77 |
SV *data, STRLEN dataoffset, SV*callback) |
78 |
{ |
79 |
aio_req req; |
80 |
STRLEN svlen; |
81 |
char *svptr = SvPV (data, svlen); |
82 |
|
83 |
if (dataoffset < 0) |
84 |
dataoffset += svlen; |
85 |
|
86 |
if (dataoffset < 0 || dataoffset > svlen) |
87 |
croak ("data offset outside of string"); |
88 |
|
89 |
if (dowrite) |
90 |
{ |
91 |
/* write: check length and adjust. */ |
92 |
if (length < 0 || length + dataoffset > svlen) |
93 |
length = svlen - dataoffset; |
94 |
} |
95 |
else |
96 |
{ |
97 |
/* read: grow scalar as necessary */ |
98 |
svptr = SvGROW (data, length + dataoffset); |
99 |
} |
100 |
|
101 |
if (length < 0) |
102 |
croak ("length must not be negative"); |
103 |
|
104 |
New (0, req, 1, aio_cb); |
105 |
|
106 |
if (!req) |
107 |
croak ("out of memory during aio_req allocation"); |
108 |
|
109 |
req->type = dowrite ? REQ_WRITE : REQ_READ; |
110 |
req->fd = fd; |
111 |
req->offset = offset; |
112 |
req->length = length; |
113 |
req->data = SvREFCNT_inc (data); |
114 |
req->dataptr = (char *)svptr + dataoffset; |
115 |
req->callback = SvREFCNT_inc (callback); |
116 |
|
117 |
send_req (req); |
118 |
} |
119 |
|
120 |
static int |
121 |
poll_cb (pTHX) |
122 |
{ |
123 |
dSP; |
124 |
int count = 0; |
125 |
aio_req req; |
126 |
|
127 |
while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req)) |
128 |
{ |
129 |
if (req->type == REQ_QUIT) |
130 |
{ |
131 |
Safefree (req->thread); |
132 |
started--; |
133 |
} |
134 |
else |
135 |
{ |
136 |
int errorno = errno; |
137 |
errno = req->errorno; |
138 |
|
139 |
if (req->type == REQ_READ) |
140 |
SvCUR_set (req->data, req->dataoffset |
141 |
+ req->result > 0 ? req->result : 0); |
142 |
|
143 |
PUSHMARK (SP); |
144 |
XPUSHs (sv_2mortal (newSViv (req->result))); |
145 |
PUTBACK; |
146 |
call_sv (req->callback, G_VOID); |
147 |
SPAGAIN; |
148 |
|
149 |
SvREFCNT_dec (req->data); |
150 |
SvREFCNT_dec (req->callback); |
151 |
|
152 |
errno = errorno; |
153 |
nreqs--; |
154 |
count++; |
155 |
} |
156 |
|
157 |
Safefree (req); |
158 |
} |
159 |
|
160 |
return count; |
161 |
} |
162 |
|
163 |
static sigset_t fullsigset; |
164 |
|
165 |
#undef errno |
166 |
#include <asm/unistd.h> |
167 |
|
168 |
static int |
169 |
aio_proc(void *thr_arg) |
170 |
{ |
171 |
aio_thread *thr = thr_arg; |
172 |
int sig; |
173 |
int errno; |
174 |
aio_req req; |
175 |
|
176 |
sigprocmask (SIG_SETMASK, &fullsigset, 0); |
177 |
|
178 |
/* we rely on gcc's ability to create closures. */ |
179 |
_syscall3(int,lseek,int,fd,off_t,offset,int,whence); |
180 |
_syscall3(int,read,int,fd,char *,buf,off_t,count); |
181 |
_syscall3(int,write,int,fd,char *,buf,off_t,count); |
182 |
_syscall3(int,open,char *,pathname,int,flags,mode_t,mode); |
183 |
|
184 |
/* then loop */ |
185 |
while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req)) |
186 |
{ |
187 |
req->thread = thr; |
188 |
errno = 0; |
189 |
|
190 |
if (req->type == REQ_READ || req->type == REQ_WRITE) |
191 |
{ |
192 |
if (lseek (req->fd, req->offset, SEEK_SET) == req->offset) |
193 |
{ |
194 |
if (req->type == REQ_READ) |
195 |
req->result = read (req->fd, req->dataptr, req->length); |
196 |
else |
197 |
req->result = write(req->fd, req->dataptr, req->length); |
198 |
} |
199 |
} |
200 |
else if (req->type == REQ_OPEN) |
201 |
{ |
202 |
req->result = open (req->dataptr, req->fd, req->mode); |
203 |
} |
204 |
else |
205 |
{ |
206 |
write (respipe[1], (void *)&req, sizeof (req)); |
207 |
break; |
208 |
} |
209 |
|
210 |
req->errorno = errno; |
211 |
write (respipe[1], (void *)&req, sizeof (req)); |
212 |
} |
213 |
|
214 |
return 0; |
215 |
} |
216 |
|
217 |
MODULE = Linux::AIO PACKAGE = Linux::AIO |
218 |
|
219 |
BOOT: |
220 |
{ |
221 |
sigfillset (&fullsigset); |
222 |
|
223 |
if (pipe (reqpipe) || pipe (respipe)) |
224 |
croak ("unable to initialize request or result pipe"); |
225 |
|
226 |
if (fcntl (respipe[0], F_SETFL, O_NONBLOCK)) |
227 |
croak ("cannot set result pipe to nonblocking mode"); |
228 |
} |
229 |
|
230 |
void |
231 |
min_parallel(nthreads) |
232 |
int nthreads |
233 |
PROTOTYPE: $ |
234 |
CODE: |
235 |
while (nthreads > started) |
236 |
start_thread (); |
237 |
|
238 |
void |
239 |
max_parallel(nthreads) |
240 |
int nthreads |
241 |
PROTOTYPE: $ |
242 |
CODE: |
243 |
int cur = started; |
244 |
while (cur > nthreads) |
245 |
{ |
246 |
end_thread (); |
247 |
cur--; |
248 |
} |
249 |
|
250 |
poll_cb (); |
251 |
while (started > nthreads) |
252 |
{ |
253 |
sched_yield (); |
254 |
poll_cb (); |
255 |
} |
256 |
|
257 |
void |
258 |
aio_read(fh,offset,length,data,dataoffset,callback) |
259 |
PerlIO * fh |
260 |
UV offset |
261 |
STRLEN length |
262 |
SV * data |
263 |
STRLEN dataoffset |
264 |
SV * callback |
265 |
PROTOTYPE: $$$$$$ |
266 |
ALIAS: |
267 |
aio_write = 1 |
268 |
CODE: |
269 |
SvUPGRADE (data, SVt_PV); |
270 |
SvPOK_on (data); |
271 |
read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback); |
272 |
|
273 |
void |
274 |
aio_open(pathname,flags,mode,callback) |
275 |
char * pathname |
276 |
int flags |
277 |
int mode |
278 |
SV * callback |
279 |
PROTOTYPE: $$$$ |
280 |
CODE: |
281 |
aio_req req; |
282 |
|
283 |
New (0, req, 1, aio_cb); |
284 |
|
285 |
if (!req) |
286 |
croak ("out of memory during aio_req allocation"); |
287 |
|
288 |
req->type = REQ_OPEN; |
289 |
req->dataptr = pathname; |
290 |
req->fd = flags; |
291 |
req->mode = mode; |
292 |
req->callback = SvREFCNT_inc (callback); |
293 |
|
294 |
send_req (req); |
295 |
|
296 |
int |
297 |
poll_fileno() |
298 |
PROTOTYPE: |
299 |
CODE: |
300 |
RETVAL = respipe[0]; |
301 |
OUTPUT: |
302 |
RETVAL |
303 |
|
304 |
int |
305 |
poll_cb(...) |
306 |
PROTOTYPE: |
307 |
CODE: |
308 |
RETVAL = poll_cb (aTHX); |
309 |
OUTPUT: |
310 |
RETVAL |
311 |
|
312 |
int |
313 |
nreqs() |
314 |
PROTOTYPE: |
315 |
CODE: |
316 |
RETVAL = nreqs; |
317 |
OUTPUT: |
318 |
RETVAL |
319 |
|