ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.106
Committed: Mon Sep 24 18:14:00 2007 UTC (16 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-2_41
Changes since 1.105: +30 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.98 #include "xthread.h"
2 root 1.63
3 root 1.19 #include <errno.h>
4    
5 root 1.15 #include "EXTERN.h"
6 root 1.1 #include "perl.h"
7     #include "XSUB.h"
8    
9 root 1.37 #include <stddef.h>
10 root 1.94 #include <stdlib.h>
11 root 1.41 #include <errno.h>
12 root 1.1 #include <sys/types.h>
13     #include <sys/stat.h>
14 root 1.37 #include <limits.h>
15 root 1.1 #include <fcntl.h>
16     #include <sched.h>
17 root 1.103
18     #ifdef _WIN32
19    
20     # define SIGIO 0
21     typedef Direntry_t X_DIRENT;
22     #undef malloc
23     #undef free
24    
25     // perl overrides all those nice win32 functions
26     # undef open
27     # undef read
28     # undef write
29 root 1.104 # undef send
30     # undef recv
31 root 1.103 # undef stat
32     # undef fstat
33     # define lstat stat
34     # undef truncate
35     # undef ftruncate
36     # undef open
37     # undef close
38     # undef unlink
39     # undef rmdir
40     # undef rename
41     # undef lseek
42    
43     # define chown(a,b,c) (errno = ENOSYS, -1)
44     # define fchown(a,b,c) (errno = ENOSYS, -1)
45     # define fchmod(a,b) (errno = ENOSYS, -1)
46     # define symlink(a,b) (errno = ENOSYS, -1)
47     # define readlink(a,b,c) (errno = ENOSYS, -1)
48     # define mknod(a,b,c) (errno = ENOSYS, -1)
49     # define truncate(a,b) (errno = ENOSYS, -1)
50     # define ftruncate(fd,o) chsize ((fd), (o))
51     # define fsync(fd) _commit (fd)
52     # define opendir(fd) (errno = ENOSYS, 0)
53     # define readdir(fd) (errno = ENOSYS, -1)
54     # define closedir(fd) (errno = ENOSYS, -1)
55     # define mkdir(a,b) mkdir (a)
56    
57     #else
58    
59     # include "autoconf/config.h"
60     # include <sys/time.h>
61     # include <sys/select.h>
62     # include <unistd.h>
63     # include <utime.h>
64     # include <signal.h>
65     typedef struct dirent X_DIRENT;
66    
67     #endif
68 root 1.1
69 root 1.32 #if HAVE_SENDFILE
70     # if __linux
71     # include <sys/sendfile.h>
72     # elif __freebsd
73     # include <sys/socket.h>
74     # include <sys/uio.h>
75     # elif __hpux
76     # include <sys/socket.h>
77 root 1.35 # elif __solaris /* not yet */
78     # include <sys/sendfile.h>
79 root 1.34 # else
80     # error sendfile support requested but not available
81 root 1.32 # endif
82     #endif
83 root 1.1
84 root 1.84 /* number of seconds after which idle threads exit */
85     #define IDLE_TIMEOUT 10
86    
87 root 1.39 /* used for struct dirent, AIX doesn't provide it */
88     #ifndef NAME_MAX
89     # define NAME_MAX 4096
90     #endif
91    
92 root 1.65 /* buffer size for various temporary buffers */
93     #define AIO_BUFSIZE 65536
94    
95 root 1.101 /* use NV for 32 bit perls as it allows larger offsets */
96     #if IVSIZE >= 8
97     # define SvVAL64 SvIV
98     #else
99     # define SvVAL64 SvNV
100     #endif
101    
102 root 1.65 #define dBUF \
103 root 1.71 char *aio_buf; \
104 root 1.103 X_LOCK (wrklock); \
105 root 1.71 self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \
106 root 1.103 X_UNLOCK (wrklock); \
107 root 1.65 if (!aio_buf) \
108     return -1;
109    
110 root 1.90 typedef SV SV8; /* byte-sv, used for argument-checking */
111    
112 root 1.1 enum {
113     REQ_QUIT,
114     REQ_OPEN, REQ_CLOSE,
115 root 1.101 REQ_READ, REQ_WRITE,
116     REQ_READAHEAD, REQ_SENDFILE,
117 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
118 root 1.103 REQ_TRUNCATE, REQ_FTRUNCATE,
119 root 1.101 REQ_UTIME, REQ_FUTIME,
120     REQ_CHMOD, REQ_FCHMOD,
121     REQ_CHOWN, REQ_FCHOWN,
122 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
123 root 1.97 REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME,
124 root 1.81 REQ_MKNOD, REQ_READDIR,
125 root 1.86 REQ_LINK, REQ_SYMLINK, REQ_READLINK,
126 root 1.54 REQ_GROUP, REQ_NOP,
127 root 1.69 REQ_BUSY,
128 root 1.1 };
129    
130 root 1.44 #define AIO_REQ_KLASS "IO::AIO::REQ"
131     #define AIO_GRP_KLASS "IO::AIO::GRP"
132 root 1.43
133     typedef struct aio_cb
134     {
135 root 1.49 struct aio_cb *volatile next;
136 root 1.43
137 root 1.99 SV *callback;
138 root 1.86 SV *sv1, *sv2;
139     void *ptr1, *ptr2;
140     off_t offs;
141     size_t size;
142 root 1.1 ssize_t result;
143 root 1.99 double nv1, nv2;
144 root 1.43
145 root 1.86 STRLEN stroffset;
146 root 1.43 int type;
147 root 1.99 int int1, int2, int3;
148 root 1.1 int errorno;
149 root 1.43 mode_t mode; /* open */
150 root 1.60
151     unsigned char flags;
152 root 1.58 unsigned char pri;
153 root 1.60
154     SV *self; /* the perl counterpart of this request, if any */
155     struct aio_cb *grp, *grp_prev, *grp_next, *grp_first;
156 root 1.1 } aio_cb;
157    
158 root 1.58 enum {
159 root 1.89 FLAG_CANCELLED = 0x01, /* request was cancelled */
160 root 1.100 FLAG_SV2_RO_OFF = 0x40, /* data was set readonly */
161 root 1.89 FLAG_PTR2_FREE = 0x80, /* need to free(ptr2) */
162 root 1.58 };
163    
164 root 1.1 typedef aio_cb *aio_req;
165 root 1.43 typedef aio_cb *aio_req_ornot;
166 root 1.1
167 root 1.60 enum {
168 root 1.61 PRI_MIN = -4,
169     PRI_MAX = 4,
170 root 1.60
171     DEFAULT_PRI = 0,
172 root 1.61 PRI_BIAS = -PRI_MIN,
173 root 1.67 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
174 root 1.60 };
175    
176 root 1.85 #define AIO_TICKS ((1000000 + 1023) >> 10)
177    
178     static unsigned int max_poll_time = 0;
179     static unsigned int max_poll_reqs = 0;
180    
181     /* calculcate time difference in ~1/AIO_TICKS of a second */
182     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
183     {
184     return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
185     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
186     }
187    
188 root 1.98 static thread_t main_tid;
189 root 1.94 static int main_sig;
190     static int block_sig_level;
191    
192     void block_sig ()
193     {
194     sigset_t ss;
195    
196     if (block_sig_level++)
197     return;
198    
199     if (!main_sig)
200     return;
201    
202     sigemptyset (&ss);
203     sigaddset (&ss, main_sig);
204     pthread_sigmask (SIG_BLOCK, &ss, 0);
205     }
206    
207     void unblock_sig ()
208     {
209     sigset_t ss;
210    
211     if (--block_sig_level)
212     return;
213    
214     if (!main_sig)
215     return;
216    
217     sigemptyset (&ss);
218     sigaddset (&ss, main_sig);
219     pthread_sigmask (SIG_UNBLOCK, &ss, 0);
220     }
221    
222 root 1.60 static int next_pri = DEFAULT_PRI + PRI_BIAS;
223    
224 root 1.84 static unsigned int started, idle, wanted;
225 root 1.1
226 root 1.76 /* worker threads management */
227 root 1.103 static mutex_t wrklock = X_MUTEX_INIT;
228 root 1.71
229     typedef struct worker {
230     /* locked by wrklock */
231     struct worker *prev, *next;
232    
233 root 1.98 thread_t tid;
234 root 1.71
235     /* locked by reslock, reqlock or wrklock */
236     aio_req req; /* currently processed request */
237     void *dbuf;
238     DIR *dirp;
239     } worker;
240    
241     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
242    
243     static void worker_clear (worker *wrk)
244     {
245     if (wrk->dirp)
246     {
247     closedir (wrk->dirp);
248     wrk->dirp = 0;
249     }
250    
251     if (wrk->dbuf)
252     {
253     free (wrk->dbuf);
254     wrk->dbuf = 0;
255     }
256     }
257    
258     static void worker_free (worker *wrk)
259     {
260     wrk->next->prev = wrk->prev;
261     wrk->prev->next = wrk->next;
262    
263     free (wrk);
264     }
265    
266 root 1.80 static volatile unsigned int nreqs, nready, npending;
267 root 1.84 static volatile unsigned int max_idle = 4;
268 root 1.80 static volatile unsigned int max_outstanding = 0xffffffff;
269 root 1.106 static int respipe_osf [2], respipe [2] = { -1, -1 };
270 root 1.80
271 root 1.103 static mutex_t reslock = X_MUTEX_INIT;
272     static mutex_t reqlock = X_MUTEX_INIT;
273     static cond_t reqwait = X_COND_INIT;
274 root 1.3
275 root 1.85 #if WORDACCESS_UNSAFE
276 root 1.80
277     static unsigned int get_nready ()
278     {
279     unsigned int retval;
280    
281 root 1.103 X_LOCK (reqlock);
282 root 1.80 retval = nready;
283 root 1.103 X_UNLOCK (reqlock);
284 root 1.80
285     return retval;
286     }
287    
288     static unsigned int get_npending ()
289     {
290     unsigned int retval;
291    
292 root 1.103 X_LOCK (reslock);
293 root 1.80 retval = npending;
294 root 1.103 X_UNLOCK (reslock);
295 root 1.80
296     return retval;
297     }
298    
299 root 1.85 static unsigned int get_nthreads ()
300     {
301     unsigned int retval;
302    
303 root 1.103 X_LOCK (wrklock);
304 root 1.85 retval = started;
305 root 1.103 X_UNLOCK (wrklock);
306 root 1.85
307     return retval;
308     }
309    
310 root 1.80 #else
311    
312     # define get_nready() nready
313     # define get_npending() npending
314 root 1.85 # define get_nthreads() started
315 root 1.80
316     #endif
317    
318 root 1.67 /*
319     * a somewhat faster data structure might be nice, but
320     * with 8 priorities this actually needs <20 insns
321     * per shift, the most expensive operation.
322     */
323     typedef struct {
324     aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
325     int size;
326     } reqq;
327    
328     static reqq req_queue;
329     static reqq res_queue;
330    
331     int reqq_push (reqq *q, aio_req req)
332     {
333     int pri = req->pri;
334     req->next = 0;
335    
336     if (q->qe[pri])
337     {
338     q->qe[pri]->next = req;
339     q->qe[pri] = req;
340     }
341     else
342     q->qe[pri] = q->qs[pri] = req;
343    
344     return q->size++;
345     }
346    
347     aio_req reqq_shift (reqq *q)
348     {
349     int pri;
350    
351     if (!q->size)
352     return 0;
353    
354     --q->size;
355    
356     for (pri = NUM_PRI; pri--; )
357     {
358     aio_req req = q->qs[pri];
359    
360     if (req)
361     {
362     if (!(q->qs[pri] = req->next))
363     q->qe[pri] = 0;
364    
365     return req;
366     }
367     }
368    
369     abort ();
370     }
371 root 1.1
372 root 1.85 static int poll_cb ();
373 root 1.94 static int req_invoke (aio_req req);
374 root 1.101 static void req_destroy (aio_req req);
375 root 1.72 static void req_cancel (aio_req req);
376 root 1.45
377 root 1.43 /* must be called at most once */
378 root 1.44 static SV *req_sv (aio_req req, const char *klass)
379 root 1.43 {
380 root 1.49 if (!req->self)
381     {
382     req->self = (SV *)newHV ();
383     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
384     }
385 root 1.43
386 root 1.45 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
387 root 1.43 }
388    
389 root 1.45 static aio_req SvAIO_REQ (SV *sv)
390 root 1.26 {
391 root 1.53 MAGIC *mg;
392    
393 root 1.45 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
394     croak ("object of class " AIO_REQ_KLASS " expected");
395 root 1.43
396 root 1.53 mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
397 root 1.43
398     return mg ? (aio_req)mg->mg_ptr : 0;
399     }
400    
401 root 1.49 static void aio_grp_feed (aio_req grp)
402     {
403 root 1.94 block_sig ();
404    
405 root 1.86 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED))
406 root 1.49 {
407 root 1.86 int old_len = grp->size;
408 root 1.49
409 root 1.86 if (grp->sv2 && SvOK (grp->sv2))
410 root 1.49 {
411     dSP;
412    
413     ENTER;
414     SAVETMPS;
415     PUSHMARK (SP);
416     XPUSHs (req_sv (grp, AIO_GRP_KLASS));
417     PUTBACK;
418 root 1.86 call_sv (grp->sv2, G_VOID | G_EVAL | G_KEEPERR);
419 root 1.49 SPAGAIN;
420     FREETMPS;
421     LEAVE;
422     }
423    
424     /* stop if no progress has been made */
425 root 1.86 if (old_len == grp->size)
426 root 1.49 {
427 root 1.86 SvREFCNT_dec (grp->sv2);
428     grp->sv2 = 0;
429 root 1.49 break;
430     }
431     }
432 root 1.94
433     unblock_sig ();
434 root 1.49 }
435    
436 root 1.50 static void aio_grp_dec (aio_req grp)
437     {
438 root 1.86 --grp->size;
439 root 1.50
440     /* call feeder, if applicable */
441     aio_grp_feed (grp);
442    
443     /* finish, if done */
444 root 1.86 if (!grp->size && grp->int1)
445 root 1.50 {
446 root 1.94 block_sig ();
447    
448     if (!req_invoke (grp))
449     {
450 root 1.101 req_destroy (grp);
451 root 1.94 unblock_sig ();
452     croak (0);
453     }
454    
455 root 1.101 req_destroy (grp);
456 root 1.94 unblock_sig ();
457 root 1.50 }
458     }
459    
460 root 1.94 static int req_invoke (aio_req req)
461 root 1.45 {
462     dSP;
463    
464 root 1.100 if (req->flags & FLAG_SV2_RO_OFF)
465     SvREADONLY_off (req->sv2);
466 root 1.86
467 root 1.67 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback))
468     {
469     ENTER;
470     SAVETMPS;
471     PUSHMARK (SP);
472     EXTEND (SP, 1);
473 root 1.45
474 root 1.67 switch (req->type)
475 root 1.45 {
476 root 1.67 case REQ_READDIR:
477 root 1.45 {
478 root 1.67 SV *rv = &PL_sv_undef;
479 root 1.45
480 root 1.67 if (req->result >= 0)
481 root 1.45 {
482 root 1.71 int i;
483 root 1.86 char *buf = req->ptr2;
484 root 1.67 AV *av = newAV ();
485    
486 root 1.71 av_extend (av, req->result - 1);
487    
488     for (i = 0; i < req->result; ++i)
489 root 1.67 {
490     SV *sv = newSVpv (buf, 0);
491    
492 root 1.71 av_store (av, i, sv);
493 root 1.67 buf += SvCUR (sv) + 1;
494     }
495 root 1.45
496 root 1.67 rv = sv_2mortal (newRV_noinc ((SV *)av));
497 root 1.45 }
498    
499 root 1.67 PUSHs (rv);
500 root 1.45 }
501 root 1.67 break;
502 root 1.45
503 root 1.67 case REQ_OPEN:
504     {
505     /* convert fd to fh */
506     SV *fh;
507 root 1.45
508 root 1.67 PUSHs (sv_2mortal (newSViv (req->result)));
509     PUTBACK;
510     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
511     SPAGAIN;
512 root 1.45
513 root 1.93 fh = POPs;
514 root 1.67 PUSHMARK (SP);
515 root 1.93 XPUSHs (fh);
516 root 1.67 }
517     break;
518 root 1.45
519 root 1.67 case REQ_GROUP:
520 root 1.86 req->int1 = 2; /* mark group as finished */
521 root 1.45
522 root 1.86 if (req->sv1)
523 root 1.67 {
524     int i;
525 root 1.86 AV *av = (AV *)req->sv1;
526 root 1.49
527 root 1.67 EXTEND (SP, AvFILL (av) + 1);
528     for (i = 0; i <= AvFILL (av); ++i)
529     PUSHs (*av_fetch (av, i, 0));
530     }
531     break;
532 root 1.48
533 root 1.67 case REQ_NOP:
534 root 1.69 case REQ_BUSY:
535 root 1.67 break;
536 root 1.48
537 root 1.86 case REQ_READLINK:
538     if (req->result > 0)
539     {
540 root 1.99 SvCUR_set (req->sv2, req->result);
541     *SvEND (req->sv2) = 0;
542     PUSHs (req->sv2);
543 root 1.86 }
544     break;
545    
546 root 1.87 case REQ_STAT:
547     case REQ_LSTAT:
548     case REQ_FSTAT:
549     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
550     PL_laststatval = req->result;
551     PL_statcache = *(Stat_t *)(req->ptr2);
552     PUSHs (sv_2mortal (newSViv (req->result)));
553     break;
554    
555 root 1.86 case REQ_READ:
556 root 1.99 SvCUR_set (req->sv2, req->stroffset + (req->result > 0 ? req->result : 0));
557     *SvEND (req->sv2) = 0;
558 root 1.87 PUSHs (sv_2mortal (newSViv (req->result)));
559     break;
560    
561 root 1.67 default:
562     PUSHs (sv_2mortal (newSViv (req->result)));
563     break;
564     }
565 root 1.45
566 root 1.79 errno = req->errorno;
567 root 1.51
568 root 1.67 PUTBACK;
569     call_sv (req->callback, G_VOID | G_EVAL);
570     SPAGAIN;
571 root 1.51
572 root 1.67 FREETMPS;
573     LEAVE;
574 root 1.45 }
575    
576     if (req->grp)
577     {
578     aio_req grp = req->grp;
579    
580     /* unlink request */
581 root 1.49 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
582     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
583    
584     if (grp->grp_first == req)
585     grp->grp_first = req->grp_next;
586    
587 root 1.50 aio_grp_dec (grp);
588 root 1.45 }
589    
590 root 1.94 return !SvTRUE (ERRSV);
591 root 1.67 }
592    
593 root 1.101 static void req_destroy (aio_req req)
594 root 1.67 {
595 root 1.43 if (req->self)
596     {
597     sv_unmagic (req->self, PERL_MAGIC_ext);
598     SvREFCNT_dec (req->self);
599     }
600    
601 root 1.86 SvREFCNT_dec (req->sv1);
602     SvREFCNT_dec (req->sv2);
603 root 1.49 SvREFCNT_dec (req->callback);
604 root 1.26
605 root 1.87 if (req->flags & FLAG_PTR2_FREE)
606 root 1.86 free (req->ptr2);
607 root 1.37
608 root 1.26 Safefree (req);
609     }
610    
611 root 1.72 static void req_cancel_subs (aio_req grp)
612     {
613     aio_req sub;
614    
615     if (grp->type != REQ_GROUP)
616     return;
617    
618 root 1.86 SvREFCNT_dec (grp->sv2);
619     grp->sv2 = 0;
620 root 1.72
621     for (sub = grp->grp_first; sub; sub = sub->grp_next)
622     req_cancel (sub);
623     }
624    
625 root 1.45 static void req_cancel (aio_req req)
626 root 1.1 {
627 root 1.58 req->flags |= FLAG_CANCELLED;
628 root 1.45
629 root 1.72 req_cancel_subs (req);
630 root 1.1 }
631    
632 root 1.104 #ifdef USE_SOCKETS_AS_HANDLES
633     # define TO_SOCKET(x) (win32_get_osfhandle (x))
634     #else
635     # define TO_SOCKET(x) (x)
636     #endif
637    
638     static void
639 root 1.106 create_respipe ()
640 root 1.104 {
641 root 1.106 int old_readfd = respipe [0];
642    
643     if (respipe [1] >= 0)
644     respipe_close (TO_SOCKET (respipe [1]));
645    
646     #ifdef _WIN32
647     if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
648     #else
649     if (pipe (respipe))
650     #endif
651     croak ("unable to initialize result pipe");
652    
653     if (old_readfd >= 0)
654     {
655     if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
656     croak ("unable to initialize result pipe(2)");
657    
658     respipe_close (respipe [0]);
659     respipe [0] = old_readfd;
660     }
661    
662 root 1.104 #ifdef _WIN32
663     int arg = 1;
664 root 1.106 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
665     || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
666 root 1.104 #else
667 root 1.106 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
668     || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
669 root 1.104 #endif
670 root 1.106 croak ("unable to initialize result pipe(3)");
671 root 1.104
672     respipe_osf [0] = TO_SOCKET (respipe [0]);
673     respipe_osf [1] = TO_SOCKET (respipe [1]);
674     }
675    
676 root 1.103 X_THREAD_PROC (aio_proc);
677 root 1.4
678 root 1.45 static void start_thread (void)
679 root 1.4 {
680 root 1.71 worker *wrk = calloc (1, sizeof (worker));
681    
682     if (!wrk)
683     croak ("unable to allocate worker thread data");
684    
685 root 1.103 X_LOCK (wrklock);
686 root 1.4
687 root 1.98 if (thread_create (&wrk->tid, aio_proc, (void *)wrk))
688 root 1.71 {
689     wrk->prev = &wrk_first;
690     wrk->next = wrk_first.next;
691     wrk_first.next->prev = wrk;
692     wrk_first.next = wrk;
693 root 1.78 ++started;
694 root 1.71 }
695     else
696     free (wrk);
697 root 1.4
698 root 1.103 X_UNLOCK (wrklock);
699 root 1.4 }
700    
701 root 1.80 static void maybe_start_thread ()
702     {
703 root 1.85 if (get_nthreads () >= wanted)
704 root 1.80 return;
705    
706 root 1.84 /* todo: maybe use idle here, but might be less exact */
707 root 1.85 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
708 root 1.80 return;
709    
710     start_thread ();
711     }
712    
713 root 1.45 static void req_send (aio_req req)
714 root 1.4 {
715 root 1.94 block_sig ();
716    
717 root 1.50 ++nreqs;
718 root 1.4
719 root 1.103 X_LOCK (reqlock);
720 root 1.79 ++nready;
721 root 1.67 reqq_push (&req_queue, req);
722 root 1.103 X_COND_SIGNAL (reqwait);
723     X_UNLOCK (reqlock);
724 root 1.80
725 root 1.94 unblock_sig ();
726    
727 root 1.80 maybe_start_thread ();
728 root 1.4 }
729    
730 root 1.45 static void end_thread (void)
731 root 1.4 {
732     aio_req req;
733 root 1.67
734 root 1.26 Newz (0, req, 1, aio_cb);
735 root 1.67
736 root 1.4 req->type = REQ_QUIT;
737 root 1.67 req->pri = PRI_MAX + PRI_BIAS;
738 root 1.4
739 root 1.103 X_LOCK (reqlock);
740 root 1.83 reqq_push (&req_queue, req);
741 root 1.103 X_COND_SIGNAL (reqwait);
742     X_UNLOCK (reqlock);
743 root 1.80
744 root 1.103 X_LOCK (wrklock);
745 root 1.80 --started;
746 root 1.103 X_UNLOCK (wrklock);
747 root 1.4 }
748    
749 root 1.85 static void set_max_idle (int nthreads)
750     {
751 root 1.103 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
752 root 1.85 max_idle = nthreads <= 0 ? 1 : nthreads;
753 root 1.103 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
754 root 1.85 }
755    
756 root 1.22 static void min_parallel (int nthreads)
757     {
758 root 1.30 if (wanted < nthreads)
759     wanted = nthreads;
760 root 1.22 }
761    
762     static void max_parallel (int nthreads)
763     {
764 root 1.30 if (wanted > nthreads)
765     wanted = nthreads;
766    
767 root 1.80 while (started > wanted)
768     end_thread ();
769     }
770    
771     static void poll_wait ()
772     {
773     fd_set rfd;
774    
775     while (nreqs)
776 root 1.30 {
777 root 1.80 int size;
778 root 1.103 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
779 root 1.80 size = res_queue.size;
780 root 1.103 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
781 root 1.80
782     if (size)
783     return;
784    
785     maybe_start_thread ();
786    
787 root 1.104 FD_ZERO (&rfd);
788     FD_SET (respipe [0], &rfd);
789 root 1.80
790 root 1.104 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
791 root 1.22 }
792 root 1.80 }
793    
794 root 1.85 static int poll_cb ()
795 root 1.80 {
796     dSP;
797     int count = 0;
798 root 1.85 int maxreqs = max_poll_reqs;
799 root 1.80 int do_croak = 0;
800 root 1.85 struct timeval tv_start, tv_now;
801 root 1.80 aio_req req;
802 root 1.22
803 root 1.85 if (max_poll_time)
804     gettimeofday (&tv_start, 0);
805    
806 root 1.94 block_sig ();
807    
808 root 1.80 for (;;)
809 root 1.22 {
810 root 1.85 for (;;)
811 root 1.80 {
812     maybe_start_thread ();
813    
814 root 1.103 X_LOCK (reslock);
815 root 1.80 req = reqq_shift (&res_queue);
816    
817     if (req)
818     {
819     --npending;
820    
821     if (!res_queue.size)
822     {
823     /* read any signals sent by the worker threads */
824 root 1.95 char buf [4];
825 root 1.105 while (respipe_read (respipe [0], buf, 4) == 4)
826 root 1.80 ;
827     }
828     }
829    
830 root 1.103 X_UNLOCK (reslock);
831 root 1.80
832     if (!req)
833     break;
834    
835     --nreqs;
836    
837 root 1.86 if (req->type == REQ_GROUP && req->size)
838 root 1.80 {
839 root 1.86 req->int1 = 1; /* mark request as delayed */
840 root 1.80 continue;
841     }
842     else
843     {
844 root 1.94 if (!req_invoke (req))
845     {
846 root 1.101 req_destroy (req);
847 root 1.94 unblock_sig ();
848     croak (0);
849     }
850 root 1.80
851     count++;
852     }
853    
854 root 1.101 req_destroy (req);
855 root 1.85
856     if (maxreqs && !--maxreqs)
857     break;
858    
859     if (max_poll_time)
860     {
861     gettimeofday (&tv_now, 0);
862    
863     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
864     break;
865     }
866 root 1.80 }
867    
868     if (nreqs <= max_outstanding)
869     break;
870    
871 root 1.22 poll_wait ();
872 root 1.80
873 root 1.85 ++maxreqs;
874 root 1.22 }
875 root 1.80
876 root 1.94 unblock_sig ();
877 root 1.80 return count;
878 root 1.22 }
879    
880     /*****************************************************************************/
881 root 1.17 /* work around various missing functions */
882    
883     #if !HAVE_PREADWRITE
884     # define pread aio_pread
885     # define pwrite aio_pwrite
886    
887     /*
888     * make our pread/pwrite safe against themselves, but not against
889     * normal read/write by using a mutex. slows down execution a lot,
890     * but that's your problem, not mine.
891     */
892 root 1.103 static mutex_t preadwritelock = X_MUTEX_INIT;
893 root 1.17
894 root 1.45 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
895 root 1.17 {
896     ssize_t res;
897     off_t ooffset;
898    
899 root 1.103 X_LOCK (preadwritelock);
900 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
901     lseek (fd, offset, SEEK_SET);
902     res = read (fd, buf, count);
903     lseek (fd, ooffset, SEEK_SET);
904 root 1.103 X_UNLOCK (preadwritelock);
905 root 1.17
906     return res;
907     }
908    
909 root 1.45 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
910 root 1.17 {
911     ssize_t res;
912     off_t ooffset;
913    
914 root 1.103 X_LOCK (preadwritelock);
915 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
916     lseek (fd, offset, SEEK_SET);
917     res = write (fd, buf, count);
918     lseek (fd, offset, SEEK_SET);
919 root 1.103 X_UNLOCK (preadwritelock);
920 root 1.17
921     return res;
922     }
923     #endif
924    
925 root 1.99 #ifndef HAVE_FUTIMES
926    
927     # define utimes(path,times) aio_utimes (path, times)
928     # define futimes(fd,times) aio_futimes (fd, times)
929    
930     int aio_utimes (const char *filename, const struct timeval times[2])
931     {
932     if (times)
933     {
934     struct utimbuf buf;
935    
936     buf.actime = times[0].tv_sec;
937     buf.modtime = times[1].tv_sec;
938    
939     return utime (filename, &buf);
940     }
941     else
942     return utime (filename, 0);
943     }
944    
945     int aio_futimes (int fd, const struct timeval tv[2])
946     {
947     errno = ENOSYS;
948     return -1;
949     }
950    
951     #endif
952    
953 root 1.17 #if !HAVE_FDATASYNC
954     # define fdatasync fsync
955     #endif
956    
957     #if !HAVE_READAHEAD
958 root 1.75 # define readahead(fd,offset,count) aio_readahead (fd, offset, count, self)
959 root 1.17
960 root 1.75 static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self)
961 root 1.17 {
962 root 1.103 size_t todo = count;
963 root 1.65 dBUF;
964 root 1.37
965 root 1.103 while (todo > 0)
966 root 1.17 {
967 root 1.103 size_t len = todo < AIO_BUFSIZE ? todo : AIO_BUFSIZE;
968 root 1.17
969 root 1.65 pread (fd, aio_buf, len, offset);
970 root 1.17 offset += len;
971 root 1.103 todo -= len;
972 root 1.17 }
973    
974     errno = 0;
975 root 1.103 return count;
976 root 1.17 }
977 root 1.75
978 root 1.17 #endif
979    
980 root 1.37 #if !HAVE_READDIR_R
981     # define readdir_r aio_readdir_r
982    
983 root 1.103 static mutex_t readdirlock = X_MUTEX_INIT;
984 root 1.37
985 root 1.103 static int readdir_r (DIR *dirp, X_DIRENT *ent, X_DIRENT **res)
986 root 1.37 {
987 root 1.103 X_DIRENT *e;
988 root 1.37 int errorno;
989    
990 root 1.103 X_LOCK (readdirlock);
991 root 1.37
992     e = readdir (dirp);
993     errorno = errno;
994    
995     if (e)
996     {
997     *res = ent;
998     strcpy (ent->d_name, e->d_name);
999     }
1000     else
1001     *res = 0;
1002    
1003 root 1.103 X_UNLOCK (readdirlock);
1004 root 1.37
1005     errno = errorno;
1006     return e ? 0 : -1;
1007     }
1008     #endif
1009    
1010 root 1.32 /* sendfile always needs emulation */
1011 root 1.71 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count, worker *self)
1012 root 1.32 {
1013 root 1.37 ssize_t res;
1014 root 1.32
1015 root 1.37 if (!count)
1016     return 0;
1017 root 1.32
1018 root 1.35 #if HAVE_SENDFILE
1019     # if __linux
1020 root 1.37 res = sendfile (ofd, ifd, &offset, count);
1021 root 1.32
1022 root 1.35 # elif __freebsd
1023 root 1.37 /*
1024     * Of course, the freebsd sendfile is a dire hack with no thoughts
1025     * wasted on making it similar to other I/O functions.
1026     */
1027     {
1028     off_t sbytes;
1029     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
1030    
1031     if (res < 0 && sbytes)
1032 root 1.77 /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
1033 root 1.37 res = sbytes;
1034     }
1035 root 1.32
1036 root 1.35 # elif __hpux
1037 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
1038 root 1.32
1039 root 1.35 # elif __solaris
1040 root 1.37 {
1041     struct sendfilevec vec;
1042     size_t sbytes;
1043    
1044     vec.sfv_fd = ifd;
1045     vec.sfv_flag = 0;
1046     vec.sfv_off = offset;
1047     vec.sfv_len = count;
1048    
1049     res = sendfilev (ofd, &vec, 1, &sbytes);
1050    
1051     if (res < 0 && sbytes)
1052     res = sbytes;
1053     }
1054 root 1.35
1055 root 1.38 # endif
1056     #else
1057 root 1.37 res = -1;
1058     errno = ENOSYS;
1059 root 1.32 #endif
1060    
1061 root 1.37 if (res < 0
1062     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
1063 root 1.35 #if __solaris
1064 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
1065 root 1.35 #endif
1066 root 1.37 )
1067     )
1068     {
1069     /* emulate sendfile. this is a major pain in the ass */
1070 root 1.65 dBUF;
1071    
1072 root 1.37 res = 0;
1073    
1074 root 1.38 while (count)
1075 root 1.37 {
1076     ssize_t cnt;
1077    
1078 root 1.65 cnt = pread (ifd, aio_buf, count > AIO_BUFSIZE ? AIO_BUFSIZE : count, offset);
1079 root 1.32
1080 root 1.37 if (cnt <= 0)
1081     {
1082     if (cnt && !res) res = -1;
1083     break;
1084     }
1085    
1086 root 1.65 cnt = write (ofd, aio_buf, cnt);
1087 root 1.37
1088     if (cnt <= 0)
1089     {
1090     if (cnt && !res) res = -1;
1091     break;
1092     }
1093    
1094     offset += cnt;
1095     res += cnt;
1096 root 1.38 count -= cnt;
1097 root 1.37 }
1098     }
1099    
1100     return res;
1101     }
1102    
1103     /* read a full directory */
1104 root 1.71 static void scandir_ (aio_req req, worker *self)
1105 root 1.37 {
1106 root 1.65 DIR *dirp;
1107 root 1.37 union
1108     {
1109 root 1.103 X_DIRENT d;
1110     char b [offsetof (X_DIRENT, d_name) + NAME_MAX + 1];
1111 root 1.65 } *u;
1112 root 1.103 X_DIRENT *entp;
1113 root 1.37 char *name, *names;
1114     int memlen = 4096;
1115     int memofs = 0;
1116     int res = 0;
1117    
1118 root 1.103 X_LOCK (wrklock);
1119 root 1.86 self->dirp = dirp = opendir (req->ptr1);
1120 root 1.71 self->dbuf = u = malloc (sizeof (*u));
1121 root 1.87 req->flags |= FLAG_PTR2_FREE;
1122 root 1.86 req->ptr2 = names = malloc (memlen);
1123 root 1.103 X_UNLOCK (wrklock);
1124 root 1.37
1125 root 1.71 if (dirp && u && names)
1126 root 1.65 for (;;)
1127     {
1128     errno = 0;
1129     readdir_r (dirp, &u->d, &entp);
1130 root 1.37
1131 root 1.65 if (!entp)
1132     break;
1133 root 1.37
1134 root 1.65 name = entp->d_name;
1135 root 1.37
1136 root 1.65 if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
1137     {
1138     int len = strlen (name) + 1;
1139 root 1.37
1140 root 1.65 res++;
1141 root 1.37
1142 root 1.65 while (memofs + len > memlen)
1143     {
1144     memlen *= 2;
1145 root 1.103 X_LOCK (wrklock);
1146 root 1.86 req->ptr2 = names = realloc (names, memlen);
1147 root 1.103 X_UNLOCK (wrklock);
1148 root 1.71
1149 root 1.65 if (!names)
1150     break;
1151     }
1152 root 1.37
1153 root 1.65 memcpy (names + memofs, name, len);
1154     memofs += len;
1155     }
1156     }
1157 root 1.37
1158 root 1.71 if (errno)
1159     res = -1;
1160    
1161     req->result = res;
1162 root 1.32 }
1163    
1164 root 1.22 /*****************************************************************************/
1165    
1166 root 1.103 X_THREAD_PROC (aio_proc)
1167 root 1.1 {
1168 root 1.103 {//D
1169 root 1.1 aio_req req;
1170 root 1.84 struct timespec ts;
1171 root 1.71 worker *self = (worker *)thr_arg;
1172 root 1.1
1173 root 1.103 /* try to distribute timeouts somewhat randomly */
1174     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
1175 root 1.84
1176 root 1.80 for (;;)
1177 root 1.1 {
1178 root 1.84 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1179    
1180 root 1.103 X_LOCK (reqlock);
1181 root 1.3
1182     for (;;)
1183     {
1184 root 1.71 self->req = req = reqq_shift (&req_queue);
1185 root 1.3
1186     if (req)
1187     break;
1188    
1189 root 1.84 ++idle;
1190    
1191 root 1.103 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts)
1192 root 1.84 == ETIMEDOUT)
1193     {
1194     if (idle > max_idle)
1195     {
1196     --idle;
1197 root 1.103 X_UNLOCK (reqlock);
1198     X_LOCK (wrklock);
1199 root 1.84 --started;
1200 root 1.103 X_UNLOCK (wrklock);
1201 root 1.84 goto quit;
1202     }
1203    
1204     /* we are allowed to idle, so do so without any timeout */
1205 root 1.103 X_COND_WAIT (reqwait, reqlock);
1206 root 1.84 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1207     }
1208    
1209     --idle;
1210 root 1.3 }
1211    
1212 root 1.79 --nready;
1213    
1214 root 1.103 X_UNLOCK (reqlock);
1215 root 1.3
1216 root 1.1 errno = 0; /* strictly unnecessary */
1217    
1218 root 1.58 if (!(req->flags & FLAG_CANCELLED))
1219 root 1.80 switch (req->type)
1220 root 1.43 {
1221 root 1.101 case REQ_READ: req->result = req->offs >= 0
1222     ? pread (req->int1, req->ptr1, req->size, req->offs)
1223     : read (req->int1, req->ptr1, req->size); break;
1224     case REQ_WRITE: req->result = req->offs >= 0
1225     ? pwrite (req->int1, req->ptr1, req->size, req->offs)
1226     : write (req->int1, req->ptr1, req->size); break;
1227 root 1.3
1228 root 1.86 case REQ_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
1229     case REQ_SENDFILE: req->result = sendfile_ (req->int1, req->int2, req->offs, req->size, self); break;
1230 root 1.16
1231 root 1.87 case REQ_STAT: req->result = stat (req->ptr1, (Stat_t *)req->ptr2); break;
1232     case REQ_LSTAT: req->result = lstat (req->ptr1, (Stat_t *)req->ptr2); break;
1233     case REQ_FSTAT: req->result = fstat (req->int1, (Stat_t *)req->ptr2); break;
1234 root 1.86
1235 root 1.99 case REQ_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
1236     case REQ_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
1237     case REQ_CHMOD: req->result = chmod (req->ptr1, req->mode); break;
1238     case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break;
1239 root 1.103 case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
1240     case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
1241 root 1.99
1242 root 1.86 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break;
1243     case REQ_CLOSE: req->result = close (req->int1); break;
1244     case REQ_UNLINK: req->result = unlink (req->ptr1); break;
1245     case REQ_RMDIR: req->result = rmdir (req->ptr1); break;
1246 root 1.97 case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break;
1247 root 1.86 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break;
1248     case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break;
1249     case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break;
1250     case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break;
1251     case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break;
1252 root 1.43
1253 root 1.86 case REQ_FDATASYNC: req->result = fdatasync (req->int1); break;
1254     case REQ_FSYNC: req->result = fsync (req->int1); break;
1255 root 1.71 case REQ_READDIR: scandir_ (req, self); break;
1256 root 1.1
1257 root 1.69 case REQ_BUSY:
1258 root 1.103 #ifdef _WIN32
1259     Sleep (req->nv1 * 1000.);
1260     #else
1261 root 1.45 {
1262     struct timeval tv;
1263    
1264 root 1.99 tv.tv_sec = req->nv1;
1265 root 1.103 tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
1266 root 1.45
1267     req->result = select (0, 0, 0, 0, &tv);
1268     }
1269 root 1.103 #endif
1270     break;
1271 root 1.45
1272 root 1.99 case REQ_UTIME:
1273     case REQ_FUTIME:
1274     {
1275     struct timeval tv[2];
1276     struct timeval *times;
1277    
1278     if (req->nv1 != -1. || req->nv2 != -1.)
1279     {
1280     tv[0].tv_sec = req->nv1;
1281     tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
1282     tv[1].tv_sec = req->nv2;
1283     tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
1284    
1285     times = tv;
1286     }
1287     else
1288     times = 0;
1289    
1290    
1291     req->result = req->type == REQ_FUTIME
1292     ? futimes (req->int1, times)
1293     : utimes (req->ptr1, times);
1294     }
1295    
1296 root 1.55 case REQ_GROUP:
1297     case REQ_NOP:
1298 root 1.80 break;
1299    
1300 root 1.43 case REQ_QUIT:
1301 root 1.84 goto quit;
1302 root 1.1
1303 root 1.43 default:
1304 root 1.99 req->result = -1;
1305 root 1.43 break;
1306     }
1307 root 1.1
1308     req->errorno = errno;
1309 root 1.3
1310 root 1.103 X_LOCK (reslock);
1311 root 1.3
1312 root 1.79 ++npending;
1313    
1314 root 1.67 if (!reqq_push (&res_queue, req))
1315 root 1.94 {
1316     /* write a dummy byte to the pipe so fh becomes ready */
1317 root 1.105 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
1318 root 1.94
1319     /* optionally signal the main thread asynchronously */
1320     if (main_sig)
1321     pthread_kill (main_tid, main_sig);
1322     }
1323 root 1.3
1324 root 1.71 self->req = 0;
1325     worker_clear (self);
1326    
1327 root 1.103 X_UNLOCK (reslock);
1328 root 1.1 }
1329 root 1.84
1330     quit:
1331 root 1.103 X_LOCK (wrklock);
1332 root 1.84 worker_free (self);
1333 root 1.103 X_UNLOCK (wrklock);
1334 root 1.84
1335     return 0;
1336 root 1.103 }//D
1337 root 1.1 }
1338    
1339 root 1.37 /*****************************************************************************/
1340    
1341     static void atfork_prepare (void)
1342     {
1343 root 1.103 X_LOCK (wrklock);
1344     X_LOCK (reqlock);
1345     X_LOCK (reslock);
1346 root 1.37 #if !HAVE_PREADWRITE
1347 root 1.103 X_LOCK (preadwritelock);
1348 root 1.37 #endif
1349     #if !HAVE_READDIR_R
1350 root 1.103 X_LOCK (readdirlock);
1351 root 1.37 #endif
1352     }
1353    
1354     static void atfork_parent (void)
1355     {
1356     #if !HAVE_READDIR_R
1357 root 1.103 X_UNLOCK (readdirlock);
1358 root 1.37 #endif
1359     #if !HAVE_PREADWRITE
1360 root 1.103 X_UNLOCK (preadwritelock);
1361 root 1.37 #endif
1362 root 1.103 X_UNLOCK (reslock);
1363     X_UNLOCK (reqlock);
1364     X_UNLOCK (wrklock);
1365 root 1.37 }
1366    
1367     static void atfork_child (void)
1368     {
1369     aio_req prv;
1370    
1371 root 1.67 while (prv = reqq_shift (&req_queue))
1372 root 1.101 req_destroy (prv);
1373 root 1.37
1374 root 1.67 while (prv = reqq_shift (&res_queue))
1375 root 1.101 req_destroy (prv);
1376 root 1.71
1377     while (wrk_first.next != &wrk_first)
1378     {
1379     worker *wrk = wrk_first.next;
1380    
1381     if (wrk->req)
1382 root 1.101 req_destroy (wrk->req);
1383 root 1.71
1384     worker_clear (wrk);
1385     worker_free (wrk);
1386     }
1387    
1388 root 1.84 started = 0;
1389     idle = 0;
1390     nreqs = 0;
1391     nready = 0;
1392     npending = 0;
1393 root 1.71
1394 root 1.106 create_respipe ();
1395 root 1.37
1396     atfork_parent ();
1397     }
1398    
1399 root 1.22 #define dREQ \
1400     aio_req req; \
1401 root 1.60 int req_pri = next_pri; \
1402     next_pri = DEFAULT_PRI + PRI_BIAS; \
1403 root 1.22 \
1404     if (SvOK (callback) && !SvROK (callback)) \
1405 root 1.43 croak ("callback must be undef or of reference type"); \
1406 root 1.22 \
1407     Newz (0, req, 1, aio_cb); \
1408     if (!req) \
1409     croak ("out of memory during aio_req allocation"); \
1410     \
1411 root 1.60 req->callback = newSVsv (callback); \
1412     req->pri = req_pri
1413 root 1.43
1414     #define REQ_SEND \
1415     req_send (req); \
1416     \
1417     if (GIMME_V != G_VOID) \
1418 root 1.44 XPUSHs (req_sv (req, AIO_REQ_KLASS));
1419 root 1.22
1420 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
1421    
1422 root 1.8 PROTOTYPES: ENABLE
1423    
1424 root 1.1 BOOT:
1425     {
1426 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
1427 root 1.82
1428 root 1.41 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1429     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1430     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1431 root 1.81 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1432     newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1433 root 1.103 #ifdef _WIN32
1434     X_MUTEX_CHECK (wrklock);
1435     X_MUTEX_CHECK (reslock);
1436     X_MUTEX_CHECK (reqlock);
1437     X_MUTEX_CHECK (reqwait);
1438     X_MUTEX_CHECK (preadwritelock);
1439     X_MUTEX_CHECK (readdirlock);
1440 root 1.104
1441     X_COND_CHECK (reqwait);
1442 root 1.103 #else
1443 root 1.82 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1444 root 1.94 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1445 root 1.103 #endif
1446    
1447 root 1.106 create_respipe ();
1448 root 1.41
1449 root 1.103 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1450 root 1.1 }
1451    
1452     void
1453 root 1.85 max_poll_reqs (int nreqs)
1454     PROTOTYPE: $
1455     CODE:
1456     max_poll_reqs = nreqs;
1457    
1458     void
1459     max_poll_time (double nseconds)
1460     PROTOTYPE: $
1461     CODE:
1462     max_poll_time = nseconds * AIO_TICKS;
1463    
1464     void
1465 root 1.78 min_parallel (int nthreads)
1466 root 1.1 PROTOTYPE: $
1467    
1468     void
1469 root 1.78 max_parallel (int nthreads)
1470     PROTOTYPE: $
1471    
1472 root 1.85 void
1473     max_idle (int nthreads)
1474     PROTOTYPE: $
1475     CODE:
1476     set_max_idle (nthreads);
1477    
1478 root 1.78 int
1479     max_outstanding (int maxreqs)
1480 root 1.1 PROTOTYPE: $
1481 root 1.78 CODE:
1482     RETVAL = max_outstanding;
1483     max_outstanding = maxreqs;
1484     OUTPUT:
1485     RETVAL
1486 root 1.1
1487     void
1488 root 1.99 aio_open (SV8 *pathname, int flags, int mode, SV *callback=&PL_sv_undef)
1489 root 1.8 PROTOTYPE: $$$;$
1490 root 1.43 PPCODE:
1491 root 1.1 {
1492 root 1.22 dREQ;
1493 root 1.1
1494     req->type = REQ_OPEN;
1495 root 1.86 req->sv1 = newSVsv (pathname);
1496 root 1.89 req->ptr1 = SvPVbyte_nolen (req->sv1);
1497 root 1.86 req->int1 = flags;
1498 root 1.1 req->mode = mode;
1499    
1500 root 1.43 REQ_SEND;
1501 root 1.1 }
1502    
1503     void
1504 root 1.99 aio_close (SV *fh, SV *callback=&PL_sv_undef)
1505 root 1.8 PROTOTYPE: $;$
1506 root 1.1 ALIAS:
1507     aio_close = REQ_CLOSE
1508     aio_fsync = REQ_FSYNC
1509     aio_fdatasync = REQ_FDATASYNC
1510 root 1.43 PPCODE:
1511 root 1.1 {
1512 root 1.22 dREQ;
1513 root 1.1
1514     req->type = ix;
1515 root 1.99 req->sv1 = newSVsv (fh);
1516 root 1.86 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1517 root 1.1
1518 root 1.43 REQ_SEND (req);
1519 root 1.1 }
1520    
1521     void
1522 root 1.102 aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef)
1523 root 1.13 ALIAS:
1524     aio_read = REQ_READ
1525     aio_write = REQ_WRITE
1526 root 1.8 PROTOTYPE: $$$$$;$
1527 root 1.43 PPCODE:
1528 root 1.13 {
1529     STRLEN svlen;
1530 root 1.21 char *svptr = SvPVbyte (data, svlen);
1531 root 1.102 UV len = SvUV (length);
1532 root 1.13
1533     SvUPGRADE (data, SVt_PV);
1534     SvPOK_on (data);
1535 root 1.1
1536 root 1.13 if (dataoffset < 0)
1537     dataoffset += svlen;
1538    
1539     if (dataoffset < 0 || dataoffset > svlen)
1540 root 1.102 croak ("dataoffset outside of data scalar");
1541 root 1.13
1542     if (ix == REQ_WRITE)
1543     {
1544     /* write: check length and adjust. */
1545 root 1.102 if (!SvOK (length) || len + dataoffset > svlen)
1546     len = svlen - dataoffset;
1547 root 1.13 }
1548     else
1549     {
1550     /* read: grow scalar as necessary */
1551 root 1.102 svptr = SvGROW (data, len + dataoffset + 1);
1552 root 1.13 }
1553    
1554 root 1.102 if (len < 0)
1555 root 1.13 croak ("length must not be negative");
1556    
1557 root 1.22 {
1558     dREQ;
1559 root 1.13
1560 root 1.22 req->type = ix;
1561 root 1.99 req->sv1 = newSVsv (fh);
1562 root 1.86 req->int1 = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
1563     : IoOFP (sv_2io (fh)));
1564 root 1.101 req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1565 root 1.102 req->size = len;
1566 root 1.99 req->sv2 = SvREFCNT_inc (data);
1567 root 1.86 req->ptr1 = (char *)svptr + dataoffset;
1568     req->stroffset = dataoffset;
1569 root 1.13
1570 root 1.28 if (!SvREADONLY (data))
1571     {
1572     SvREADONLY_on (data);
1573 root 1.100 req->flags |= FLAG_SV2_RO_OFF;
1574 root 1.28 }
1575    
1576 root 1.43 REQ_SEND;
1577 root 1.22 }
1578 root 1.13 }
1579 root 1.1
1580     void
1581 root 1.99 aio_readlink (SV8 *path, SV *callback=&PL_sv_undef)
1582 root 1.86 PROTOTYPE: $$;$
1583     PPCODE:
1584     {
1585     SV *data;
1586     dREQ;
1587    
1588     data = newSV (NAME_MAX);
1589     SvPOK_on (data);
1590    
1591     req->type = REQ_READLINK;
1592 root 1.99 req->sv1 = newSVsv (path);
1593     req->ptr2 = SvPVbyte_nolen (req->sv1);
1594     req->sv2 = data;
1595 root 1.86 req->ptr1 = SvPVbyte_nolen (data);
1596    
1597     REQ_SEND;
1598     }
1599    
1600     void
1601 root 1.101 aio_sendfile (SV *out_fh, SV *in_fh, SV *in_offset, UV length, SV *callback=&PL_sv_undef)
1602 root 1.32 PROTOTYPE: $$$$;$
1603 root 1.43 PPCODE:
1604 root 1.32 {
1605     dREQ;
1606    
1607     req->type = REQ_SENDFILE;
1608 root 1.99 req->sv1 = newSVsv (out_fh);
1609 root 1.86 req->int1 = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1610     req->sv2 = newSVsv (in_fh);
1611     req->int2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1612 root 1.101 req->offs = SvVAL64 (in_offset);
1613 root 1.86 req->size = length;
1614 root 1.32
1615 root 1.43 REQ_SEND;
1616 root 1.32 }
1617    
1618     void
1619 root 1.101 aio_readahead (SV *fh, SV *offset, IV length, SV *callback=&PL_sv_undef)
1620 root 1.8 PROTOTYPE: $$$;$
1621 root 1.43 PPCODE:
1622 root 1.1 {
1623 root 1.22 dREQ;
1624 root 1.1
1625     req->type = REQ_READAHEAD;
1626 root 1.99 req->sv1 = newSVsv (fh);
1627 root 1.86 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1628 root 1.101 req->offs = SvVAL64 (offset);
1629 root 1.86 req->size = length;
1630 root 1.1
1631 root 1.43 REQ_SEND;
1632 root 1.1 }
1633    
1634     void
1635 root 1.99 aio_stat (SV8 *fh_or_path, SV *callback=&PL_sv_undef)
1636 root 1.1 ALIAS:
1637 root 1.8 aio_stat = REQ_STAT
1638     aio_lstat = REQ_LSTAT
1639 root 1.43 PPCODE:
1640 root 1.1 {
1641 root 1.22 dREQ;
1642 root 1.1
1643 root 1.87 req->ptr2 = malloc (sizeof (Stat_t));
1644     if (!req->ptr2)
1645 root 1.27 {
1646 root 1.101 req_destroy (req);
1647 root 1.88 croak ("out of memory during aio_stat statdata allocation");
1648 root 1.27 }
1649 root 1.1
1650 root 1.87 req->flags |= FLAG_PTR2_FREE;
1651 root 1.99 req->sv1 = newSVsv (fh_or_path);
1652 root 1.87
1653 root 1.1 if (SvPOK (fh_or_path))
1654     {
1655 root 1.8 req->type = ix;
1656 root 1.89 req->ptr1 = SvPVbyte_nolen (req->sv1);
1657 root 1.1 }
1658     else
1659     {
1660     req->type = REQ_FSTAT;
1661 root 1.86 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1662 root 1.1 }
1663    
1664 root 1.43 REQ_SEND;
1665 root 1.1 }
1666    
1667     void
1668 root 1.99 aio_utime (SV8 *fh_or_path, SV *atime, SV *mtime, SV *callback=&PL_sv_undef)
1669     PPCODE:
1670     {
1671     dREQ;
1672    
1673     req->nv1 = SvOK (atime) ? SvNV (atime) : -1.;
1674     req->nv2 = SvOK (mtime) ? SvNV (mtime) : -1.;
1675     req->sv1 = newSVsv (fh_or_path);
1676    
1677     if (SvPOK (fh_or_path))
1678     {
1679     req->type = REQ_UTIME;
1680     req->ptr1 = SvPVbyte_nolen (req->sv1);
1681     }
1682     else
1683     {
1684     req->type = REQ_FUTIME;
1685     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1686     }
1687    
1688     REQ_SEND;
1689     }
1690    
1691     void
1692 root 1.103 aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef)
1693     PPCODE:
1694     {
1695     dREQ;
1696    
1697     req->sv1 = newSVsv (fh_or_path);
1698     req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1699    
1700     if (SvPOK (fh_or_path))
1701     {
1702     req->type = REQ_TRUNCATE;
1703     req->ptr1 = SvPVbyte_nolen (req->sv1);
1704     }
1705     else
1706     {
1707     req->type = REQ_FTRUNCATE;
1708     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1709     }
1710    
1711     REQ_SEND;
1712     }
1713    
1714     void
1715 root 1.99 aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef)
1716     PPCODE:
1717     {
1718     dREQ;
1719    
1720     req->mode = mode;
1721     req->sv1 = newSVsv (fh_or_path);
1722    
1723     if (SvPOK (fh_or_path))
1724     {
1725     req->type = REQ_CHMOD;
1726     req->ptr1 = SvPVbyte_nolen (req->sv1);
1727     }
1728     else
1729     {
1730     req->type = REQ_FCHMOD;
1731     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1732     }
1733    
1734     REQ_SEND;
1735     }
1736    
1737     void
1738     aio_chown (SV8 *fh_or_path, SV *uid, SV *gid, SV *callback=&PL_sv_undef)
1739     PPCODE:
1740     {
1741     dREQ;
1742    
1743     req->int2 = SvOK (uid) ? SvIV (uid) : -1;
1744     req->int3 = SvOK (gid) ? SvIV (gid) : -1;
1745     req->sv1 = newSVsv (fh_or_path);
1746    
1747     if (SvPOK (fh_or_path))
1748     {
1749     req->type = REQ_CHOWN;
1750     req->ptr1 = SvPVbyte_nolen (req->sv1);
1751     }
1752     else
1753     {
1754     req->type = REQ_FCHOWN;
1755     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1756     }
1757    
1758     REQ_SEND;
1759     }
1760    
1761     void
1762     aio_unlink (SV8 *pathname, SV *callback=&PL_sv_undef)
1763 root 1.22 ALIAS:
1764 root 1.40 aio_unlink = REQ_UNLINK
1765     aio_rmdir = REQ_RMDIR
1766     aio_readdir = REQ_READDIR
1767 root 1.43 PPCODE:
1768 root 1.1 {
1769 root 1.22 dREQ;
1770 root 1.1
1771 root 1.22 req->type = ix;
1772 root 1.86 req->sv1 = newSVsv (pathname);
1773 root 1.89 req->ptr1 = SvPVbyte_nolen (req->sv1);
1774 root 1.87
1775 root 1.43 REQ_SEND;
1776 root 1.22 }
1777    
1778     void
1779 root 1.99 aio_mkdir (SV8 *pathname, int mode, SV *callback=&PL_sv_undef)
1780 root 1.97 PPCODE:
1781     {
1782     dREQ;
1783    
1784     req->type = REQ_MKDIR;
1785     req->sv1 = newSVsv (pathname);
1786     req->ptr1 = SvPVbyte_nolen (req->sv1);
1787     req->mode = mode;
1788    
1789     REQ_SEND;
1790     }
1791    
1792     void
1793 root 1.99 aio_link (SV8 *oldpath, SV8 *newpath, SV *callback=&PL_sv_undef)
1794 root 1.40 ALIAS:
1795     aio_link = REQ_LINK
1796     aio_symlink = REQ_SYMLINK
1797     aio_rename = REQ_RENAME
1798 root 1.43 PPCODE:
1799 root 1.22 {
1800     dREQ;
1801 root 1.1
1802 root 1.40 req->type = ix;
1803 root 1.99 req->sv2 = newSVsv (oldpath);
1804     req->ptr2 = SvPVbyte_nolen (req->sv2);
1805 root 1.86 req->sv1 = newSVsv (newpath);
1806 root 1.89 req->ptr1 = SvPVbyte_nolen (req->sv1);
1807 root 1.1
1808 root 1.43 REQ_SEND;
1809 root 1.1 }
1810    
1811 root 1.42 void
1812 root 1.99 aio_mknod (SV8 *pathname, int mode, UV dev, SV *callback=&PL_sv_undef)
1813 root 1.81 PPCODE:
1814     {
1815     dREQ;
1816    
1817     req->type = REQ_MKNOD;
1818 root 1.86 req->sv1 = newSVsv (pathname);
1819 root 1.89 req->ptr1 = SvPVbyte_nolen (req->sv1);
1820 root 1.81 req->mode = (mode_t)mode;
1821 root 1.86 req->offs = dev;
1822 root 1.81
1823     REQ_SEND;
1824     }
1825    
1826     void
1827 root 1.99 aio_busy (double delay, SV *callback=&PL_sv_undef)
1828 root 1.45 PPCODE:
1829     {
1830     dREQ;
1831    
1832 root 1.69 req->type = REQ_BUSY;
1833 root 1.99 req->nv1 = delay < 0. ? 0. : delay;
1834 root 1.45
1835     REQ_SEND;
1836     }
1837    
1838     void
1839 root 1.99 aio_group (SV *callback=&PL_sv_undef)
1840 root 1.46 PROTOTYPE: ;$
1841 root 1.44 PPCODE:
1842 root 1.42 {
1843 root 1.44 dREQ;
1844 root 1.60
1845 root 1.44 req->type = REQ_GROUP;
1846 root 1.86
1847 root 1.45 req_send (req);
1848 root 1.44 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1849 root 1.42 }
1850    
1851 root 1.6 void
1852 root 1.99 aio_nop (SV *callback=&PL_sv_undef)
1853 root 1.54 PPCODE:
1854     {
1855     dREQ;
1856    
1857     req->type = REQ_NOP;
1858    
1859     REQ_SEND;
1860     }
1861    
1862 root 1.79 int
1863     aioreq_pri (int pri = 0)
1864     PROTOTYPE: ;$
1865     CODE:
1866     RETVAL = next_pri - PRI_BIAS;
1867     if (items > 0)
1868     {
1869     if (pri < PRI_MIN) pri = PRI_MIN;
1870     if (pri > PRI_MAX) pri = PRI_MAX;
1871     next_pri = pri + PRI_BIAS;
1872     }
1873     OUTPUT:
1874     RETVAL
1875 root 1.68
1876     void
1877     aioreq_nice (int nice = 0)
1878 root 1.79 CODE:
1879     nice = next_pri - nice;
1880     if (nice < PRI_MIN) nice = PRI_MIN;
1881     if (nice > PRI_MAX) nice = PRI_MAX;
1882     next_pri = nice + PRI_BIAS;
1883 root 1.60
1884 root 1.54 void
1885 root 1.40 flush ()
1886 root 1.6 PROTOTYPE:
1887     CODE:
1888     while (nreqs)
1889     {
1890     poll_wait ();
1891 root 1.91 poll_cb ();
1892 root 1.6 }
1893    
1894 root 1.91 int
1895 root 1.7 poll()
1896     PROTOTYPE:
1897     CODE:
1898 root 1.92 poll_wait ();
1899     RETVAL = poll_cb ();
1900 root 1.91 OUTPUT:
1901     RETVAL
1902 root 1.7
1903 root 1.1 int
1904     poll_fileno()
1905     PROTOTYPE:
1906     CODE:
1907 root 1.3 RETVAL = respipe [0];
1908 root 1.1 OUTPUT:
1909     RETVAL
1910    
1911     int
1912     poll_cb(...)
1913     PROTOTYPE:
1914     CODE:
1915 root 1.85 RETVAL = poll_cb ();
1916 root 1.1 OUTPUT:
1917     RETVAL
1918    
1919     void
1920     poll_wait()
1921     PROTOTYPE:
1922     CODE:
1923 root 1.92 poll_wait ();
1924 root 1.1
1925 root 1.94 void
1926     setsig (int signum = SIGIO)
1927     PROTOTYPE: ;$
1928     CODE:
1929     {
1930     if (block_sig_level)
1931     croak ("cannot call IO::AIO::setsig from within aio_block/callback");
1932    
1933 root 1.103 X_LOCK (reslock);
1934 root 1.94 main_tid = pthread_self ();
1935     main_sig = signum;
1936 root 1.103 X_UNLOCK (reslock);
1937 root 1.94
1938     if (main_sig && npending)
1939     pthread_kill (main_tid, main_sig);
1940     }
1941    
1942     void
1943     aio_block (SV *cb)
1944     PROTOTYPE: &
1945     PPCODE:
1946     {
1947     int count;
1948    
1949     block_sig ();
1950     PUSHMARK (SP);
1951     PUTBACK;
1952     count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL);
1953     SPAGAIN;
1954     unblock_sig ();
1955    
1956     if (SvTRUE (ERRSV))
1957     croak (0);
1958    
1959     XSRETURN (count);
1960     }
1961    
1962 root 1.1 int
1963     nreqs()
1964     PROTOTYPE:
1965     CODE:
1966     RETVAL = nreqs;
1967     OUTPUT:
1968     RETVAL
1969    
1970 root 1.79 int
1971     nready()
1972     PROTOTYPE:
1973     CODE:
1974 root 1.80 RETVAL = get_nready ();
1975 root 1.79 OUTPUT:
1976     RETVAL
1977    
1978     int
1979     npending()
1980     PROTOTYPE:
1981     CODE:
1982 root 1.80 RETVAL = get_npending ();
1983 root 1.79 OUTPUT:
1984     RETVAL
1985    
1986 root 1.85 int
1987     nthreads()
1988     PROTOTYPE:
1989     CODE:
1990 root 1.103 if (WORDACCESS_UNSAFE) X_LOCK (wrklock);
1991 root 1.85 RETVAL = started;
1992 root 1.103 if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock);
1993 root 1.85 OUTPUT:
1994     RETVAL
1995    
1996 root 1.48 PROTOTYPES: DISABLE
1997    
1998 root 1.44 MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1999 root 1.43
2000     void
2001     cancel (aio_req_ornot req)
2002     CODE:
2003 root 1.45 req_cancel (req);
2004    
2005 root 1.56 void
2006 root 1.59 cb (aio_req_ornot req, SV *callback=&PL_sv_undef)
2007 root 1.56 CODE:
2008     SvREFCNT_dec (req->callback);
2009     req->callback = newSVsv (callback);
2010    
2011 root 1.45 MODULE = IO::AIO PACKAGE = IO::AIO::GRP
2012    
2013     void
2014     add (aio_req grp, ...)
2015     PPCODE:
2016     {
2017     int i;
2018 root 1.53 aio_req req;
2019 root 1.45
2020 root 1.94 if (main_sig && !block_sig_level)
2021     croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use");
2022    
2023 root 1.86 if (grp->int1 == 2)
2024 root 1.49 croak ("cannot add requests to IO::AIO::GRP after the group finished");
2025    
2026 root 1.45 for (i = 1; i < items; ++i )
2027     {
2028 root 1.46 if (GIMME_V != G_VOID)
2029     XPUSHs (sv_2mortal (newSVsv (ST (i))));
2030    
2031 root 1.53 req = SvAIO_REQ (ST (i));
2032 root 1.45
2033 root 1.46 if (req)
2034     {
2035 root 1.86 ++grp->size;
2036 root 1.49 req->grp = grp;
2037    
2038     req->grp_prev = 0;
2039     req->grp_next = grp->grp_first;
2040 root 1.45
2041 root 1.49 if (grp->grp_first)
2042     grp->grp_first->grp_prev = req;
2043    
2044     grp->grp_first = req;
2045 root 1.46 }
2046 root 1.45 }
2047     }
2048 root 1.43
2049 root 1.48 void
2050 root 1.72 cancel_subs (aio_req_ornot req)
2051     CODE:
2052     req_cancel_subs (req);
2053    
2054     void
2055 root 1.51 result (aio_req grp, ...)
2056     CODE:
2057     {
2058     int i;
2059 root 1.79 AV *av;
2060    
2061     grp->errorno = errno;
2062    
2063     av = newAV ();
2064 root 1.51
2065     for (i = 1; i < items; ++i )
2066     av_push (av, newSVsv (ST (i)));
2067    
2068 root 1.86 SvREFCNT_dec (grp->sv1);
2069     grp->sv1 = (SV *)av;
2070 root 1.51 }
2071    
2072     void
2073 root 1.79 errno (aio_req grp, int errorno = errno)
2074     CODE:
2075     grp->errorno = errorno;
2076    
2077     void
2078 root 1.67 limit (aio_req grp, int limit)
2079 root 1.49 CODE:
2080 root 1.86 grp->int2 = limit;
2081 root 1.49 aio_grp_feed (grp);
2082    
2083     void
2084 root 1.56 feed (aio_req grp, SV *callback=&PL_sv_undef)
2085 root 1.49 CODE:
2086     {
2087 root 1.86 SvREFCNT_dec (grp->sv2);
2088     grp->sv2 = newSVsv (callback);
2089 root 1.49
2090 root 1.86 if (grp->int2 <= 0)
2091     grp->int2 = 2;
2092 root 1.49
2093     aio_grp_feed (grp);
2094     }
2095