--- IO-AIO/AIO.xs 2008/05/10 18:06:41 1.115 +++ IO-AIO/AIO.xs 2008/05/10 20:02:09 1.117 @@ -103,9 +103,9 @@ typedef eio_req *aio_req; typedef eio_req *aio_req_ornot; -static unsigned int max_outstanding = 0x7fffffff; - +static SV *on_next_submit; static int next_pri = EIO_DEFAULT_PRI + EIO_PRI_BIAS; +static int max_outstanding; static int respipe_osf [2], respipe [2] = { -1, -1 }; @@ -168,6 +168,23 @@ } } +static void req_submit (eio_req *req) +{ + eio_submit (req); + + if (on_next_submit) + { + dSP; + SV *cb = sv_2mortal (on_next_submit); + + on_next_submit = 0; + + PUSHMARK (SP); + PUTBACK; + call_sv (cb, G_DISCARD | G_EVAL); + } +} + static int req_invoke (eio_req *req) { dSP; @@ -407,14 +424,25 @@ static int poll_cb (void) { - int res = eio_poll (); + int res; - if (res > 0) - croak (0); + do + { + res = eio_poll (); + + if (res > 0) + croak (0); + } + while (max_outstanding && max_outstanding <= eio_nreqs ()); return res; } +static void atfork_child (void) +{ + create_respipe (); +} + #define dREQ \ aio_req req; \ int req_pri = next_pri; \ @@ -431,7 +459,7 @@ req->pri = req_pri #define REQ_SEND \ - eio_submit (req); \ + req_submit (req); \ \ if (GIMME_V != G_VOID) \ XPUSHs (req_sv (req, AIO_REQ_KLASS)); @@ -466,6 +494,9 @@ if (eio_init (want_poll, done_poll) < 0) croak ("IO::AIO: unable to initialise eio library"); + + /* atfork child called in fifo order, so before eio's handler */ + X_THREAD_ATFORK (0, 0, atfork_child); } void @@ -920,7 +951,7 @@ flush () PROTOTYPE: CODE: - while (nreqs) + while (eio_nreqs ()) { poll_wait (); poll_cb (); @@ -991,6 +1022,11 @@ OUTPUT: RETVAL +void _on_next_submit (SV *cb) + CODE: + SvREFCNT_dec (on_next_submit); + on_next_submit = SvOK (cb) ? newSVsv (cb) : 0; + PROTOTYPES: DISABLE MODULE = IO::AIO PACKAGE = IO::AIO::REQ