ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.20
Committed: Sun Jul 18 10:55:34 2004 UTC (19 years, 10 months ago) by root
Branch: MAIN
Changes since 1.19: +38 -9 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13    
14 root 1.13 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.12 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
17    
18 root 1.16 #ifndef __NR_pread64
19     # define __NR_pread64 __NR_pread
20     #endif
21     #ifndef __NR_pwrite64
22     # define __NR_pwrite64 __NR_pwrite
23     #endif
24    
25 root 1.8 #define STACKSIZE 1024 /* yeah */
26 root 1.1
27 root 1.20 enum {
28     REQ_QUIT,
29     REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
30     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
31     };
32 root 1.1
33     typedef struct {
34     char stack[STACKSIZE];
35     } aio_thread;
36    
37 root 1.18 typedef struct aio_cb {
38     struct aio_cb *next;
39    
40 root 1.1 int type;
41     aio_thread *thread;
42    
43     int fd;
44     off_t offset;
45     size_t length;
46 root 1.2 ssize_t result;
47 root 1.8 mode_t mode; /* open */
48 root 1.1 int errorno;
49 root 1.5 SV *data, *callback;
50 root 1.1 void *dataptr;
51 root 1.2 STRLEN dataoffset;
52 root 1.13
53     struct stat64 *statdata;
54 root 1.1 } aio_cb;
55    
56     typedef aio_cb *aio_req;
57    
58     static int started;
59 root 1.5 static int nreqs;
60 root 1.1 static int reqpipe[2], respipe[2];
61    
62 root 1.18 static aio_req qs, qe; /* queue start, queue end */
63    
64 root 1.2 static int aio_proc(void *arg);
65    
66 root 1.1 static void
67 root 1.18 start_thread (void)
68 root 1.1 {
69 root 1.2 aio_thread *thr;
70    
71     New (0, thr, 1, aio_thread);
72    
73     if (clone (aio_proc,
74     &(thr->stack[STACKSIZE]),
75 root 1.20 CLONE_VM|CLONE_FS|CLONE_FILES,
76 root 1.2 thr) >= 0)
77     started++;
78     else
79     Safefree (thr);
80 root 1.1 }
81    
82     static void
83 root 1.18 send_reqs (void)
84 root 1.1 {
85 root 1.18 /* this write is atomic */
86     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
87     {
88     qs = qs->next;
89     if (!qs) qe = 0;
90     }
91 root 1.2 }
92    
93     static void
94 root 1.8 send_req (aio_req req)
95     {
96     nreqs++;
97 root 1.18 req->next = 0;
98    
99     if (qe)
100     qe->next = req;
101     else
102     qe = qs = req;
103    
104     send_reqs ();
105     }
106    
107     static void
108     end_thread (void)
109     {
110     aio_req req;
111     New (0, req, 1, aio_cb);
112     req->type = REQ_QUIT;
113    
114     send_req (req);
115 root 1.8 }
116    
117     static void
118 root 1.17 read_write (pTHX_
119     int dowrite, int fd, off_t offset, size_t length,
120 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
121 root 1.2 {
122 root 1.5 aio_req req;
123     STRLEN svlen;
124     char *svptr = SvPV (data, svlen);
125    
126 root 1.13 SvUPGRADE (data, SVt_PV);
127     SvPOK_on (data);
128    
129 root 1.5 if (dataoffset < 0)
130     dataoffset += svlen;
131    
132     if (dataoffset < 0 || dataoffset > svlen)
133     croak ("data offset outside of string");
134    
135     if (dowrite)
136     {
137     /* write: check length and adjust. */
138     if (length < 0 || length + dataoffset > svlen)
139     length = svlen - dataoffset;
140     }
141     else
142     {
143     /* read: grow scalar as necessary */
144     svptr = SvGROW (data, length + dataoffset);
145     }
146    
147     if (length < 0)
148     croak ("length must not be negative");
149    
150 root 1.14 Newz (0, req, 1, aio_cb);
151 root 1.5
152     if (!req)
153     croak ("out of memory during aio_req allocation");
154    
155     req->type = dowrite ? REQ_WRITE : REQ_READ;
156     req->fd = fd;
157     req->offset = offset;
158     req->length = length;
159     req->data = SvREFCNT_inc (data);
160 root 1.6 req->dataptr = (char *)svptr + dataoffset;
161 root 1.5 req->callback = SvREFCNT_inc (callback);
162    
163 root 1.8 send_req (req);
164 root 1.5 }
165    
166     static int
167     poll_cb (pTHX)
168     {
169     dSP;
170     int count = 0;
171     aio_req req;
172    
173     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
174     {
175     if (req->type == REQ_QUIT)
176     {
177     Safefree (req->thread);
178     started--;
179     }
180     else
181     {
182     int errorno = errno;
183     errno = req->errorno;
184    
185     if (req->type == REQ_READ)
186 root 1.6 SvCUR_set (req->data, req->dataoffset
187     + req->result > 0 ? req->result : 0);
188 root 1.5
189 root 1.14 if (req->data)
190     SvREFCNT_dec (req->data);
191    
192 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
193     {
194     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
195     PL_laststatval = req->result;
196     PL_statcache.st_dev = req->statdata->st_dev;
197     PL_statcache.st_ino = req->statdata->st_ino;
198     PL_statcache.st_mode = req->statdata->st_mode;
199     PL_statcache.st_nlink = req->statdata->st_nlink;
200     PL_statcache.st_uid = req->statdata->st_uid;
201     PL_statcache.st_gid = req->statdata->st_gid;
202     PL_statcache.st_rdev = req->statdata->st_rdev;
203     PL_statcache.st_size = req->statdata->st_size;
204     PL_statcache.st_atime = req->statdata->st_atime;
205     PL_statcache.st_mtime = req->statdata->st_mtime;
206     PL_statcache.st_ctime = req->statdata->st_ctime;
207     PL_statcache.st_blksize = req->statdata->st_blksize;
208     PL_statcache.st_blocks = req->statdata->st_blocks;
209    
210     Safefree (req->statdata);
211     }
212    
213 root 1.5 PUSHMARK (SP);
214     XPUSHs (sv_2mortal (newSViv (req->result)));
215     PUTBACK;
216     call_sv (req->callback, G_VOID);
217     SPAGAIN;
218    
219 root 1.14 if (req->callback)
220     SvREFCNT_dec (req->callback);
221 root 1.5
222     errno = errorno;
223     nreqs--;
224     count++;
225     }
226    
227     Safefree (req);
228     }
229    
230 root 1.18 if (qs)
231     send_reqs ();
232    
233 root 1.5 return count;
234 root 1.2 }
235    
236 root 1.8 static sigset_t fullsigset;
237    
238 root 1.2 #undef errno
239     #include <asm/unistd.h>
240 root 1.20 #include <sys/prctl.h>
241 root 1.2
242     static int
243 root 1.18 aio_proc (void *thr_arg)
244 root 1.2 {
245     aio_thread *thr = thr_arg;
246 root 1.9 aio_req req;
247 root 1.2 int errno;
248    
249 root 1.15 /* this is very much x86 and kernel-specific :(:(:( */
250 root 1.11 /* we rely on gcc's ability to create closures. */
251 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
252     _syscall3(int,write,int,fd,char *,buf,size_t,count)
253    
254 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
255     _syscall1(int,close,int,fd)
256    
257 root 1.16 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
258     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
259 root 1.13
260     _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
261     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
262     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
263    
264 root 1.20 _syscall1(int,unlink, char *, filename);
265    
266 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
267 root 1.20 prctl (PR_SET_PDEATHSIG, SIGKILL);
268 root 1.2
269     /* then loop */
270     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
271     {
272     req->thread = thr;
273 root 1.14 errno = 0; /* strictly unnecessary */
274 root 1.2
275 root 1.18 switch (req->type)
276 root 1.2 {
277 root 1.20 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
278     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
279     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
280     case REQ_CLOSE: req->result = close (req->fd); break;
281     case REQ_STAT: req->result = stat64 (req->dataptr, req->statdata); break;
282     case REQ_LSTAT: req->result = lstat64 (req->dataptr, req->statdata); break;
283     case REQ_FSTAT: req->result = fstat64 (req->fd, req->statdata); break;
284     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
285 root 1.18
286     case REQ_QUIT:
287     default:
288     write (respipe[1], (void *)&req, sizeof (req));
289     return 0;
290 root 1.2 }
291    
292 root 1.8 req->errorno = errno;
293 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
294     }
295    
296     return 0;
297 root 1.1 }
298    
299     MODULE = Linux::AIO PACKAGE = Linux::AIO
300    
301     BOOT:
302     {
303 root 1.8 sigfillset (&fullsigset);
304 root 1.9 sigdelset (&fullsigset, SIGTERM);
305     sigdelset (&fullsigset, SIGQUIT);
306     sigdelset (&fullsigset, SIGABRT);
307     sigdelset (&fullsigset, SIGINT);
308 root 1.8
309 root 1.1 if (pipe (reqpipe) || pipe (respipe))
310     croak ("unable to initialize request or result pipe");
311 root 1.18
312     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
313     croak ("cannot set result pipe to nonblocking mode");
314 root 1.5
315     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
316     croak ("cannot set result pipe to nonblocking mode");
317 root 1.1 }
318    
319     void
320     min_parallel(nthreads)
321     int nthreads
322 root 1.5 PROTOTYPE: $
323 root 1.1 CODE:
324     while (nthreads > started)
325     start_thread ();
326    
327     void
328     max_parallel(nthreads)
329     int nthreads
330 root 1.5 PROTOTYPE: $
331 root 1.1 CODE:
332 root 1.5 int cur = started;
333     while (cur > nthreads)
334     {
335     end_thread ();
336     cur--;
337     }
338    
339 root 1.1 while (started > nthreads)
340 root 1.5 {
341 root 1.14 fd_set rfd;
342     FD_ZERO(&rfd);
343     FD_SET(respipe[0], &rfd);
344    
345     select (respipe[0] + 1, &rfd, 0, 0, 0);
346 root 1.17 poll_cb (aTHX);
347 root 1.5 }
348 root 1.1
349     void
350 root 1.8 aio_open(pathname,flags,mode,callback)
351 root 1.13 SV * pathname
352 root 1.8 int flags
353     int mode
354     SV * callback
355     PROTOTYPE: $$$$
356     CODE:
357     aio_req req;
358    
359 root 1.14 Newz (0, req, 1, aio_cb);
360 root 1.8
361     if (!req)
362     croak ("out of memory during aio_req allocation");
363    
364     req->type = REQ_OPEN;
365 root 1.14 req->data = newSVsv (pathname);
366     req->dataptr = SvPV_nolen (req->data);
367 root 1.8 req->fd = flags;
368     req->mode = mode;
369 root 1.10 req->callback = SvREFCNT_inc (callback);
370    
371     send_req (req);
372    
373     void
374     aio_close(fh,callback)
375 root 1.13 InputStream fh
376 root 1.10 SV * callback
377 root 1.14 PROTOTYPE: $$
378 root 1.10 CODE:
379     aio_req req;
380    
381 root 1.14 Newz (0, req, 1, aio_cb);
382 root 1.10
383     if (!req)
384     croak ("out of memory during aio_req allocation");
385    
386     req->type = REQ_CLOSE;
387     req->fd = PerlIO_fileno (fh);
388 root 1.13 req->callback = SvREFCNT_inc (callback);
389    
390     send_req (req);
391    
392     void
393     aio_read(fh,offset,length,data,dataoffset,callback)
394     InputStream fh
395     UV offset
396     IV length
397     SV * data
398     IV dataoffset
399     SV * callback
400     PROTOTYPE: $$$$$$
401     CODE:
402     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
403    
404     void
405     aio_write(fh,offset,length,data,dataoffset,callback)
406     OutputStream fh
407     UV offset
408     IV length
409     SV * data
410     IV dataoffset
411     SV * callback
412     PROTOTYPE: $$$$$$
413     CODE:
414     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
415    
416     void
417     aio_stat(fh_or_path,callback)
418     SV * fh_or_path
419     SV * callback
420     PROTOTYPE: $$
421     ALIAS:
422     aio_lstat = 1
423     CODE:
424     aio_req req;
425    
426 root 1.14 Newz (0, req, 1, aio_cb);
427 root 1.13
428     if (!req)
429     croak ("out of memory during aio_req allocation");
430    
431     New (0, req->statdata, 1, struct stat64);
432    
433     if (!req->statdata)
434     croak ("out of memory during aio_req->statdata allocation");
435    
436     if (SvPOK (fh_or_path))
437     {
438     req->type = ix ? REQ_LSTAT : REQ_STAT;
439 root 1.14 req->data = newSVsv (fh_or_path);
440     req->dataptr = SvPV_nolen (req->data);
441 root 1.13 }
442     else
443     {
444     req->type = REQ_FSTAT;
445     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
446     }
447    
448 root 1.8 req->callback = SvREFCNT_inc (callback);
449    
450     send_req (req);
451 root 1.20
452     void
453     aio_unlink(pathname,callback)
454     SV * pathname
455     SV * callback
456     PROTOTYPE: $$
457     CODE:
458     aio_req req;
459    
460     Newz (0, req, 1, aio_cb);
461    
462     if (!req)
463     croak ("out of memory during aio_req allocation");
464    
465     req->type = REQ_UNLINK;
466     req->data = newSVsv (pathname);
467     req->dataptr = SvPV_nolen (req->data);
468     req->callback = SvREFCNT_inc (callback);
469    
470     send_req (req);
471 root 1.5
472     int
473     poll_fileno()
474     PROTOTYPE:
475     CODE:
476     RETVAL = respipe[0];
477     OUTPUT:
478     RETVAL
479    
480     int
481 root 1.6 poll_cb(...)
482 root 1.5 PROTOTYPE:
483     CODE:
484     RETVAL = poll_cb (aTHX);
485     OUTPUT:
486     RETVAL
487    
488     int
489     nreqs()
490     PROTOTYPE:
491     CODE:
492     RETVAL = nreqs;
493     OUTPUT:
494     RETVAL
495