ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.28
Committed: Sat Jul 9 04:02:54 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
Changes since 1.27: +10 -18 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13 root 1.26 #include <endian.h>
14 root 1.1
15 root 1.21 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.13 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
17 root 1.21 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
18 root 1.16
19 root 1.27 #if __i386 || __amd64
20     # define STACKSIZE ( 256 * sizeof (long))
21     #elif __ia64
22     # define STACKSIZE (8192 * sizeof (long))
23     #else
24     # define STACKSIZE ( 512 * sizeof (long))
25     #endif
26 root 1.1
27 root 1.20 enum {
28     REQ_QUIT,
29     REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
30     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
31     };
32 root 1.1
33     typedef struct {
34     char stack[STACKSIZE];
35     } aio_thread;
36    
37 root 1.18 typedef struct aio_cb {
38     struct aio_cb *next;
39    
40 root 1.1 int type;
41     aio_thread *thread;
42    
43     int fd;
44     off_t offset;
45     size_t length;
46 root 1.2 ssize_t result;
47 root 1.8 mode_t mode; /* open */
48 root 1.1 int errorno;
49 root 1.5 SV *data, *callback;
50 root 1.1 void *dataptr;
51 root 1.2 STRLEN dataoffset;
52 root 1.13
53 root 1.21 Stat_t *statdata;
54 root 1.1 } aio_cb;
55    
56     typedef aio_cb *aio_req;
57    
58     static int started;
59 root 1.5 static int nreqs;
60 root 1.1 static int reqpipe[2], respipe[2];
61    
62 root 1.18 static aio_req qs, qe; /* queue start, queue end */
63    
64 root 1.2 static int aio_proc(void *arg);
65    
66 root 1.1 static void
67 root 1.18 start_thread (void)
68 root 1.1 {
69 root 1.2 aio_thread *thr;
70    
71     New (0, thr, 1, aio_thread);
72    
73     if (clone (aio_proc,
74 root 1.27 &(thr->stack[STACKSIZE - 16]),
75 root 1.20 CLONE_VM|CLONE_FS|CLONE_FILES,
76 root 1.2 thr) >= 0)
77     started++;
78     else
79     Safefree (thr);
80 root 1.1 }
81    
82     static void
83 root 1.18 send_reqs (void)
84 root 1.1 {
85 root 1.18 /* this write is atomic */
86     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
87     {
88     qs = qs->next;
89     if (!qs) qe = 0;
90     }
91 root 1.2 }
92    
93     static void
94 root 1.8 send_req (aio_req req)
95     {
96     nreqs++;
97 root 1.18 req->next = 0;
98    
99     if (qe)
100 root 1.22 {
101     qe->next = req;
102     qe = req;
103     }
104 root 1.18 else
105     qe = qs = req;
106    
107     send_reqs ();
108     }
109    
110     static void
111     end_thread (void)
112     {
113     aio_req req;
114     New (0, req, 1, aio_cb);
115     req->type = REQ_QUIT;
116    
117     send_req (req);
118 root 1.8 }
119    
120     static void
121 root 1.17 read_write (pTHX_
122     int dowrite, int fd, off_t offset, size_t length,
123 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
124 root 1.2 {
125 root 1.5 aio_req req;
126     STRLEN svlen;
127     char *svptr = SvPV (data, svlen);
128    
129 root 1.13 SvUPGRADE (data, SVt_PV);
130     SvPOK_on (data);
131    
132 root 1.5 if (dataoffset < 0)
133     dataoffset += svlen;
134    
135     if (dataoffset < 0 || dataoffset > svlen)
136     croak ("data offset outside of string");
137    
138     if (dowrite)
139     {
140     /* write: check length and adjust. */
141     if (length < 0 || length + dataoffset > svlen)
142     length = svlen - dataoffset;
143     }
144     else
145     {
146     /* read: grow scalar as necessary */
147     svptr = SvGROW (data, length + dataoffset);
148     }
149    
150     if (length < 0)
151     croak ("length must not be negative");
152    
153 root 1.14 Newz (0, req, 1, aio_cb);
154 root 1.5
155     if (!req)
156     croak ("out of memory during aio_req allocation");
157    
158     req->type = dowrite ? REQ_WRITE : REQ_READ;
159     req->fd = fd;
160     req->offset = offset;
161     req->length = length;
162     req->data = SvREFCNT_inc (data);
163 root 1.6 req->dataptr = (char *)svptr + dataoffset;
164 root 1.5 req->callback = SvREFCNT_inc (callback);
165    
166 root 1.8 send_req (req);
167 root 1.5 }
168    
169 root 1.22 static void
170     poll_wait ()
171     {
172     fd_set rfd;
173     FD_ZERO(&rfd);
174     FD_SET(respipe[0], &rfd);
175    
176     select (respipe[0] + 1, &rfd, 0, 0, 0);
177     }
178    
179 root 1.5 static int
180     poll_cb (pTHX)
181     {
182     dSP;
183     int count = 0;
184     aio_req req;
185    
186     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
187     {
188 root 1.22 nreqs--;
189    
190 root 1.5 if (req->type == REQ_QUIT)
191     {
192     Safefree (req->thread);
193     started--;
194     }
195     else
196     {
197     int errorno = errno;
198     errno = req->errorno;
199    
200     if (req->type == REQ_READ)
201 root 1.6 SvCUR_set (req->data, req->dataoffset
202     + req->result > 0 ? req->result : 0);
203 root 1.5
204 root 1.14 if (req->data)
205     SvREFCNT_dec (req->data);
206    
207 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
208     {
209 root 1.21 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
210     PL_laststatval = req->result;
211     PL_statcache = *(req->statdata);
212 root 1.13
213     Safefree (req->statdata);
214     }
215    
216 root 1.5 PUSHMARK (SP);
217     XPUSHs (sv_2mortal (newSViv (req->result)));
218     PUTBACK;
219     call_sv (req->callback, G_VOID);
220     SPAGAIN;
221    
222 root 1.14 if (req->callback)
223     SvREFCNT_dec (req->callback);
224 root 1.5
225     errno = errorno;
226     count++;
227     }
228    
229     Safefree (req);
230     }
231    
232 root 1.18 if (qs)
233     send_reqs ();
234    
235 root 1.5 return count;
236 root 1.2 }
237    
238 root 1.8 static sigset_t fullsigset;
239    
240 root 1.2 #undef errno
241     #include <asm/unistd.h>
242 root 1.28 #include <linux/types.h>
243 root 1.20 #include <sys/prctl.h>
244 root 1.2
245 root 1.26 #if BYTE_ORDER == LITTLE_ENDIAN
246     # define LONG_LONG_PAIR(HI, LO) LO, HI
247     #elif BYTE_ORDER == BIG_ENDIAN
248     # define LONG_LONG_PAIR(HI, LO) HI, LO
249     #endif
250    
251     #if __alpha || __ia64 || __hppa || __v850__
252 root 1.25 # define stat kernelstat
253     # define stat64 kernelstat64
254     # include <asm/stat.h>
255     # undef stat
256     # undef stat64
257     #else
258     # define kernelstat stat
259     # define kernelstat64 stat64
260     #endif
261    
262 root 1.21 #define COPY_STATDATA \
263     req->statdata->st_dev = statdata.st_dev; \
264     req->statdata->st_ino = statdata.st_ino; \
265     req->statdata->st_mode = statdata.st_mode; \
266     req->statdata->st_nlink = statdata.st_nlink; \
267     req->statdata->st_uid = statdata.st_uid; \
268     req->statdata->st_gid = statdata.st_gid; \
269     req->statdata->st_rdev = statdata.st_rdev; \
270     req->statdata->st_size = statdata.st_size; \
271     req->statdata->st_atime = statdata.st_atime; \
272     req->statdata->st_mtime = statdata.st_mtime; \
273     req->statdata->st_ctime = statdata.st_ctime; \
274     req->statdata->st_blksize = statdata.st_blksize; \
275     req->statdata->st_blocks = statdata.st_blocks; \
276    
277 root 1.2 static int
278 root 1.18 aio_proc (void *thr_arg)
279 root 1.2 {
280     aio_thread *thr = thr_arg;
281 root 1.9 aio_req req;
282 root 1.2 int errno;
283    
284 root 1.21 /* this is very much kernel-specific :(:(:( */
285 root 1.11 /* we rely on gcc's ability to create closures. */
286 root 1.28 _syscall3(__kernel_size_t,read,unsigned int,fd,char *,buf,__kernel_size_t,count)
287     _syscall3(__kernel_size_t,write,unsigned int,fd,char *,buf,__kernel_size_t,count)
288 root 1.13
289 root 1.28 _syscall3(int,open,char *,pathname,int,flags,int,mode)
290     _syscall1(int,close,unsigned int,fd)
291 root 1.11
292 root 1.28 #ifndef __NR_pread64
293     # define __NR_pread64 __NR_pread
294     # define __NR_pwrite64 __NR_write
295 root 1.21 #endif
296 root 1.28 _syscall5(__kernel_ssize_t,pread64,unsigned int,fd,char *,buf,__kernel_size_t,count,unsigned int,offset_lh,unsigned int,offset_hl)
297     _syscall5(__kernel_ssize_t,pwrite64,unsigned int,fd,char *,buf,__kernel_size_t,count,unsigned int,offset_lh,unsigned int,offset_hl)
298 root 1.21
299 root 1.25 #if __NR_stat64
300     _syscall2(int,stat64, const char *, filename, struct kernelstat64 *, buf)
301     _syscall2(int,lstat64, const char *, filename, struct kernelstat64 *, buf)
302     _syscall2(int,fstat64, int, fd, struct kernelstat64 *, buf)
303 root 1.21 #elif __NR_stat
304 root 1.25 _syscall2(int,stat, const char *, filename, struct kernelstat *, buf)
305     _syscall2(int,lstat, const char *, filename, struct kernelstat *, buf)
306     _syscall2(int,fstat, int, fd, struct kernelstat *, buf)
307 root 1.21 #else
308     # error "neither stat64 nor stat defined"
309     #endif
310 root 1.13
311 root 1.20 _syscall1(int,unlink, char *, filename);
312    
313 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
314 root 1.20 prctl (PR_SET_PDEATHSIG, SIGKILL);
315 root 1.2
316     /* then loop */
317     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
318     {
319     req->thread = thr;
320 root 1.14 errno = 0; /* strictly unnecessary */
321 root 1.2
322 root 1.18 switch (req->type)
323 root 1.2 {
324 root 1.26 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length,
325     LONG_LONG_PAIR (req->offset >> 32, req->offset & 0xffffffff)); break;
326     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length,
327     LONG_LONG_PAIR (req->offset >> 32, req->offset & 0xffffffff)); break;
328 root 1.25 #if __NR_stat64
329     struct kernelstat64 statdata;
330 root 1.21 case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
331     case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
332     case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
333     #else
334 root 1.25 struct kernelstat statdata;
335 root 1.21 case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
336     case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
337     case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
338     #endif
339 root 1.20 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
340     case REQ_CLOSE: req->result = close (req->fd); break;
341     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
342 root 1.18
343     case REQ_QUIT:
344     default:
345     write (respipe[1], (void *)&req, sizeof (req));
346     return 0;
347 root 1.2 }
348    
349 root 1.8 req->errorno = errno;
350 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
351     }
352    
353     return 0;
354 root 1.1 }
355    
356     MODULE = Linux::AIO PACKAGE = Linux::AIO
357    
358     BOOT:
359     {
360 root 1.8 sigfillset (&fullsigset);
361 root 1.9 sigdelset (&fullsigset, SIGTERM);
362     sigdelset (&fullsigset, SIGQUIT);
363     sigdelset (&fullsigset, SIGABRT);
364     sigdelset (&fullsigset, SIGINT);
365 root 1.8
366 root 1.1 if (pipe (reqpipe) || pipe (respipe))
367     croak ("unable to initialize request or result pipe");
368 root 1.18
369     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
370     croak ("cannot set result pipe to nonblocking mode");
371 root 1.5
372     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
373     croak ("cannot set result pipe to nonblocking mode");
374 root 1.1 }
375    
376     void
377     min_parallel(nthreads)
378     int nthreads
379 root 1.5 PROTOTYPE: $
380 root 1.1 CODE:
381     while (nthreads > started)
382     start_thread ();
383    
384     void
385     max_parallel(nthreads)
386     int nthreads
387 root 1.5 PROTOTYPE: $
388 root 1.1 CODE:
389 root 1.5 int cur = started;
390     while (cur > nthreads)
391     {
392     end_thread ();
393     cur--;
394     }
395    
396 root 1.1 while (started > nthreads)
397 root 1.5 {
398 root 1.22 poll_wait ();
399 root 1.17 poll_cb (aTHX);
400 root 1.5 }
401 root 1.1
402     void
403 root 1.8 aio_open(pathname,flags,mode,callback)
404 root 1.13 SV * pathname
405 root 1.8 int flags
406     int mode
407     SV * callback
408     PROTOTYPE: $$$$
409     CODE:
410     aio_req req;
411    
412 root 1.14 Newz (0, req, 1, aio_cb);
413 root 1.8
414     if (!req)
415     croak ("out of memory during aio_req allocation");
416    
417     req->type = REQ_OPEN;
418 root 1.14 req->data = newSVsv (pathname);
419     req->dataptr = SvPV_nolen (req->data);
420 root 1.8 req->fd = flags;
421     req->mode = mode;
422 root 1.10 req->callback = SvREFCNT_inc (callback);
423    
424     send_req (req);
425    
426     void
427     aio_close(fh,callback)
428 root 1.13 InputStream fh
429 root 1.10 SV * callback
430 root 1.14 PROTOTYPE: $$
431 root 1.10 CODE:
432     aio_req req;
433    
434 root 1.14 Newz (0, req, 1, aio_cb);
435 root 1.10
436     if (!req)
437     croak ("out of memory during aio_req allocation");
438    
439     req->type = REQ_CLOSE;
440     req->fd = PerlIO_fileno (fh);
441 root 1.13 req->callback = SvREFCNT_inc (callback);
442    
443     send_req (req);
444    
445     void
446     aio_read(fh,offset,length,data,dataoffset,callback)
447     InputStream fh
448     UV offset
449     IV length
450     SV * data
451     IV dataoffset
452     SV * callback
453     PROTOTYPE: $$$$$$
454     CODE:
455     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
456    
457     void
458     aio_write(fh,offset,length,data,dataoffset,callback)
459     OutputStream fh
460     UV offset
461     IV length
462     SV * data
463     IV dataoffset
464     SV * callback
465     PROTOTYPE: $$$$$$
466     CODE:
467     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
468    
469     void
470     aio_stat(fh_or_path,callback)
471     SV * fh_or_path
472     SV * callback
473     PROTOTYPE: $$
474     ALIAS:
475     aio_lstat = 1
476     CODE:
477     aio_req req;
478    
479 root 1.14 Newz (0, req, 1, aio_cb);
480 root 1.13
481     if (!req)
482     croak ("out of memory during aio_req allocation");
483    
484 root 1.21 New (0, req->statdata, 1, Stat_t);
485 root 1.13
486     if (!req->statdata)
487     croak ("out of memory during aio_req->statdata allocation");
488    
489     if (SvPOK (fh_or_path))
490     {
491     req->type = ix ? REQ_LSTAT : REQ_STAT;
492 root 1.14 req->data = newSVsv (fh_or_path);
493     req->dataptr = SvPV_nolen (req->data);
494 root 1.13 }
495     else
496     {
497     req->type = REQ_FSTAT;
498     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
499     }
500    
501 root 1.8 req->callback = SvREFCNT_inc (callback);
502    
503     send_req (req);
504 root 1.20
505     void
506     aio_unlink(pathname,callback)
507     SV * pathname
508     SV * callback
509     PROTOTYPE: $$
510     CODE:
511     aio_req req;
512    
513     Newz (0, req, 1, aio_cb);
514    
515     if (!req)
516     croak ("out of memory during aio_req allocation");
517    
518     req->type = REQ_UNLINK;
519     req->data = newSVsv (pathname);
520     req->dataptr = SvPV_nolen (req->data);
521     req->callback = SvREFCNT_inc (callback);
522    
523     send_req (req);
524 root 1.5
525     int
526     poll_fileno()
527     PROTOTYPE:
528     CODE:
529     RETVAL = respipe[0];
530     OUTPUT:
531     RETVAL
532    
533     int
534 root 1.6 poll_cb(...)
535 root 1.5 PROTOTYPE:
536     CODE:
537     RETVAL = poll_cb (aTHX);
538     OUTPUT:
539     RETVAL
540    
541 root 1.23 void
542     poll_wait()
543     PROTOTYPE:
544     CODE:
545     poll_wait ();
546    
547 root 1.5 int
548     nreqs()
549     PROTOTYPE:
550     CODE:
551     RETVAL = nreqs;
552     OUTPUT:
553     RETVAL
554