ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.25
Committed: Thu Jul 7 22:24:09 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
Changes since 1.24: +25 -15 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13    
14 root 1.21 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15 root 1.13 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.21 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
17 root 1.16
18 root 1.25 // 128 seems to be enough most everywhere. alpha needs 256.
19     #define STACKSIZE (256 * sizeof (long))
20 root 1.1
21 root 1.20 enum {
22     REQ_QUIT,
23     REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
24     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
25     };
26 root 1.1
27     typedef struct {
28     char stack[STACKSIZE];
29     } aio_thread;
30    
31 root 1.18 typedef struct aio_cb {
32     struct aio_cb *next;
33    
34 root 1.1 int type;
35     aio_thread *thread;
36    
37     int fd;
38     off_t offset;
39     size_t length;
40 root 1.2 ssize_t result;
41 root 1.8 mode_t mode; /* open */
42 root 1.1 int errorno;
43 root 1.5 SV *data, *callback;
44 root 1.1 void *dataptr;
45 root 1.2 STRLEN dataoffset;
46 root 1.13
47 root 1.21 Stat_t *statdata;
48 root 1.1 } aio_cb;
49    
50     typedef aio_cb *aio_req;
51    
52     static int started;
53 root 1.5 static int nreqs;
54 root 1.1 static int reqpipe[2], respipe[2];
55    
56 root 1.18 static aio_req qs, qe; /* queue start, queue end */
57    
58 root 1.2 static int aio_proc(void *arg);
59    
60 root 1.1 static void
61 root 1.18 start_thread (void)
62 root 1.1 {
63 root 1.2 aio_thread *thr;
64    
65     New (0, thr, 1, aio_thread);
66    
67     if (clone (aio_proc,
68 root 1.22 &(thr->stack[STACKSIZE - sizeof (long)]),
69 root 1.20 CLONE_VM|CLONE_FS|CLONE_FILES,
70 root 1.2 thr) >= 0)
71     started++;
72     else
73     Safefree (thr);
74 root 1.1 }
75    
76     static void
77 root 1.18 send_reqs (void)
78 root 1.1 {
79 root 1.18 /* this write is atomic */
80     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
81     {
82     qs = qs->next;
83     if (!qs) qe = 0;
84     }
85 root 1.2 }
86    
87     static void
88 root 1.8 send_req (aio_req req)
89     {
90     nreqs++;
91 root 1.18 req->next = 0;
92    
93     if (qe)
94 root 1.22 {
95     qe->next = req;
96     qe = req;
97     }
98 root 1.18 else
99     qe = qs = req;
100    
101     send_reqs ();
102     }
103    
104     static void
105     end_thread (void)
106     {
107     aio_req req;
108     New (0, req, 1, aio_cb);
109     req->type = REQ_QUIT;
110    
111     send_req (req);
112 root 1.8 }
113    
114     static void
115 root 1.17 read_write (pTHX_
116     int dowrite, int fd, off_t offset, size_t length,
117 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
118 root 1.2 {
119 root 1.5 aio_req req;
120     STRLEN svlen;
121     char *svptr = SvPV (data, svlen);
122    
123 root 1.13 SvUPGRADE (data, SVt_PV);
124     SvPOK_on (data);
125    
126 root 1.5 if (dataoffset < 0)
127     dataoffset += svlen;
128    
129     if (dataoffset < 0 || dataoffset > svlen)
130     croak ("data offset outside of string");
131    
132     if (dowrite)
133     {
134     /* write: check length and adjust. */
135     if (length < 0 || length + dataoffset > svlen)
136     length = svlen - dataoffset;
137     }
138     else
139     {
140     /* read: grow scalar as necessary */
141     svptr = SvGROW (data, length + dataoffset);
142     }
143    
144     if (length < 0)
145     croak ("length must not be negative");
146    
147 root 1.14 Newz (0, req, 1, aio_cb);
148 root 1.5
149     if (!req)
150     croak ("out of memory during aio_req allocation");
151    
152     req->type = dowrite ? REQ_WRITE : REQ_READ;
153     req->fd = fd;
154     req->offset = offset;
155     req->length = length;
156     req->data = SvREFCNT_inc (data);
157 root 1.6 req->dataptr = (char *)svptr + dataoffset;
158 root 1.5 req->callback = SvREFCNT_inc (callback);
159    
160 root 1.8 send_req (req);
161 root 1.5 }
162    
163 root 1.22 static void
164     poll_wait ()
165     {
166     fd_set rfd;
167     FD_ZERO(&rfd);
168     FD_SET(respipe[0], &rfd);
169    
170     select (respipe[0] + 1, &rfd, 0, 0, 0);
171     }
172    
173 root 1.5 static int
174     poll_cb (pTHX)
175     {
176     dSP;
177     int count = 0;
178     aio_req req;
179    
180     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
181     {
182 root 1.22 nreqs--;
183    
184 root 1.5 if (req->type == REQ_QUIT)
185     {
186     Safefree (req->thread);
187     started--;
188     }
189     else
190     {
191     int errorno = errno;
192     errno = req->errorno;
193    
194     if (req->type == REQ_READ)
195 root 1.6 SvCUR_set (req->data, req->dataoffset
196     + req->result > 0 ? req->result : 0);
197 root 1.5
198 root 1.14 if (req->data)
199     SvREFCNT_dec (req->data);
200    
201 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
202     {
203 root 1.21 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
204     PL_laststatval = req->result;
205     PL_statcache = *(req->statdata);
206 root 1.13
207     Safefree (req->statdata);
208     }
209    
210 root 1.5 PUSHMARK (SP);
211     XPUSHs (sv_2mortal (newSViv (req->result)));
212     PUTBACK;
213     call_sv (req->callback, G_VOID);
214     SPAGAIN;
215    
216 root 1.14 if (req->callback)
217     SvREFCNT_dec (req->callback);
218 root 1.5
219     errno = errorno;
220     count++;
221     }
222    
223     Safefree (req);
224     }
225    
226 root 1.18 if (qs)
227     send_reqs ();
228    
229 root 1.5 return count;
230 root 1.2 }
231    
232 root 1.8 static sigset_t fullsigset;
233    
234 root 1.2 #undef errno
235     #include <asm/unistd.h>
236 root 1.20 #include <sys/prctl.h>
237 root 1.2
238 root 1.25 #if __alpha || __ia64 || __hppa || __sparc64__ || __v850__
239     # define stat kernelstat
240     # define stat64 kernelstat64
241     # include <asm/stat.h>
242     # undef stat
243     # undef stat64
244     #else
245     # define kernelstat stat
246     # define kernelstat64 stat64
247     #endif
248    
249 root 1.21 #define COPY_STATDATA \
250     req->statdata->st_dev = statdata.st_dev; \
251     req->statdata->st_ino = statdata.st_ino; \
252     req->statdata->st_mode = statdata.st_mode; \
253     req->statdata->st_nlink = statdata.st_nlink; \
254     req->statdata->st_uid = statdata.st_uid; \
255     req->statdata->st_gid = statdata.st_gid; \
256     req->statdata->st_rdev = statdata.st_rdev; \
257     req->statdata->st_size = statdata.st_size; \
258     req->statdata->st_atime = statdata.st_atime; \
259     req->statdata->st_mtime = statdata.st_mtime; \
260     req->statdata->st_ctime = statdata.st_ctime; \
261     req->statdata->st_blksize = statdata.st_blksize; \
262     req->statdata->st_blocks = statdata.st_blocks; \
263    
264 root 1.2 static int
265 root 1.18 aio_proc (void *thr_arg)
266 root 1.2 {
267     aio_thread *thr = thr_arg;
268 root 1.9 aio_req req;
269 root 1.2 int errno;
270    
271 root 1.21 /* this is very much kernel-specific :(:(:( */
272 root 1.11 /* we rely on gcc's ability to create closures. */
273 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
274     _syscall3(int,write,int,fd,char *,buf,size_t,count)
275    
276 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
277     _syscall1(int,close,int,fd)
278    
279 root 1.25 #if __NR_pread64
280 root 1.16 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
281     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
282 root 1.21 #elif __NR_pread
283     _syscall4(int,pread,int,fd,char *,buf,size_t,count,offset_t,offset)
284     _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,offset_t,offset)
285     #else
286     # error "neither pread nor pread64 defined"
287     #endif
288 root 1.13
289 root 1.21
290 root 1.25 #if __NR_stat64
291     _syscall2(int,stat64, const char *, filename, struct kernelstat64 *, buf)
292     _syscall2(int,lstat64, const char *, filename, struct kernelstat64 *, buf)
293     _syscall2(int,fstat64, int, fd, struct kernelstat64 *, buf)
294 root 1.21 #elif __NR_stat
295 root 1.25 _syscall2(int,stat, const char *, filename, struct kernelstat *, buf)
296     _syscall2(int,lstat, const char *, filename, struct kernelstat *, buf)
297     _syscall2(int,fstat, int, fd, struct kernelstat *, buf)
298 root 1.21 #else
299     # error "neither stat64 nor stat defined"
300     #endif
301 root 1.13
302 root 1.20 _syscall1(int,unlink, char *, filename);
303    
304 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
305 root 1.20 prctl (PR_SET_PDEATHSIG, SIGKILL);
306 root 1.2
307     /* then loop */
308     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
309     {
310     req->thread = thr;
311 root 1.14 errno = 0; /* strictly unnecessary */
312 root 1.2
313 root 1.18 switch (req->type)
314 root 1.2 {
315 root 1.25 #if __NR_pread64
316 root 1.20 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
317     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
318 root 1.21 #else
319     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
320     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
321     #endif
322 root 1.25 #if __NR_stat64
323     struct kernelstat64 statdata;
324 root 1.21 case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
325     case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
326     case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
327     #else
328 root 1.25 struct kernelstat statdata;
329 root 1.21 case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
330     case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
331     case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
332     #endif
333 root 1.20 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
334     case REQ_CLOSE: req->result = close (req->fd); break;
335     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
336 root 1.18
337     case REQ_QUIT:
338     default:
339     write (respipe[1], (void *)&req, sizeof (req));
340     return 0;
341 root 1.2 }
342    
343 root 1.8 req->errorno = errno;
344 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
345     }
346    
347     return 0;
348 root 1.1 }
349    
350     MODULE = Linux::AIO PACKAGE = Linux::AIO
351    
352     BOOT:
353     {
354 root 1.8 sigfillset (&fullsigset);
355 root 1.9 sigdelset (&fullsigset, SIGTERM);
356     sigdelset (&fullsigset, SIGQUIT);
357     sigdelset (&fullsigset, SIGABRT);
358     sigdelset (&fullsigset, SIGINT);
359 root 1.8
360 root 1.1 if (pipe (reqpipe) || pipe (respipe))
361     croak ("unable to initialize request or result pipe");
362 root 1.18
363     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
364     croak ("cannot set result pipe to nonblocking mode");
365 root 1.5
366     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
367     croak ("cannot set result pipe to nonblocking mode");
368 root 1.1 }
369    
370     void
371     min_parallel(nthreads)
372     int nthreads
373 root 1.5 PROTOTYPE: $
374 root 1.1 CODE:
375     while (nthreads > started)
376     start_thread ();
377    
378     void
379     max_parallel(nthreads)
380     int nthreads
381 root 1.5 PROTOTYPE: $
382 root 1.1 CODE:
383 root 1.5 int cur = started;
384     while (cur > nthreads)
385     {
386     end_thread ();
387     cur--;
388     }
389    
390 root 1.1 while (started > nthreads)
391 root 1.5 {
392 root 1.22 poll_wait ();
393 root 1.17 poll_cb (aTHX);
394 root 1.5 }
395 root 1.1
396     void
397 root 1.8 aio_open(pathname,flags,mode,callback)
398 root 1.13 SV * pathname
399 root 1.8 int flags
400     int mode
401     SV * callback
402     PROTOTYPE: $$$$
403     CODE:
404     aio_req req;
405    
406 root 1.14 Newz (0, req, 1, aio_cb);
407 root 1.8
408     if (!req)
409     croak ("out of memory during aio_req allocation");
410    
411     req->type = REQ_OPEN;
412 root 1.14 req->data = newSVsv (pathname);
413     req->dataptr = SvPV_nolen (req->data);
414 root 1.8 req->fd = flags;
415     req->mode = mode;
416 root 1.10 req->callback = SvREFCNT_inc (callback);
417    
418     send_req (req);
419    
420     void
421     aio_close(fh,callback)
422 root 1.13 InputStream fh
423 root 1.10 SV * callback
424 root 1.14 PROTOTYPE: $$
425 root 1.10 CODE:
426     aio_req req;
427    
428 root 1.14 Newz (0, req, 1, aio_cb);
429 root 1.10
430     if (!req)
431     croak ("out of memory during aio_req allocation");
432    
433     req->type = REQ_CLOSE;
434     req->fd = PerlIO_fileno (fh);
435 root 1.13 req->callback = SvREFCNT_inc (callback);
436    
437     send_req (req);
438    
439     void
440     aio_read(fh,offset,length,data,dataoffset,callback)
441     InputStream fh
442     UV offset
443     IV length
444     SV * data
445     IV dataoffset
446     SV * callback
447     PROTOTYPE: $$$$$$
448     CODE:
449     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
450    
451     void
452     aio_write(fh,offset,length,data,dataoffset,callback)
453     OutputStream fh
454     UV offset
455     IV length
456     SV * data
457     IV dataoffset
458     SV * callback
459     PROTOTYPE: $$$$$$
460     CODE:
461     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
462    
463     void
464     aio_stat(fh_or_path,callback)
465     SV * fh_or_path
466     SV * callback
467     PROTOTYPE: $$
468     ALIAS:
469     aio_lstat = 1
470     CODE:
471     aio_req req;
472    
473 root 1.14 Newz (0, req, 1, aio_cb);
474 root 1.13
475     if (!req)
476     croak ("out of memory during aio_req allocation");
477    
478 root 1.21 New (0, req->statdata, 1, Stat_t);
479 root 1.13
480     if (!req->statdata)
481     croak ("out of memory during aio_req->statdata allocation");
482    
483     if (SvPOK (fh_or_path))
484     {
485     req->type = ix ? REQ_LSTAT : REQ_STAT;
486 root 1.14 req->data = newSVsv (fh_or_path);
487     req->dataptr = SvPV_nolen (req->data);
488 root 1.13 }
489     else
490     {
491     req->type = REQ_FSTAT;
492     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
493     }
494    
495 root 1.8 req->callback = SvREFCNT_inc (callback);
496    
497     send_req (req);
498 root 1.20
499     void
500     aio_unlink(pathname,callback)
501     SV * pathname
502     SV * callback
503     PROTOTYPE: $$
504     CODE:
505     aio_req req;
506    
507     Newz (0, req, 1, aio_cb);
508    
509     if (!req)
510     croak ("out of memory during aio_req allocation");
511    
512     req->type = REQ_UNLINK;
513     req->data = newSVsv (pathname);
514     req->dataptr = SvPV_nolen (req->data);
515     req->callback = SvREFCNT_inc (callback);
516    
517     send_req (req);
518 root 1.5
519     int
520     poll_fileno()
521     PROTOTYPE:
522     CODE:
523     RETVAL = respipe[0];
524     OUTPUT:
525     RETVAL
526    
527     int
528 root 1.6 poll_cb(...)
529 root 1.5 PROTOTYPE:
530     CODE:
531     RETVAL = poll_cb (aTHX);
532     OUTPUT:
533     RETVAL
534    
535 root 1.23 void
536     poll_wait()
537     PROTOTYPE:
538     CODE:
539     poll_wait ();
540    
541 root 1.5 int
542     nreqs()
543     PROTOTYPE:
544     CODE:
545     RETVAL = nreqs;
546     OUTPUT:
547     RETVAL
548