ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.11
Committed: Wed Jul 20 21:55:27 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-0_9
Changes since 1.10: +30 -29 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.11 #define _XOPEN_SOURCE 500
2    
3 root 1.1 #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8     #include <sys/stat.h>
9 root 1.10
10 root 1.1 #include <unistd.h>
11     #include <fcntl.h>
12     #include <signal.h>
13     #include <sched.h>
14 root 1.10 #if __linux
15     #include <sys/syscall.h>
16     #endif
17 root 1.1
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
35     REQ_FSYNC, REQ_FDATASYNC,
36     };
37    
38     typedef struct aio_cb {
39 root 1.3 struct aio_cb *volatile next;
40 root 1.1
41     int type;
42    
43     int fd;
44     off_t offset;
45     size_t length;
46     ssize_t result;
47     mode_t mode; /* open */
48     int errorno;
49     SV *data, *callback;
50     void *dataptr;
51     STRLEN dataoffset;
52    
53     Stat_t *statdata;
54     } aio_cb;
55    
56     typedef aio_cb *aio_req;
57    
58     static int started;
59 root 1.11 static volatile int nreqs;
60 root 1.4 static int max_outstanding = 1<<30;
61 root 1.3 static int respipe [2];
62 root 1.1
63 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
64     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
65     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
66    
67     static volatile aio_req reqs, reqe; /* queue start, queue end */
68     static volatile aio_req ress, rese; /* queue start, queue end */
69 root 1.1
70     static void
71     poll_wait ()
72     {
73 root 1.11 if (nreqs && !ress)
74     {
75     fd_set rfd;
76     FD_ZERO(&rfd);
77     FD_SET(respipe [0], &rfd);
78 root 1.3
79 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
80     }
81 root 1.1 }
82    
83     static int
84 root 1.5 poll_cb ()
85 root 1.1 {
86     dSP;
87     int count = 0;
88 root 1.11 aio_req req, prv;
89    
90     static int rl;//D
91     //printf ("%d ENTER\n", ++rl);//D
92    
93     pthread_mutex_lock (&reslock);
94    
95 root 1.3 {
96 root 1.11 /* read any signals sent by the worker threads */
97 root 1.3 char buf [32];
98     while (read (respipe [0], buf, 32) > 0)
99     ;
100     }
101 root 1.1
102 root 1.11 req = ress;
103     ress = rese = 0;
104 root 1.3
105 root 1.11 pthread_mutex_unlock (&reslock);
106 root 1.3
107 root 1.11 while (req)
108     {
109 root 1.1 nreqs--;
110 root 1.11 //printf ("%d count %d %p->%p\n", rl, count, req, req->next);//D
111 root 1.1
112     if (req->type == REQ_QUIT)
113     started--;
114     else
115     {
116     int errorno = errno;
117     errno = req->errorno;
118    
119     if (req->type == REQ_READ)
120     SvCUR_set (req->data, req->dataoffset
121     + req->result > 0 ? req->result : 0);
122    
123     if (req->data)
124     SvREFCNT_dec (req->data);
125    
126     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
127     {
128     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
129     PL_laststatval = req->result;
130     PL_statcache = *(req->statdata);
131    
132     Safefree (req->statdata);
133     }
134    
135     PUSHMARK (SP);
136     XPUSHs (sv_2mortal (newSViv (req->result)));
137 root 1.2
138     if (req->type == REQ_OPEN)
139     {
140     /* convert fd to fh */
141     SV *fh;
142    
143     PUTBACK;
144     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
145     SPAGAIN;
146    
147     fh = POPs;
148    
149     PUSHMARK (SP);
150     XPUSHs (fh);
151     }
152    
153 root 1.8 if (SvOK (req->callback))
154     {
155     PUTBACK;
156     call_sv (req->callback, G_VOID | G_EVAL);
157     SPAGAIN;
158     }
159 root 1.1
160     if (req->callback)
161     SvREFCNT_dec (req->callback);
162    
163     errno = errorno;
164     count++;
165     }
166    
167 root 1.11 prv = req;
168     req = req->next;
169     Safefree (prv);
170    
171     /* TODO: croak on errors? */
172 root 1.1 }
173    
174 root 1.11 //printf ("%d LEAVE %p %p\n", rl--, ress, rese);//D
175 root 1.1 return count;
176     }
177    
178 root 1.4 static void *aio_proc(void *arg);
179    
180     static void
181     start_thread (void)
182     {
183     sigset_t fullsigset, oldsigset;
184     pthread_t tid;
185     pthread_attr_t attr;
186    
187     pthread_attr_init (&attr);
188     pthread_attr_setstacksize (&attr, STACKSIZE);
189     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
190    
191     sigfillset (&fullsigset);
192     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
193    
194     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
195     started++;
196    
197     sigprocmask (SIG_SETMASK, &oldsigset, 0);
198     }
199    
200     static void
201     send_req (aio_req req)
202     {
203     nreqs++;
204    
205     pthread_mutex_lock (&reqlock);
206    
207     req->next = 0;
208    
209     if (reqe)
210     {
211     reqe->next = req;
212     reqe = req;
213     }
214     else
215     reqe = reqs = req;
216    
217     pthread_cond_signal (&reqwait);
218     pthread_mutex_unlock (&reqlock);
219    
220     while (nreqs > max_outstanding)
221     {
222     poll_wait ();
223     poll_cb ();
224     }
225     }
226    
227     static void
228     end_thread (void)
229     {
230     aio_req req;
231     New (0, req, 1, aio_cb);
232     req->type = REQ_QUIT;
233    
234     send_req (req);
235     }
236    
237     static void
238 root 1.5 read_write (int dowrite, int fd, off_t offset, size_t length,
239 root 1.4 SV *data, STRLEN dataoffset, SV *callback)
240     {
241     aio_req req;
242     STRLEN svlen;
243     char *svptr = SvPV (data, svlen);
244    
245     SvUPGRADE (data, SVt_PV);
246     SvPOK_on (data);
247    
248     if (dataoffset < 0)
249     dataoffset += svlen;
250    
251     if (dataoffset < 0 || dataoffset > svlen)
252     croak ("data offset outside of string");
253    
254     if (dowrite)
255     {
256     /* write: check length and adjust. */
257     if (length < 0 || length + dataoffset > svlen)
258     length = svlen - dataoffset;
259     }
260     else
261     {
262     /* read: grow scalar as necessary */
263     svptr = SvGROW (data, length + dataoffset);
264     }
265    
266     if (length < 0)
267     croak ("length must not be negative");
268    
269     Newz (0, req, 1, aio_cb);
270    
271     if (!req)
272     croak ("out of memory during aio_req allocation");
273    
274     req->type = dowrite ? REQ_WRITE : REQ_READ;
275     req->fd = fd;
276     req->offset = offset;
277     req->length = length;
278     req->data = SvREFCNT_inc (data);
279     req->dataptr = (char *)svptr + dataoffset;
280     req->callback = SvREFCNT_inc (callback);
281    
282     send_req (req);
283     }
284    
285 root 1.1 static void *
286     aio_proc (void *thr_arg)
287     {
288     aio_req req;
289 root 1.3 int type;
290 root 1.1
291 root 1.3 do
292 root 1.1 {
293 root 1.3 pthread_mutex_lock (&reqlock);
294    
295     for (;;)
296     {
297     req = reqs;
298    
299     if (reqs)
300     {
301     reqs = reqs->next;
302     if (!reqs) reqe = 0;
303     }
304    
305     if (req)
306     break;
307    
308     pthread_cond_wait (&reqwait, &reqlock);
309     }
310    
311     pthread_mutex_unlock (&reqlock);
312    
313 root 1.1 errno = 0; /* strictly unnecessary */
314    
315 root 1.3 type = req->type;
316    
317     switch (type)
318 root 1.1 {
319 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
320     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
321 root 1.1 #if SYS_readahead
322     case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
323     #else
324     case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
325     #endif
326    
327     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
328     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
329     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
330    
331     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
332     case REQ_CLOSE: req->result = close (req->fd); break;
333     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
334    
335     case REQ_FSYNC: req->result = fsync (req->fd); break;
336     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
337    
338     case REQ_QUIT:
339 root 1.3 break;
340 root 1.1
341     default:
342     req->result = ENOSYS;
343     break;
344     }
345    
346     req->errorno = errno;
347 root 1.3
348     pthread_mutex_lock (&reslock);
349    
350     req->next = 0;
351    
352     if (rese)
353     {
354     rese->next = req;
355     rese = req;
356     }
357     else
358     {
359     rese = ress = req;
360    
361     /* write a dummy byte to the pipe so fh becomes ready */
362     write (respipe [1], &respipe, 1);
363     }
364    
365     pthread_mutex_unlock (&reslock);
366 root 1.1 }
367 root 1.3 while (type != REQ_QUIT);
368 root 1.1
369     return 0;
370     }
371    
372     MODULE = IO::AIO PACKAGE = IO::AIO
373    
374 root 1.8 PROTOTYPES: ENABLE
375    
376 root 1.1 BOOT:
377     {
378 root 1.3 if (pipe (respipe))
379     croak ("unable to initialize result pipe");
380 root 1.1
381 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
382 root 1.1 croak ("cannot set result pipe to nonblocking mode");
383    
384 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
385 root 1.1 croak ("cannot set result pipe to nonblocking mode");
386     }
387    
388     void
389     min_parallel(nthreads)
390     int nthreads
391     PROTOTYPE: $
392     CODE:
393     while (nthreads > started)
394     start_thread ();
395    
396     void
397     max_parallel(nthreads)
398     int nthreads
399     PROTOTYPE: $
400     CODE:
401     {
402     int cur = started;
403     while (cur > nthreads)
404     {
405     end_thread ();
406     cur--;
407     }
408    
409     while (started > nthreads)
410     {
411     poll_wait ();
412 root 1.5 poll_cb ();
413 root 1.1 }
414     }
415    
416 root 1.4 int
417     max_outstanding(nreqs)
418     int nreqs
419     PROTOTYPE: $
420     CODE:
421     RETVAL = max_outstanding;
422     max_outstanding = nreqs;
423    
424 root 1.1 void
425 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
426 root 1.1 SV * pathname
427     int flags
428     int mode
429     SV * callback
430 root 1.8 PROTOTYPE: $$$;$
431 root 1.1 CODE:
432     {
433     aio_req req;
434    
435     Newz (0, req, 1, aio_cb);
436    
437     if (!req)
438     croak ("out of memory during aio_req allocation");
439    
440     req->type = REQ_OPEN;
441     req->data = newSVsv (pathname);
442     req->dataptr = SvPV_nolen (req->data);
443     req->fd = flags;
444     req->mode = mode;
445     req->callback = SvREFCNT_inc (callback);
446    
447     send_req (req);
448     }
449    
450     void
451 root 1.8 aio_close(fh,callback=&PL_sv_undef)
452 root 1.1 InputStream fh
453     SV * callback
454 root 1.8 PROTOTYPE: $;$
455 root 1.1 ALIAS:
456     aio_close = REQ_CLOSE
457     aio_fsync = REQ_FSYNC
458     aio_fdatasync = REQ_FDATASYNC
459     CODE:
460     {
461     aio_req req;
462    
463     Newz (0, req, 1, aio_cb);
464    
465     if (!req)
466     croak ("out of memory during aio_req allocation");
467    
468     req->type = ix;
469     req->fd = PerlIO_fileno (fh);
470     req->callback = SvREFCNT_inc (callback);
471    
472     send_req (req);
473     }
474    
475     void
476 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
477 root 1.1 InputStream fh
478     UV offset
479     IV length
480     SV * data
481     IV dataoffset
482     SV * callback
483 root 1.8 PROTOTYPE: $$$$$;$
484 root 1.1 CODE:
485 root 1.5 read_write (0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
486 root 1.1
487     void
488 root 1.8 aio_write(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
489 root 1.1 OutputStream fh
490     UV offset
491     IV length
492     SV * data
493     IV dataoffset
494     SV * callback
495 root 1.8 PROTOTYPE: $$$$$;$
496 root 1.1 CODE:
497 root 1.5 read_write (1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
498 root 1.1
499     void
500 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
501 root 1.1 InputStream fh
502     UV offset
503     IV length
504     SV * callback
505 root 1.8 PROTOTYPE: $$$;$
506 root 1.1 CODE:
507     {
508     aio_req req;
509    
510     if (length < 0)
511     croak ("length must not be negative");
512    
513     Newz (0, req, 1, aio_cb);
514    
515     if (!req)
516     croak ("out of memory during aio_req allocation");
517    
518     req->type = REQ_READAHEAD;
519     req->fd = PerlIO_fileno (fh);
520     req->offset = offset;
521     req->length = length;
522     req->callback = SvREFCNT_inc (callback);
523    
524     send_req (req);
525     }
526    
527     void
528 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
529 root 1.1 SV * fh_or_path
530     SV * callback
531     ALIAS:
532 root 1.8 aio_stat = REQ_STAT
533     aio_lstat = REQ_LSTAT
534 root 1.1 CODE:
535     {
536     aio_req req;
537    
538     Newz (0, req, 1, aio_cb);
539    
540     if (!req)
541     croak ("out of memory during aio_req allocation");
542    
543     New (0, req->statdata, 1, Stat_t);
544    
545     if (!req->statdata)
546     croak ("out of memory during aio_req->statdata allocation");
547    
548     if (SvPOK (fh_or_path))
549     {
550 root 1.8 req->type = ix;
551 root 1.1 req->data = newSVsv (fh_or_path);
552     req->dataptr = SvPV_nolen (req->data);
553     }
554     else
555     {
556     req->type = REQ_FSTAT;
557     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
558     }
559    
560     req->callback = SvREFCNT_inc (callback);
561    
562     send_req (req);
563     }
564    
565     void
566 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
567 root 1.1 SV * pathname
568     SV * callback
569     CODE:
570     {
571     aio_req req;
572    
573     Newz (0, req, 1, aio_cb);
574    
575     if (!req)
576     croak ("out of memory during aio_req allocation");
577    
578     req->type = REQ_UNLINK;
579     req->data = newSVsv (pathname);
580     req->dataptr = SvPV_nolen (req->data);
581     req->callback = SvREFCNT_inc (callback);
582    
583     send_req (req);
584     }
585    
586 root 1.6 void
587     flush()
588     PROTOTYPE:
589     CODE:
590     while (nreqs)
591     {
592     poll_wait ();
593     poll_cb ();
594     }
595    
596 root 1.7 void
597     poll()
598     PROTOTYPE:
599     CODE:
600     if (nreqs)
601     {
602     poll_wait ();
603     poll_cb ();
604     }
605    
606 root 1.1 int
607     poll_fileno()
608     PROTOTYPE:
609     CODE:
610 root 1.3 RETVAL = respipe [0];
611 root 1.1 OUTPUT:
612     RETVAL
613    
614     int
615     poll_cb(...)
616     PROTOTYPE:
617     CODE:
618 root 1.5 RETVAL = poll_cb ();
619 root 1.1 OUTPUT:
620     RETVAL
621    
622     void
623     poll_wait()
624     PROTOTYPE:
625     CODE:
626 root 1.3 if (nreqs)
627     poll_wait ();
628 root 1.1
629     int
630     nreqs()
631     PROTOTYPE:
632     CODE:
633     RETVAL = nreqs;
634     OUTPUT:
635     RETVAL
636