ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.21
Committed: Fri Aug 6 17:18:08 2004 UTC (19 years, 9 months ago) by root
Branch: MAIN
Changes since 1.20: +55 -30 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.17 #define PERL_NO_GET_CONTEXT
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8 root 1.13 #include <sys/stat.h>
9 root 1.1 #include <unistd.h>
10 root 1.5 #include <fcntl.h>
11 root 1.8 #include <signal.h>
12 root 1.1 #include <sched.h>
13    
14 root 1.21 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15 root 1.13 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16 root 1.21 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
17 root 1.16
18 root 1.8 #define STACKSIZE 1024 /* yeah */
19 root 1.1
20 root 1.20 enum {
21     REQ_QUIT,
22     REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
23     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
24     };
25 root 1.1
26     typedef struct {
27     char stack[STACKSIZE];
28     } aio_thread;
29    
30 root 1.18 typedef struct aio_cb {
31     struct aio_cb *next;
32    
33 root 1.1 int type;
34     aio_thread *thread;
35    
36     int fd;
37     off_t offset;
38     size_t length;
39 root 1.2 ssize_t result;
40 root 1.8 mode_t mode; /* open */
41 root 1.1 int errorno;
42 root 1.5 SV *data, *callback;
43 root 1.1 void *dataptr;
44 root 1.2 STRLEN dataoffset;
45 root 1.13
46 root 1.21 Stat_t *statdata;
47 root 1.1 } aio_cb;
48    
49     typedef aio_cb *aio_req;
50    
51     static int started;
52 root 1.5 static int nreqs;
53 root 1.1 static int reqpipe[2], respipe[2];
54    
55 root 1.18 static aio_req qs, qe; /* queue start, queue end */
56    
57 root 1.2 static int aio_proc(void *arg);
58    
59 root 1.1 static void
60 root 1.18 start_thread (void)
61 root 1.1 {
62 root 1.2 aio_thread *thr;
63    
64     New (0, thr, 1, aio_thread);
65    
66     if (clone (aio_proc,
67     &(thr->stack[STACKSIZE]),
68 root 1.20 CLONE_VM|CLONE_FS|CLONE_FILES,
69 root 1.2 thr) >= 0)
70     started++;
71     else
72     Safefree (thr);
73 root 1.1 }
74    
75     static void
76 root 1.18 send_reqs (void)
77 root 1.1 {
78 root 1.18 /* this write is atomic */
79     while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
80     {
81     qs = qs->next;
82     if (!qs) qe = 0;
83     }
84 root 1.2 }
85    
86     static void
87 root 1.8 send_req (aio_req req)
88     {
89     nreqs++;
90 root 1.18 req->next = 0;
91    
92     if (qe)
93     qe->next = req;
94     else
95     qe = qs = req;
96    
97     send_reqs ();
98     }
99    
100     static void
101     end_thread (void)
102     {
103     aio_req req;
104     New (0, req, 1, aio_cb);
105     req->type = REQ_QUIT;
106    
107     send_req (req);
108 root 1.8 }
109    
110     static void
111 root 1.17 read_write (pTHX_
112     int dowrite, int fd, off_t offset, size_t length,
113 root 1.18 SV *data, STRLEN dataoffset, SV *callback)
114 root 1.2 {
115 root 1.5 aio_req req;
116     STRLEN svlen;
117     char *svptr = SvPV (data, svlen);
118    
119 root 1.13 SvUPGRADE (data, SVt_PV);
120     SvPOK_on (data);
121    
122 root 1.5 if (dataoffset < 0)
123     dataoffset += svlen;
124    
125     if (dataoffset < 0 || dataoffset > svlen)
126     croak ("data offset outside of string");
127    
128     if (dowrite)
129     {
130     /* write: check length and adjust. */
131     if (length < 0 || length + dataoffset > svlen)
132     length = svlen - dataoffset;
133     }
134     else
135     {
136     /* read: grow scalar as necessary */
137     svptr = SvGROW (data, length + dataoffset);
138     }
139    
140     if (length < 0)
141     croak ("length must not be negative");
142    
143 root 1.14 Newz (0, req, 1, aio_cb);
144 root 1.5
145     if (!req)
146     croak ("out of memory during aio_req allocation");
147    
148     req->type = dowrite ? REQ_WRITE : REQ_READ;
149     req->fd = fd;
150     req->offset = offset;
151     req->length = length;
152     req->data = SvREFCNT_inc (data);
153 root 1.6 req->dataptr = (char *)svptr + dataoffset;
154 root 1.5 req->callback = SvREFCNT_inc (callback);
155    
156 root 1.8 send_req (req);
157 root 1.5 }
158    
159     static int
160     poll_cb (pTHX)
161     {
162     dSP;
163     int count = 0;
164     aio_req req;
165    
166     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
167     {
168     if (req->type == REQ_QUIT)
169     {
170     Safefree (req->thread);
171     started--;
172     }
173     else
174     {
175     int errorno = errno;
176     errno = req->errorno;
177    
178     if (req->type == REQ_READ)
179 root 1.6 SvCUR_set (req->data, req->dataoffset
180     + req->result > 0 ? req->result : 0);
181 root 1.5
182 root 1.14 if (req->data)
183     SvREFCNT_dec (req->data);
184    
185 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
186     {
187 root 1.21 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
188     PL_laststatval = req->result;
189     PL_statcache = *(req->statdata);
190 root 1.13
191     Safefree (req->statdata);
192     }
193    
194 root 1.5 PUSHMARK (SP);
195     XPUSHs (sv_2mortal (newSViv (req->result)));
196     PUTBACK;
197     call_sv (req->callback, G_VOID);
198     SPAGAIN;
199    
200 root 1.14 if (req->callback)
201     SvREFCNT_dec (req->callback);
202 root 1.5
203     errno = errorno;
204     nreqs--;
205     count++;
206     }
207    
208     Safefree (req);
209     }
210    
211 root 1.18 if (qs)
212     send_reqs ();
213    
214 root 1.5 return count;
215 root 1.2 }
216    
217 root 1.8 static sigset_t fullsigset;
218    
219 root 1.2 #undef errno
220     #include <asm/unistd.h>
221 root 1.20 #include <sys/prctl.h>
222 root 1.2
223 root 1.21 #define COPY_STATDATA \
224     req->statdata->st_dev = statdata.st_dev; \
225     req->statdata->st_ino = statdata.st_ino; \
226     req->statdata->st_mode = statdata.st_mode; \
227     req->statdata->st_nlink = statdata.st_nlink; \
228     req->statdata->st_uid = statdata.st_uid; \
229     req->statdata->st_gid = statdata.st_gid; \
230     req->statdata->st_rdev = statdata.st_rdev; \
231     req->statdata->st_size = statdata.st_size; \
232     req->statdata->st_atime = statdata.st_atime; \
233     req->statdata->st_mtime = statdata.st_mtime; \
234     req->statdata->st_ctime = statdata.st_ctime; \
235     req->statdata->st_blksize = statdata.st_blksize; \
236     req->statdata->st_blocks = statdata.st_blocks; \
237    
238 root 1.2 static int
239 root 1.18 aio_proc (void *thr_arg)
240 root 1.2 {
241     aio_thread *thr = thr_arg;
242 root 1.9 aio_req req;
243 root 1.2 int errno;
244    
245 root 1.21 /* this is very much kernel-specific :(:(:( */
246 root 1.11 /* we rely on gcc's ability to create closures. */
247 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
248     _syscall3(int,write,int,fd,char *,buf,size_t,count)
249    
250 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
251     _syscall1(int,close,int,fd)
252    
253 root 1.21 #ifdef __NR_pread64
254 root 1.16 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
255     _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
256 root 1.21 #elif __NR_pread
257     _syscall4(int,pread,int,fd,char *,buf,size_t,count,offset_t,offset)
258     _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,offset_t,offset)
259     #else
260     # error "neither pread nor pread64 defined"
261     #endif
262 root 1.13
263 root 1.21
264     #ifdef __NR_stat64
265 root 1.13 _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
266     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
267     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
268 root 1.21 #elif __NR_stat
269     _syscall2(int,stat, const char *, filename, struct stat *, buf)
270     _syscall2(int,lstat, const char *, filename, struct stat *, buf)
271     _syscall2(int,fstat, int, fd, struct stat *, buf)
272     #else
273     # error "neither stat64 nor stat defined"
274     #endif
275 root 1.13
276 root 1.20 _syscall1(int,unlink, char *, filename);
277    
278 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
279 root 1.20 prctl (PR_SET_PDEATHSIG, SIGKILL);
280 root 1.2
281     /* then loop */
282     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
283     {
284     req->thread = thr;
285 root 1.14 errno = 0; /* strictly unnecessary */
286 root 1.2
287 root 1.18 switch (req->type)
288 root 1.2 {
289 root 1.21 #ifdef __NR_pread64
290 root 1.20 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
291     case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
292 root 1.21 #else
293     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
294     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
295     #endif
296     #ifdef __NR_stat64
297     struct stat64 statdata;
298     case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
299     case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
300     case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
301     #else
302     struct stat statdata;
303     case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
304     case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
305     case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
306     #endif
307 root 1.20 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
308     case REQ_CLOSE: req->result = close (req->fd); break;
309     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
310 root 1.18
311     case REQ_QUIT:
312     default:
313     write (respipe[1], (void *)&req, sizeof (req));
314     return 0;
315 root 1.2 }
316    
317 root 1.8 req->errorno = errno;
318 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
319     }
320    
321     return 0;
322 root 1.1 }
323    
324     MODULE = Linux::AIO PACKAGE = Linux::AIO
325    
326     BOOT:
327     {
328 root 1.8 sigfillset (&fullsigset);
329 root 1.9 sigdelset (&fullsigset, SIGTERM);
330     sigdelset (&fullsigset, SIGQUIT);
331     sigdelset (&fullsigset, SIGABRT);
332     sigdelset (&fullsigset, SIGINT);
333 root 1.8
334 root 1.1 if (pipe (reqpipe) || pipe (respipe))
335     croak ("unable to initialize request or result pipe");
336 root 1.18
337     if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
338     croak ("cannot set result pipe to nonblocking mode");
339 root 1.5
340     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
341     croak ("cannot set result pipe to nonblocking mode");
342 root 1.1 }
343    
344     void
345     min_parallel(nthreads)
346     int nthreads
347 root 1.5 PROTOTYPE: $
348 root 1.1 CODE:
349     while (nthreads > started)
350     start_thread ();
351    
352     void
353     max_parallel(nthreads)
354     int nthreads
355 root 1.5 PROTOTYPE: $
356 root 1.1 CODE:
357 root 1.5 int cur = started;
358     while (cur > nthreads)
359     {
360     end_thread ();
361     cur--;
362     }
363    
364 root 1.1 while (started > nthreads)
365 root 1.5 {
366 root 1.14 fd_set rfd;
367     FD_ZERO(&rfd);
368     FD_SET(respipe[0], &rfd);
369    
370     select (respipe[0] + 1, &rfd, 0, 0, 0);
371 root 1.17 poll_cb (aTHX);
372 root 1.5 }
373 root 1.1
374     void
375 root 1.8 aio_open(pathname,flags,mode,callback)
376 root 1.13 SV * pathname
377 root 1.8 int flags
378     int mode
379     SV * callback
380     PROTOTYPE: $$$$
381     CODE:
382     aio_req req;
383    
384 root 1.14 Newz (0, req, 1, aio_cb);
385 root 1.8
386     if (!req)
387     croak ("out of memory during aio_req allocation");
388    
389     req->type = REQ_OPEN;
390 root 1.14 req->data = newSVsv (pathname);
391     req->dataptr = SvPV_nolen (req->data);
392 root 1.8 req->fd = flags;
393     req->mode = mode;
394 root 1.10 req->callback = SvREFCNT_inc (callback);
395    
396     send_req (req);
397    
398     void
399     aio_close(fh,callback)
400 root 1.13 InputStream fh
401 root 1.10 SV * callback
402 root 1.14 PROTOTYPE: $$
403 root 1.10 CODE:
404     aio_req req;
405    
406 root 1.14 Newz (0, req, 1, aio_cb);
407 root 1.10
408     if (!req)
409     croak ("out of memory during aio_req allocation");
410    
411     req->type = REQ_CLOSE;
412     req->fd = PerlIO_fileno (fh);
413 root 1.13 req->callback = SvREFCNT_inc (callback);
414    
415     send_req (req);
416    
417     void
418     aio_read(fh,offset,length,data,dataoffset,callback)
419     InputStream fh
420     UV offset
421     IV length
422     SV * data
423     IV dataoffset
424     SV * callback
425     PROTOTYPE: $$$$$$
426     CODE:
427     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
428    
429     void
430     aio_write(fh,offset,length,data,dataoffset,callback)
431     OutputStream fh
432     UV offset
433     IV length
434     SV * data
435     IV dataoffset
436     SV * callback
437     PROTOTYPE: $$$$$$
438     CODE:
439     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
440    
441     void
442     aio_stat(fh_or_path,callback)
443     SV * fh_or_path
444     SV * callback
445     PROTOTYPE: $$
446     ALIAS:
447     aio_lstat = 1
448     CODE:
449     aio_req req;
450    
451 root 1.14 Newz (0, req, 1, aio_cb);
452 root 1.13
453     if (!req)
454     croak ("out of memory during aio_req allocation");
455    
456 root 1.21 New (0, req->statdata, 1, Stat_t);
457 root 1.13
458     if (!req->statdata)
459     croak ("out of memory during aio_req->statdata allocation");
460    
461     if (SvPOK (fh_or_path))
462     {
463     req->type = ix ? REQ_LSTAT : REQ_STAT;
464 root 1.14 req->data = newSVsv (fh_or_path);
465     req->dataptr = SvPV_nolen (req->data);
466 root 1.13 }
467     else
468     {
469     req->type = REQ_FSTAT;
470     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
471     }
472    
473 root 1.8 req->callback = SvREFCNT_inc (callback);
474    
475     send_req (req);
476 root 1.20
477     void
478     aio_unlink(pathname,callback)
479     SV * pathname
480     SV * callback
481     PROTOTYPE: $$
482     CODE:
483     aio_req req;
484    
485     Newz (0, req, 1, aio_cb);
486    
487     if (!req)
488     croak ("out of memory during aio_req allocation");
489    
490     req->type = REQ_UNLINK;
491     req->data = newSVsv (pathname);
492     req->dataptr = SvPV_nolen (req->data);
493     req->callback = SvREFCNT_inc (callback);
494    
495     send_req (req);
496 root 1.5
497     int
498     poll_fileno()
499     PROTOTYPE:
500     CODE:
501     RETVAL = respipe[0];
502     OUTPUT:
503     RETVAL
504    
505     int
506 root 1.6 poll_cb(...)
507 root 1.5 PROTOTYPE:
508     CODE:
509     RETVAL = poll_cb (aTHX);
510     OUTPUT:
511     RETVAL
512    
513     int
514     nreqs()
515     PROTOTYPE:
516     CODE:
517     RETVAL = nreqs;
518     OUTPUT:
519     RETVAL
520