ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.17
Committed: Thu May 6 12:16:48 2004 UTC (20 years ago) by root
Branch: MAIN
Changes since 1.16: +5 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13    
14 root 1.13 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.12 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
17    
18 root 1.16 #ifndef __NR_pread64
19     # define __NR_pread64 __NR_pread
20     #endif
21     #ifndef __NR_pwrite64
22     # define __NR_pwrite64 __NR_pwrite
23     #endif
24    
25 root 1.8 #define STACKSIZE 1024 /* yeah */
26 root 1.1
27 root 1.13 enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE, REQ_STAT, REQ_LSTAT, REQ_FSTAT};
28 root 1.1
29     typedef struct {
30     char stack[STACKSIZE];
31     } aio_thread;
32    
33     typedef struct {
34     int type;
35     aio_thread *thread;
36    
37     int fd;
38     off_t offset;
39     size_t length;
40 root 1.2 ssize_t result;
41 root 1.8 mode_t mode; /* open */
42 root 1.1 int errorno;
43 root 1.5 SV *data, *callback;
44 root 1.1 void *dataptr;
45 root 1.2 STRLEN dataoffset;
46 root 1.13
47     struct stat64 *statdata;
48 root 1.1 } aio_cb;
49    
50     typedef aio_cb *aio_req;
51    
52     static int started;
53 root 1.5 static int nreqs;
54 root 1.1 static int reqpipe[2], respipe[2];
55    
56 root 1.2 static int aio_proc(void *arg);
57    
58 root 1.1 static void
59     start_thread(void)
60     {
61 root 1.2 aio_thread *thr;
62    
63     New (0, thr, 1, aio_thread);
64    
65     if (clone (aio_proc,
66     &(thr->stack[STACKSIZE]),
67     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
68     thr) >= 0)
69     started++;
70     else
71     Safefree (thr);
72 root 1.1 }
73    
74     static void
75     end_thread(void)
76     {
77 root 1.5 aio_req req;
78     New (0, req, 1, aio_cb);
79     req->type = REQ_QUIT;
80 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
81 root 1.2 }
82    
83     static void
84 root 1.8 send_req (aio_req req)
85     {
86     nreqs++;
87     write (reqpipe[1], &req, sizeof (aio_req));
88     }
89    
90     static void
91 root 1.17 read_write (pTHX_
92     int dowrite, int fd, off_t offset, size_t length,
93 root 1.5 SV *data, STRLEN dataoffset, SV*callback)
94 root 1.2 {
95 root 1.5 aio_req req;
96     STRLEN svlen;
97     char *svptr = SvPV (data, svlen);
98    
99 root 1.13 SvUPGRADE (data, SVt_PV);
100     SvPOK_on (data);
101    
102 root 1.5 if (dataoffset < 0)
103     dataoffset += svlen;
104    
105     if (dataoffset < 0 || dataoffset > svlen)
106     croak ("data offset outside of string");
107    
108     if (dowrite)
109     {
110     /* write: check length and adjust. */
111     if (length < 0 || length + dataoffset > svlen)
112     length = svlen - dataoffset;
113     }
114     else
115     {
116     /* read: grow scalar as necessary */
117     svptr = SvGROW (data, length + dataoffset);
118     }
119    
120     if (length < 0)
121     croak ("length must not be negative");
122    
123 root 1.14 Newz (0, req, 1, aio_cb);
124 root 1.5
125     if (!req)
126     croak ("out of memory during aio_req allocation");
127    
128     req->type = dowrite ? REQ_WRITE : REQ_READ;
129     req->fd = fd;
130     req->offset = offset;
131     req->length = length;
132     req->data = SvREFCNT_inc (data);
133 root 1.6 req->dataptr = (char *)svptr + dataoffset;
134 root 1.5 req->callback = SvREFCNT_inc (callback);
135    
136 root 1.8 send_req (req);
137 root 1.5 }
138    
139     static int
140     poll_cb (pTHX)
141     {
142     dSP;
143     int count = 0;
144     aio_req req;
145    
146     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
147     {
148     if (req->type == REQ_QUIT)
149     {
150     Safefree (req->thread);
151     started--;
152     }
153     else
154     {
155     int errorno = errno;
156     errno = req->errorno;
157    
158     if (req->type == REQ_READ)
159 root 1.6 SvCUR_set (req->data, req->dataoffset
160     + req->result > 0 ? req->result : 0);
161 root 1.5
162 root 1.14 if (req->data)
163     SvREFCNT_dec (req->data);
164    
165 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
166     {
167     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
168     PL_laststatval = req->result;
169     PL_statcache.st_dev = req->statdata->st_dev;
170     PL_statcache.st_ino = req->statdata->st_ino;
171     PL_statcache.st_mode = req->statdata->st_mode;
172     PL_statcache.st_nlink = req->statdata->st_nlink;
173     PL_statcache.st_uid = req->statdata->st_uid;
174     PL_statcache.st_gid = req->statdata->st_gid;
175     PL_statcache.st_rdev = req->statdata->st_rdev;
176     PL_statcache.st_size = req->statdata->st_size;
177     PL_statcache.st_atime = req->statdata->st_atime;
178     PL_statcache.st_mtime = req->statdata->st_mtime;
179     PL_statcache.st_ctime = req->statdata->st_ctime;
180     PL_statcache.st_blksize = req->statdata->st_blksize;
181     PL_statcache.st_blocks = req->statdata->st_blocks;
182    
183     Safefree (req->statdata);
184     }
185    
186 root 1.5 PUSHMARK (SP);
187     XPUSHs (sv_2mortal (newSViv (req->result)));
188     PUTBACK;
189     call_sv (req->callback, G_VOID);
190     SPAGAIN;
191    
192 root 1.14 if (req->callback)
193     SvREFCNT_dec (req->callback);
194 root 1.5
195     errno = errorno;
196     nreqs--;
197     count++;
198     }
199    
200     Safefree (req);
201     }
202    
203     return count;
204 root 1.2 }
205    
206 root 1.8 static sigset_t fullsigset;
207    
208 root 1.2 #undef errno
209     #include <asm/unistd.h>
210    
211     static int
212     aio_proc(void *thr_arg)
213     {
214     aio_thread *thr = thr_arg;
215 root 1.9 aio_req req;
216 root 1.2 int errno;
217    
218 root 1.15 /* this is very much x86 and kernel-specific :(:(:( */
219 root 1.11 /* we rely on gcc's ability to create closures. */
220 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
221     _syscall3(int,write,int,fd,char *,buf,size_t,count)
222    
223 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
224     _syscall1(int,close,int,fd)
225    
226 root 1.16 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
227     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
228 root 1.13
229     _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
230     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
231     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
232    
233 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
234 root 1.2
235     /* then loop */
236     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
237     {
238     req->thread = thr;
239 root 1.14 errno = 0; /* strictly unnecessary */
240 root 1.2
241 root 1.13 if (req->type == REQ_READ)
242 root 1.16 req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32);
243 root 1.13 else if (req->type == REQ_WRITE)
244 root 1.16 req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32);
245 root 1.8 else if (req->type == REQ_OPEN)
246 root 1.16 req->result = open (req->dataptr, req->fd, req->mode);
247 root 1.10 else if (req->type == REQ_CLOSE)
248 root 1.16 req->result = close (req->fd);
249 root 1.13 else if (req->type == REQ_STAT)
250 root 1.16 req->result = stat64 (req->dataptr, req->statdata);
251 root 1.13 else if (req->type == REQ_LSTAT)
252 root 1.16 req->result = lstat64 (req->dataptr, req->statdata);
253 root 1.13 else if (req->type == REQ_FSTAT)
254 root 1.16 req->result = fstat64 (req->fd, req->statdata);
255 root 1.2 else
256     {
257     write (respipe[1], (void *)&req, sizeof (req));
258     break;
259     }
260    
261 root 1.8 req->errorno = errno;
262 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
263     }
264    
265     return 0;
266 root 1.1 }
267    
268     MODULE = Linux::AIO PACKAGE = Linux::AIO
269    
270     BOOT:
271     {
272 root 1.8 sigfillset (&fullsigset);
273 root 1.9 sigdelset (&fullsigset, SIGTERM);
274     sigdelset (&fullsigset, SIGQUIT);
275     sigdelset (&fullsigset, SIGABRT);
276     sigdelset (&fullsigset, SIGINT);
277 root 1.8
278 root 1.1 if (pipe (reqpipe) || pipe (respipe))
279     croak ("unable to initialize request or result pipe");
280 root 1.5
281     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
282     croak ("cannot set result pipe to nonblocking mode");
283 root 1.1 }
284    
285     void
286     min_parallel(nthreads)
287     int nthreads
288 root 1.5 PROTOTYPE: $
289 root 1.1 CODE:
290     while (nthreads > started)
291     start_thread ();
292    
293     void
294     max_parallel(nthreads)
295     int nthreads
296 root 1.5 PROTOTYPE: $
297 root 1.1 CODE:
298 root 1.5 int cur = started;
299     while (cur > nthreads)
300     {
301     end_thread ();
302     cur--;
303     }
304    
305 root 1.1 while (started > nthreads)
306 root 1.5 {
307 root 1.14 fd_set rfd;
308     FD_ZERO(&rfd);
309     FD_SET(respipe[0], &rfd);
310    
311     select (respipe[0] + 1, &rfd, 0, 0, 0);
312 root 1.17 poll_cb (aTHX);
313 root 1.5 }
314 root 1.1
315     void
316 root 1.8 aio_open(pathname,flags,mode,callback)
317 root 1.13 SV * pathname
318 root 1.8 int flags
319     int mode
320     SV * callback
321     PROTOTYPE: $$$$
322     CODE:
323     aio_req req;
324    
325 root 1.14 Newz (0, req, 1, aio_cb);
326 root 1.8
327     if (!req)
328     croak ("out of memory during aio_req allocation");
329    
330     req->type = REQ_OPEN;
331 root 1.14 req->data = newSVsv (pathname);
332     req->dataptr = SvPV_nolen (req->data);
333 root 1.8 req->fd = flags;
334     req->mode = mode;
335 root 1.10 req->callback = SvREFCNT_inc (callback);
336    
337     send_req (req);
338    
339     void
340     aio_close(fh,callback)
341 root 1.13 InputStream fh
342 root 1.10 SV * callback
343 root 1.14 PROTOTYPE: $$
344 root 1.10 CODE:
345     aio_req req;
346    
347 root 1.14 Newz (0, req, 1, aio_cb);
348 root 1.10
349     if (!req)
350     croak ("out of memory during aio_req allocation");
351    
352     req->type = REQ_CLOSE;
353     req->fd = PerlIO_fileno (fh);
354 root 1.13 req->callback = SvREFCNT_inc (callback);
355    
356     send_req (req);
357    
358     void
359     aio_read(fh,offset,length,data,dataoffset,callback)
360     InputStream fh
361     UV offset
362     IV length
363     SV * data
364     IV dataoffset
365     SV * callback
366     PROTOTYPE: $$$$$$
367     CODE:
368     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
369    
370     void
371     aio_write(fh,offset,length,data,dataoffset,callback)
372     OutputStream fh
373     UV offset
374     IV length
375     SV * data
376     IV dataoffset
377     SV * callback
378     PROTOTYPE: $$$$$$
379     CODE:
380     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
381    
382     void
383     aio_stat(fh_or_path,callback)
384     SV * fh_or_path
385     SV * callback
386     PROTOTYPE: $$
387     ALIAS:
388     aio_lstat = 1
389     CODE:
390     aio_req req;
391    
392 root 1.14 Newz (0, req, 1, aio_cb);
393 root 1.13
394     if (!req)
395     croak ("out of memory during aio_req allocation");
396    
397     New (0, req->statdata, 1, struct stat64);
398    
399     if (!req->statdata)
400     croak ("out of memory during aio_req->statdata allocation");
401    
402     if (SvPOK (fh_or_path))
403     {
404     req->type = ix ? REQ_LSTAT : REQ_STAT;
405 root 1.14 req->data = newSVsv (fh_or_path);
406     req->dataptr = SvPV_nolen (req->data);
407 root 1.13 }
408     else
409     {
410     req->type = REQ_FSTAT;
411     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
412     }
413    
414 root 1.8 req->callback = SvREFCNT_inc (callback);
415    
416     send_req (req);
417 root 1.5
418     int
419     poll_fileno()
420     PROTOTYPE:
421     CODE:
422     RETVAL = respipe[0];
423     OUTPUT:
424     RETVAL
425    
426     int
427 root 1.6 poll_cb(...)
428 root 1.5 PROTOTYPE:
429     CODE:
430     RETVAL = poll_cb (aTHX);
431     OUTPUT:
432     RETVAL
433    
434     int
435     nreqs()
436     PROTOTYPE:
437     CODE:
438     RETVAL = nreqs;
439     OUTPUT:
440     RETVAL
441