ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.25
Committed: Wed Aug 17 03:16:56 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.24: +8 -11 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.1 #include <sys/types.h>
11     #include <sys/stat.h>
12 root 1.10
13 root 1.1 #include <unistd.h>
14     #include <fcntl.h>
15     #include <signal.h>
16     #include <sched.h>
17    
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
35 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
36 root 1.23 REQ_UNLINK, REQ_RMDIR,
37 root 1.22 REQ_SYMLINK,
38 root 1.1 };
39    
40     typedef struct aio_cb {
41 root 1.3 struct aio_cb *volatile next;
42 root 1.1
43     int type;
44    
45     int fd;
46     off_t offset;
47     size_t length;
48     ssize_t result;
49     mode_t mode; /* open */
50     int errorno;
51 root 1.13 SV *data, *callback, *fh;
52 root 1.22 void *dataptr, *data2ptr;
53 root 1.1 STRLEN dataoffset;
54    
55     Stat_t *statdata;
56     } aio_cb;
57    
58     typedef aio_cb *aio_req;
59    
60     static int started;
61 root 1.11 static volatile int nreqs;
62 root 1.4 static int max_outstanding = 1<<30;
63 root 1.3 static int respipe [2];
64 root 1.1
65 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67 root 1.22 static pthread_mutex_t frklock = PTHREAD_MUTEX_INITIALIZER;
68 root 1.3 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69    
70     static volatile aio_req reqs, reqe; /* queue start, queue end */
71     static volatile aio_req ress, rese; /* queue start, queue end */
72 root 1.1
73     static void
74     poll_wait ()
75     {
76 root 1.11 if (nreqs && !ress)
77     {
78     fd_set rfd;
79     FD_ZERO(&rfd);
80     FD_SET(respipe [0], &rfd);
81 root 1.3
82 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
83     }
84 root 1.1 }
85    
86     static int
87 root 1.5 poll_cb ()
88 root 1.1 {
89     dSP;
90     int count = 0;
91 root 1.24 int do_croak = 0;
92 root 1.11 aio_req req, prv;
93    
94     pthread_mutex_lock (&reslock);
95    
96 root 1.3 {
97 root 1.11 /* read any signals sent by the worker threads */
98 root 1.3 char buf [32];
99 root 1.22 while (read (respipe [0], buf, 32) == 32)
100 root 1.3 ;
101     }
102 root 1.1
103 root 1.11 req = ress;
104     ress = rese = 0;
105 root 1.3
106 root 1.11 pthread_mutex_unlock (&reslock);
107 root 1.3
108 root 1.11 while (req)
109     {
110 root 1.1 nreqs--;
111    
112     if (req->type == REQ_QUIT)
113     started--;
114     else
115     {
116     int errorno = errno;
117     errno = req->errorno;
118    
119     if (req->type == REQ_READ)
120     SvCUR_set (req->data, req->dataoffset
121     + req->result > 0 ? req->result : 0);
122    
123     if (req->data)
124     SvREFCNT_dec (req->data);
125    
126 root 1.13 if (req->fh)
127     SvREFCNT_dec (req->fh);
128    
129 root 1.1 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
130     {
131     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
132     PL_laststatval = req->result;
133     PL_statcache = *(req->statdata);
134    
135     Safefree (req->statdata);
136     }
137    
138 root 1.13 ENTER;
139 root 1.1 PUSHMARK (SP);
140     XPUSHs (sv_2mortal (newSViv (req->result)));
141 root 1.2
142     if (req->type == REQ_OPEN)
143     {
144     /* convert fd to fh */
145     SV *fh;
146    
147     PUTBACK;
148     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
149     SPAGAIN;
150    
151 root 1.13 fh = SvREFCNT_inc (POPs);
152 root 1.2
153     PUSHMARK (SP);
154 root 1.13 XPUSHs (sv_2mortal (fh));
155 root 1.2 }
156    
157 root 1.8 if (SvOK (req->callback))
158     {
159     PUTBACK;
160     call_sv (req->callback, G_VOID | G_EVAL);
161     SPAGAIN;
162     }
163 root 1.13
164 root 1.24 do_croak = SvTRUE (ERRSV);
165    
166 root 1.13 LEAVE;
167 root 1.1
168     if (req->callback)
169     SvREFCNT_dec (req->callback);
170    
171     errno = errorno;
172     count++;
173     }
174    
175 root 1.11 prv = req;
176     req = req->next;
177     Safefree (prv);
178    
179 root 1.24 if (do_croak)
180     croak (0);
181 root 1.1 }
182    
183     return count;
184     }
185    
186 root 1.4 static void *aio_proc(void *arg);
187    
188     static void
189     start_thread (void)
190     {
191     sigset_t fullsigset, oldsigset;
192     pthread_t tid;
193     pthread_attr_t attr;
194    
195     pthread_attr_init (&attr);
196     pthread_attr_setstacksize (&attr, STACKSIZE);
197     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
198    
199     sigfillset (&fullsigset);
200     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
201    
202     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
203     started++;
204    
205     sigprocmask (SIG_SETMASK, &oldsigset, 0);
206     }
207    
208     static void
209     send_req (aio_req req)
210     {
211     nreqs++;
212    
213     pthread_mutex_lock (&reqlock);
214    
215     req->next = 0;
216    
217     if (reqe)
218     {
219     reqe->next = req;
220     reqe = req;
221     }
222     else
223     reqe = reqs = req;
224    
225     pthread_cond_signal (&reqwait);
226     pthread_mutex_unlock (&reqlock);
227    
228     while (nreqs > max_outstanding)
229     {
230     poll_wait ();
231     poll_cb ();
232     }
233     }
234    
235     static void
236     end_thread (void)
237     {
238     aio_req req;
239     New (0, req, 1, aio_cb);
240     req->type = REQ_QUIT;
241    
242     send_req (req);
243     }
244    
245 root 1.22
246     static void min_parallel (int nthreads)
247     {
248     while (nthreads > started)
249     start_thread ();
250     }
251    
252     static void max_parallel (int nthreads)
253     {
254     int cur = started;
255     while (cur > nthreads)
256     {
257     end_thread ();
258     cur--;
259     }
260    
261     while (started > nthreads)
262     {
263     poll_wait ();
264     poll_cb ();
265     }
266     }
267    
268     static int fork_started;
269    
270     static void atfork_prepare (void)
271     {
272 root 1.25 int nstarted;
273 root 1.22
274     for (;;) {
275     while (nreqs)
276     {
277     poll_wait ();
278     poll_cb ();
279     }
280    
281 root 1.25 nstarted = started;
282 root 1.22 max_parallel (0);
283    
284     pthread_mutex_lock (&reqlock);
285    
286     if (!nreqs && !started)
287     break;
288    
289     pthread_mutex_unlock (&reqlock);
290    
291     min_parallel (fork_started);
292     }
293    
294 root 1.25 pthread_mutex_lock (&frklock);
295     fork_started = nstarted;
296 root 1.22 pthread_mutex_lock (&reslock);
297    
298     assert (!started);
299     assert (!nreqs);
300     assert (!reqs && !reqe);
301     assert (!ress && !rese);
302     }
303    
304     static void atfork_parent (void)
305     {
306     pthread_mutex_unlock (&reslock);
307 root 1.25 pthread_mutex_unlock (&frklock);
308     pthread_mutex_unlock (&reqlock);
309    
310 root 1.22 min_parallel (fork_started);
311     }
312    
313 root 1.25 #define atfork_child atfork_parent
314 root 1.22
315     /*****************************************************************************/
316 root 1.17 /* work around various missing functions */
317    
318     #if !HAVE_PREADWRITE
319     # define pread aio_pread
320     # define pwrite aio_pwrite
321    
322     /*
323     * make our pread/pwrite safe against themselves, but not against
324     * normal read/write by using a mutex. slows down execution a lot,
325     * but that's your problem, not mine.
326     */
327     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
328    
329     static ssize_t
330     pread (int fd, void *buf, size_t count, off_t offset)
331     {
332     ssize_t res;
333     off_t ooffset;
334    
335     pthread_mutex_lock (&iolock);
336     ooffset = lseek (fd, 0, SEEK_CUR);
337     lseek (fd, offset, SEEK_SET);
338     res = read (fd, buf, count);
339     lseek (fd, ooffset, SEEK_SET);
340     pthread_mutex_unlock (&iolock);
341    
342     return res;
343     }
344    
345     static ssize_t
346     pwrite (int fd, void *buf, size_t count, off_t offset)
347     {
348     ssize_t res;
349     off_t ooffset;
350    
351     pthread_mutex_lock (&iolock);
352     ooffset = lseek (fd, 0, SEEK_CUR);
353     lseek (fd, offset, SEEK_SET);
354     res = write (fd, buf, count);
355     lseek (fd, offset, SEEK_SET);
356     pthread_mutex_unlock (&iolock);
357    
358     return res;
359     }
360     #endif
361    
362     #if !HAVE_FDATASYNC
363     # define fdatasync fsync
364     #endif
365    
366     #if !HAVE_READAHEAD
367     # define readahead aio_readahead
368    
369     static char readahead_buf[4096];
370    
371     static ssize_t
372     readahead (int fd, off_t offset, size_t count)
373     {
374     while (count > 0)
375     {
376     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
377    
378     pread (fd, readahead_buf, len, offset);
379     offset += len;
380     count -= len;
381     }
382    
383     errno = 0;
384     }
385     #endif
386    
387 root 1.22 /*****************************************************************************/
388    
389 root 1.1 static void *
390     aio_proc (void *thr_arg)
391     {
392     aio_req req;
393 root 1.3 int type;
394 root 1.1
395 root 1.3 do
396 root 1.1 {
397 root 1.3 pthread_mutex_lock (&reqlock);
398    
399     for (;;)
400     {
401     req = reqs;
402    
403     if (reqs)
404     {
405     reqs = reqs->next;
406     if (!reqs) reqe = 0;
407     }
408    
409     if (req)
410     break;
411    
412     pthread_cond_wait (&reqwait, &reqlock);
413     }
414    
415     pthread_mutex_unlock (&reqlock);
416    
417 root 1.1 errno = 0; /* strictly unnecessary */
418    
419 root 1.3 type = req->type;
420    
421     switch (type)
422 root 1.1 {
423 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
424     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
425 root 1.16
426 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
427    
428     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
429     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
430     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
431    
432     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
433     case REQ_CLOSE: req->result = close (req->fd); break;
434     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
435 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
436     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
437 root 1.1
438 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
439 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
440    
441     case REQ_QUIT:
442 root 1.3 break;
443 root 1.1
444     default:
445     req->result = ENOSYS;
446     break;
447     }
448    
449     req->errorno = errno;
450 root 1.3
451     pthread_mutex_lock (&reslock);
452    
453     req->next = 0;
454    
455     if (rese)
456     {
457     rese->next = req;
458     rese = req;
459     }
460     else
461     {
462     rese = ress = req;
463    
464     /* write a dummy byte to the pipe so fh becomes ready */
465     write (respipe [1], &respipe, 1);
466     }
467    
468     pthread_mutex_unlock (&reslock);
469 root 1.1 }
470 root 1.3 while (type != REQ_QUIT);
471 root 1.1
472     return 0;
473     }
474    
475 root 1.22 #define dREQ \
476     aio_req req; \
477     \
478     if (SvOK (callback) && !SvROK (callback)) \
479     croak ("clalback must be undef or of reference type"); \
480     \
481     Newz (0, req, 1, aio_cb); \
482     if (!req) \
483     croak ("out of memory during aio_req allocation"); \
484     \
485     req->callback = SvREFCNT_inc (callback);
486    
487 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
488    
489 root 1.8 PROTOTYPES: ENABLE
490    
491 root 1.1 BOOT:
492     {
493 root 1.3 if (pipe (respipe))
494     croak ("unable to initialize result pipe");
495 root 1.1
496 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
497 root 1.1 croak ("cannot set result pipe to nonblocking mode");
498    
499 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
500 root 1.1 croak ("cannot set result pipe to nonblocking mode");
501 root 1.22
502     pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
503 root 1.1 }
504    
505     void
506     min_parallel(nthreads)
507     int nthreads
508     PROTOTYPE: $
509    
510     void
511     max_parallel(nthreads)
512     int nthreads
513     PROTOTYPE: $
514    
515 root 1.4 int
516     max_outstanding(nreqs)
517     int nreqs
518     PROTOTYPE: $
519     CODE:
520     RETVAL = max_outstanding;
521     max_outstanding = nreqs;
522    
523 root 1.1 void
524 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
525 root 1.1 SV * pathname
526     int flags
527     int mode
528     SV * callback
529 root 1.8 PROTOTYPE: $$$;$
530 root 1.1 CODE:
531     {
532 root 1.22 dREQ;
533 root 1.1
534     req->type = REQ_OPEN;
535     req->data = newSVsv (pathname);
536 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
537 root 1.1 req->fd = flags;
538     req->mode = mode;
539    
540     send_req (req);
541     }
542    
543     void
544 root 1.8 aio_close(fh,callback=&PL_sv_undef)
545 root 1.13 SV * fh
546     SV * callback
547 root 1.8 PROTOTYPE: $;$
548 root 1.1 ALIAS:
549     aio_close = REQ_CLOSE
550     aio_fsync = REQ_FSYNC
551     aio_fdatasync = REQ_FDATASYNC
552     CODE:
553     {
554 root 1.22 dREQ;
555 root 1.1
556     req->type = ix;
557 root 1.13 req->fh = newSVsv (fh);
558     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
559 root 1.1
560     send_req (req);
561     }
562    
563     void
564 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
565 root 1.13 SV * fh
566     UV offset
567     IV length
568     SV * data
569     IV dataoffset
570     SV * callback
571     ALIAS:
572     aio_read = REQ_READ
573     aio_write = REQ_WRITE
574 root 1.8 PROTOTYPE: $$$$$;$
575 root 1.1 CODE:
576 root 1.13 {
577     aio_req req;
578     STRLEN svlen;
579 root 1.21 char *svptr = SvPVbyte (data, svlen);
580 root 1.13
581     SvUPGRADE (data, SVt_PV);
582     SvPOK_on (data);
583 root 1.1
584 root 1.13 if (dataoffset < 0)
585     dataoffset += svlen;
586    
587     if (dataoffset < 0 || dataoffset > svlen)
588     croak ("data offset outside of string");
589    
590     if (ix == REQ_WRITE)
591     {
592     /* write: check length and adjust. */
593     if (length < 0 || length + dataoffset > svlen)
594     length = svlen - dataoffset;
595     }
596     else
597     {
598     /* read: grow scalar as necessary */
599     svptr = SvGROW (data, length + dataoffset);
600     }
601    
602     if (length < 0)
603     croak ("length must not be negative");
604    
605 root 1.22 {
606     dREQ;
607 root 1.13
608 root 1.22 req->type = ix;
609     req->fh = newSVsv (fh);
610     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
611     : IoOFP (sv_2io (fh)));
612     req->offset = offset;
613     req->length = length;
614     req->data = SvREFCNT_inc (data);
615     req->dataptr = (char *)svptr + dataoffset;
616     req->callback = SvREFCNT_inc (callback);
617 root 1.13
618 root 1.22 send_req (req);
619     }
620 root 1.13 }
621 root 1.1
622     void
623 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
624 root 1.13 SV * fh
625     UV offset
626     IV length
627     SV * callback
628 root 1.8 PROTOTYPE: $$$;$
629 root 1.1 CODE:
630     {
631 root 1.22 dREQ;
632 root 1.1
633     req->type = REQ_READAHEAD;
634 root 1.13 req->fh = newSVsv (fh);
635     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
636 root 1.1 req->offset = offset;
637     req->length = length;
638    
639     send_req (req);
640     }
641    
642     void
643 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
644 root 1.1 SV * fh_or_path
645     SV * callback
646     ALIAS:
647 root 1.8 aio_stat = REQ_STAT
648     aio_lstat = REQ_LSTAT
649 root 1.1 CODE:
650     {
651 root 1.22 dREQ;
652 root 1.1
653     New (0, req->statdata, 1, Stat_t);
654     if (!req->statdata)
655 root 1.22 croak ("out of memory during aio_req->statdata allocation (sorry, i just leaked memory, too)");
656 root 1.1
657     if (SvPOK (fh_or_path))
658     {
659 root 1.8 req->type = ix;
660 root 1.1 req->data = newSVsv (fh_or_path);
661 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
662 root 1.1 }
663     else
664     {
665     req->type = REQ_FSTAT;
666 root 1.13 req->fh = newSVsv (fh_or_path);
667 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
668     }
669    
670     send_req (req);
671     }
672    
673     void
674 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
675 root 1.1 SV * pathname
676     SV * callback
677 root 1.22 ALIAS:
678     aio_unlink = REQ_UNLINK
679     aio_rmdir = REQ_RMDIR
680 root 1.1 CODE:
681     {
682 root 1.22 dREQ;
683 root 1.1
684 root 1.22 req->type = ix;
685     req->data = newSVsv (pathname);
686     req->dataptr = SvPVbyte_nolen (req->data);
687 root 1.1
688 root 1.22 send_req (req);
689     }
690    
691     void
692     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
693     SV * oldpath
694     SV * newpath
695     SV * callback
696     CODE:
697     {
698     dREQ;
699 root 1.1
700 root 1.22 req->type = REQ_SYMLINK;
701     req->fh = newSVsv (oldpath);
702     req->data2ptr = SvPVbyte_nolen (req->fh);
703     req->data = newSVsv (newpath);
704     req->dataptr = SvPVbyte_nolen (req->data);
705 root 1.1
706     send_req (req);
707     }
708    
709 root 1.6 void
710     flush()
711     PROTOTYPE:
712     CODE:
713     while (nreqs)
714     {
715     poll_wait ();
716     poll_cb ();
717     }
718    
719 root 1.7 void
720     poll()
721     PROTOTYPE:
722     CODE:
723     if (nreqs)
724     {
725     poll_wait ();
726     poll_cb ();
727     }
728    
729 root 1.1 int
730     poll_fileno()
731     PROTOTYPE:
732     CODE:
733 root 1.3 RETVAL = respipe [0];
734 root 1.1 OUTPUT:
735     RETVAL
736    
737     int
738     poll_cb(...)
739     PROTOTYPE:
740     CODE:
741 root 1.5 RETVAL = poll_cb ();
742 root 1.1 OUTPUT:
743     RETVAL
744    
745     void
746     poll_wait()
747     PROTOTYPE:
748     CODE:
749 root 1.3 if (nreqs)
750     poll_wait ();
751 root 1.1
752     int
753     nreqs()
754     PROTOTYPE:
755     CODE:
756     RETVAL = nreqs;
757     OUTPUT:
758     RETVAL
759