ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.16
Committed: Wed May 5 10:13:30 2004 UTC (20 years ago) by root
Branch: MAIN
Changes since 1.15: +16 -9 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6 root 1.13 #include <sys/stat.h>
7 root 1.1 #include <unistd.h>
8 root 1.5 #include <fcntl.h>
9 root 1.8 #include <signal.h>
10 root 1.1 #include <sched.h>
11    
12 root 1.13 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
13     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
14 root 1.12 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
15    
16 root 1.16 #ifndef __NR_pread64
17     # define __NR_pread64 __NR_pread
18     #endif
19     #ifndef __NR_pwrite64
20     # define __NR_pwrite64 __NR_pwrite
21     #endif
22    
23 root 1.8 #define STACKSIZE 1024 /* yeah */
24 root 1.1
25 root 1.13 enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE, REQ_STAT, REQ_LSTAT, REQ_FSTAT};
26 root 1.1
27     typedef struct {
28     char stack[STACKSIZE];
29     } aio_thread;
30    
31     typedef struct {
32     int type;
33     aio_thread *thread;
34    
35     int fd;
36     off_t offset;
37     size_t length;
38 root 1.2 ssize_t result;
39 root 1.8 mode_t mode; /* open */
40 root 1.1 int errorno;
41 root 1.5 SV *data, *callback;
42 root 1.1 void *dataptr;
43 root 1.2 STRLEN dataoffset;
44 root 1.13
45     struct stat64 *statdata;
46 root 1.1 } aio_cb;
47    
48     typedef aio_cb *aio_req;
49    
50     static int started;
51 root 1.5 static int nreqs;
52 root 1.1 static int reqpipe[2], respipe[2];
53    
54 root 1.2 static int aio_proc(void *arg);
55    
56 root 1.1 static void
57     start_thread(void)
58     {
59 root 1.2 aio_thread *thr;
60    
61     New (0, thr, 1, aio_thread);
62    
63     if (clone (aio_proc,
64     &(thr->stack[STACKSIZE]),
65     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
66     thr) >= 0)
67     started++;
68     else
69     Safefree (thr);
70 root 1.1 }
71    
72     static void
73     end_thread(void)
74     {
75 root 1.5 aio_req req;
76     New (0, req, 1, aio_cb);
77     req->type = REQ_QUIT;
78 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
79 root 1.2 }
80    
81     static void
82 root 1.8 send_req (aio_req req)
83     {
84     nreqs++;
85     write (reqpipe[1], &req, sizeof (aio_req));
86     }
87    
88     static void
89 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
90     SV *data, STRLEN dataoffset, SV*callback)
91 root 1.2 {
92 root 1.5 aio_req req;
93     STRLEN svlen;
94     char *svptr = SvPV (data, svlen);
95    
96 root 1.13 SvUPGRADE (data, SVt_PV);
97     SvPOK_on (data);
98    
99 root 1.5 if (dataoffset < 0)
100     dataoffset += svlen;
101    
102     if (dataoffset < 0 || dataoffset > svlen)
103     croak ("data offset outside of string");
104    
105     if (dowrite)
106     {
107     /* write: check length and adjust. */
108     if (length < 0 || length + dataoffset > svlen)
109     length = svlen - dataoffset;
110     }
111     else
112     {
113     /* read: grow scalar as necessary */
114     svptr = SvGROW (data, length + dataoffset);
115     }
116    
117     if (length < 0)
118     croak ("length must not be negative");
119    
120 root 1.14 Newz (0, req, 1, aio_cb);
121 root 1.5
122     if (!req)
123     croak ("out of memory during aio_req allocation");
124    
125     req->type = dowrite ? REQ_WRITE : REQ_READ;
126     req->fd = fd;
127     req->offset = offset;
128     req->length = length;
129     req->data = SvREFCNT_inc (data);
130 root 1.6 req->dataptr = (char *)svptr + dataoffset;
131 root 1.5 req->callback = SvREFCNT_inc (callback);
132    
133 root 1.8 send_req (req);
134 root 1.5 }
135    
136     static int
137     poll_cb (pTHX)
138     {
139     dSP;
140     int count = 0;
141     aio_req req;
142    
143     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
144     {
145     if (req->type == REQ_QUIT)
146     {
147     Safefree (req->thread);
148     started--;
149     }
150     else
151     {
152     int errorno = errno;
153     errno = req->errorno;
154    
155     if (req->type == REQ_READ)
156 root 1.6 SvCUR_set (req->data, req->dataoffset
157     + req->result > 0 ? req->result : 0);
158 root 1.5
159 root 1.14 if (req->data)
160     SvREFCNT_dec (req->data);
161    
162 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
163     {
164     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
165     PL_laststatval = req->result;
166     PL_statcache.st_dev = req->statdata->st_dev;
167     PL_statcache.st_ino = req->statdata->st_ino;
168     PL_statcache.st_mode = req->statdata->st_mode;
169     PL_statcache.st_nlink = req->statdata->st_nlink;
170     PL_statcache.st_uid = req->statdata->st_uid;
171     PL_statcache.st_gid = req->statdata->st_gid;
172     PL_statcache.st_rdev = req->statdata->st_rdev;
173     PL_statcache.st_size = req->statdata->st_size;
174     PL_statcache.st_atime = req->statdata->st_atime;
175     PL_statcache.st_mtime = req->statdata->st_mtime;
176     PL_statcache.st_ctime = req->statdata->st_ctime;
177     PL_statcache.st_blksize = req->statdata->st_blksize;
178     PL_statcache.st_blocks = req->statdata->st_blocks;
179    
180     Safefree (req->statdata);
181     }
182    
183 root 1.5 PUSHMARK (SP);
184     XPUSHs (sv_2mortal (newSViv (req->result)));
185     PUTBACK;
186     call_sv (req->callback, G_VOID);
187     SPAGAIN;
188    
189 root 1.14 if (req->callback)
190     SvREFCNT_dec (req->callback);
191 root 1.5
192     errno = errorno;
193     nreqs--;
194     count++;
195     }
196    
197     Safefree (req);
198     }
199    
200     return count;
201 root 1.2 }
202    
203 root 1.8 static sigset_t fullsigset;
204    
205 root 1.2 #undef errno
206     #include <asm/unistd.h>
207    
208     static int
209     aio_proc(void *thr_arg)
210     {
211     aio_thread *thr = thr_arg;
212 root 1.9 aio_req req;
213 root 1.2 int errno;
214    
215 root 1.15 /* this is very much x86 and kernel-specific :(:(:( */
216 root 1.11 /* we rely on gcc's ability to create closures. */
217 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
218     _syscall3(int,write,int,fd,char *,buf,size_t,count)
219    
220 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
221     _syscall1(int,close,int,fd)
222    
223 root 1.16 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
224     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
225 root 1.13
226     _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
227     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
228     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
229    
230 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
231 root 1.2
232     /* then loop */
233     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
234     {
235     req->thread = thr;
236 root 1.14 errno = 0; /* strictly unnecessary */
237 root 1.2
238 root 1.13 if (req->type == REQ_READ)
239 root 1.16 req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32);
240 root 1.13 else if (req->type == REQ_WRITE)
241 root 1.16 req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32);
242 root 1.8 else if (req->type == REQ_OPEN)
243 root 1.16 req->result = open (req->dataptr, req->fd, req->mode);
244 root 1.10 else if (req->type == REQ_CLOSE)
245 root 1.16 req->result = close (req->fd);
246 root 1.13 else if (req->type == REQ_STAT)
247 root 1.16 req->result = stat64 (req->dataptr, req->statdata);
248 root 1.13 else if (req->type == REQ_LSTAT)
249 root 1.16 req->result = lstat64 (req->dataptr, req->statdata);
250 root 1.13 else if (req->type == REQ_FSTAT)
251 root 1.16 req->result = fstat64 (req->fd, req->statdata);
252 root 1.2 else
253     {
254     write (respipe[1], (void *)&req, sizeof (req));
255     break;
256     }
257    
258 root 1.8 req->errorno = errno;
259 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
260     }
261    
262     return 0;
263 root 1.1 }
264    
265     MODULE = Linux::AIO PACKAGE = Linux::AIO
266    
267     BOOT:
268     {
269 root 1.8 sigfillset (&fullsigset);
270 root 1.9 sigdelset (&fullsigset, SIGTERM);
271     sigdelset (&fullsigset, SIGQUIT);
272     sigdelset (&fullsigset, SIGABRT);
273     sigdelset (&fullsigset, SIGINT);
274 root 1.8
275 root 1.1 if (pipe (reqpipe) || pipe (respipe))
276     croak ("unable to initialize request or result pipe");
277 root 1.5
278     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
279     croak ("cannot set result pipe to nonblocking mode");
280 root 1.1 }
281    
282     void
283     min_parallel(nthreads)
284     int nthreads
285 root 1.5 PROTOTYPE: $
286 root 1.1 CODE:
287     while (nthreads > started)
288     start_thread ();
289    
290     void
291     max_parallel(nthreads)
292     int nthreads
293 root 1.5 PROTOTYPE: $
294 root 1.1 CODE:
295 root 1.5 int cur = started;
296     while (cur > nthreads)
297     {
298     end_thread ();
299     cur--;
300     }
301    
302 root 1.1 while (started > nthreads)
303 root 1.5 {
304 root 1.14 fd_set rfd;
305     FD_ZERO(&rfd);
306     FD_SET(respipe[0], &rfd);
307    
308     select (respipe[0] + 1, &rfd, 0, 0, 0);
309 root 1.5 poll_cb ();
310     }
311 root 1.1
312     void
313 root 1.8 aio_open(pathname,flags,mode,callback)
314 root 1.13 SV * pathname
315 root 1.8 int flags
316     int mode
317     SV * callback
318     PROTOTYPE: $$$$
319     CODE:
320     aio_req req;
321    
322 root 1.14 Newz (0, req, 1, aio_cb);
323 root 1.8
324     if (!req)
325     croak ("out of memory during aio_req allocation");
326    
327     req->type = REQ_OPEN;
328 root 1.14 req->data = newSVsv (pathname);
329     req->dataptr = SvPV_nolen (req->data);
330 root 1.8 req->fd = flags;
331     req->mode = mode;
332 root 1.10 req->callback = SvREFCNT_inc (callback);
333    
334     send_req (req);
335    
336     void
337     aio_close(fh,callback)
338 root 1.13 InputStream fh
339 root 1.10 SV * callback
340 root 1.14 PROTOTYPE: $$
341 root 1.10 CODE:
342     aio_req req;
343    
344 root 1.14 Newz (0, req, 1, aio_cb);
345 root 1.10
346     if (!req)
347     croak ("out of memory during aio_req allocation");
348    
349     req->type = REQ_CLOSE;
350     req->fd = PerlIO_fileno (fh);
351 root 1.13 req->callback = SvREFCNT_inc (callback);
352    
353     send_req (req);
354    
355     void
356     aio_read(fh,offset,length,data,dataoffset,callback)
357     InputStream fh
358     UV offset
359     IV length
360     SV * data
361     IV dataoffset
362     SV * callback
363     PROTOTYPE: $$$$$$
364     CODE:
365     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
366    
367     void
368     aio_write(fh,offset,length,data,dataoffset,callback)
369     OutputStream fh
370     UV offset
371     IV length
372     SV * data
373     IV dataoffset
374     SV * callback
375     PROTOTYPE: $$$$$$
376     CODE:
377     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
378    
379     void
380     aio_stat(fh_or_path,callback)
381     SV * fh_or_path
382     SV * callback
383     PROTOTYPE: $$
384     ALIAS:
385     aio_lstat = 1
386     CODE:
387     aio_req req;
388    
389 root 1.14 Newz (0, req, 1, aio_cb);
390 root 1.13
391     if (!req)
392     croak ("out of memory during aio_req allocation");
393    
394     New (0, req->statdata, 1, struct stat64);
395    
396     if (!req->statdata)
397     croak ("out of memory during aio_req->statdata allocation");
398    
399     if (SvPOK (fh_or_path))
400     {
401     req->type = ix ? REQ_LSTAT : REQ_STAT;
402 root 1.14 req->data = newSVsv (fh_or_path);
403     req->dataptr = SvPV_nolen (req->data);
404 root 1.13 }
405     else
406     {
407     req->type = REQ_FSTAT;
408     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
409     }
410    
411 root 1.8 req->callback = SvREFCNT_inc (callback);
412    
413     send_req (req);
414 root 1.5
415     int
416     poll_fileno()
417     PROTOTYPE:
418     CODE:
419     RETVAL = respipe[0];
420     OUTPUT:
421     RETVAL
422    
423     int
424 root 1.6 poll_cb(...)
425 root 1.5 PROTOTYPE:
426     CODE:
427     RETVAL = poll_cb (aTHX);
428     OUTPUT:
429     RETVAL
430    
431     int
432     nreqs()
433     PROTOTYPE:
434     CODE:
435     RETVAL = nreqs;
436     OUTPUT:
437     RETVAL
438