ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.20
Committed: Sun Jul 31 19:08:39 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.19: +1 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.1 #include <sys/types.h>
11     #include <sys/stat.h>
12 root 1.10
13 root 1.1 #include <unistd.h>
14     #include <fcntl.h>
15     #include <signal.h>
16     #include <sched.h>
17    
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
35     REQ_FSYNC, REQ_FDATASYNC,
36     };
37    
38     typedef struct aio_cb {
39 root 1.3 struct aio_cb *volatile next;
40 root 1.1
41     int type;
42    
43     int fd;
44     off_t offset;
45     size_t length;
46     ssize_t result;
47     mode_t mode; /* open */
48     int errorno;
49 root 1.13 SV *data, *callback, *fh;
50 root 1.1 void *dataptr;
51     STRLEN dataoffset;
52    
53     Stat_t *statdata;
54     } aio_cb;
55    
56     typedef aio_cb *aio_req;
57    
58     static int started;
59 root 1.11 static volatile int nreqs;
60 root 1.4 static int max_outstanding = 1<<30;
61 root 1.3 static int respipe [2];
62 root 1.1
63 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
64     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
65     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
66    
67     static volatile aio_req reqs, reqe; /* queue start, queue end */
68     static volatile aio_req ress, rese; /* queue start, queue end */
69 root 1.1
70     static void
71     poll_wait ()
72     {
73 root 1.11 if (nreqs && !ress)
74     {
75     fd_set rfd;
76     FD_ZERO(&rfd);
77     FD_SET(respipe [0], &rfd);
78 root 1.3
79 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
80     }
81 root 1.1 }
82    
83     static int
84 root 1.5 poll_cb ()
85 root 1.1 {
86     dSP;
87     int count = 0;
88 root 1.11 aio_req req, prv;
89    
90     pthread_mutex_lock (&reslock);
91    
92 root 1.3 {
93 root 1.11 /* read any signals sent by the worker threads */
94 root 1.3 char buf [32];
95     while (read (respipe [0], buf, 32) > 0)
96     ;
97     }
98 root 1.1
99 root 1.11 req = ress;
100     ress = rese = 0;
101 root 1.3
102 root 1.11 pthread_mutex_unlock (&reslock);
103 root 1.3
104 root 1.11 while (req)
105     {
106 root 1.1 nreqs--;
107    
108     if (req->type == REQ_QUIT)
109     started--;
110     else
111     {
112     int errorno = errno;
113     errno = req->errorno;
114    
115     if (req->type == REQ_READ)
116     SvCUR_set (req->data, req->dataoffset
117     + req->result > 0 ? req->result : 0);
118    
119     if (req->data)
120     SvREFCNT_dec (req->data);
121    
122 root 1.13 if (req->fh)
123     SvREFCNT_dec (req->fh);
124    
125 root 1.1 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
126     {
127     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
128     PL_laststatval = req->result;
129     PL_statcache = *(req->statdata);
130    
131     Safefree (req->statdata);
132     }
133    
134 root 1.13 ENTER;
135 root 1.1 PUSHMARK (SP);
136     XPUSHs (sv_2mortal (newSViv (req->result)));
137 root 1.2
138     if (req->type == REQ_OPEN)
139     {
140     /* convert fd to fh */
141     SV *fh;
142    
143     PUTBACK;
144     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
145     SPAGAIN;
146    
147 root 1.13 fh = SvREFCNT_inc (POPs);
148 root 1.2
149     PUSHMARK (SP);
150 root 1.13 XPUSHs (sv_2mortal (fh));
151 root 1.2 }
152    
153 root 1.8 if (SvOK (req->callback))
154     {
155     PUTBACK;
156     call_sv (req->callback, G_VOID | G_EVAL);
157     SPAGAIN;
158     }
159 root 1.13
160     LEAVE;
161 root 1.1
162     if (req->callback)
163     SvREFCNT_dec (req->callback);
164    
165     errno = errorno;
166     count++;
167     }
168    
169 root 1.11 prv = req;
170     req = req->next;
171     Safefree (prv);
172    
173     /* TODO: croak on errors? */
174 root 1.1 }
175    
176     return count;
177     }
178    
179 root 1.4 static void *aio_proc(void *arg);
180    
181     static void
182     start_thread (void)
183     {
184     sigset_t fullsigset, oldsigset;
185     pthread_t tid;
186     pthread_attr_t attr;
187    
188     pthread_attr_init (&attr);
189     pthread_attr_setstacksize (&attr, STACKSIZE);
190     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
191    
192     sigfillset (&fullsigset);
193     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
194    
195     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
196     started++;
197    
198     sigprocmask (SIG_SETMASK, &oldsigset, 0);
199     }
200    
201     static void
202     send_req (aio_req req)
203     {
204     nreqs++;
205    
206     pthread_mutex_lock (&reqlock);
207    
208     req->next = 0;
209    
210     if (reqe)
211     {
212     reqe->next = req;
213     reqe = req;
214     }
215     else
216     reqe = reqs = req;
217    
218     pthread_cond_signal (&reqwait);
219     pthread_mutex_unlock (&reqlock);
220    
221     while (nreqs > max_outstanding)
222     {
223     poll_wait ();
224     poll_cb ();
225     }
226     }
227    
228     static void
229     end_thread (void)
230     {
231     aio_req req;
232     New (0, req, 1, aio_cb);
233     req->type = REQ_QUIT;
234    
235     send_req (req);
236     }
237    
238 root 1.17 /* work around various missing functions */
239    
240     #if !HAVE_PREADWRITE
241     # define pread aio_pread
242     # define pwrite aio_pwrite
243    
244     /*
245     * make our pread/pwrite safe against themselves, but not against
246     * normal read/write by using a mutex. slows down execution a lot,
247     * but that's your problem, not mine.
248     */
249     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
250    
251     static ssize_t
252     pread (int fd, void *buf, size_t count, off_t offset)
253     {
254     ssize_t res;
255     off_t ooffset;
256    
257     pthread_mutex_lock (&iolock);
258     ooffset = lseek (fd, 0, SEEK_CUR);
259     lseek (fd, offset, SEEK_SET);
260     res = read (fd, buf, count);
261     lseek (fd, ooffset, SEEK_SET);
262     pthread_mutex_unlock (&iolock);
263    
264     return res;
265     }
266    
267     static ssize_t
268     pwrite (int fd, void *buf, size_t count, off_t offset)
269     {
270     ssize_t res;
271     off_t ooffset;
272    
273     pthread_mutex_lock (&iolock);
274     ooffset = lseek (fd, 0, SEEK_CUR);
275     lseek (fd, offset, SEEK_SET);
276     res = write (fd, buf, count);
277     lseek (fd, offset, SEEK_SET);
278     pthread_mutex_unlock (&iolock);
279    
280     return res;
281     }
282     #endif
283    
284     #if !HAVE_FDATASYNC
285     # define fdatasync fsync
286     #endif
287    
288     #if !HAVE_READAHEAD
289     # define readahead aio_readahead
290    
291     static char readahead_buf[4096];
292    
293     static ssize_t
294     readahead (int fd, off_t offset, size_t count)
295     {
296     while (count > 0)
297     {
298     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
299    
300     pread (fd, readahead_buf, len, offset);
301     offset += len;
302     count -= len;
303     }
304    
305     errno = 0;
306     }
307     #endif
308    
309 root 1.1 static void *
310     aio_proc (void *thr_arg)
311     {
312     aio_req req;
313 root 1.3 int type;
314 root 1.1
315 root 1.3 do
316 root 1.1 {
317 root 1.3 pthread_mutex_lock (&reqlock);
318    
319     for (;;)
320     {
321     req = reqs;
322    
323     if (reqs)
324     {
325     reqs = reqs->next;
326     if (!reqs) reqe = 0;
327     }
328    
329     if (req)
330     break;
331    
332     pthread_cond_wait (&reqwait, &reqlock);
333     }
334    
335     pthread_mutex_unlock (&reqlock);
336    
337 root 1.1 errno = 0; /* strictly unnecessary */
338    
339 root 1.3 type = req->type;
340    
341     switch (type)
342 root 1.1 {
343 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
344     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
345 root 1.16
346 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
347    
348     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
349     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
350     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
351    
352     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
353     case REQ_CLOSE: req->result = close (req->fd); break;
354     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
355    
356 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
357 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
358    
359     case REQ_QUIT:
360 root 1.3 break;
361 root 1.1
362     default:
363     req->result = ENOSYS;
364     break;
365     }
366    
367     req->errorno = errno;
368 root 1.3
369     pthread_mutex_lock (&reslock);
370    
371     req->next = 0;
372    
373     if (rese)
374     {
375     rese->next = req;
376     rese = req;
377     }
378     else
379     {
380     rese = ress = req;
381    
382     /* write a dummy byte to the pipe so fh becomes ready */
383     write (respipe [1], &respipe, 1);
384     }
385    
386     pthread_mutex_unlock (&reslock);
387 root 1.1 }
388 root 1.3 while (type != REQ_QUIT);
389 root 1.1
390     return 0;
391     }
392    
393     MODULE = IO::AIO PACKAGE = IO::AIO
394    
395 root 1.8 PROTOTYPES: ENABLE
396    
397 root 1.1 BOOT:
398     {
399 root 1.3 if (pipe (respipe))
400     croak ("unable to initialize result pipe");
401 root 1.1
402 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
403 root 1.1 croak ("cannot set result pipe to nonblocking mode");
404    
405 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
406 root 1.1 croak ("cannot set result pipe to nonblocking mode");
407     }
408    
409     void
410     min_parallel(nthreads)
411     int nthreads
412     PROTOTYPE: $
413     CODE:
414     while (nthreads > started)
415     start_thread ();
416    
417     void
418     max_parallel(nthreads)
419     int nthreads
420     PROTOTYPE: $
421     CODE:
422     {
423     int cur = started;
424     while (cur > nthreads)
425     {
426     end_thread ();
427     cur--;
428     }
429    
430     while (started > nthreads)
431     {
432     poll_wait ();
433 root 1.5 poll_cb ();
434 root 1.1 }
435     }
436    
437 root 1.4 int
438     max_outstanding(nreqs)
439     int nreqs
440     PROTOTYPE: $
441     CODE:
442     RETVAL = max_outstanding;
443     max_outstanding = nreqs;
444    
445 root 1.1 void
446 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
447 root 1.1 SV * pathname
448     int flags
449     int mode
450     SV * callback
451 root 1.8 PROTOTYPE: $$$;$
452 root 1.1 CODE:
453     {
454     aio_req req;
455    
456     Newz (0, req, 1, aio_cb);
457    
458     if (!req)
459     croak ("out of memory during aio_req allocation");
460    
461     req->type = REQ_OPEN;
462     req->data = newSVsv (pathname);
463     req->dataptr = SvPV_nolen (req->data);
464     req->fd = flags;
465     req->mode = mode;
466     req->callback = SvREFCNT_inc (callback);
467    
468     send_req (req);
469     }
470    
471     void
472 root 1.8 aio_close(fh,callback=&PL_sv_undef)
473 root 1.13 SV * fh
474     SV * callback
475 root 1.8 PROTOTYPE: $;$
476 root 1.1 ALIAS:
477     aio_close = REQ_CLOSE
478     aio_fsync = REQ_FSYNC
479     aio_fdatasync = REQ_FDATASYNC
480     CODE:
481     {
482     aio_req req;
483    
484     Newz (0, req, 1, aio_cb);
485    
486     if (!req)
487     croak ("out of memory during aio_req allocation");
488    
489     req->type = ix;
490 root 1.13 req->fh = newSVsv (fh);
491     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
492 root 1.1 req->callback = SvREFCNT_inc (callback);
493    
494     send_req (req);
495     }
496    
497     void
498 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
499 root 1.13 SV * fh
500     UV offset
501     IV length
502     SV * data
503     IV dataoffset
504     SV * callback
505     ALIAS:
506     aio_read = REQ_READ
507     aio_write = REQ_WRITE
508 root 1.8 PROTOTYPE: $$$$$;$
509 root 1.1 CODE:
510 root 1.13 {
511     aio_req req;
512     STRLEN svlen;
513     char *svptr = SvPV (data, svlen);
514    
515     SvUPGRADE (data, SVt_PV);
516     SvPOK_on (data);
517 root 1.1
518 root 1.13 if (dataoffset < 0)
519     dataoffset += svlen;
520    
521     if (dataoffset < 0 || dataoffset > svlen)
522     croak ("data offset outside of string");
523    
524     if (ix == REQ_WRITE)
525     {
526     /* write: check length and adjust. */
527     if (length < 0 || length + dataoffset > svlen)
528     length = svlen - dataoffset;
529     }
530     else
531     {
532     /* read: grow scalar as necessary */
533     svptr = SvGROW (data, length + dataoffset);
534     }
535    
536     if (length < 0)
537     croak ("length must not be negative");
538    
539     Newz (0, req, 1, aio_cb);
540    
541     if (!req)
542     croak ("out of memory during aio_req allocation");
543    
544     req->type = ix;
545     req->fh = newSVsv (fh);
546     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
547     : IoOFP (sv_2io (fh)));
548     req->offset = offset;
549     req->length = length;
550     req->data = SvREFCNT_inc (data);
551     req->dataptr = (char *)svptr + dataoffset;
552     req->callback = SvREFCNT_inc (callback);
553    
554     send_req (req);
555     }
556 root 1.1
557     void
558 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
559 root 1.13 SV * fh
560     UV offset
561     IV length
562     SV * callback
563 root 1.8 PROTOTYPE: $$$;$
564 root 1.1 CODE:
565     {
566     aio_req req;
567    
568     if (length < 0)
569     croak ("length must not be negative");
570    
571     Newz (0, req, 1, aio_cb);
572    
573     if (!req)
574     croak ("out of memory during aio_req allocation");
575    
576     req->type = REQ_READAHEAD;
577 root 1.13 req->fh = newSVsv (fh);
578     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
579 root 1.1 req->offset = offset;
580     req->length = length;
581     req->callback = SvREFCNT_inc (callback);
582    
583     send_req (req);
584     }
585    
586     void
587 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
588 root 1.1 SV * fh_or_path
589     SV * callback
590     ALIAS:
591 root 1.8 aio_stat = REQ_STAT
592     aio_lstat = REQ_LSTAT
593 root 1.1 CODE:
594     {
595     aio_req req;
596    
597     Newz (0, req, 1, aio_cb);
598    
599     if (!req)
600     croak ("out of memory during aio_req allocation");
601    
602     New (0, req->statdata, 1, Stat_t);
603    
604     if (!req->statdata)
605     croak ("out of memory during aio_req->statdata allocation");
606    
607     if (SvPOK (fh_or_path))
608     {
609 root 1.8 req->type = ix;
610 root 1.1 req->data = newSVsv (fh_or_path);
611     req->dataptr = SvPV_nolen (req->data);
612     }
613     else
614     {
615     req->type = REQ_FSTAT;
616 root 1.13 req->fh = newSVsv (fh_or_path);
617 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
618     }
619    
620     req->callback = SvREFCNT_inc (callback);
621    
622     send_req (req);
623     }
624    
625     void
626 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
627 root 1.1 SV * pathname
628     SV * callback
629     CODE:
630     {
631     aio_req req;
632    
633     Newz (0, req, 1, aio_cb);
634    
635     if (!req)
636     croak ("out of memory during aio_req allocation");
637    
638     req->type = REQ_UNLINK;
639     req->data = newSVsv (pathname);
640     req->dataptr = SvPV_nolen (req->data);
641     req->callback = SvREFCNT_inc (callback);
642    
643     send_req (req);
644     }
645    
646 root 1.6 void
647     flush()
648     PROTOTYPE:
649     CODE:
650     while (nreqs)
651     {
652     poll_wait ();
653     poll_cb ();
654     }
655    
656 root 1.7 void
657     poll()
658     PROTOTYPE:
659     CODE:
660     if (nreqs)
661     {
662     poll_wait ();
663     poll_cb ();
664     }
665    
666 root 1.1 int
667     poll_fileno()
668     PROTOTYPE:
669     CODE:
670 root 1.3 RETVAL = respipe [0];
671 root 1.1 OUTPUT:
672     RETVAL
673    
674     int
675     poll_cb(...)
676     PROTOTYPE:
677     CODE:
678 root 1.5 RETVAL = poll_cb ();
679 root 1.1 OUTPUT:
680     RETVAL
681    
682     void
683     poll_wait()
684     PROTOTYPE:
685     CODE:
686 root 1.3 if (nreqs)
687     poll_wait ();
688 root 1.1
689     int
690     nreqs()
691     PROTOTYPE:
692     CODE:
693     RETVAL = nreqs;
694     OUTPUT:
695     RETVAL
696