ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.8
Committed: Mon Jul 11 02:53:59 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-0_4
Changes since 1.7: +23 -19 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <sys/stat.h>
7     #include <unistd.h>
8     #include <fcntl.h>
9     #include <signal.h>
10     #include <sched.h>
11     #include <endian.h>
12    
13     #include <pthread.h>
14     #include <sys/syscall.h>
15    
16     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
17     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
18     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
19    
20 root 1.4 #if __ia64
21     # define STACKSIZE 65536
22 root 1.1 #else
23 root 1.4 # define STACKSIZE 4096
24 root 1.1 #endif
25    
26     enum {
27     REQ_QUIT,
28     REQ_OPEN, REQ_CLOSE,
29     REQ_READ, REQ_WRITE, REQ_READAHEAD,
30     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
31     REQ_FSYNC, REQ_FDATASYNC,
32     };
33    
34     typedef struct aio_cb {
35 root 1.3 struct aio_cb *volatile next;
36 root 1.1
37     int type;
38    
39     int fd;
40     off_t offset;
41     size_t length;
42     ssize_t result;
43     mode_t mode; /* open */
44     int errorno;
45     SV *data, *callback;
46     void *dataptr;
47     STRLEN dataoffset;
48    
49     Stat_t *statdata;
50     } aio_cb;
51    
52     typedef aio_cb *aio_req;
53    
54     static int started;
55     static int nreqs;
56 root 1.4 static int max_outstanding = 1<<30;
57 root 1.3 static int respipe [2];
58 root 1.1
59 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
60     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
61     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
62    
63     static volatile aio_req reqs, reqe; /* queue start, queue end */
64     static volatile aio_req ress, rese; /* queue start, queue end */
65 root 1.1
66     static void
67     poll_wait ()
68     {
69 root 1.3 if (!nreqs)
70     return;
71    
72 root 1.1 fd_set rfd;
73     FD_ZERO(&rfd);
74 root 1.3 FD_SET(respipe [0], &rfd);
75 root 1.1
76 root 1.3 select (respipe [0] + 1, &rfd, 0, 0, 0);
77 root 1.1 }
78    
79     static int
80 root 1.5 poll_cb ()
81 root 1.1 {
82     dSP;
83     int count = 0;
84     aio_req req;
85 root 1.3
86     {
87     /* read and signals sent by the worker threads */
88     char buf [32];
89     while (read (respipe [0], buf, 32) > 0)
90     ;
91     }
92 root 1.1
93 root 1.3 for (;;)
94 root 1.1 {
95 root 1.3 pthread_mutex_lock (&reslock);
96    
97     req = ress;
98    
99     if (ress)
100     {
101     ress = ress->next;
102     if (!ress) rese = 0;
103     }
104    
105     pthread_mutex_unlock (&reslock);
106    
107     if (!req)
108     break;
109    
110 root 1.1 nreqs--;
111    
112     if (req->type == REQ_QUIT)
113     started--;
114     else
115     {
116     int errorno = errno;
117     errno = req->errorno;
118    
119     if (req->type == REQ_READ)
120     SvCUR_set (req->data, req->dataoffset
121     + req->result > 0 ? req->result : 0);
122    
123     if (req->data)
124     SvREFCNT_dec (req->data);
125    
126     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
127     {
128     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
129     PL_laststatval = req->result;
130     PL_statcache = *(req->statdata);
131    
132     Safefree (req->statdata);
133     }
134    
135     PUSHMARK (SP);
136     XPUSHs (sv_2mortal (newSViv (req->result)));
137 root 1.2
138     if (req->type == REQ_OPEN)
139     {
140     /* convert fd to fh */
141     SV *fh;
142    
143     PUTBACK;
144     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
145     SPAGAIN;
146    
147     fh = POPs;
148    
149     PUSHMARK (SP);
150     XPUSHs (fh);
151     }
152    
153 root 1.8 if (SvOK (req->callback))
154     {
155     PUTBACK;
156     call_sv (req->callback, G_VOID | G_EVAL);
157     SPAGAIN;
158     }
159 root 1.1
160     if (req->callback)
161     SvREFCNT_dec (req->callback);
162    
163     errno = errorno;
164     count++;
165     }
166    
167     Safefree (req);
168     }
169    
170     return count;
171     }
172    
173 root 1.4 static void *aio_proc(void *arg);
174    
175     static void
176     start_thread (void)
177     {
178     sigset_t fullsigset, oldsigset;
179     pthread_t tid;
180     pthread_attr_t attr;
181    
182     pthread_attr_init (&attr);
183     pthread_attr_setstacksize (&attr, STACKSIZE);
184     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
185    
186     sigfillset (&fullsigset);
187     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
188    
189     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
190     started++;
191    
192     sigprocmask (SIG_SETMASK, &oldsigset, 0);
193     }
194    
195     static void
196     send_req (aio_req req)
197     {
198     nreqs++;
199    
200     pthread_mutex_lock (&reqlock);
201    
202     req->next = 0;
203    
204     if (reqe)
205     {
206     reqe->next = req;
207     reqe = req;
208     }
209     else
210     reqe = reqs = req;
211    
212     pthread_cond_signal (&reqwait);
213     pthread_mutex_unlock (&reqlock);
214    
215     while (nreqs > max_outstanding)
216     {
217     poll_wait ();
218     poll_cb ();
219     }
220     }
221    
222     static void
223     end_thread (void)
224     {
225     aio_req req;
226     New (0, req, 1, aio_cb);
227     req->type = REQ_QUIT;
228    
229     send_req (req);
230     }
231    
232     static void
233 root 1.5 read_write (int dowrite, int fd, off_t offset, size_t length,
234 root 1.4 SV *data, STRLEN dataoffset, SV *callback)
235     {
236     aio_req req;
237     STRLEN svlen;
238     char *svptr = SvPV (data, svlen);
239    
240     SvUPGRADE (data, SVt_PV);
241     SvPOK_on (data);
242    
243     if (dataoffset < 0)
244     dataoffset += svlen;
245    
246     if (dataoffset < 0 || dataoffset > svlen)
247     croak ("data offset outside of string");
248    
249     if (dowrite)
250     {
251     /* write: check length and adjust. */
252     if (length < 0 || length + dataoffset > svlen)
253     length = svlen - dataoffset;
254     }
255     else
256     {
257     /* read: grow scalar as necessary */
258     svptr = SvGROW (data, length + dataoffset);
259     }
260    
261     if (length < 0)
262     croak ("length must not be negative");
263    
264     Newz (0, req, 1, aio_cb);
265    
266     if (!req)
267     croak ("out of memory during aio_req allocation");
268    
269     req->type = dowrite ? REQ_WRITE : REQ_READ;
270     req->fd = fd;
271     req->offset = offset;
272     req->length = length;
273     req->data = SvREFCNT_inc (data);
274     req->dataptr = (char *)svptr + dataoffset;
275     req->callback = SvREFCNT_inc (callback);
276    
277     send_req (req);
278     }
279    
280 root 1.1 static void *
281     aio_proc (void *thr_arg)
282     {
283     aio_req req;
284 root 1.3 int type;
285 root 1.1
286 root 1.3 do
287 root 1.1 {
288 root 1.3 pthread_mutex_lock (&reqlock);
289    
290     for (;;)
291     {
292     req = reqs;
293    
294     if (reqs)
295     {
296     reqs = reqs->next;
297     if (!reqs) reqe = 0;
298     }
299    
300     if (req)
301     break;
302    
303     pthread_cond_wait (&reqwait, &reqlock);
304     }
305    
306     pthread_mutex_unlock (&reqlock);
307    
308 root 1.1 errno = 0; /* strictly unnecessary */
309    
310 root 1.3 type = req->type;
311    
312     switch (type)
313 root 1.1 {
314     case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset); break;
315     case REQ_WRITE: req->result = pwrite64 (req->fd, req->dataptr, req->length, req->offset); break;
316     #if SYS_readahead
317     case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
318     #else
319     case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
320     #endif
321    
322     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
323     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
324     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
325    
326     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
327     case REQ_CLOSE: req->result = close (req->fd); break;
328     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
329    
330     case REQ_FSYNC: req->result = fsync (req->fd); break;
331     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
332    
333     case REQ_QUIT:
334 root 1.3 break;
335 root 1.1
336     default:
337     req->result = ENOSYS;
338     break;
339     }
340    
341     req->errorno = errno;
342 root 1.3
343     pthread_mutex_lock (&reslock);
344    
345     req->next = 0;
346    
347     if (rese)
348     {
349     rese->next = req;
350     rese = req;
351     }
352     else
353     {
354     rese = ress = req;
355    
356     /* write a dummy byte to the pipe so fh becomes ready */
357     write (respipe [1], &respipe, 1);
358     }
359    
360     pthread_mutex_unlock (&reslock);
361 root 1.1 }
362 root 1.3 while (type != REQ_QUIT);
363 root 1.1
364     return 0;
365     }
366    
367     MODULE = IO::AIO PACKAGE = IO::AIO
368    
369 root 1.8 PROTOTYPES: ENABLE
370    
371 root 1.1 BOOT:
372     {
373 root 1.3 if (pipe (respipe))
374     croak ("unable to initialize result pipe");
375 root 1.1
376 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
377 root 1.1 croak ("cannot set result pipe to nonblocking mode");
378    
379 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
380 root 1.1 croak ("cannot set result pipe to nonblocking mode");
381     }
382    
383     void
384     min_parallel(nthreads)
385     int nthreads
386     PROTOTYPE: $
387     CODE:
388     while (nthreads > started)
389     start_thread ();
390    
391     void
392     max_parallel(nthreads)
393     int nthreads
394     PROTOTYPE: $
395     CODE:
396     {
397     int cur = started;
398     while (cur > nthreads)
399     {
400     end_thread ();
401     cur--;
402     }
403    
404     while (started > nthreads)
405     {
406     poll_wait ();
407 root 1.5 poll_cb ();
408 root 1.1 }
409     }
410    
411 root 1.4 int
412     max_outstanding(nreqs)
413     int nreqs
414     PROTOTYPE: $
415     CODE:
416     RETVAL = max_outstanding;
417     max_outstanding = nreqs;
418    
419 root 1.1 void
420 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
421 root 1.1 SV * pathname
422     int flags
423     int mode
424     SV * callback
425 root 1.8 PROTOTYPE: $$$;$
426 root 1.1 CODE:
427     {
428     aio_req req;
429    
430     Newz (0, req, 1, aio_cb);
431    
432     if (!req)
433     croak ("out of memory during aio_req allocation");
434    
435     req->type = REQ_OPEN;
436     req->data = newSVsv (pathname);
437     req->dataptr = SvPV_nolen (req->data);
438     req->fd = flags;
439     req->mode = mode;
440     req->callback = SvREFCNT_inc (callback);
441    
442     send_req (req);
443     }
444    
445     void
446 root 1.8 aio_close(fh,callback=&PL_sv_undef)
447 root 1.1 InputStream fh
448     SV * callback
449 root 1.8 PROTOTYPE: $;$
450 root 1.1 ALIAS:
451     aio_close = REQ_CLOSE
452     aio_fsync = REQ_FSYNC
453     aio_fdatasync = REQ_FDATASYNC
454     CODE:
455     {
456     aio_req req;
457    
458     Newz (0, req, 1, aio_cb);
459    
460     if (!req)
461     croak ("out of memory during aio_req allocation");
462    
463     req->type = ix;
464     req->fd = PerlIO_fileno (fh);
465     req->callback = SvREFCNT_inc (callback);
466    
467     send_req (req);
468     }
469    
470     void
471 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
472 root 1.1 InputStream fh
473     UV offset
474     IV length
475     SV * data
476     IV dataoffset
477     SV * callback
478 root 1.8 PROTOTYPE: $$$$$;$
479 root 1.1 CODE:
480 root 1.5 read_write (0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
481 root 1.1
482     void
483 root 1.8 aio_write(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
484 root 1.1 OutputStream fh
485     UV offset
486     IV length
487     SV * data
488     IV dataoffset
489     SV * callback
490 root 1.8 PROTOTYPE: $$$$$;$
491 root 1.1 CODE:
492 root 1.5 read_write (1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
493 root 1.1
494     void
495 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
496 root 1.1 InputStream fh
497     UV offset
498     IV length
499     SV * callback
500 root 1.8 PROTOTYPE: $$$;$
501 root 1.1 CODE:
502     {
503     aio_req req;
504    
505     if (length < 0)
506     croak ("length must not be negative");
507    
508     Newz (0, req, 1, aio_cb);
509    
510     if (!req)
511     croak ("out of memory during aio_req allocation");
512    
513     req->type = REQ_READAHEAD;
514     req->fd = PerlIO_fileno (fh);
515     req->offset = offset;
516     req->length = length;
517     req->callback = SvREFCNT_inc (callback);
518    
519     send_req (req);
520     }
521    
522     void
523 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
524 root 1.1 SV * fh_or_path
525     SV * callback
526     ALIAS:
527 root 1.8 aio_stat = REQ_STAT
528     aio_lstat = REQ_LSTAT
529 root 1.1 CODE:
530     {
531     aio_req req;
532    
533     Newz (0, req, 1, aio_cb);
534    
535     if (!req)
536     croak ("out of memory during aio_req allocation");
537    
538     New (0, req->statdata, 1, Stat_t);
539    
540     if (!req->statdata)
541     croak ("out of memory during aio_req->statdata allocation");
542    
543     if (SvPOK (fh_or_path))
544     {
545 root 1.8 req->type = ix;
546 root 1.1 req->data = newSVsv (fh_or_path);
547     req->dataptr = SvPV_nolen (req->data);
548     }
549     else
550     {
551     req->type = REQ_FSTAT;
552     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
553     }
554    
555     req->callback = SvREFCNT_inc (callback);
556    
557     send_req (req);
558     }
559    
560     void
561 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
562 root 1.1 SV * pathname
563     SV * callback
564     CODE:
565     {
566     aio_req req;
567    
568     Newz (0, req, 1, aio_cb);
569    
570     if (!req)
571     croak ("out of memory during aio_req allocation");
572    
573     req->type = REQ_UNLINK;
574     req->data = newSVsv (pathname);
575     req->dataptr = SvPV_nolen (req->data);
576     req->callback = SvREFCNT_inc (callback);
577    
578     send_req (req);
579     }
580    
581 root 1.6 void
582     flush()
583     PROTOTYPE:
584     CODE:
585     while (nreqs)
586     {
587     poll_wait ();
588     poll_cb ();
589     }
590    
591 root 1.7 void
592     poll()
593     PROTOTYPE:
594     CODE:
595     if (nreqs)
596     {
597     poll_wait ();
598     poll_cb ();
599     }
600    
601 root 1.1 int
602     poll_fileno()
603     PROTOTYPE:
604     CODE:
605 root 1.3 RETVAL = respipe [0];
606 root 1.1 OUTPUT:
607     RETVAL
608    
609     int
610     poll_cb(...)
611     PROTOTYPE:
612     CODE:
613 root 1.5 RETVAL = poll_cb ();
614 root 1.1 OUTPUT:
615     RETVAL
616    
617     void
618     poll_wait()
619     PROTOTYPE:
620     CODE:
621 root 1.3 if (nreqs)
622     poll_wait ();
623 root 1.1
624     int
625     nreqs()
626     PROTOTYPE:
627     CODE:
628     RETVAL = nreqs;
629     OUTPUT:
630     RETVAL
631