ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.18
Committed: Thu May 6 15:05:57 2004 UTC (20 years ago) by root
Branch: MAIN
Changes since 1.17: +55 -27 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13    
14 root 1.13 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.12 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
17    
18 root 1.16 #ifndef __NR_pread64
19     # define __NR_pread64 __NR_pread
20     #endif
21     #ifndef __NR_pwrite64
22     # define __NR_pwrite64 __NR_pwrite
23     #endif
24    
25 root 1.8 #define STACKSIZE 1024 /* yeah */
26 root 1.1
27 root 1.13 enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE, REQ_STAT, REQ_LSTAT, REQ_FSTAT};
28 root 1.1
29     typedef struct {
30     char stack[STACKSIZE];
31     } aio_thread;
32    
33 root 1.18 typedef struct aio_cb {
34     struct aio_cb *next;
35    
36 root 1.1 int type;
37     aio_thread *thread;
38    
39     int fd;
40     off_t offset;
41     size_t length;
42 root 1.2 ssize_t result;
43 root 1.8 mode_t mode; /* open */
44 root 1.1 int errorno;
45 root 1.5 SV *data, *callback;
46 root 1.1 void *dataptr;
47 root 1.2 STRLEN dataoffset;
48 root 1.13
49     struct stat64 *statdata;
50 root 1.1 } aio_cb;
51    
52     typedef aio_cb *aio_req;
53    
54     static int started;
55 root 1.5 static int nreqs;
56 root 1.1 static int reqpipe[2], respipe[2];
57    
58 root 1.18 static aio_req qs, qe; /* queue start, queue end */
59    
60 root 1.2 static int aio_proc(void *arg);
61    
62 root 1.1 static void
63 root 1.18 start_thread (void)
64 root 1.1 {
65 root 1.2 aio_thread *thr;
66    
67     New (0, thr, 1, aio_thread);
68    
69     if (clone (aio_proc,
70     &(thr->stack[STACKSIZE]),
71     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
72     thr) >= 0)
73     started++;
74     else
75     Safefree (thr);
76 root 1.1 }
77    
78     static void
79 root 1.18 send_reqs (void)
80 root 1.1 {
81 root 1.18 /* this write is atomic */
82     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
83     {
84     qs = qs->next;
85     if (!qs) qe = 0;
86     }
87 root 1.2 }
88    
89     static void
90 root 1.8 send_req (aio_req req)
91     {
92     nreqs++;
93 root 1.18 req->next = 0;
94    
95     if (qe)
96     qe->next = req;
97     else
98     qe = qs = req;
99    
100     send_reqs ();
101     }
102    
103     static void
104     end_thread (void)
105     {
106     aio_req req;
107     New (0, req, 1, aio_cb);
108     req->type = REQ_QUIT;
109    
110     send_req (req);
111 root 1.8 }
112    
113     static void
114 root 1.17 read_write (pTHX_
115     int dowrite, int fd, off_t offset, size_t length,
116 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
117 root 1.2 {
118 root 1.5 aio_req req;
119     STRLEN svlen;
120     char *svptr = SvPV (data, svlen);
121    
122 root 1.13 SvUPGRADE (data, SVt_PV);
123     SvPOK_on (data);
124    
125 root 1.5 if (dataoffset < 0)
126     dataoffset += svlen;
127    
128     if (dataoffset < 0 || dataoffset > svlen)
129     croak ("data offset outside of string");
130    
131     if (dowrite)
132     {
133     /* write: check length and adjust. */
134     if (length < 0 || length + dataoffset > svlen)
135     length = svlen - dataoffset;
136     }
137     else
138     {
139     /* read: grow scalar as necessary */
140     svptr = SvGROW (data, length + dataoffset);
141     }
142    
143     if (length < 0)
144     croak ("length must not be negative");
145    
146 root 1.14 Newz (0, req, 1, aio_cb);
147 root 1.5
148     if (!req)
149     croak ("out of memory during aio_req allocation");
150    
151     req->type = dowrite ? REQ_WRITE : REQ_READ;
152     req->fd = fd;
153     req->offset = offset;
154     req->length = length;
155     req->data = SvREFCNT_inc (data);
156 root 1.6 req->dataptr = (char *)svptr + dataoffset;
157 root 1.5 req->callback = SvREFCNT_inc (callback);
158    
159 root 1.8 send_req (req);
160 root 1.5 }
161    
162     static int
163     poll_cb (pTHX)
164     {
165     dSP;
166     int count = 0;
167     aio_req req;
168    
169     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
170     {
171     if (req->type == REQ_QUIT)
172     {
173     Safefree (req->thread);
174     started--;
175     }
176     else
177     {
178     int errorno = errno;
179     errno = req->errorno;
180    
181     if (req->type == REQ_READ)
182 root 1.6 SvCUR_set (req->data, req->dataoffset
183     + req->result > 0 ? req->result : 0);
184 root 1.5
185 root 1.14 if (req->data)
186     SvREFCNT_dec (req->data);
187    
188 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
189     {
190     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
191     PL_laststatval = req->result;
192     PL_statcache.st_dev = req->statdata->st_dev;
193     PL_statcache.st_ino = req->statdata->st_ino;
194     PL_statcache.st_mode = req->statdata->st_mode;
195     PL_statcache.st_nlink = req->statdata->st_nlink;
196     PL_statcache.st_uid = req->statdata->st_uid;
197     PL_statcache.st_gid = req->statdata->st_gid;
198     PL_statcache.st_rdev = req->statdata->st_rdev;
199     PL_statcache.st_size = req->statdata->st_size;
200     PL_statcache.st_atime = req->statdata->st_atime;
201     PL_statcache.st_mtime = req->statdata->st_mtime;
202     PL_statcache.st_ctime = req->statdata->st_ctime;
203     PL_statcache.st_blksize = req->statdata->st_blksize;
204     PL_statcache.st_blocks = req->statdata->st_blocks;
205    
206     Safefree (req->statdata);
207     }
208    
209 root 1.5 PUSHMARK (SP);
210     XPUSHs (sv_2mortal (newSViv (req->result)));
211     PUTBACK;
212     call_sv (req->callback, G_VOID);
213     SPAGAIN;
214    
215 root 1.14 if (req->callback)
216     SvREFCNT_dec (req->callback);
217 root 1.5
218     errno = errorno;
219     nreqs--;
220     count++;
221     }
222    
223     Safefree (req);
224     }
225    
226 root 1.18 if (qs)
227     {
228     printf ("outstanding %p %d\n", qs, nreqs);
229     send_reqs ();
230     }
231    
232 root 1.5 return count;
233 root 1.2 }
234    
235 root 1.8 static sigset_t fullsigset;
236    
237 root 1.2 #undef errno
238     #include <asm/unistd.h>
239    
240     static int
241 root 1.18 aio_proc (void *thr_arg)
242 root 1.2 {
243     aio_thread *thr = thr_arg;
244 root 1.9 aio_req req;
245 root 1.2 int errno;
246    
247 root 1.15 /* this is very much x86 and kernel-specific :(:(:( */
248 root 1.11 /* we rely on gcc's ability to create closures. */
249 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
250     _syscall3(int,write,int,fd,char *,buf,size_t,count)
251    
252 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
253     _syscall1(int,close,int,fd)
254    
255 root 1.16 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
256     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
257 root 1.13
258     _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
259     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
260     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
261    
262 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
263 root 1.2
264     /* then loop */
265     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
266     {
267     req->thread = thr;
268 root 1.14 errno = 0; /* strictly unnecessary */
269 root 1.2
270 root 1.18 switch (req->type)
271 root 1.2 {
272 root 1.18 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
273     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
274     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
275     case REQ_CLOSE: req->result = close (req->fd); break;
276     case REQ_STAT: req->result = stat64 (req->dataptr, req->statdata); break;
277     case REQ_LSTAT: req->result = lstat64 (req->dataptr, req->statdata); break;
278     case REQ_FSTAT: req->result = fstat64 (req->fd, req->statdata); break;
279    
280     case REQ_QUIT:
281     default:
282     write (respipe[1], (void *)&req, sizeof (req));
283     return 0;
284 root 1.2 }
285    
286 root 1.8 req->errorno = errno;
287 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
288     }
289    
290     return 0;
291 root 1.1 }
292    
293     MODULE = Linux::AIO PACKAGE = Linux::AIO
294    
295     BOOT:
296     {
297 root 1.8 sigfillset (&fullsigset);
298 root 1.9 sigdelset (&fullsigset, SIGTERM);
299     sigdelset (&fullsigset, SIGQUIT);
300     sigdelset (&fullsigset, SIGABRT);
301     sigdelset (&fullsigset, SIGINT);
302 root 1.8
303 root 1.1 if (pipe (reqpipe) || pipe (respipe))
304     croak ("unable to initialize request or result pipe");
305 root 1.18
306     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
307     croak ("cannot set result pipe to nonblocking mode");
308 root 1.5
309     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
310     croak ("cannot set result pipe to nonblocking mode");
311 root 1.1 }
312    
313     void
314     min_parallel(nthreads)
315     int nthreads
316 root 1.5 PROTOTYPE: $
317 root 1.1 CODE:
318     while (nthreads > started)
319     start_thread ();
320    
321     void
322     max_parallel(nthreads)
323     int nthreads
324 root 1.5 PROTOTYPE: $
325 root 1.1 CODE:
326 root 1.5 int cur = started;
327     while (cur > nthreads)
328     {
329     end_thread ();
330     cur--;
331     }
332    
333 root 1.1 while (started > nthreads)
334 root 1.5 {
335 root 1.14 fd_set rfd;
336     FD_ZERO(&rfd);
337     FD_SET(respipe[0], &rfd);
338    
339     select (respipe[0] + 1, &rfd, 0, 0, 0);
340 root 1.17 poll_cb (aTHX);
341 root 1.5 }
342 root 1.1
343     void
344 root 1.8 aio_open(pathname,flags,mode,callback)
345 root 1.13 SV * pathname
346 root 1.8 int flags
347     int mode
348     SV * callback
349     PROTOTYPE: $$$$
350     CODE:
351     aio_req req;
352    
353 root 1.14 Newz (0, req, 1, aio_cb);
354 root 1.8
355     if (!req)
356     croak ("out of memory during aio_req allocation");
357    
358     req->type = REQ_OPEN;
359 root 1.14 req->data = newSVsv (pathname);
360     req->dataptr = SvPV_nolen (req->data);
361 root 1.8 req->fd = flags;
362     req->mode = mode;
363 root 1.10 req->callback = SvREFCNT_inc (callback);
364    
365     send_req (req);
366    
367     void
368     aio_close(fh,callback)
369 root 1.13 InputStream fh
370 root 1.10 SV * callback
371 root 1.14 PROTOTYPE: $$
372 root 1.10 CODE:
373     aio_req req;
374    
375 root 1.14 Newz (0, req, 1, aio_cb);
376 root 1.10
377     if (!req)
378     croak ("out of memory during aio_req allocation");
379    
380     req->type = REQ_CLOSE;
381     req->fd = PerlIO_fileno (fh);
382 root 1.13 req->callback = SvREFCNT_inc (callback);
383    
384     send_req (req);
385    
386     void
387     aio_read(fh,offset,length,data,dataoffset,callback)
388     InputStream fh
389     UV offset
390     IV length
391     SV * data
392     IV dataoffset
393     SV * callback
394     PROTOTYPE: $$$$$$
395     CODE:
396     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
397    
398     void
399     aio_write(fh,offset,length,data,dataoffset,callback)
400     OutputStream fh
401     UV offset
402     IV length
403     SV * data
404     IV dataoffset
405     SV * callback
406     PROTOTYPE: $$$$$$
407     CODE:
408     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
409    
410     void
411     aio_stat(fh_or_path,callback)
412     SV * fh_or_path
413     SV * callback
414     PROTOTYPE: $$
415     ALIAS:
416     aio_lstat = 1
417     CODE:
418     aio_req req;
419    
420 root 1.14 Newz (0, req, 1, aio_cb);
421 root 1.13
422     if (!req)
423     croak ("out of memory during aio_req allocation");
424    
425     New (0, req->statdata, 1, struct stat64);
426    
427     if (!req->statdata)
428     croak ("out of memory during aio_req->statdata allocation");
429    
430     if (SvPOK (fh_or_path))
431     {
432     req->type = ix ? REQ_LSTAT : REQ_STAT;
433 root 1.14 req->data = newSVsv (fh_or_path);
434     req->dataptr = SvPV_nolen (req->data);
435 root 1.13 }
436     else
437     {
438     req->type = REQ_FSTAT;
439     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
440     }
441    
442 root 1.8 req->callback = SvREFCNT_inc (callback);
443    
444     send_req (req);
445 root 1.5
446     int
447     poll_fileno()
448     PROTOTYPE:
449     CODE:
450     RETVAL = respipe[0];
451     OUTPUT:
452     RETVAL
453    
454     int
455 root 1.6 poll_cb(...)
456 root 1.5 PROTOTYPE:
457     CODE:
458     RETVAL = poll_cb (aTHX);
459     OUTPUT:
460     RETVAL
461    
462     int
463     nreqs()
464     PROTOTYPE:
465     CODE:
466     RETVAL = nreqs;
467     OUTPUT:
468     RETVAL
469