ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.13
Committed: Mon Apr 1 20:30:08 2002 UTC (22 years, 1 month ago) by root
Branch: MAIN
Changes since 1.12: +121 -40 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6 root 1.13 #include <sys/stat.h>
7 root 1.1 #include <unistd.h>
8 root 1.5 #include <fcntl.h>
9 root 1.8 #include <signal.h>
10 root 1.1 #include <sched.h>
11    
12 root 1.13 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
13     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
14 root 1.12 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
15    
16 root 1.8 #define STACKSIZE 1024 /* yeah */
17 root 1.1
18 root 1.13 enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE, REQ_STAT, REQ_LSTAT, REQ_FSTAT};
19 root 1.1
20     typedef struct {
21     char stack[STACKSIZE];
22     } aio_thread;
23    
24     typedef struct {
25     int type;
26     aio_thread *thread;
27    
28 root 1.13 SV *savesv;
29    
30 root 1.1 int fd;
31     off_t offset;
32     size_t length;
33 root 1.2 ssize_t result;
34 root 1.8 mode_t mode; /* open */
35 root 1.1 int errorno;
36 root 1.5 SV *data, *callback;
37 root 1.1 void *dataptr;
38 root 1.2 STRLEN dataoffset;
39 root 1.13
40     struct stat64 *statdata;
41 root 1.1 } aio_cb;
42    
43     typedef aio_cb *aio_req;
44    
45     static int started;
46 root 1.5 static int nreqs;
47 root 1.1 static int reqpipe[2], respipe[2];
48    
49 root 1.2 static int aio_proc(void *arg);
50    
51 root 1.1 static void
52     start_thread(void)
53     {
54 root 1.2 aio_thread *thr;
55    
56     New (0, thr, 1, aio_thread);
57    
58     if (clone (aio_proc,
59     &(thr->stack[STACKSIZE]),
60     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
61     thr) >= 0)
62     started++;
63     else
64     Safefree (thr);
65 root 1.1 }
66    
67     static void
68     end_thread(void)
69     {
70 root 1.5 aio_req req;
71     New (0, req, 1, aio_cb);
72     req->type = REQ_QUIT;
73 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
74 root 1.2 }
75    
76     static void
77 root 1.8 send_req (aio_req req)
78     {
79     nreqs++;
80     write (reqpipe[1], &req, sizeof (aio_req));
81     }
82    
83     static void
84 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
85     SV *data, STRLEN dataoffset, SV*callback)
86 root 1.2 {
87 root 1.5 aio_req req;
88     STRLEN svlen;
89     char *svptr = SvPV (data, svlen);
90    
91 root 1.13 SvUPGRADE (data, SVt_PV);
92     SvPOK_on (data);
93    
94 root 1.5 if (dataoffset < 0)
95     dataoffset += svlen;
96    
97     if (dataoffset < 0 || dataoffset > svlen)
98     croak ("data offset outside of string");
99    
100     if (dowrite)
101     {
102     /* write: check length and adjust. */
103     if (length < 0 || length + dataoffset > svlen)
104     length = svlen - dataoffset;
105     }
106     else
107     {
108     /* read: grow scalar as necessary */
109     svptr = SvGROW (data, length + dataoffset);
110     }
111    
112     if (length < 0)
113     croak ("length must not be negative");
114    
115     New (0, req, 1, aio_cb);
116    
117     if (!req)
118     croak ("out of memory during aio_req allocation");
119    
120     req->type = dowrite ? REQ_WRITE : REQ_READ;
121     req->fd = fd;
122     req->offset = offset;
123     req->length = length;
124     req->data = SvREFCNT_inc (data);
125 root 1.6 req->dataptr = (char *)svptr + dataoffset;
126 root 1.5 req->callback = SvREFCNT_inc (callback);
127    
128 root 1.8 send_req (req);
129 root 1.5 }
130    
131     static int
132     poll_cb (pTHX)
133     {
134     dSP;
135     int count = 0;
136     aio_req req;
137    
138     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
139     {
140     if (req->type == REQ_QUIT)
141     {
142     Safefree (req->thread);
143     started--;
144     }
145     else
146     {
147     int errorno = errno;
148     errno = req->errorno;
149    
150 root 1.13 if (req->savesv)
151     SvREFCNT_dec (req->savesv);
152    
153 root 1.5 if (req->type == REQ_READ)
154 root 1.6 SvCUR_set (req->data, req->dataoffset
155     + req->result > 0 ? req->result : 0);
156 root 1.5
157 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
158     {
159     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
160     PL_laststatval = req->result;
161     PL_statcache.st_dev = req->statdata->st_dev;
162     PL_statcache.st_ino = req->statdata->st_ino;
163     PL_statcache.st_mode = req->statdata->st_mode;
164     PL_statcache.st_nlink = req->statdata->st_nlink;
165     PL_statcache.st_uid = req->statdata->st_uid;
166     PL_statcache.st_gid = req->statdata->st_gid;
167     PL_statcache.st_rdev = req->statdata->st_rdev;
168     PL_statcache.st_size = req->statdata->st_size;
169     PL_statcache.st_atime = req->statdata->st_atime;
170     PL_statcache.st_mtime = req->statdata->st_mtime;
171     PL_statcache.st_ctime = req->statdata->st_ctime;
172     PL_statcache.st_blksize = req->statdata->st_blksize;
173     PL_statcache.st_blocks = req->statdata->st_blocks;
174    
175     Safefree (req->statdata);
176     }
177    
178 root 1.5 PUSHMARK (SP);
179     XPUSHs (sv_2mortal (newSViv (req->result)));
180     PUTBACK;
181     call_sv (req->callback, G_VOID);
182     SPAGAIN;
183    
184     SvREFCNT_dec (req->data);
185     SvREFCNT_dec (req->callback);
186    
187     errno = errorno;
188     nreqs--;
189     count++;
190     }
191    
192     Safefree (req);
193     }
194    
195     return count;
196 root 1.2 }
197    
198 root 1.8 static sigset_t fullsigset;
199    
200 root 1.2 #undef errno
201     #include <asm/unistd.h>
202    
203     static int
204     aio_proc(void *thr_arg)
205     {
206     aio_thread *thr = thr_arg;
207 root 1.9 aio_req req;
208 root 1.2 int errno;
209    
210 root 1.11 /* we rely on gcc's ability to create closures. */
211 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
212     _syscall3(int,write,int,fd,char *,buf,size_t,count)
213    
214 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
215     _syscall1(int,close,int,fd)
216    
217 root 1.13 _syscall4(int,pread,int,fd,char *,buf,size_t,count,off_t,offset)
218     _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,off_t,offset)
219    
220     _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
221     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
222     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
223    
224 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
225 root 1.2
226     /* then loop */
227     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
228     {
229     req->thread = thr;
230 root 1.8 errno = 0;
231 root 1.2
232 root 1.13 if (req->type == REQ_READ)
233     req->result = pread (req->fd, req->dataptr, req->length, req->offset);
234     else if (req->type == REQ_WRITE)
235     req->result = pwrite (req->fd, req->dataptr, req->length, req->offset);
236 root 1.8 else if (req->type == REQ_OPEN)
237 root 1.13 req->result = open (req->dataptr, req->fd, req->mode);
238 root 1.10 else if (req->type == REQ_CLOSE)
239 root 1.13 req->result = close (req->fd);
240     else if (req->type == REQ_STAT)
241     req->result = stat64 (req->dataptr, req->statdata);
242     else if (req->type == REQ_LSTAT)
243     req->result = lstat64 (req->dataptr, req->statdata);
244     else if (req->type == REQ_FSTAT)
245     req->result = fstat64 (req->fd, req->statdata);
246 root 1.2 else
247     {
248     write (respipe[1], (void *)&req, sizeof (req));
249     break;
250     }
251    
252 root 1.8 req->errorno = errno;
253 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
254     }
255    
256     return 0;
257 root 1.1 }
258    
259     MODULE = Linux::AIO PACKAGE = Linux::AIO
260    
261     BOOT:
262     {
263 root 1.8 sigfillset (&fullsigset);
264 root 1.9 sigdelset (&fullsigset, SIGTERM);
265     sigdelset (&fullsigset, SIGQUIT);
266     sigdelset (&fullsigset, SIGABRT);
267     sigdelset (&fullsigset, SIGINT);
268 root 1.8
269 root 1.1 if (pipe (reqpipe) || pipe (respipe))
270     croak ("unable to initialize request or result pipe");
271 root 1.5
272     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
273     croak ("cannot set result pipe to nonblocking mode");
274 root 1.1 }
275    
276     void
277     min_parallel(nthreads)
278     int nthreads
279 root 1.5 PROTOTYPE: $
280 root 1.1 CODE:
281     while (nthreads > started)
282     start_thread ();
283    
284     void
285     max_parallel(nthreads)
286     int nthreads
287 root 1.5 PROTOTYPE: $
288 root 1.1 CODE:
289 root 1.5 int cur = started;
290     while (cur > nthreads)
291     {
292     end_thread ();
293     cur--;
294     }
295    
296     poll_cb ();
297 root 1.1 while (started > nthreads)
298 root 1.5 {
299     sched_yield ();
300 root 1.12 fcntl (respipe[0], F_SETFL, 0);
301 root 1.5 poll_cb ();
302 root 1.12 fcntl (respipe[0], F_SETFL, O_NONBLOCK);
303 root 1.5 }
304 root 1.1
305     void
306 root 1.8 aio_open(pathname,flags,mode,callback)
307 root 1.13 SV * pathname
308 root 1.8 int flags
309     int mode
310     SV * callback
311     PROTOTYPE: $$$$
312     CODE:
313     aio_req req;
314    
315     New (0, req, 1, aio_cb);
316    
317     if (!req)
318     croak ("out of memory during aio_req allocation");
319    
320     req->type = REQ_OPEN;
321 root 1.13 req->savesv = newSVsv (pathname);
322     req->dataptr = SvPV_nolen (req->savesv);
323 root 1.8 req->fd = flags;
324     req->mode = mode;
325 root 1.10 req->callback = SvREFCNT_inc (callback);
326    
327     send_req (req);
328    
329     void
330     aio_close(fh,callback)
331 root 1.13 InputStream fh
332 root 1.10 SV * callback
333     PROTOTYPE: $
334     CODE:
335     aio_req req;
336    
337     New (0, req, 1, aio_cb);
338    
339     if (!req)
340     croak ("out of memory during aio_req allocation");
341    
342     req->type = REQ_CLOSE;
343     req->fd = PerlIO_fileno (fh);
344 root 1.13 req->callback = SvREFCNT_inc (callback);
345    
346     send_req (req);
347    
348     void
349     aio_read(fh,offset,length,data,dataoffset,callback)
350     InputStream fh
351     UV offset
352     IV length
353     SV * data
354     IV dataoffset
355     SV * callback
356     PROTOTYPE: $$$$$$
357     CODE:
358     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
359    
360     void
361     aio_write(fh,offset,length,data,dataoffset,callback)
362     OutputStream fh
363     UV offset
364     IV length
365     SV * data
366     IV dataoffset
367     SV * callback
368     PROTOTYPE: $$$$$$
369     CODE:
370     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
371    
372     void
373     aio_stat(fh_or_path,callback)
374     SV * fh_or_path
375     SV * callback
376     PROTOTYPE: $$
377     ALIAS:
378     aio_lstat = 1
379     CODE:
380     aio_req req;
381    
382     New (0, req, 1, aio_cb);
383    
384     if (!req)
385     croak ("out of memory during aio_req allocation");
386    
387     New (0, req->statdata, 1, struct stat64);
388    
389     if (!req->statdata)
390     croak ("out of memory during aio_req->statdata allocation");
391    
392     if (SvPOK (fh_or_path))
393     {
394     req->type = ix ? REQ_LSTAT : REQ_STAT;
395     req->savesv = newSVsv (fh_or_path);
396     req->dataptr = SvPV_nolen (req->savesv);
397     }
398     else
399     {
400     req->type = REQ_FSTAT;
401     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
402     }
403    
404 root 1.8 req->callback = SvREFCNT_inc (callback);
405    
406     send_req (req);
407 root 1.5
408     int
409     poll_fileno()
410     PROTOTYPE:
411     CODE:
412     RETVAL = respipe[0];
413     OUTPUT:
414     RETVAL
415    
416     int
417 root 1.6 poll_cb(...)
418 root 1.5 PROTOTYPE:
419     CODE:
420     RETVAL = poll_cb (aTHX);
421     OUTPUT:
422     RETVAL
423    
424     int
425     nreqs()
426     PROTOTYPE:
427     CODE:
428     RETVAL = nreqs;
429     OUTPUT:
430     RETVAL
431