ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.25
Committed: Thu Jul 7 22:24:09 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
Changes since 1.24: +25 -15 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #include <sys/types.h>
8 #include <sys/stat.h>
9 #include <unistd.h>
10 #include <fcntl.h>
11 #include <signal.h>
12 #include <sched.h>
13
14 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
17
18 // 128 seems to be enough most everywhere. alpha needs 256.
19 #define STACKSIZE (256 * sizeof (long))
20
21 enum {
22 REQ_QUIT,
23 REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
24 REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
25 };
26
27 typedef struct {
28 char stack[STACKSIZE];
29 } aio_thread;
30
31 typedef struct aio_cb {
32 struct aio_cb *next;
33
34 int type;
35 aio_thread *thread;
36
37 int fd;
38 off_t offset;
39 size_t length;
40 ssize_t result;
41 mode_t mode; /* open */
42 int errorno;
43 SV *data, *callback;
44 void *dataptr;
45 STRLEN dataoffset;
46
47 Stat_t *statdata;
48 } aio_cb;
49
50 typedef aio_cb *aio_req;
51
52 static int started;
53 static int nreqs;
54 static int reqpipe[2], respipe[2];
55
56 static aio_req qs, qe; /* queue start, queue end */
57
58 static int aio_proc(void *arg);
59
60 static void
61 start_thread (void)
62 {
63 aio_thread *thr;
64
65 New (0, thr, 1, aio_thread);
66
67 if (clone (aio_proc,
68 &(thr->stack[STACKSIZE - sizeof (long)]),
69 CLONE_VM|CLONE_FS|CLONE_FILES,
70 thr) >= 0)
71 started++;
72 else
73 Safefree (thr);
74 }
75
76 static void
77 send_reqs (void)
78 {
79 /* this write is atomic */
80 while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
81 {
82 qs = qs->next;
83 if (!qs) qe = 0;
84 }
85 }
86
87 static void
88 send_req (aio_req req)
89 {
90 nreqs++;
91 req->next = 0;
92
93 if (qe)
94 {
95 qe->next = req;
96 qe = req;
97 }
98 else
99 qe = qs = req;
100
101 send_reqs ();
102 }
103
104 static void
105 end_thread (void)
106 {
107 aio_req req;
108 New (0, req, 1, aio_cb);
109 req->type = REQ_QUIT;
110
111 send_req (req);
112 }
113
114 static void
115 read_write (pTHX_
116 int dowrite, int fd, off_t offset, size_t length,
117 SV *data, STRLEN dataoffset, SV *callback)
118 {
119 aio_req req;
120 STRLEN svlen;
121 char *svptr = SvPV (data, svlen);
122
123 SvUPGRADE (data, SVt_PV);
124 SvPOK_on (data);
125
126 if (dataoffset < 0)
127 dataoffset += svlen;
128
129 if (dataoffset < 0 || dataoffset > svlen)
130 croak ("data offset outside of string");
131
132 if (dowrite)
133 {
134 /* write: check length and adjust. */
135 if (length < 0 || length + dataoffset > svlen)
136 length = svlen - dataoffset;
137 }
138 else
139 {
140 /* read: grow scalar as necessary */
141 svptr = SvGROW (data, length + dataoffset);
142 }
143
144 if (length < 0)
145 croak ("length must not be negative");
146
147 Newz (0, req, 1, aio_cb);
148
149 if (!req)
150 croak ("out of memory during aio_req allocation");
151
152 req->type = dowrite ? REQ_WRITE : REQ_READ;
153 req->fd = fd;
154 req->offset = offset;
155 req->length = length;
156 req->data = SvREFCNT_inc (data);
157 req->dataptr = (char *)svptr + dataoffset;
158 req->callback = SvREFCNT_inc (callback);
159
160 send_req (req);
161 }
162
163 static void
164 poll_wait ()
165 {
166 fd_set rfd;
167 FD_ZERO(&rfd);
168 FD_SET(respipe[0], &rfd);
169
170 select (respipe[0] + 1, &rfd, 0, 0, 0);
171 }
172
173 static int
174 poll_cb (pTHX)
175 {
176 dSP;
177 int count = 0;
178 aio_req req;
179
180 while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
181 {
182 nreqs--;
183
184 if (req->type == REQ_QUIT)
185 {
186 Safefree (req->thread);
187 started--;
188 }
189 else
190 {
191 int errorno = errno;
192 errno = req->errorno;
193
194 if (req->type == REQ_READ)
195 SvCUR_set (req->data, req->dataoffset
196 + req->result > 0 ? req->result : 0);
197
198 if (req->data)
199 SvREFCNT_dec (req->data);
200
201 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
202 {
203 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
204 PL_laststatval = req->result;
205 PL_statcache = *(req->statdata);
206
207 Safefree (req->statdata);
208 }
209
210 PUSHMARK (SP);
211 XPUSHs (sv_2mortal (newSViv (req->result)));
212 PUTBACK;
213 call_sv (req->callback, G_VOID);
214 SPAGAIN;
215
216 if (req->callback)
217 SvREFCNT_dec (req->callback);
218
219 errno = errorno;
220 count++;
221 }
222
223 Safefree (req);
224 }
225
226 if (qs)
227 send_reqs ();
228
229 return count;
230 }
231
232 static sigset_t fullsigset;
233
234 #undef errno
235 #include <asm/unistd.h>
236 #include <sys/prctl.h>
237
238 #if __alpha || __ia64 || __hppa || __sparc64__ || __v850__
239 # define stat kernelstat
240 # define stat64 kernelstat64
241 # include <asm/stat.h>
242 # undef stat
243 # undef stat64
244 #else
245 # define kernelstat stat
246 # define kernelstat64 stat64
247 #endif
248
249 #define COPY_STATDATA \
250 req->statdata->st_dev = statdata.st_dev; \
251 req->statdata->st_ino = statdata.st_ino; \
252 req->statdata->st_mode = statdata.st_mode; \
253 req->statdata->st_nlink = statdata.st_nlink; \
254 req->statdata->st_uid = statdata.st_uid; \
255 req->statdata->st_gid = statdata.st_gid; \
256 req->statdata->st_rdev = statdata.st_rdev; \
257 req->statdata->st_size = statdata.st_size; \
258 req->statdata->st_atime = statdata.st_atime; \
259 req->statdata->st_mtime = statdata.st_mtime; \
260 req->statdata->st_ctime = statdata.st_ctime; \
261 req->statdata->st_blksize = statdata.st_blksize; \
262 req->statdata->st_blocks = statdata.st_blocks; \
263
264 static int
265 aio_proc (void *thr_arg)
266 {
267 aio_thread *thr = thr_arg;
268 aio_req req;
269 int errno;
270
271 /* this is very much kernel-specific :(:(:( */
272 /* we rely on gcc's ability to create closures. */
273 _syscall3(int,read,int,fd,char *,buf,size_t,count)
274 _syscall3(int,write,int,fd,char *,buf,size_t,count)
275
276 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
277 _syscall1(int,close,int,fd)
278
279 #if __NR_pread64
280 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
281 _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
282 #elif __NR_pread
283 _syscall4(int,pread,int,fd,char *,buf,size_t,count,offset_t,offset)
284 _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,offset_t,offset)
285 #else
286 # error "neither pread nor pread64 defined"
287 #endif
288
289
290 #if __NR_stat64
291 _syscall2(int,stat64, const char *, filename, struct kernelstat64 *, buf)
292 _syscall2(int,lstat64, const char *, filename, struct kernelstat64 *, buf)
293 _syscall2(int,fstat64, int, fd, struct kernelstat64 *, buf)
294 #elif __NR_stat
295 _syscall2(int,stat, const char *, filename, struct kernelstat *, buf)
296 _syscall2(int,lstat, const char *, filename, struct kernelstat *, buf)
297 _syscall2(int,fstat, int, fd, struct kernelstat *, buf)
298 #else
299 # error "neither stat64 nor stat defined"
300 #endif
301
302 _syscall1(int,unlink, char *, filename);
303
304 sigprocmask (SIG_SETMASK, &fullsigset, 0);
305 prctl (PR_SET_PDEATHSIG, SIGKILL);
306
307 /* then loop */
308 while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
309 {
310 req->thread = thr;
311 errno = 0; /* strictly unnecessary */
312
313 switch (req->type)
314 {
315 #if __NR_pread64
316 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
317 case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
318 #else
319 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
320 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
321 #endif
322 #if __NR_stat64
323 struct kernelstat64 statdata;
324 case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
325 case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
326 case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
327 #else
328 struct kernelstat statdata;
329 case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
330 case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
331 case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
332 #endif
333 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
334 case REQ_CLOSE: req->result = close (req->fd); break;
335 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
336
337 case REQ_QUIT:
338 default:
339 write (respipe[1], (void *)&req, sizeof (req));
340 return 0;
341 }
342
343 req->errorno = errno;
344 write (respipe[1], (void *)&req, sizeof (req));
345 }
346
347 return 0;
348 }
349
350 MODULE = Linux::AIO PACKAGE = Linux::AIO
351
352 BOOT:
353 {
354 sigfillset (&fullsigset);
355 sigdelset (&fullsigset, SIGTERM);
356 sigdelset (&fullsigset, SIGQUIT);
357 sigdelset (&fullsigset, SIGABRT);
358 sigdelset (&fullsigset, SIGINT);
359
360 if (pipe (reqpipe) || pipe (respipe))
361 croak ("unable to initialize request or result pipe");
362
363 if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
364 croak ("cannot set result pipe to nonblocking mode");
365
366 if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
367 croak ("cannot set result pipe to nonblocking mode");
368 }
369
370 void
371 min_parallel(nthreads)
372 int nthreads
373 PROTOTYPE: $
374 CODE:
375 while (nthreads > started)
376 start_thread ();
377
378 void
379 max_parallel(nthreads)
380 int nthreads
381 PROTOTYPE: $
382 CODE:
383 int cur = started;
384 while (cur > nthreads)
385 {
386 end_thread ();
387 cur--;
388 }
389
390 while (started > nthreads)
391 {
392 poll_wait ();
393 poll_cb (aTHX);
394 }
395
396 void
397 aio_open(pathname,flags,mode,callback)
398 SV * pathname
399 int flags
400 int mode
401 SV * callback
402 PROTOTYPE: $$$$
403 CODE:
404 aio_req req;
405
406 Newz (0, req, 1, aio_cb);
407
408 if (!req)
409 croak ("out of memory during aio_req allocation");
410
411 req->type = REQ_OPEN;
412 req->data = newSVsv (pathname);
413 req->dataptr = SvPV_nolen (req->data);
414 req->fd = flags;
415 req->mode = mode;
416 req->callback = SvREFCNT_inc (callback);
417
418 send_req (req);
419
420 void
421 aio_close(fh,callback)
422 InputStream fh
423 SV * callback
424 PROTOTYPE: $$
425 CODE:
426 aio_req req;
427
428 Newz (0, req, 1, aio_cb);
429
430 if (!req)
431 croak ("out of memory during aio_req allocation");
432
433 req->type = REQ_CLOSE;
434 req->fd = PerlIO_fileno (fh);
435 req->callback = SvREFCNT_inc (callback);
436
437 send_req (req);
438
439 void
440 aio_read(fh,offset,length,data,dataoffset,callback)
441 InputStream fh
442 UV offset
443 IV length
444 SV * data
445 IV dataoffset
446 SV * callback
447 PROTOTYPE: $$$$$$
448 CODE:
449 read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
450
451 void
452 aio_write(fh,offset,length,data,dataoffset,callback)
453 OutputStream fh
454 UV offset
455 IV length
456 SV * data
457 IV dataoffset
458 SV * callback
459 PROTOTYPE: $$$$$$
460 CODE:
461 read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
462
463 void
464 aio_stat(fh_or_path,callback)
465 SV * fh_or_path
466 SV * callback
467 PROTOTYPE: $$
468 ALIAS:
469 aio_lstat = 1
470 CODE:
471 aio_req req;
472
473 Newz (0, req, 1, aio_cb);
474
475 if (!req)
476 croak ("out of memory during aio_req allocation");
477
478 New (0, req->statdata, 1, Stat_t);
479
480 if (!req->statdata)
481 croak ("out of memory during aio_req->statdata allocation");
482
483 if (SvPOK (fh_or_path))
484 {
485 req->type = ix ? REQ_LSTAT : REQ_STAT;
486 req->data = newSVsv (fh_or_path);
487 req->dataptr = SvPV_nolen (req->data);
488 }
489 else
490 {
491 req->type = REQ_FSTAT;
492 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
493 }
494
495 req->callback = SvREFCNT_inc (callback);
496
497 send_req (req);
498
499 void
500 aio_unlink(pathname,callback)
501 SV * pathname
502 SV * callback
503 PROTOTYPE: $$
504 CODE:
505 aio_req req;
506
507 Newz (0, req, 1, aio_cb);
508
509 if (!req)
510 croak ("out of memory during aio_req allocation");
511
512 req->type = REQ_UNLINK;
513 req->data = newSVsv (pathname);
514 req->dataptr = SvPV_nolen (req->data);
515 req->callback = SvREFCNT_inc (callback);
516
517 send_req (req);
518
519 int
520 poll_fileno()
521 PROTOTYPE:
522 CODE:
523 RETVAL = respipe[0];
524 OUTPUT:
525 RETVAL
526
527 int
528 poll_cb(...)
529 PROTOTYPE:
530 CODE:
531 RETVAL = poll_cb (aTHX);
532 OUTPUT:
533 RETVAL
534
535 void
536 poll_wait()
537 PROTOTYPE:
538 CODE:
539 poll_wait ();
540
541 int
542 nreqs()
543 PROTOTYPE:
544 CODE:
545 RETVAL = nreqs;
546 OUTPUT:
547 RETVAL
548