ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.27
Committed: Fri Jul 8 03:10:00 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
Changes since 1.26: +8 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13 root 1.26 #include <endian.h>
14 root 1.1
15 root 1.21 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.13 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
17 root 1.21 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
18 root 1.16
19 root 1.27 #if __i386 || __amd64
20     # define STACKSIZE ( 256 * sizeof (long))
21     #elif __ia64
22     # define STACKSIZE (8192 * sizeof (long))
23     #else
24     # define STACKSIZE ( 512 * sizeof (long))
25     #endif
26 root 1.1
27 root 1.20 enum {
28     REQ_QUIT,
29     REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
30     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
31     };
32 root 1.1
33     typedef struct {
34     char stack[STACKSIZE];
35     } aio_thread;
36    
37 root 1.18 typedef struct aio_cb {
38     struct aio_cb *next;
39    
40 root 1.1 int type;
41     aio_thread *thread;
42    
43     int fd;
44     off_t offset;
45     size_t length;
46 root 1.2 ssize_t result;
47 root 1.8 mode_t mode; /* open */
48 root 1.1 int errorno;
49 root 1.5 SV *data, *callback;
50 root 1.1 void *dataptr;
51 root 1.2 STRLEN dataoffset;
52 root 1.13
53 root 1.21 Stat_t *statdata;
54 root 1.1 } aio_cb;
55    
56     typedef aio_cb *aio_req;
57    
58     static int started;
59 root 1.5 static int nreqs;
60 root 1.1 static int reqpipe[2], respipe[2];
61    
62 root 1.18 static aio_req qs, qe; /* queue start, queue end */
63    
64 root 1.2 static int aio_proc(void *arg);
65    
66 root 1.1 static void
67 root 1.18 start_thread (void)
68 root 1.1 {
69 root 1.2 aio_thread *thr;
70    
71     New (0, thr, 1, aio_thread);
72    
73     if (clone (aio_proc,
74 root 1.27 &(thr->stack[STACKSIZE - 16]),
75 root 1.20 CLONE_VM|CLONE_FS|CLONE_FILES,
76 root 1.2 thr) >= 0)
77     started++;
78     else
79     Safefree (thr);
80 root 1.1 }
81    
82     static void
83 root 1.18 send_reqs (void)
84 root 1.1 {
85 root 1.18 /* this write is atomic */
86     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
87     {
88     qs = qs->next;
89     if (!qs) qe = 0;
90     }
91 root 1.2 }
92    
93     static void
94 root 1.8 send_req (aio_req req)
95     {
96     nreqs++;
97 root 1.18 req->next = 0;
98    
99     if (qe)
100 root 1.22 {
101     qe->next = req;
102     qe = req;
103     }
104 root 1.18 else
105     qe = qs = req;
106    
107     send_reqs ();
108     }
109    
110     static void
111     end_thread (void)
112     {
113     aio_req req;
114     New (0, req, 1, aio_cb);
115     req->type = REQ_QUIT;
116    
117     send_req (req);
118 root 1.8 }
119    
120     static void
121 root 1.17 read_write (pTHX_
122     int dowrite, int fd, off_t offset, size_t length,
123 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
124 root 1.2 {
125 root 1.5 aio_req req;
126     STRLEN svlen;
127     char *svptr = SvPV (data, svlen);
128    
129 root 1.13 SvUPGRADE (data, SVt_PV);
130     SvPOK_on (data);
131    
132 root 1.5 if (dataoffset < 0)
133     dataoffset += svlen;
134    
135     if (dataoffset < 0 || dataoffset > svlen)
136     croak ("data offset outside of string");
137    
138     if (dowrite)
139     {
140     /* write: check length and adjust. */
141     if (length < 0 || length + dataoffset > svlen)
142     length = svlen - dataoffset;
143     }
144     else
145     {
146     /* read: grow scalar as necessary */
147     svptr = SvGROW (data, length + dataoffset);
148     }
149    
150     if (length < 0)
151     croak ("length must not be negative");
152    
153 root 1.14 Newz (0, req, 1, aio_cb);
154 root 1.5
155     if (!req)
156     croak ("out of memory during aio_req allocation");
157    
158     req->type = dowrite ? REQ_WRITE : REQ_READ;
159     req->fd = fd;
160     req->offset = offset;
161     req->length = length;
162     req->data = SvREFCNT_inc (data);
163 root 1.6 req->dataptr = (char *)svptr + dataoffset;
164 root 1.5 req->callback = SvREFCNT_inc (callback);
165    
166 root 1.8 send_req (req);
167 root 1.5 }
168    
169 root 1.22 static void
170     poll_wait ()
171     {
172     fd_set rfd;
173     FD_ZERO(&rfd);
174     FD_SET(respipe[0], &rfd);
175    
176     select (respipe[0] + 1, &rfd, 0, 0, 0);
177     }
178    
179 root 1.5 static int
180     poll_cb (pTHX)
181     {
182     dSP;
183     int count = 0;
184     aio_req req;
185    
186     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
187     {
188 root 1.22 nreqs--;
189    
190 root 1.5 if (req->type == REQ_QUIT)
191     {
192     Safefree (req->thread);
193     started--;
194     }
195     else
196     {
197     int errorno = errno;
198     errno = req->errorno;
199    
200     if (req->type == REQ_READ)
201 root 1.6 SvCUR_set (req->data, req->dataoffset
202     + req->result > 0 ? req->result : 0);
203 root 1.5
204 root 1.14 if (req->data)
205     SvREFCNT_dec (req->data);
206    
207 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
208     {
209 root 1.21 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
210     PL_laststatval = req->result;
211     PL_statcache = *(req->statdata);
212 root 1.13
213     Safefree (req->statdata);
214     }
215    
216 root 1.5 PUSHMARK (SP);
217     XPUSHs (sv_2mortal (newSViv (req->result)));
218     PUTBACK;
219     call_sv (req->callback, G_VOID);
220     SPAGAIN;
221    
222 root 1.14 if (req->callback)
223     SvREFCNT_dec (req->callback);
224 root 1.5
225     errno = errorno;
226     count++;
227     }
228    
229     Safefree (req);
230     }
231    
232 root 1.18 if (qs)
233     send_reqs ();
234    
235 root 1.5 return count;
236 root 1.2 }
237    
238 root 1.8 static sigset_t fullsigset;
239    
240 root 1.2 #undef errno
241     #include <asm/unistd.h>
242 root 1.20 #include <sys/prctl.h>
243 root 1.2
244 root 1.26 #if BYTE_ORDER == LITTLE_ENDIAN
245     # define LONG_LONG_PAIR(HI, LO) LO, HI
246     #elif BYTE_ORDER == BIG_ENDIAN
247     # define LONG_LONG_PAIR(HI, LO) HI, LO
248     #endif
249    
250     #if __alpha || __ia64 || __hppa || __v850__
251 root 1.25 # define stat kernelstat
252     # define stat64 kernelstat64
253     # include <asm/stat.h>
254     # undef stat
255     # undef stat64
256     #else
257     # define kernelstat stat
258     # define kernelstat64 stat64
259     #endif
260    
261 root 1.21 #define COPY_STATDATA \
262     req->statdata->st_dev = statdata.st_dev; \
263     req->statdata->st_ino = statdata.st_ino; \
264     req->statdata->st_mode = statdata.st_mode; \
265     req->statdata->st_nlink = statdata.st_nlink; \
266     req->statdata->st_uid = statdata.st_uid; \
267     req->statdata->st_gid = statdata.st_gid; \
268     req->statdata->st_rdev = statdata.st_rdev; \
269     req->statdata->st_size = statdata.st_size; \
270     req->statdata->st_atime = statdata.st_atime; \
271     req->statdata->st_mtime = statdata.st_mtime; \
272     req->statdata->st_ctime = statdata.st_ctime; \
273     req->statdata->st_blksize = statdata.st_blksize; \
274     req->statdata->st_blocks = statdata.st_blocks; \
275    
276 root 1.2 static int
277 root 1.18 aio_proc (void *thr_arg)
278 root 1.2 {
279     aio_thread *thr = thr_arg;
280 root 1.9 aio_req req;
281 root 1.2 int errno;
282    
283 root 1.21 /* this is very much kernel-specific :(:(:( */
284 root 1.11 /* we rely on gcc's ability to create closures. */
285 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
286     _syscall3(int,write,int,fd,char *,buf,size_t,count)
287    
288 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
289     _syscall1(int,close,int,fd)
290    
291 root 1.25 #if __NR_pread64
292 root 1.26 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lh,unsigned int,offset_hl)
293     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lh,unsigned int,offset_hl)
294 root 1.21 #elif __NR_pread
295     _syscall4(int,pread,int,fd,char *,buf,size_t,count,offset_t,offset)
296     _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,offset_t,offset)
297     #else
298     # error "neither pread nor pread64 defined"
299     #endif
300 root 1.13
301 root 1.21
302 root 1.25 #if __NR_stat64
303     _syscall2(int,stat64, const char *, filename, struct kernelstat64 *, buf)
304     _syscall2(int,lstat64, const char *, filename, struct kernelstat64 *, buf)
305     _syscall2(int,fstat64, int, fd, struct kernelstat64 *, buf)
306 root 1.21 #elif __NR_stat
307 root 1.25 _syscall2(int,stat, const char *, filename, struct kernelstat *, buf)
308     _syscall2(int,lstat, const char *, filename, struct kernelstat *, buf)
309     _syscall2(int,fstat, int, fd, struct kernelstat *, buf)
310 root 1.21 #else
311     # error "neither stat64 nor stat defined"
312     #endif
313 root 1.13
314 root 1.20 _syscall1(int,unlink, char *, filename);
315    
316 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
317 root 1.20 prctl (PR_SET_PDEATHSIG, SIGKILL);
318 root 1.2
319     /* then loop */
320     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
321     {
322     req->thread = thr;
323 root 1.14 errno = 0; /* strictly unnecessary */
324 root 1.2
325 root 1.18 switch (req->type)
326 root 1.2 {
327 root 1.25 #if __NR_pread64
328 root 1.26 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length,
329     LONG_LONG_PAIR (req->offset >> 32, req->offset & 0xffffffff)); break;
330     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length,
331     LONG_LONG_PAIR (req->offset >> 32, req->offset & 0xffffffff)); break;
332 root 1.21 #else
333     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
334     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
335     #endif
336 root 1.25 #if __NR_stat64
337     struct kernelstat64 statdata;
338 root 1.21 case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
339     case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
340     case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
341     #else
342 root 1.25 struct kernelstat statdata;
343 root 1.21 case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
344     case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
345     case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
346     #endif
347 root 1.20 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
348     case REQ_CLOSE: req->result = close (req->fd); break;
349     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
350 root 1.18
351     case REQ_QUIT:
352     default:
353     write (respipe[1], (void *)&req, sizeof (req));
354     return 0;
355 root 1.2 }
356    
357 root 1.8 req->errorno = errno;
358 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
359     }
360    
361     return 0;
362 root 1.1 }
363    
364     MODULE = Linux::AIO PACKAGE = Linux::AIO
365    
366     BOOT:
367     {
368 root 1.8 sigfillset (&fullsigset);
369 root 1.9 sigdelset (&fullsigset, SIGTERM);
370     sigdelset (&fullsigset, SIGQUIT);
371     sigdelset (&fullsigset, SIGABRT);
372     sigdelset (&fullsigset, SIGINT);
373 root 1.8
374 root 1.1 if (pipe (reqpipe) || pipe (respipe))
375     croak ("unable to initialize request or result pipe");
376 root 1.18
377     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
378     croak ("cannot set result pipe to nonblocking mode");
379 root 1.5
380     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
381     croak ("cannot set result pipe to nonblocking mode");
382 root 1.1 }
383    
384     void
385     min_parallel(nthreads)
386     int nthreads
387 root 1.5 PROTOTYPE: $
388 root 1.1 CODE:
389     while (nthreads > started)
390     start_thread ();
391    
392     void
393     max_parallel(nthreads)
394     int nthreads
395 root 1.5 PROTOTYPE: $
396 root 1.1 CODE:
397 root 1.5 int cur = started;
398     while (cur > nthreads)
399     {
400     end_thread ();
401     cur--;
402     }
403    
404 root 1.1 while (started > nthreads)
405 root 1.5 {
406 root 1.22 poll_wait ();
407 root 1.17 poll_cb (aTHX);
408 root 1.5 }
409 root 1.1
410     void
411 root 1.8 aio_open(pathname,flags,mode,callback)
412 root 1.13 SV * pathname
413 root 1.8 int flags
414     int mode
415     SV * callback
416     PROTOTYPE: $$$$
417     CODE:
418     aio_req req;
419    
420 root 1.14 Newz (0, req, 1, aio_cb);
421 root 1.8
422     if (!req)
423     croak ("out of memory during aio_req allocation");
424    
425     req->type = REQ_OPEN;
426 root 1.14 req->data = newSVsv (pathname);
427     req->dataptr = SvPV_nolen (req->data);
428 root 1.8 req->fd = flags;
429     req->mode = mode;
430 root 1.10 req->callback = SvREFCNT_inc (callback);
431    
432     send_req (req);
433    
434     void
435     aio_close(fh,callback)
436 root 1.13 InputStream fh
437 root 1.10 SV * callback
438 root 1.14 PROTOTYPE: $$
439 root 1.10 CODE:
440     aio_req req;
441    
442 root 1.14 Newz (0, req, 1, aio_cb);
443 root 1.10
444     if (!req)
445     croak ("out of memory during aio_req allocation");
446    
447     req->type = REQ_CLOSE;
448     req->fd = PerlIO_fileno (fh);
449 root 1.13 req->callback = SvREFCNT_inc (callback);
450    
451     send_req (req);
452    
453     void
454     aio_read(fh,offset,length,data,dataoffset,callback)
455     InputStream fh
456     UV offset
457     IV length
458     SV * data
459     IV dataoffset
460     SV * callback
461     PROTOTYPE: $$$$$$
462     CODE:
463     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
464    
465     void
466     aio_write(fh,offset,length,data,dataoffset,callback)
467     OutputStream fh
468     UV offset
469     IV length
470     SV * data
471     IV dataoffset
472     SV * callback
473     PROTOTYPE: $$$$$$
474     CODE:
475     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
476    
477     void
478     aio_stat(fh_or_path,callback)
479     SV * fh_or_path
480     SV * callback
481     PROTOTYPE: $$
482     ALIAS:
483     aio_lstat = 1
484     CODE:
485     aio_req req;
486    
487 root 1.14 Newz (0, req, 1, aio_cb);
488 root 1.13
489     if (!req)
490     croak ("out of memory during aio_req allocation");
491    
492 root 1.21 New (0, req->statdata, 1, Stat_t);
493 root 1.13
494     if (!req->statdata)
495     croak ("out of memory during aio_req->statdata allocation");
496    
497     if (SvPOK (fh_or_path))
498     {
499     req->type = ix ? REQ_LSTAT : REQ_STAT;
500 root 1.14 req->data = newSVsv (fh_or_path);
501     req->dataptr = SvPV_nolen (req->data);
502 root 1.13 }
503     else
504     {
505     req->type = REQ_FSTAT;
506     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
507     }
508    
509 root 1.8 req->callback = SvREFCNT_inc (callback);
510    
511     send_req (req);
512 root 1.20
513     void
514     aio_unlink(pathname,callback)
515     SV * pathname
516     SV * callback
517     PROTOTYPE: $$
518     CODE:
519     aio_req req;
520    
521     Newz (0, req, 1, aio_cb);
522    
523     if (!req)
524     croak ("out of memory during aio_req allocation");
525    
526     req->type = REQ_UNLINK;
527     req->data = newSVsv (pathname);
528     req->dataptr = SvPV_nolen (req->data);
529     req->callback = SvREFCNT_inc (callback);
530    
531     send_req (req);
532 root 1.5
533     int
534     poll_fileno()
535     PROTOTYPE:
536     CODE:
537     RETVAL = respipe[0];
538     OUTPUT:
539     RETVAL
540    
541     int
542 root 1.6 poll_cb(...)
543 root 1.5 PROTOTYPE:
544     CODE:
545     RETVAL = poll_cb (aTHX);
546     OUTPUT:
547     RETVAL
548    
549 root 1.23 void
550     poll_wait()
551     PROTOTYPE:
552     CODE:
553     poll_wait ();
554    
555 root 1.5 int
556     nreqs()
557     PROTOTYPE:
558     CODE:
559     RETVAL = nreqs;
560     OUTPUT:
561     RETVAL
562