ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.4
Committed: Sun Jul 10 20:57:00 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-0_2
Changes since 1.3: +120 -107 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8     #include <sys/stat.h>
9     #include <unistd.h>
10     #include <fcntl.h>
11     #include <signal.h>
12     #include <sched.h>
13     #include <endian.h>
14    
15     #include <pthread.h>
16     #include <sys/syscall.h>
17    
18     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
19     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
20     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
21    
22 root 1.4 #if __ia64
23     # define STACKSIZE 65536
24 root 1.1 #else
25 root 1.4 # define STACKSIZE 4096
26 root 1.1 #endif
27    
28     enum {
29     REQ_QUIT,
30     REQ_OPEN, REQ_CLOSE,
31     REQ_READ, REQ_WRITE, REQ_READAHEAD,
32     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
33     REQ_FSYNC, REQ_FDATASYNC,
34     };
35    
36     typedef struct aio_cb {
37 root 1.3 struct aio_cb *volatile next;
38 root 1.1
39     int type;
40    
41     int fd;
42     off_t offset;
43     size_t length;
44     ssize_t result;
45     mode_t mode; /* open */
46     int errorno;
47     SV *data, *callback;
48     void *dataptr;
49     STRLEN dataoffset;
50    
51     Stat_t *statdata;
52     } aio_cb;
53    
54     typedef aio_cb *aio_req;
55    
56     static int started;
57     static int nreqs;
58 root 1.4 static int max_outstanding = 1<<30;
59 root 1.3 static int respipe [2];
60 root 1.1
61 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
62     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
63     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
64    
65     static volatile aio_req reqs, reqe; /* queue start, queue end */
66     static volatile aio_req ress, rese; /* queue start, queue end */
67 root 1.1
68     static void
69     poll_wait ()
70     {
71 root 1.3 if (!nreqs)
72     return;
73    
74 root 1.1 fd_set rfd;
75     FD_ZERO(&rfd);
76 root 1.3 FD_SET(respipe [0], &rfd);
77 root 1.1
78 root 1.3 select (respipe [0] + 1, &rfd, 0, 0, 0);
79 root 1.1 }
80    
81     static int
82     poll_cb (pTHX)
83     {
84     dSP;
85     int count = 0;
86     aio_req req;
87 root 1.3
88     {
89     /* read and signals sent by the worker threads */
90     char buf [32];
91     while (read (respipe [0], buf, 32) > 0)
92     ;
93     }
94 root 1.1
95 root 1.3 for (;;)
96 root 1.1 {
97 root 1.3 pthread_mutex_lock (&reslock);
98    
99     req = ress;
100    
101     if (ress)
102     {
103     ress = ress->next;
104     if (!ress) rese = 0;
105     }
106    
107     pthread_mutex_unlock (&reslock);
108    
109     if (!req)
110     break;
111    
112 root 1.1 nreqs--;
113    
114     if (req->type == REQ_QUIT)
115     started--;
116     else
117     {
118     int errorno = errno;
119     errno = req->errorno;
120    
121     if (req->type == REQ_READ)
122     SvCUR_set (req->data, req->dataoffset
123     + req->result > 0 ? req->result : 0);
124    
125     if (req->data)
126     SvREFCNT_dec (req->data);
127    
128     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
129     {
130     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
131     PL_laststatval = req->result;
132     PL_statcache = *(req->statdata);
133    
134     Safefree (req->statdata);
135     }
136    
137     PUSHMARK (SP);
138     XPUSHs (sv_2mortal (newSViv (req->result)));
139 root 1.2
140     if (req->type == REQ_OPEN)
141     {
142     /* convert fd to fh */
143     SV *fh;
144    
145     PUTBACK;
146     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
147     SPAGAIN;
148    
149     fh = POPs;
150    
151     PUSHMARK (SP);
152     XPUSHs (fh);
153     }
154    
155 root 1.1 PUTBACK;
156 root 1.2 call_sv (req->callback, G_VOID | G_EVAL);
157 root 1.1 SPAGAIN;
158    
159     if (req->callback)
160     SvREFCNT_dec (req->callback);
161    
162     errno = errorno;
163     count++;
164     }
165    
166     Safefree (req);
167     }
168    
169     return count;
170     }
171    
172 root 1.4 static void *aio_proc(void *arg);
173    
174     static void
175     start_thread (void)
176     {
177     sigset_t fullsigset, oldsigset;
178     pthread_t tid;
179     pthread_attr_t attr;
180    
181     pthread_attr_init (&attr);
182     pthread_attr_setstacksize (&attr, STACKSIZE);
183     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
184    
185     sigfillset (&fullsigset);
186     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
187    
188     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
189     started++;
190    
191     sigprocmask (SIG_SETMASK, &oldsigset, 0);
192     }
193    
194     static void
195     send_req (aio_req req)
196     {
197     nreqs++;
198    
199     pthread_mutex_lock (&reqlock);
200    
201     req->next = 0;
202    
203     if (reqe)
204     {
205     reqe->next = req;
206     reqe = req;
207     }
208     else
209     reqe = reqs = req;
210    
211     pthread_cond_signal (&reqwait);
212     pthread_mutex_unlock (&reqlock);
213    
214     while (nreqs > max_outstanding)
215     {
216     poll_wait ();
217     poll_cb ();
218     }
219     }
220    
221     static void
222     end_thread (void)
223     {
224     aio_req req;
225     New (0, req, 1, aio_cb);
226     req->type = REQ_QUIT;
227    
228     send_req (req);
229     }
230    
231     static void
232     read_write (pTHX_
233     int dowrite, int fd, off_t offset, size_t length,
234     SV *data, STRLEN dataoffset, SV *callback)
235     {
236     aio_req req;
237     STRLEN svlen;
238     char *svptr = SvPV (data, svlen);
239    
240     SvUPGRADE (data, SVt_PV);
241     SvPOK_on (data);
242    
243     if (dataoffset < 0)
244     dataoffset += svlen;
245    
246     if (dataoffset < 0 || dataoffset > svlen)
247     croak ("data offset outside of string");
248    
249     if (dowrite)
250     {
251     /* write: check length and adjust. */
252     if (length < 0 || length + dataoffset > svlen)
253     length = svlen - dataoffset;
254     }
255     else
256     {
257     /* read: grow scalar as necessary */
258     svptr = SvGROW (data, length + dataoffset);
259     }
260    
261     if (length < 0)
262     croak ("length must not be negative");
263    
264     Newz (0, req, 1, aio_cb);
265    
266     if (!req)
267     croak ("out of memory during aio_req allocation");
268    
269     req->type = dowrite ? REQ_WRITE : REQ_READ;
270     req->fd = fd;
271     req->offset = offset;
272     req->length = length;
273     req->data = SvREFCNT_inc (data);
274     req->dataptr = (char *)svptr + dataoffset;
275     req->callback = SvREFCNT_inc (callback);
276    
277     send_req (req);
278     }
279    
280 root 1.1 static void *
281     aio_proc (void *thr_arg)
282     {
283     aio_req req;
284 root 1.3 int type;
285 root 1.1
286 root 1.3 do
287 root 1.1 {
288 root 1.3 pthread_mutex_lock (&reqlock);
289    
290     for (;;)
291     {
292     req = reqs;
293    
294     if (reqs)
295     {
296     reqs = reqs->next;
297     if (!reqs) reqe = 0;
298     }
299    
300     if (req)
301     break;
302    
303     pthread_cond_wait (&reqwait, &reqlock);
304     }
305    
306     pthread_mutex_unlock (&reqlock);
307    
308 root 1.1 errno = 0; /* strictly unnecessary */
309    
310 root 1.3 type = req->type;
311    
312     switch (type)
313 root 1.1 {
314     case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset); break;
315     case REQ_WRITE: req->result = pwrite64 (req->fd, req->dataptr, req->length, req->offset); break;
316     #if SYS_readahead
317     case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
318     #else
319     case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
320     #endif
321    
322     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
323     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
324     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
325    
326     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
327     case REQ_CLOSE: req->result = close (req->fd); break;
328     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
329    
330     case REQ_FSYNC: req->result = fsync (req->fd); break;
331     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
332    
333     case REQ_QUIT:
334 root 1.3 break;
335 root 1.1
336     default:
337     req->result = ENOSYS;
338     break;
339     }
340    
341     req->errorno = errno;
342 root 1.3
343     pthread_mutex_lock (&reslock);
344    
345     req->next = 0;
346    
347     if (rese)
348     {
349     rese->next = req;
350     rese = req;
351     }
352     else
353     {
354     rese = ress = req;
355    
356     /* write a dummy byte to the pipe so fh becomes ready */
357     write (respipe [1], &respipe, 1);
358     }
359    
360     pthread_mutex_unlock (&reslock);
361 root 1.1 }
362 root 1.3 while (type != REQ_QUIT);
363 root 1.1
364     return 0;
365     }
366    
367     MODULE = IO::AIO PACKAGE = IO::AIO
368    
369     BOOT:
370     {
371 root 1.3 if (pipe (respipe))
372     croak ("unable to initialize result pipe");
373 root 1.1
374 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
375 root 1.1 croak ("cannot set result pipe to nonblocking mode");
376    
377 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
378 root 1.1 croak ("cannot set result pipe to nonblocking mode");
379     }
380    
381     void
382     min_parallel(nthreads)
383     int nthreads
384     PROTOTYPE: $
385     CODE:
386     while (nthreads > started)
387     start_thread ();
388    
389     void
390     max_parallel(nthreads)
391     int nthreads
392     PROTOTYPE: $
393     CODE:
394     {
395     int cur = started;
396     while (cur > nthreads)
397     {
398     end_thread ();
399     cur--;
400     }
401    
402     while (started > nthreads)
403     {
404     poll_wait ();
405     poll_cb (aTHX);
406     }
407     }
408    
409 root 1.4 int
410     max_outstanding(nreqs)
411     int nreqs
412     PROTOTYPE: $
413     CODE:
414     RETVAL = max_outstanding;
415     max_outstanding = nreqs;
416    
417 root 1.1 void
418     aio_open(pathname,flags,mode,callback)
419     SV * pathname
420     int flags
421     int mode
422     SV * callback
423     PROTOTYPE: $$$$
424     CODE:
425     {
426     aio_req req;
427    
428     Newz (0, req, 1, aio_cb);
429    
430     if (!req)
431     croak ("out of memory during aio_req allocation");
432    
433     req->type = REQ_OPEN;
434     req->data = newSVsv (pathname);
435     req->dataptr = SvPV_nolen (req->data);
436     req->fd = flags;
437     req->mode = mode;
438     req->callback = SvREFCNT_inc (callback);
439    
440     send_req (req);
441     }
442    
443     void
444     aio_close(fh,callback)
445     InputStream fh
446     SV * callback
447     PROTOTYPE: $$
448     ALIAS:
449     aio_close = REQ_CLOSE
450     aio_fsync = REQ_FSYNC
451     aio_fdatasync = REQ_FDATASYNC
452     CODE:
453     {
454     aio_req req;
455    
456     Newz (0, req, 1, aio_cb);
457    
458     if (!req)
459     croak ("out of memory during aio_req allocation");
460    
461     req->type = ix;
462     req->fd = PerlIO_fileno (fh);
463     req->callback = SvREFCNT_inc (callback);
464    
465     send_req (req);
466     }
467    
468     void
469     aio_read(fh,offset,length,data,dataoffset,callback)
470     InputStream fh
471     UV offset
472     IV length
473     SV * data
474     IV dataoffset
475     SV * callback
476     PROTOTYPE: $$$$$$
477     CODE:
478     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
479    
480     void
481     aio_write(fh,offset,length,data,dataoffset,callback)
482     OutputStream fh
483     UV offset
484     IV length
485     SV * data
486     IV dataoffset
487     SV * callback
488     PROTOTYPE: $$$$$$
489     CODE:
490     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
491    
492     void
493     aio_readahead(fh,offset,length,callback)
494     InputStream fh
495     UV offset
496     IV length
497     SV * callback
498     PROTOTYPE: $$$$
499     CODE:
500     {
501     aio_req req;
502    
503     if (length < 0)
504     croak ("length must not be negative");
505    
506     Newz (0, req, 1, aio_cb);
507    
508     if (!req)
509     croak ("out of memory during aio_req allocation");
510    
511     req->type = REQ_READAHEAD;
512     req->fd = PerlIO_fileno (fh);
513     req->offset = offset;
514     req->length = length;
515     req->callback = SvREFCNT_inc (callback);
516    
517     send_req (req);
518     }
519    
520     void
521     aio_stat(fh_or_path,callback)
522     SV * fh_or_path
523     SV * callback
524     PROTOTYPE: $$
525     ALIAS:
526     aio_lstat = 1
527     CODE:
528     {
529     aio_req req;
530    
531     Newz (0, req, 1, aio_cb);
532    
533     if (!req)
534     croak ("out of memory during aio_req allocation");
535    
536     New (0, req->statdata, 1, Stat_t);
537    
538     if (!req->statdata)
539     croak ("out of memory during aio_req->statdata allocation");
540    
541     if (SvPOK (fh_or_path))
542     {
543     req->type = ix ? REQ_LSTAT : REQ_STAT;
544     req->data = newSVsv (fh_or_path);
545     req->dataptr = SvPV_nolen (req->data);
546     }
547     else
548     {
549     req->type = REQ_FSTAT;
550     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
551     }
552    
553     req->callback = SvREFCNT_inc (callback);
554    
555     send_req (req);
556     }
557    
558     void
559     aio_unlink(pathname,callback)
560     SV * pathname
561     SV * callback
562     PROTOTYPE: $$
563     CODE:
564     {
565     aio_req req;
566    
567     Newz (0, req, 1, aio_cb);
568    
569     if (!req)
570     croak ("out of memory during aio_req allocation");
571    
572     req->type = REQ_UNLINK;
573     req->data = newSVsv (pathname);
574     req->dataptr = SvPV_nolen (req->data);
575     req->callback = SvREFCNT_inc (callback);
576    
577     send_req (req);
578     }
579    
580     int
581     poll_fileno()
582     PROTOTYPE:
583     CODE:
584 root 1.3 RETVAL = respipe [0];
585 root 1.1 OUTPUT:
586     RETVAL
587    
588     int
589     poll_cb(...)
590     PROTOTYPE:
591     CODE:
592     RETVAL = poll_cb (aTHX);
593     OUTPUT:
594     RETVAL
595    
596     void
597     poll_wait()
598     PROTOTYPE:
599     CODE:
600 root 1.3 if (nreqs)
601     poll_wait ();
602 root 1.1
603     int
604     nreqs()
605     PROTOTYPE:
606     CODE:
607     RETVAL = nreqs;
608     OUTPUT:
609     RETVAL
610