ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.24
Committed: Sat Jul 2 13:16:33 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-1_5
Changes since 1.23: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13    
14 root 1.21 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15 root 1.13 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.21 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
17 root 1.16
18 root 1.22 #define STACKSIZE (128 * sizeof (long)) /* yeah */
19 root 1.1
20 root 1.20 enum {
21     REQ_QUIT,
22     REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
23     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
24     };
25 root 1.1
26     typedef struct {
27     char stack[STACKSIZE];
28     } aio_thread;
29    
30 root 1.18 typedef struct aio_cb {
31     struct aio_cb *next;
32    
33 root 1.1 int type;
34     aio_thread *thread;
35    
36     int fd;
37     off_t offset;
38     size_t length;
39 root 1.2 ssize_t result;
40 root 1.8 mode_t mode; /* open */
41 root 1.1 int errorno;
42 root 1.5 SV *data, *callback;
43 root 1.1 void *dataptr;
44 root 1.2 STRLEN dataoffset;
45 root 1.13
46 root 1.21 Stat_t *statdata;
47 root 1.1 } aio_cb;
48    
49     typedef aio_cb *aio_req;
50    
51     static int started;
52 root 1.5 static int nreqs;
53 root 1.1 static int reqpipe[2], respipe[2];
54    
55 root 1.18 static aio_req qs, qe; /* queue start, queue end */
56    
57 root 1.2 static int aio_proc(void *arg);
58    
59 root 1.1 static void
60 root 1.18 start_thread (void)
61 root 1.1 {
62 root 1.2 aio_thread *thr;
63    
64     New (0, thr, 1, aio_thread);
65    
66     if (clone (aio_proc,
67 root 1.22 &(thr->stack[STACKSIZE - sizeof (long)]),
68 root 1.20 CLONE_VM|CLONE_FS|CLONE_FILES,
69 root 1.2 thr) >= 0)
70     started++;
71     else
72     Safefree (thr);
73 root 1.1 }
74    
75     static void
76 root 1.18 send_reqs (void)
77 root 1.1 {
78 root 1.18 /* this write is atomic */
79     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
80     {
81     qs = qs->next;
82     if (!qs) qe = 0;
83     }
84 root 1.2 }
85    
86     static void
87 root 1.8 send_req (aio_req req)
88     {
89     nreqs++;
90 root 1.18 req->next = 0;
91    
92     if (qe)
93 root 1.22 {
94     qe->next = req;
95     qe = req;
96     }
97 root 1.18 else
98     qe = qs = req;
99    
100     send_reqs ();
101     }
102    
103     static void
104     end_thread (void)
105     {
106     aio_req req;
107     New (0, req, 1, aio_cb);
108     req->type = REQ_QUIT;
109    
110     send_req (req);
111 root 1.8 }
112    
113     static void
114 root 1.17 read_write (pTHX_
115     int dowrite, int fd, off_t offset, size_t length,
116 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
117 root 1.2 {
118 root 1.5 aio_req req;
119     STRLEN svlen;
120     char *svptr = SvPV (data, svlen);
121    
122 root 1.13 SvUPGRADE (data, SVt_PV);
123     SvPOK_on (data);
124    
125 root 1.5 if (dataoffset < 0)
126     dataoffset += svlen;
127    
128     if (dataoffset < 0 || dataoffset > svlen)
129     croak ("data offset outside of string");
130    
131     if (dowrite)
132     {
133     /* write: check length and adjust. */
134     if (length < 0 || length + dataoffset > svlen)
135     length = svlen - dataoffset;
136     }
137     else
138     {
139     /* read: grow scalar as necessary */
140     svptr = SvGROW (data, length + dataoffset);
141     }
142    
143     if (length < 0)
144     croak ("length must not be negative");
145    
146 root 1.14 Newz (0, req, 1, aio_cb);
147 root 1.5
148     if (!req)
149     croak ("out of memory during aio_req allocation");
150    
151     req->type = dowrite ? REQ_WRITE : REQ_READ;
152     req->fd = fd;
153     req->offset = offset;
154     req->length = length;
155     req->data = SvREFCNT_inc (data);
156 root 1.6 req->dataptr = (char *)svptr + dataoffset;
157 root 1.5 req->callback = SvREFCNT_inc (callback);
158    
159 root 1.8 send_req (req);
160 root 1.5 }
161    
162 root 1.22 static void
163     poll_wait ()
164     {
165     fd_set rfd;
166     FD_ZERO(&rfd);
167     FD_SET(respipe[0], &rfd);
168    
169     select (respipe[0] + 1, &rfd, 0, 0, 0);
170     }
171    
172 root 1.5 static int
173     poll_cb (pTHX)
174     {
175     dSP;
176     int count = 0;
177     aio_req req;
178    
179     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
180     {
181 root 1.22 nreqs--;
182    
183 root 1.5 if (req->type == REQ_QUIT)
184     {
185     Safefree (req->thread);
186     started--;
187     }
188     else
189     {
190     int errorno = errno;
191     errno = req->errorno;
192    
193     if (req->type == REQ_READ)
194 root 1.6 SvCUR_set (req->data, req->dataoffset
195     + req->result > 0 ? req->result : 0);
196 root 1.5
197 root 1.14 if (req->data)
198     SvREFCNT_dec (req->data);
199    
200 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
201     {
202 root 1.21 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
203     PL_laststatval = req->result;
204     PL_statcache = *(req->statdata);
205 root 1.13
206     Safefree (req->statdata);
207     }
208    
209 root 1.5 PUSHMARK (SP);
210     XPUSHs (sv_2mortal (newSViv (req->result)));
211     PUTBACK;
212     call_sv (req->callback, G_VOID);
213     SPAGAIN;
214    
215 root 1.14 if (req->callback)
216     SvREFCNT_dec (req->callback);
217 root 1.5
218     errno = errorno;
219     count++;
220     }
221    
222     Safefree (req);
223     }
224    
225 root 1.18 if (qs)
226     send_reqs ();
227    
228 root 1.5 return count;
229 root 1.2 }
230    
231 root 1.8 static sigset_t fullsigset;
232    
233 root 1.2 #undef errno
234     #include <asm/unistd.h>
235 root 1.20 #include <sys/prctl.h>
236 root 1.2
237 root 1.21 #define COPY_STATDATA \
238     req->statdata->st_dev = statdata.st_dev; \
239     req->statdata->st_ino = statdata.st_ino; \
240     req->statdata->st_mode = statdata.st_mode; \
241     req->statdata->st_nlink = statdata.st_nlink; \
242     req->statdata->st_uid = statdata.st_uid; \
243     req->statdata->st_gid = statdata.st_gid; \
244     req->statdata->st_rdev = statdata.st_rdev; \
245     req->statdata->st_size = statdata.st_size; \
246     req->statdata->st_atime = statdata.st_atime; \
247     req->statdata->st_mtime = statdata.st_mtime; \
248     req->statdata->st_ctime = statdata.st_ctime; \
249     req->statdata->st_blksize = statdata.st_blksize; \
250     req->statdata->st_blocks = statdata.st_blocks; \
251    
252 root 1.2 static int
253 root 1.18 aio_proc (void *thr_arg)
254 root 1.2 {
255     aio_thread *thr = thr_arg;
256 root 1.9 aio_req req;
257 root 1.2 int errno;
258    
259 root 1.21 /* this is very much kernel-specific :(:(:( */
260 root 1.11 /* we rely on gcc's ability to create closures. */
261 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
262     _syscall3(int,write,int,fd,char *,buf,size_t,count)
263    
264 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
265     _syscall1(int,close,int,fd)
266    
267 root 1.24 #define arch64 (__ia64 || __alpha)
268    
269     #ifdef __NR_pread64 && !arch64
270 root 1.16 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
271     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
272 root 1.21 #elif __NR_pread
273     _syscall4(int,pread,int,fd,char *,buf,size_t,count,offset_t,offset)
274     _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,offset_t,offset)
275     #else
276     # error "neither pread nor pread64 defined"
277     #endif
278 root 1.13
279 root 1.21
280 root 1.24 #ifdef __NR_stat64 && !arch64
281 root 1.13 _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
282     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
283     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
284 root 1.21 #elif __NR_stat
285     _syscall2(int,stat, const char *, filename, struct stat *, buf)
286     _syscall2(int,lstat, const char *, filename, struct stat *, buf)
287     _syscall2(int,fstat, int, fd, struct stat *, buf)
288     #else
289     # error "neither stat64 nor stat defined"
290     #endif
291 root 1.13
292 root 1.20 _syscall1(int,unlink, char *, filename);
293    
294 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
295 root 1.20 prctl (PR_SET_PDEATHSIG, SIGKILL);
296 root 1.2
297     /* then loop */
298     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
299     {
300     req->thread = thr;
301 root 1.14 errno = 0; /* strictly unnecessary */
302 root 1.2
303 root 1.18 switch (req->type)
304 root 1.2 {
305 root 1.21 #ifdef __NR_pread64
306 root 1.20 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
307     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
308 root 1.21 #else
309     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
310     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
311     #endif
312     #ifdef __NR_stat64
313     struct stat64 statdata;
314     case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
315     case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
316     case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
317     #else
318     struct stat statdata;
319     case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
320     case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
321     case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
322     #endif
323 root 1.20 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
324     case REQ_CLOSE: req->result = close (req->fd); break;
325     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
326 root 1.18
327     case REQ_QUIT:
328     default:
329     write (respipe[1], (void *)&req, sizeof (req));
330     return 0;
331 root 1.2 }
332    
333 root 1.8 req->errorno = errno;
334 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
335     }
336    
337     return 0;
338 root 1.1 }
339    
340     MODULE = Linux::AIO PACKAGE = Linux::AIO
341    
342     BOOT:
343     {
344 root 1.8 sigfillset (&fullsigset);
345 root 1.9 sigdelset (&fullsigset, SIGTERM);
346     sigdelset (&fullsigset, SIGQUIT);
347     sigdelset (&fullsigset, SIGABRT);
348     sigdelset (&fullsigset, SIGINT);
349 root 1.8
350 root 1.1 if (pipe (reqpipe) || pipe (respipe))
351     croak ("unable to initialize request or result pipe");
352 root 1.18
353     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
354     croak ("cannot set result pipe to nonblocking mode");
355 root 1.5
356     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
357     croak ("cannot set result pipe to nonblocking mode");
358 root 1.1 }
359    
360     void
361     min_parallel(nthreads)
362     int nthreads
363 root 1.5 PROTOTYPE: $
364 root 1.1 CODE:
365     while (nthreads > started)
366     start_thread ();
367    
368     void
369     max_parallel(nthreads)
370     int nthreads
371 root 1.5 PROTOTYPE: $
372 root 1.1 CODE:
373 root 1.5 int cur = started;
374     while (cur > nthreads)
375     {
376     end_thread ();
377     cur--;
378     }
379    
380 root 1.1 while (started > nthreads)
381 root 1.5 {
382 root 1.22 poll_wait ();
383 root 1.17 poll_cb (aTHX);
384 root 1.5 }
385 root 1.1
386     void
387 root 1.8 aio_open(pathname,flags,mode,callback)
388 root 1.13 SV * pathname
389 root 1.8 int flags
390     int mode
391     SV * callback
392     PROTOTYPE: $$$$
393     CODE:
394     aio_req req;
395    
396 root 1.14 Newz (0, req, 1, aio_cb);
397 root 1.8
398     if (!req)
399     croak ("out of memory during aio_req allocation");
400    
401     req->type = REQ_OPEN;
402 root 1.14 req->data = newSVsv (pathname);
403     req->dataptr = SvPV_nolen (req->data);
404 root 1.8 req->fd = flags;
405     req->mode = mode;
406 root 1.10 req->callback = SvREFCNT_inc (callback);
407    
408     send_req (req);
409    
410     void
411     aio_close(fh,callback)
412 root 1.13 InputStream fh
413 root 1.10 SV * callback
414 root 1.14 PROTOTYPE: $$
415 root 1.10 CODE:
416     aio_req req;
417    
418 root 1.14 Newz (0, req, 1, aio_cb);
419 root 1.10
420     if (!req)
421     croak ("out of memory during aio_req allocation");
422    
423     req->type = REQ_CLOSE;
424     req->fd = PerlIO_fileno (fh);
425 root 1.13 req->callback = SvREFCNT_inc (callback);
426    
427     send_req (req);
428    
429     void
430     aio_read(fh,offset,length,data,dataoffset,callback)
431     InputStream fh
432     UV offset
433     IV length
434     SV * data
435     IV dataoffset
436     SV * callback
437     PROTOTYPE: $$$$$$
438     CODE:
439     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
440    
441     void
442     aio_write(fh,offset,length,data,dataoffset,callback)
443     OutputStream fh
444     UV offset
445     IV length
446     SV * data
447     IV dataoffset
448     SV * callback
449     PROTOTYPE: $$$$$$
450     CODE:
451     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
452    
453     void
454     aio_stat(fh_or_path,callback)
455     SV * fh_or_path
456     SV * callback
457     PROTOTYPE: $$
458     ALIAS:
459     aio_lstat = 1
460     CODE:
461     aio_req req;
462    
463 root 1.14 Newz (0, req, 1, aio_cb);
464 root 1.13
465     if (!req)
466     croak ("out of memory during aio_req allocation");
467    
468 root 1.21 New (0, req->statdata, 1, Stat_t);
469 root 1.13
470     if (!req->statdata)
471     croak ("out of memory during aio_req->statdata allocation");
472    
473     if (SvPOK (fh_or_path))
474     {
475     req->type = ix ? REQ_LSTAT : REQ_STAT;
476 root 1.14 req->data = newSVsv (fh_or_path);
477     req->dataptr = SvPV_nolen (req->data);
478 root 1.13 }
479     else
480     {
481     req->type = REQ_FSTAT;
482     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
483     }
484    
485 root 1.8 req->callback = SvREFCNT_inc (callback);
486    
487     send_req (req);
488 root 1.20
489     void
490     aio_unlink(pathname,callback)
491     SV * pathname
492     SV * callback
493     PROTOTYPE: $$
494     CODE:
495     aio_req req;
496    
497     Newz (0, req, 1, aio_cb);
498    
499     if (!req)
500     croak ("out of memory during aio_req allocation");
501    
502     req->type = REQ_UNLINK;
503     req->data = newSVsv (pathname);
504     req->dataptr = SvPV_nolen (req->data);
505     req->callback = SvREFCNT_inc (callback);
506    
507     send_req (req);
508 root 1.5
509     int
510     poll_fileno()
511     PROTOTYPE:
512     CODE:
513     RETVAL = respipe[0];
514     OUTPUT:
515     RETVAL
516    
517     int
518 root 1.6 poll_cb(...)
519 root 1.5 PROTOTYPE:
520     CODE:
521     RETVAL = poll_cb (aTHX);
522     OUTPUT:
523     RETVAL
524    
525 root 1.23 void
526     poll_wait()
527     PROTOTYPE:
528     CODE:
529     poll_wait ();
530    
531 root 1.5 int
532     nreqs()
533     PROTOTYPE:
534     CODE:
535     RETVAL = nreqs;
536     OUTPUT:
537     RETVAL
538