ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.17
Committed: Sun Jul 31 18:45:48 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.16: +72 -13 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.15 #include "EXTERN.h"
2 root 1.1 #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.16 #include "autoconf/config.h"
6    
7 root 1.1 #include <sys/types.h>
8     #include <sys/stat.h>
9 root 1.10
10 root 1.1 #include <unistd.h>
11     #include <fcntl.h>
12     #include <signal.h>
13     #include <sched.h>
14    
15     #include <pthread.h>
16    
17     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
18     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
19     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
20    
21 root 1.4 #if __ia64
22     # define STACKSIZE 65536
23 root 1.1 #else
24 root 1.4 # define STACKSIZE 4096
25 root 1.1 #endif
26    
27     enum {
28     REQ_QUIT,
29     REQ_OPEN, REQ_CLOSE,
30     REQ_READ, REQ_WRITE, REQ_READAHEAD,
31     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
32     REQ_FSYNC, REQ_FDATASYNC,
33     };
34    
35     typedef struct aio_cb {
36 root 1.3 struct aio_cb *volatile next;
37 root 1.1
38     int type;
39    
40     int fd;
41     off_t offset;
42     size_t length;
43     ssize_t result;
44     mode_t mode; /* open */
45     int errorno;
46 root 1.13 SV *data, *callback, *fh;
47 root 1.1 void *dataptr;
48     STRLEN dataoffset;
49    
50     Stat_t *statdata;
51     } aio_cb;
52    
53     typedef aio_cb *aio_req;
54    
55     static int started;
56 root 1.11 static volatile int nreqs;
57 root 1.4 static int max_outstanding = 1<<30;
58 root 1.3 static int respipe [2];
59 root 1.1
60 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
61     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
62     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
63    
64     static volatile aio_req reqs, reqe; /* queue start, queue end */
65     static volatile aio_req ress, rese; /* queue start, queue end */
66 root 1.1
67     static void
68     poll_wait ()
69     {
70 root 1.11 if (nreqs && !ress)
71     {
72     fd_set rfd;
73     FD_ZERO(&rfd);
74     FD_SET(respipe [0], &rfd);
75 root 1.3
76 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
77     }
78 root 1.1 }
79    
80     static int
81 root 1.5 poll_cb ()
82 root 1.1 {
83     dSP;
84     int count = 0;
85 root 1.11 aio_req req, prv;
86    
87     pthread_mutex_lock (&reslock);
88    
89 root 1.3 {
90 root 1.11 /* read any signals sent by the worker threads */
91 root 1.3 char buf [32];
92     while (read (respipe [0], buf, 32) > 0)
93     ;
94     }
95 root 1.1
96 root 1.11 req = ress;
97     ress = rese = 0;
98 root 1.3
99 root 1.11 pthread_mutex_unlock (&reslock);
100 root 1.3
101 root 1.11 while (req)
102     {
103 root 1.1 nreqs--;
104    
105     if (req->type == REQ_QUIT)
106     started--;
107     else
108     {
109     int errorno = errno;
110     errno = req->errorno;
111    
112     if (req->type == REQ_READ)
113     SvCUR_set (req->data, req->dataoffset
114     + req->result > 0 ? req->result : 0);
115    
116     if (req->data)
117     SvREFCNT_dec (req->data);
118    
119 root 1.13 if (req->fh)
120     SvREFCNT_dec (req->fh);
121    
122 root 1.1 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
123     {
124     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
125     PL_laststatval = req->result;
126     PL_statcache = *(req->statdata);
127    
128     Safefree (req->statdata);
129     }
130    
131 root 1.13 ENTER;
132 root 1.1 PUSHMARK (SP);
133     XPUSHs (sv_2mortal (newSViv (req->result)));
134 root 1.2
135     if (req->type == REQ_OPEN)
136     {
137     /* convert fd to fh */
138     SV *fh;
139    
140     PUTBACK;
141     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
142     SPAGAIN;
143    
144 root 1.13 fh = SvREFCNT_inc (POPs);
145 root 1.2
146     PUSHMARK (SP);
147 root 1.13 XPUSHs (sv_2mortal (fh));
148 root 1.2 }
149    
150 root 1.8 if (SvOK (req->callback))
151     {
152     PUTBACK;
153     call_sv (req->callback, G_VOID | G_EVAL);
154     SPAGAIN;
155     }
156 root 1.13
157     LEAVE;
158 root 1.1
159     if (req->callback)
160     SvREFCNT_dec (req->callback);
161    
162     errno = errorno;
163     count++;
164     }
165    
166 root 1.11 prv = req;
167     req = req->next;
168     Safefree (prv);
169    
170     /* TODO: croak on errors? */
171 root 1.1 }
172    
173     return count;
174     }
175    
176 root 1.4 static void *aio_proc(void *arg);
177    
178     static void
179     start_thread (void)
180     {
181     sigset_t fullsigset, oldsigset;
182     pthread_t tid;
183     pthread_attr_t attr;
184    
185     pthread_attr_init (&attr);
186     pthread_attr_setstacksize (&attr, STACKSIZE);
187     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
188    
189     sigfillset (&fullsigset);
190     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
191    
192     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
193     started++;
194    
195     sigprocmask (SIG_SETMASK, &oldsigset, 0);
196     }
197    
198     static void
199     send_req (aio_req req)
200     {
201     nreqs++;
202    
203     pthread_mutex_lock (&reqlock);
204    
205     req->next = 0;
206    
207     if (reqe)
208     {
209     reqe->next = req;
210     reqe = req;
211     }
212     else
213     reqe = reqs = req;
214    
215     pthread_cond_signal (&reqwait);
216     pthread_mutex_unlock (&reqlock);
217    
218     while (nreqs > max_outstanding)
219     {
220     poll_wait ();
221     poll_cb ();
222     }
223     }
224    
225     static void
226     end_thread (void)
227     {
228     aio_req req;
229     New (0, req, 1, aio_cb);
230     req->type = REQ_QUIT;
231    
232     send_req (req);
233     }
234    
235 root 1.17 /* work around various missing functions */
236    
237     #if !HAVE_PREADWRITE
238     # define pread aio_pread
239     # define pwrite aio_pwrite
240    
241     /*
242     * make our pread/pwrite safe against themselves, but not against
243     * normal read/write by using a mutex. slows down execution a lot,
244     * but that's your problem, not mine.
245     */
246     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
247    
248     static ssize_t
249     pread (int fd, void *buf, size_t count, off_t offset)
250     {
251     ssize_t res;
252     off_t ooffset;
253    
254     pthread_mutex_lock (&iolock);
255     ooffset = lseek (fd, 0, SEEK_CUR);
256     lseek (fd, offset, SEEK_SET);
257     res = read (fd, buf, count);
258     lseek (fd, ooffset, SEEK_SET);
259     pthread_mutex_unlock (&iolock);
260    
261     return res;
262     }
263    
264     static ssize_t
265     pwrite (int fd, void *buf, size_t count, off_t offset)
266     {
267     ssize_t res;
268     off_t ooffset;
269    
270     pthread_mutex_lock (&iolock);
271     ooffset = lseek (fd, 0, SEEK_CUR);
272     lseek (fd, offset, SEEK_SET);
273     res = write (fd, buf, count);
274     lseek (fd, offset, SEEK_SET);
275     pthread_mutex_unlock (&iolock);
276    
277     return res;
278     }
279     #endif
280    
281     #if !HAVE_FDATASYNC
282     # define fdatasync fsync
283     #endif
284    
285     #if !HAVE_READAHEAD
286     # define readahead aio_readahead
287    
288     static char readahead_buf[4096];
289    
290     static ssize_t
291     readahead (int fd, off_t offset, size_t count)
292     {
293     while (count > 0)
294     {
295     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
296    
297     pread (fd, readahead_buf, len, offset);
298     offset += len;
299     count -= len;
300     }
301    
302     errno = 0;
303     }
304     #endif
305    
306 root 1.1 static void *
307     aio_proc (void *thr_arg)
308     {
309     aio_req req;
310 root 1.3 int type;
311 root 1.1
312 root 1.3 do
313 root 1.1 {
314 root 1.3 pthread_mutex_lock (&reqlock);
315    
316     for (;;)
317     {
318     req = reqs;
319    
320     if (reqs)
321     {
322     reqs = reqs->next;
323     if (!reqs) reqe = 0;
324     }
325    
326     if (req)
327     break;
328    
329     pthread_cond_wait (&reqwait, &reqlock);
330     }
331    
332     pthread_mutex_unlock (&reqlock);
333    
334 root 1.1 errno = 0; /* strictly unnecessary */
335    
336 root 1.3 type = req->type;
337    
338     switch (type)
339 root 1.1 {
340 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
341     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
342 root 1.16
343 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
344    
345     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
346     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
347     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
348    
349     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
350     case REQ_CLOSE: req->result = close (req->fd); break;
351     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
352    
353 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
354 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
355    
356     case REQ_QUIT:
357 root 1.3 break;
358 root 1.1
359     default:
360     req->result = ENOSYS;
361     break;
362     }
363    
364     req->errorno = errno;
365 root 1.3
366     pthread_mutex_lock (&reslock);
367    
368     req->next = 0;
369    
370     if (rese)
371     {
372     rese->next = req;
373     rese = req;
374     }
375     else
376     {
377     rese = ress = req;
378    
379     /* write a dummy byte to the pipe so fh becomes ready */
380     write (respipe [1], &respipe, 1);
381     }
382    
383     pthread_mutex_unlock (&reslock);
384 root 1.1 }
385 root 1.3 while (type != REQ_QUIT);
386 root 1.1
387     return 0;
388     }
389    
390     MODULE = IO::AIO PACKAGE = IO::AIO
391    
392 root 1.8 PROTOTYPES: ENABLE
393    
394 root 1.1 BOOT:
395     {
396 root 1.3 if (pipe (respipe))
397     croak ("unable to initialize result pipe");
398 root 1.1
399 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
400 root 1.1 croak ("cannot set result pipe to nonblocking mode");
401    
402 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
403 root 1.1 croak ("cannot set result pipe to nonblocking mode");
404     }
405    
406     void
407     min_parallel(nthreads)
408     int nthreads
409     PROTOTYPE: $
410     CODE:
411     while (nthreads > started)
412     start_thread ();
413    
414     void
415     max_parallel(nthreads)
416     int nthreads
417     PROTOTYPE: $
418     CODE:
419     {
420     int cur = started;
421     while (cur > nthreads)
422     {
423     end_thread ();
424     cur--;
425     }
426    
427     while (started > nthreads)
428     {
429     poll_wait ();
430 root 1.5 poll_cb ();
431 root 1.1 }
432     }
433    
434 root 1.4 int
435     max_outstanding(nreqs)
436     int nreqs
437     PROTOTYPE: $
438     CODE:
439     RETVAL = max_outstanding;
440     max_outstanding = nreqs;
441    
442 root 1.1 void
443 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
444 root 1.1 SV * pathname
445     int flags
446     int mode
447     SV * callback
448 root 1.8 PROTOTYPE: $$$;$
449 root 1.1 CODE:
450     {
451     aio_req req;
452    
453     Newz (0, req, 1, aio_cb);
454    
455     if (!req)
456     croak ("out of memory during aio_req allocation");
457    
458     req->type = REQ_OPEN;
459     req->data = newSVsv (pathname);
460     req->dataptr = SvPV_nolen (req->data);
461     req->fd = flags;
462     req->mode = mode;
463     req->callback = SvREFCNT_inc (callback);
464    
465     send_req (req);
466     }
467    
468     void
469 root 1.8 aio_close(fh,callback=&PL_sv_undef)
470 root 1.13 SV * fh
471     SV * callback
472 root 1.8 PROTOTYPE: $;$
473 root 1.1 ALIAS:
474     aio_close = REQ_CLOSE
475     aio_fsync = REQ_FSYNC
476     aio_fdatasync = REQ_FDATASYNC
477     CODE:
478     {
479     aio_req req;
480    
481     Newz (0, req, 1, aio_cb);
482    
483     if (!req)
484     croak ("out of memory during aio_req allocation");
485    
486     req->type = ix;
487 root 1.13 req->fh = newSVsv (fh);
488     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
489 root 1.1 req->callback = SvREFCNT_inc (callback);
490    
491     send_req (req);
492     }
493    
494     void
495 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
496 root 1.13 SV * fh
497     UV offset
498     IV length
499     SV * data
500     IV dataoffset
501     SV * callback
502     ALIAS:
503     aio_read = REQ_READ
504     aio_write = REQ_WRITE
505 root 1.8 PROTOTYPE: $$$$$;$
506 root 1.1 CODE:
507 root 1.13 {
508     aio_req req;
509     STRLEN svlen;
510     char *svptr = SvPV (data, svlen);
511    
512     SvUPGRADE (data, SVt_PV);
513     SvPOK_on (data);
514 root 1.1
515 root 1.13 if (dataoffset < 0)
516     dataoffset += svlen;
517    
518     if (dataoffset < 0 || dataoffset > svlen)
519     croak ("data offset outside of string");
520    
521     if (ix == REQ_WRITE)
522     {
523     /* write: check length and adjust. */
524     if (length < 0 || length + dataoffset > svlen)
525     length = svlen - dataoffset;
526     }
527     else
528     {
529     /* read: grow scalar as necessary */
530     svptr = SvGROW (data, length + dataoffset);
531     }
532    
533     if (length < 0)
534     croak ("length must not be negative");
535    
536     Newz (0, req, 1, aio_cb);
537    
538     if (!req)
539     croak ("out of memory during aio_req allocation");
540    
541     req->type = ix;
542     req->fh = newSVsv (fh);
543     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
544     : IoOFP (sv_2io (fh)));
545     req->offset = offset;
546     req->length = length;
547     req->data = SvREFCNT_inc (data);
548     req->dataptr = (char *)svptr + dataoffset;
549     req->callback = SvREFCNT_inc (callback);
550    
551     send_req (req);
552     }
553 root 1.1
554     void
555 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
556 root 1.13 SV * fh
557     UV offset
558     IV length
559     SV * callback
560 root 1.8 PROTOTYPE: $$$;$
561 root 1.1 CODE:
562     {
563     aio_req req;
564    
565     if (length < 0)
566     croak ("length must not be negative");
567    
568     Newz (0, req, 1, aio_cb);
569    
570     if (!req)
571     croak ("out of memory during aio_req allocation");
572    
573     req->type = REQ_READAHEAD;
574 root 1.13 req->fh = newSVsv (fh);
575     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
576 root 1.1 req->offset = offset;
577     req->length = length;
578     req->callback = SvREFCNT_inc (callback);
579    
580     send_req (req);
581     }
582    
583     void
584 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
585 root 1.1 SV * fh_or_path
586     SV * callback
587     ALIAS:
588 root 1.8 aio_stat = REQ_STAT
589     aio_lstat = REQ_LSTAT
590 root 1.1 CODE:
591     {
592     aio_req req;
593    
594     Newz (0, req, 1, aio_cb);
595    
596     if (!req)
597     croak ("out of memory during aio_req allocation");
598    
599     New (0, req->statdata, 1, Stat_t);
600    
601     if (!req->statdata)
602     croak ("out of memory during aio_req->statdata allocation");
603    
604     if (SvPOK (fh_or_path))
605     {
606 root 1.8 req->type = ix;
607 root 1.1 req->data = newSVsv (fh_or_path);
608     req->dataptr = SvPV_nolen (req->data);
609     }
610     else
611     {
612     req->type = REQ_FSTAT;
613 root 1.13 req->fh = newSVsv (fh_or_path);
614 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
615     }
616    
617     req->callback = SvREFCNT_inc (callback);
618    
619     send_req (req);
620     }
621    
622     void
623 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
624 root 1.1 SV * pathname
625     SV * callback
626     CODE:
627     {
628     aio_req req;
629    
630     Newz (0, req, 1, aio_cb);
631    
632     if (!req)
633     croak ("out of memory during aio_req allocation");
634    
635     req->type = REQ_UNLINK;
636     req->data = newSVsv (pathname);
637     req->dataptr = SvPV_nolen (req->data);
638     req->callback = SvREFCNT_inc (callback);
639    
640     send_req (req);
641     }
642    
643 root 1.6 void
644     flush()
645     PROTOTYPE:
646     CODE:
647     while (nreqs)
648     {
649     poll_wait ();
650     poll_cb ();
651     }
652    
653 root 1.7 void
654     poll()
655     PROTOTYPE:
656     CODE:
657     if (nreqs)
658     {
659     poll_wait ();
660     poll_cb ();
661     }
662    
663 root 1.1 int
664     poll_fileno()
665     PROTOTYPE:
666     CODE:
667 root 1.3 RETVAL = respipe [0];
668 root 1.1 OUTPUT:
669     RETVAL
670    
671     int
672     poll_cb(...)
673     PROTOTYPE:
674     CODE:
675 root 1.5 RETVAL = poll_cb ();
676 root 1.1 OUTPUT:
677     RETVAL
678    
679     void
680     poll_wait()
681     PROTOTYPE:
682     CODE:
683 root 1.3 if (nreqs)
684     poll_wait ();
685 root 1.1
686     int
687     nreqs()
688     PROTOTYPE:
689     CODE:
690     RETVAL = nreqs;
691     OUTPUT:
692     RETVAL
693