--- IO-AIO/AIO.xs 2007/07/08 11:05:36 1.104 +++ IO-AIO/AIO.xs 2007/10/03 21:27:51 1.107 @@ -99,6 +99,8 @@ # define SvVAL64 SvNV #endif +static HV *stash; + #define dBUF \ char *aio_buf; \ X_LOCK (wrklock); \ @@ -266,7 +268,7 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2], respipe_osf [2]; +static int respipe_osf [2], respipe [2] = { -1, -1 }; static mutex_t reslock = X_MUTEX_INIT; static mutex_t reqlock = X_MUTEX_INIT; @@ -503,15 +505,30 @@ case REQ_OPEN: { /* convert fd to fh */ - SV *fh; + SV *fh = &PL_sv_undef; - PUSHs (sv_2mortal (newSViv (req->result))); - PUTBACK; - call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); - SPAGAIN; + if (req->result >= 0) + { + GV *gv = (GV *)sv_newmortal (); + int flags = req->int1 & (O_RDONLY | O_WRONLY | O_RDWR); + char sym [64]; + int symlen; + + symlen = snprintf (sym, sizeof (sym), "fd#%d", req->result); + gv_init (gv, stash, sym, symlen, 0); + + symlen = snprintf ( + sym, + sizeof (sym), + "%s&=%d", + flags == O_RDONLY ? "<" : flags == O_WRONLY ? ">" : "+<", + req->result + ); + + if (do_open (gv, sym, symlen, 0, 0, 0, 0)) + fh = gv; + } - fh = POPs; - PUSHMARK (SP); XPUSHs (fh); } break; @@ -636,20 +653,39 @@ #endif static void -create_pipe (int fd[2]) +create_respipe () { + int old_readfd = respipe [0]; + + if (respipe [1] >= 0) + respipe_close (TO_SOCKET (respipe [1])); + #ifdef _WIN32 - int arg = 1; - if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd) - || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) - || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe)) #else - if (pipe (fd) - || fcntl (fd [0], F_SETFL, O_NONBLOCK) - || fcntl (fd [1], F_SETFL, O_NONBLOCK)) + if (pipe (respipe)) #endif croak ("unable to initialize result pipe"); + if (old_readfd >= 0) + { + if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0) + croak ("unable to initialize result pipe(2)"); + + respipe_close (respipe [0]); + respipe [0] = old_readfd; + } + +#ifdef _WIN32 + int arg = 1; + if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg)) +#else + if (fcntl (respipe [0], F_SETFL, O_NONBLOCK) + || fcntl (respipe [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe(3)"); + respipe_osf [0] = TO_SOCKET (respipe [0]); respipe_osf [1] = TO_SOCKET (respipe [1]); } @@ -803,7 +839,7 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (PerlSock_recv (respipe [0], buf, 4, 0) == 4) + while (respipe_read (respipe [0], buf, 4) == 4) ; } } @@ -1221,7 +1257,7 @@ case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break; case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; - case REQ_CLOSE: req->result = close (req->int1); break; + case REQ_CLOSE: req->result = PerlIO_close ((PerlIO *)req->ptr1); break; case REQ_UNLINK: req->result = unlink (req->ptr1); break; case REQ_RMDIR: req->result = rmdir (req->ptr1); break; case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break; @@ -1295,7 +1331,7 @@ if (!reqq_push (&res_queue, req)) { /* write a dummy byte to the pipe so fh becomes ready */ - send (respipe_osf [1], (const void *)&respipe_osf, 1, 0); + respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); /* optionally signal the main thread asynchronously */ if (main_sig) @@ -1372,10 +1408,7 @@ nready = 0; npending = 0; - PerlSock_closesocket (respipe [0]); - PerlSock_closesocket (respipe [1]); - - create_pipe (respipe); + create_respipe (); atfork_parent (); } @@ -1407,7 +1440,7 @@ BOOT: { - HV *stash = gv_stashpv ("IO::AIO", 1); + stash = gv_stashpv ("IO::AIO", 1); newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); @@ -1428,7 +1461,7 @@ newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); #endif - create_pipe (respipe); + create_respipe (); X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); } @@ -1485,10 +1518,9 @@ } void -aio_close (SV *fh, SV *callback=&PL_sv_undef) +aio_fsync (SV *fh, SV *callback=&PL_sv_undef) PROTOTYPE: $;$ ALIAS: - aio_close = REQ_CLOSE aio_fsync = REQ_FSYNC aio_fdatasync = REQ_FDATASYNC PPCODE: @@ -1503,6 +1535,30 @@ } void +aio_close (SV *fh, SV *callback=&PL_sv_undef) + PROTOTYPE: $;$ + PPCODE: +{ + PerlIO *io = IoIFP (sv_2io (fh)); + int fd = PerlIO_fileno (io); + + if (fd < 0) + croak ("aio_close called with fd-less filehandle"); + + PerlIO_binmode (io, 0, 0, 0); + + { + dREQ; + + req->type = REQ_CLOSE; + req->sv1 = newSVsv (fh); + req->ptr1 = (void *)io; + + REQ_SEND (req); + } +} + +void aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef) ALIAS: aio_read = REQ_READ