ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.26
Committed: Thu Jul 7 23:17:23 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-1_6
Changes since 1.25: +14 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13 root 1.26 #include <endian.h>
14 root 1.1
15 root 1.21 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.13 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
17 root 1.21 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
18 root 1.16
19 root 1.25 // 128 seems to be enough most everywhere. alpha needs 256.
20     #define STACKSIZE (256 * sizeof (long))
21 root 1.1
22 root 1.20 enum {
23     REQ_QUIT,
24     REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
25     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
26     };
27 root 1.1
28     typedef struct {
29     char stack[STACKSIZE];
30     } aio_thread;
31    
32 root 1.18 typedef struct aio_cb {
33     struct aio_cb *next;
34    
35 root 1.1 int type;
36     aio_thread *thread;
37    
38     int fd;
39     off_t offset;
40     size_t length;
41 root 1.2 ssize_t result;
42 root 1.8 mode_t mode; /* open */
43 root 1.1 int errorno;
44 root 1.5 SV *data, *callback;
45 root 1.1 void *dataptr;
46 root 1.2 STRLEN dataoffset;
47 root 1.13
48 root 1.21 Stat_t *statdata;
49 root 1.1 } aio_cb;
50    
51     typedef aio_cb *aio_req;
52    
53     static int started;
54 root 1.5 static int nreqs;
55 root 1.1 static int reqpipe[2], respipe[2];
56    
57 root 1.18 static aio_req qs, qe; /* queue start, queue end */
58    
59 root 1.2 static int aio_proc(void *arg);
60    
61 root 1.1 static void
62 root 1.18 start_thread (void)
63 root 1.1 {
64 root 1.2 aio_thread *thr;
65    
66     New (0, thr, 1, aio_thread);
67    
68     if (clone (aio_proc,
69 root 1.22 &(thr->stack[STACKSIZE - sizeof (long)]),
70 root 1.20 CLONE_VM|CLONE_FS|CLONE_FILES,
71 root 1.2 thr) >= 0)
72     started++;
73     else
74     Safefree (thr);
75 root 1.1 }
76    
77     static void
78 root 1.18 send_reqs (void)
79 root 1.1 {
80 root 1.18 /* this write is atomic */
81     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
82     {
83     qs = qs->next;
84     if (!qs) qe = 0;
85     }
86 root 1.2 }
87    
88     static void
89 root 1.8 send_req (aio_req req)
90     {
91     nreqs++;
92 root 1.18 req->next = 0;
93    
94     if (qe)
95 root 1.22 {
96     qe->next = req;
97     qe = req;
98     }
99 root 1.18 else
100     qe = qs = req;
101    
102     send_reqs ();
103     }
104    
105     static void
106     end_thread (void)
107     {
108     aio_req req;
109     New (0, req, 1, aio_cb);
110     req->type = REQ_QUIT;
111    
112     send_req (req);
113 root 1.8 }
114    
115     static void
116 root 1.17 read_write (pTHX_
117     int dowrite, int fd, off_t offset, size_t length,
118 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
119 root 1.2 {
120 root 1.5 aio_req req;
121     STRLEN svlen;
122     char *svptr = SvPV (data, svlen);
123    
124 root 1.13 SvUPGRADE (data, SVt_PV);
125     SvPOK_on (data);
126    
127 root 1.5 if (dataoffset < 0)
128     dataoffset += svlen;
129    
130     if (dataoffset < 0 || dataoffset > svlen)
131     croak ("data offset outside of string");
132    
133     if (dowrite)
134     {
135     /* write: check length and adjust. */
136     if (length < 0 || length + dataoffset > svlen)
137     length = svlen - dataoffset;
138     }
139     else
140     {
141     /* read: grow scalar as necessary */
142     svptr = SvGROW (data, length + dataoffset);
143     }
144    
145     if (length < 0)
146     croak ("length must not be negative");
147    
148 root 1.14 Newz (0, req, 1, aio_cb);
149 root 1.5
150     if (!req)
151     croak ("out of memory during aio_req allocation");
152    
153     req->type = dowrite ? REQ_WRITE : REQ_READ;
154     req->fd = fd;
155     req->offset = offset;
156     req->length = length;
157     req->data = SvREFCNT_inc (data);
158 root 1.6 req->dataptr = (char *)svptr + dataoffset;
159 root 1.5 req->callback = SvREFCNT_inc (callback);
160    
161 root 1.8 send_req (req);
162 root 1.5 }
163    
164 root 1.22 static void
165     poll_wait ()
166     {
167     fd_set rfd;
168     FD_ZERO(&rfd);
169     FD_SET(respipe[0], &rfd);
170    
171     select (respipe[0] + 1, &rfd, 0, 0, 0);
172     }
173    
174 root 1.5 static int
175     poll_cb (pTHX)
176     {
177     dSP;
178     int count = 0;
179     aio_req req;
180    
181     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
182     {
183 root 1.22 nreqs--;
184    
185 root 1.5 if (req->type == REQ_QUIT)
186     {
187     Safefree (req->thread);
188     started--;
189     }
190     else
191     {
192     int errorno = errno;
193     errno = req->errorno;
194    
195     if (req->type == REQ_READ)
196 root 1.6 SvCUR_set (req->data, req->dataoffset
197     + req->result > 0 ? req->result : 0);
198 root 1.5
199 root 1.14 if (req->data)
200     SvREFCNT_dec (req->data);
201    
202 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
203     {
204 root 1.21 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
205     PL_laststatval = req->result;
206     PL_statcache = *(req->statdata);
207 root 1.13
208     Safefree (req->statdata);
209     }
210    
211 root 1.5 PUSHMARK (SP);
212     XPUSHs (sv_2mortal (newSViv (req->result)));
213     PUTBACK;
214     call_sv (req->callback, G_VOID);
215     SPAGAIN;
216    
217 root 1.14 if (req->callback)
218     SvREFCNT_dec (req->callback);
219 root 1.5
220     errno = errorno;
221     count++;
222     }
223    
224     Safefree (req);
225     }
226    
227 root 1.18 if (qs)
228     send_reqs ();
229    
230 root 1.5 return count;
231 root 1.2 }
232    
233 root 1.8 static sigset_t fullsigset;
234    
235 root 1.2 #undef errno
236     #include <asm/unistd.h>
237 root 1.20 #include <sys/prctl.h>
238 root 1.2
239 root 1.26 #if BYTE_ORDER == LITTLE_ENDIAN
240     # define LONG_LONG_PAIR(HI, LO) LO, HI
241     #elif BYTE_ORDER == BIG_ENDIAN
242     # define LONG_LONG_PAIR(HI, LO) HI, LO
243     #endif
244    
245     #if __alpha || __ia64 || __hppa || __v850__
246 root 1.25 # define stat kernelstat
247     # define stat64 kernelstat64
248     # include <asm/stat.h>
249     # undef stat
250     # undef stat64
251     #else
252     # define kernelstat stat
253     # define kernelstat64 stat64
254     #endif
255    
256 root 1.21 #define COPY_STATDATA \
257     req->statdata->st_dev = statdata.st_dev; \
258     req->statdata->st_ino = statdata.st_ino; \
259     req->statdata->st_mode = statdata.st_mode; \
260     req->statdata->st_nlink = statdata.st_nlink; \
261     req->statdata->st_uid = statdata.st_uid; \
262     req->statdata->st_gid = statdata.st_gid; \
263     req->statdata->st_rdev = statdata.st_rdev; \
264     req->statdata->st_size = statdata.st_size; \
265     req->statdata->st_atime = statdata.st_atime; \
266     req->statdata->st_mtime = statdata.st_mtime; \
267     req->statdata->st_ctime = statdata.st_ctime; \
268     req->statdata->st_blksize = statdata.st_blksize; \
269     req->statdata->st_blocks = statdata.st_blocks; \
270    
271 root 1.2 static int
272 root 1.18 aio_proc (void *thr_arg)
273 root 1.2 {
274     aio_thread *thr = thr_arg;
275 root 1.9 aio_req req;
276 root 1.2 int errno;
277    
278 root 1.21 /* this is very much kernel-specific :(:(:( */
279 root 1.11 /* we rely on gcc's ability to create closures. */
280 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
281     _syscall3(int,write,int,fd,char *,buf,size_t,count)
282    
283 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
284     _syscall1(int,close,int,fd)
285    
286 root 1.25 #if __NR_pread64
287 root 1.26 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lh,unsigned int,offset_hl)
288     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lh,unsigned int,offset_hl)
289 root 1.21 #elif __NR_pread
290     _syscall4(int,pread,int,fd,char *,buf,size_t,count,offset_t,offset)
291     _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,offset_t,offset)
292     #else
293     # error "neither pread nor pread64 defined"
294     #endif
295 root 1.13
296 root 1.21
297 root 1.25 #if __NR_stat64
298     _syscall2(int,stat64, const char *, filename, struct kernelstat64 *, buf)
299     _syscall2(int,lstat64, const char *, filename, struct kernelstat64 *, buf)
300     _syscall2(int,fstat64, int, fd, struct kernelstat64 *, buf)
301 root 1.21 #elif __NR_stat
302 root 1.25 _syscall2(int,stat, const char *, filename, struct kernelstat *, buf)
303     _syscall2(int,lstat, const char *, filename, struct kernelstat *, buf)
304     _syscall2(int,fstat, int, fd, struct kernelstat *, buf)
305 root 1.21 #else
306     # error "neither stat64 nor stat defined"
307     #endif
308 root 1.13
309 root 1.20 _syscall1(int,unlink, char *, filename);
310    
311 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
312 root 1.20 prctl (PR_SET_PDEATHSIG, SIGKILL);
313 root 1.2
314     /* then loop */
315     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
316     {
317     req->thread = thr;
318 root 1.14 errno = 0; /* strictly unnecessary */
319 root 1.2
320 root 1.18 switch (req->type)
321 root 1.2 {
322 root 1.25 #if __NR_pread64
323 root 1.26 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length,
324     LONG_LONG_PAIR (req->offset >> 32, req->offset & 0xffffffff)); break;
325     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length,
326     LONG_LONG_PAIR (req->offset >> 32, req->offset & 0xffffffff)); break;
327 root 1.21 #else
328     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
329     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
330     #endif
331 root 1.25 #if __NR_stat64
332     struct kernelstat64 statdata;
333 root 1.21 case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
334     case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
335     case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
336     #else
337 root 1.25 struct kernelstat statdata;
338 root 1.21 case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
339     case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
340     case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
341     #endif
342 root 1.20 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
343     case REQ_CLOSE: req->result = close (req->fd); break;
344     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
345 root 1.18
346     case REQ_QUIT:
347     default:
348     write (respipe[1], (void *)&req, sizeof (req));
349     return 0;
350 root 1.2 }
351    
352 root 1.8 req->errorno = errno;
353 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
354     }
355    
356     return 0;
357 root 1.1 }
358    
359     MODULE = Linux::AIO PACKAGE = Linux::AIO
360    
361     BOOT:
362     {
363 root 1.8 sigfillset (&fullsigset);
364 root 1.9 sigdelset (&fullsigset, SIGTERM);
365     sigdelset (&fullsigset, SIGQUIT);
366     sigdelset (&fullsigset, SIGABRT);
367     sigdelset (&fullsigset, SIGINT);
368 root 1.8
369 root 1.1 if (pipe (reqpipe) || pipe (respipe))
370     croak ("unable to initialize request or result pipe");
371 root 1.18
372     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
373     croak ("cannot set result pipe to nonblocking mode");
374 root 1.5
375     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
376     croak ("cannot set result pipe to nonblocking mode");
377 root 1.1 }
378    
379     void
380     min_parallel(nthreads)
381     int nthreads
382 root 1.5 PROTOTYPE: $
383 root 1.1 CODE:
384     while (nthreads > started)
385     start_thread ();
386    
387     void
388     max_parallel(nthreads)
389     int nthreads
390 root 1.5 PROTOTYPE: $
391 root 1.1 CODE:
392 root 1.5 int cur = started;
393     while (cur > nthreads)
394     {
395     end_thread ();
396     cur--;
397     }
398    
399 root 1.1 while (started > nthreads)
400 root 1.5 {
401 root 1.22 poll_wait ();
402 root 1.17 poll_cb (aTHX);
403 root 1.5 }
404 root 1.1
405     void
406 root 1.8 aio_open(pathname,flags,mode,callback)
407 root 1.13 SV * pathname
408 root 1.8 int flags
409     int mode
410     SV * callback
411     PROTOTYPE: $$$$
412     CODE:
413     aio_req req;
414    
415 root 1.14 Newz (0, req, 1, aio_cb);
416 root 1.8
417     if (!req)
418     croak ("out of memory during aio_req allocation");
419    
420     req->type = REQ_OPEN;
421 root 1.14 req->data = newSVsv (pathname);
422     req->dataptr = SvPV_nolen (req->data);
423 root 1.8 req->fd = flags;
424     req->mode = mode;
425 root 1.10 req->callback = SvREFCNT_inc (callback);
426    
427     send_req (req);
428    
429     void
430     aio_close(fh,callback)
431 root 1.13 InputStream fh
432 root 1.10 SV * callback
433 root 1.14 PROTOTYPE: $$
434 root 1.10 CODE:
435     aio_req req;
436    
437 root 1.14 Newz (0, req, 1, aio_cb);
438 root 1.10
439     if (!req)
440     croak ("out of memory during aio_req allocation");
441    
442     req->type = REQ_CLOSE;
443     req->fd = PerlIO_fileno (fh);
444 root 1.13 req->callback = SvREFCNT_inc (callback);
445    
446     send_req (req);
447    
448     void
449     aio_read(fh,offset,length,data,dataoffset,callback)
450     InputStream fh
451     UV offset
452     IV length
453     SV * data
454     IV dataoffset
455     SV * callback
456     PROTOTYPE: $$$$$$
457     CODE:
458     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
459    
460     void
461     aio_write(fh,offset,length,data,dataoffset,callback)
462     OutputStream fh
463     UV offset
464     IV length
465     SV * data
466     IV dataoffset
467     SV * callback
468     PROTOTYPE: $$$$$$
469     CODE:
470     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
471    
472     void
473     aio_stat(fh_or_path,callback)
474     SV * fh_or_path
475     SV * callback
476     PROTOTYPE: $$
477     ALIAS:
478     aio_lstat = 1
479     CODE:
480     aio_req req;
481    
482 root 1.14 Newz (0, req, 1, aio_cb);
483 root 1.13
484     if (!req)
485     croak ("out of memory during aio_req allocation");
486    
487 root 1.21 New (0, req->statdata, 1, Stat_t);
488 root 1.13
489     if (!req->statdata)
490     croak ("out of memory during aio_req->statdata allocation");
491    
492     if (SvPOK (fh_or_path))
493     {
494     req->type = ix ? REQ_LSTAT : REQ_STAT;
495 root 1.14 req->data = newSVsv (fh_or_path);
496     req->dataptr = SvPV_nolen (req->data);
497 root 1.13 }
498     else
499     {
500     req->type = REQ_FSTAT;
501     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
502     }
503    
504 root 1.8 req->callback = SvREFCNT_inc (callback);
505    
506     send_req (req);
507 root 1.20
508     void
509     aio_unlink(pathname,callback)
510     SV * pathname
511     SV * callback
512     PROTOTYPE: $$
513     CODE:
514     aio_req req;
515    
516     Newz (0, req, 1, aio_cb);
517    
518     if (!req)
519     croak ("out of memory during aio_req allocation");
520    
521     req->type = REQ_UNLINK;
522     req->data = newSVsv (pathname);
523     req->dataptr = SvPV_nolen (req->data);
524     req->callback = SvREFCNT_inc (callback);
525    
526     send_req (req);
527 root 1.5
528     int
529     poll_fileno()
530     PROTOTYPE:
531     CODE:
532     RETVAL = respipe[0];
533     OUTPUT:
534     RETVAL
535    
536     int
537 root 1.6 poll_cb(...)
538 root 1.5 PROTOTYPE:
539     CODE:
540     RETVAL = poll_cb (aTHX);
541     OUTPUT:
542     RETVAL
543    
544 root 1.23 void
545     poll_wait()
546     PROTOTYPE:
547     CODE:
548     poll_wait ();
549    
550 root 1.5 int
551     nreqs()
552     PROTOTYPE:
553     CODE:
554     RETVAL = nreqs;
555     OUTPUT:
556     RETVAL
557