--- IO-AIO/AIO.xs 2007/07/08 09:09:34 1.103 +++ IO-AIO/AIO.xs 2008/04/26 12:00:23 1.114 @@ -26,6 +26,8 @@ # undef open # undef read # undef write +# undef send +# undef recv # undef stat # undef fstat # define lstat stat @@ -97,6 +99,8 @@ # define SvVAL64 SvNV #endif +static HV *stash; + #define dBUF \ char *aio_buf; \ X_LOCK (wrklock); \ @@ -117,7 +121,7 @@ REQ_UTIME, REQ_FUTIME, REQ_CHMOD, REQ_FCHMOD, REQ_CHOWN, REQ_FCHOWN, - REQ_FSYNC, REQ_FDATASYNC, + REQ_SYNC, REQ_FSYNC, REQ_FDATASYNC, REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME, REQ_MKNOD, REQ_READDIR, REQ_LINK, REQ_SYMLINK, REQ_READLINK, @@ -187,7 +191,7 @@ static int main_sig; static int block_sig_level; -void block_sig () +void block_sig (void) { sigset_t ss; @@ -202,7 +206,7 @@ pthread_sigmask (SIG_BLOCK, &ss, 0); } -void unblock_sig () +void unblock_sig (void) { sigset_t ss; @@ -264,7 +268,7 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2]; +static int respipe_osf [2], respipe [2] = { -1, -1 }; static mutex_t reslock = X_MUTEX_INIT; static mutex_t reqlock = X_MUTEX_INIT; @@ -272,7 +276,7 @@ #if WORDACCESS_UNSAFE -static unsigned int get_nready () +static unsigned int get_nready (void) { unsigned int retval; @@ -283,7 +287,7 @@ return retval; } -static unsigned int get_npending () +static unsigned int get_npending (void) { unsigned int retval; @@ -294,7 +298,7 @@ return retval; } -static unsigned int get_nthreads () +static unsigned int get_nthreads (void) { unsigned int retval; @@ -367,7 +371,7 @@ abort (); } -static int poll_cb (); +static int poll_cb (void); static int req_invoke (aio_req req); static void req_destroy (aio_req req); static void req_cancel (aio_req req); @@ -501,16 +505,31 @@ case REQ_OPEN: { /* convert fd to fh */ - SV *fh; + SV *fh = &PL_sv_undef; + + if (req->result >= 0) + { + GV *gv = (GV *)sv_newmortal (); + int flags = req->int1 & (O_RDONLY | O_WRONLY | O_RDWR); + char sym [64]; + int symlen; + + symlen = snprintf (sym, sizeof (sym), "fd#%d", req->result); + gv_init (gv, stash, sym, symlen, 0); + + symlen = snprintf ( + sym, + sizeof (sym), + "%s&=%d", + flags == O_RDONLY ? "<" : flags == O_WRONLY ? ">" : "+<", + req->result + ); - PUSHs (sv_2mortal (newSViv (req->result))); - PUTBACK; - call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); - SPAGAIN; - - fh = POPs; - PUSHMARK (SP); - XPUSHs (fh); + if (do_open (gv, sym, symlen, 0, 0, 0, 0)) + fh = (SV *)gv; + } + + PUSHs (fh); } break; @@ -564,11 +583,13 @@ errno = req->errorno; PUTBACK; - call_sv (req->callback, G_VOID | G_EVAL); + call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD); SPAGAIN; FREETMPS; LEAVE; + + PUTBACK; } if (req->grp) @@ -627,6 +648,50 @@ req_cancel_subs (req); } +#ifdef USE_SOCKETS_AS_HANDLES +# define TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define TO_SOCKET(x) (x) +#endif + +static void +create_respipe (void) +{ + int old_readfd = respipe [0]; + + if (respipe [1] >= 0) + respipe_close (TO_SOCKET (respipe [1])); + +#ifdef _WIN32 + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe)) +#else + if (pipe (respipe)) +#endif + croak ("unable to initialize result pipe"); + + if (old_readfd >= 0) + { + if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0) + croak ("unable to initialize result pipe(2)"); + + respipe_close (respipe [0]); + respipe [0] = old_readfd; + } + +#ifdef _WIN32 + int arg = 1; + if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg)) +#else + if (fcntl (respipe [0], F_SETFL, O_NONBLOCK) + || fcntl (respipe [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe(3)"); + + respipe_osf [0] = TO_SOCKET (respipe [0]); + respipe_osf [1] = TO_SOCKET (respipe [1]); +} + X_THREAD_PROC (aio_proc); static void start_thread (void) @@ -652,7 +717,7 @@ X_UNLOCK (wrklock); } -static void maybe_start_thread () +static void maybe_start_thread (void) { if (get_nthreads () >= wanted) return; @@ -722,7 +787,7 @@ end_thread (); } -static void poll_wait () +static void poll_wait (void) { fd_set rfd; @@ -738,14 +803,14 @@ maybe_start_thread (); - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + FD_ZERO (&rfd); + FD_SET (respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); } } -static int poll_cb () +static int poll_cb (void) { dSP; int count = 0; @@ -776,7 +841,7 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (read (respipe [0], buf, 4) == 4) + while (respipe_read (respipe [0], buf, 4) == 4) ; } } @@ -1115,11 +1180,37 @@ req->result = res; } +static int +aio_close (int fd) +{ + static int close_pipe = -1; /* dummy fd to close fds via dup2 */ + + X_LOCK (wrklock); + + if (close_pipe < 0) + { + int pipefd [2]; + + if (pipe (pipefd) < 0 + || close (pipefd [1]) < 0 + || fcntl (pipefd [0], F_SETFD, FD_CLOEXEC) < 0) + { + X_UNLOCK (wrklock); + return -1; + } + + close_pipe = pipefd [0]; + } + + X_UNLOCK (wrklock); + + return dup2 (close_pipe, fd) < 0 ? -1 : 0; +} + /*****************************************************************************/ X_THREAD_PROC (aio_proc) { - {//D aio_req req; struct timespec ts; worker *self = (worker *)thr_arg; @@ -1194,7 +1285,7 @@ case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break; case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; - case REQ_CLOSE: req->result = close (req->int1); break; + case REQ_CLOSE: req->result = aio_close (req->int1); break; case REQ_UNLINK: req->result = unlink (req->ptr1); break; case REQ_RMDIR: req->result = rmdir (req->ptr1); break; case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break; @@ -1204,8 +1295,10 @@ case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break; case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break; - case REQ_FDATASYNC: req->result = fdatasync (req->int1); break; + case REQ_SYNC: req->result = 0; sync (); break; case REQ_FSYNC: req->result = fsync (req->int1); break; + case REQ_FDATASYNC: req->result = fdatasync (req->int1); break; + case REQ_READDIR: scandir_ (req, self); break; case REQ_BUSY: @@ -1268,7 +1361,7 @@ if (!reqq_push (&res_queue, req)) { /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); /* optionally signal the main thread asynchronously */ if (main_sig) @@ -1287,7 +1380,6 @@ X_UNLOCK (wrklock); return 0; - }//D } /*****************************************************************************/ @@ -1345,11 +1437,7 @@ nready = 0; npending = 0; - close (respipe [0]); - close (respipe [1]); - - if (!create_pipe (respipe)) - croak ("cannot set result pipe to nonblocking mode"); + create_respipe (); atfork_parent (); } @@ -1381,7 +1469,7 @@ BOOT: { - HV *stash = gv_stashpv ("IO::AIO", 1); + stash = gv_stashpv ("IO::AIO", 1); newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); @@ -1395,13 +1483,14 @@ X_MUTEX_CHECK (reqwait); X_MUTEX_CHECK (preadwritelock); X_MUTEX_CHECK (readdirlock); + + X_COND_CHECK (reqwait); #else newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); #endif - if (!create_pipe (respipe)) - croak ("cannot set result pipe to nonblocking mode"); + create_respipe (); X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); } @@ -1458,10 +1547,9 @@ } void -aio_close (SV *fh, SV *callback=&PL_sv_undef) +aio_fsync (SV *fh, SV *callback=&PL_sv_undef) PROTOTYPE: $;$ ALIAS: - aio_close = REQ_CLOSE aio_fsync = REQ_FSYNC aio_fdatasync = REQ_FDATASYNC PPCODE: @@ -1476,6 +1564,20 @@ } void +aio_close (SV *fh, SV *callback=&PL_sv_undef) + PROTOTYPE: $;$ + PPCODE: +{ + dREQ; + + req->type = REQ_CLOSE; + req->sv1 = newSVsv (fh); + req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh))); + + REQ_SEND (req); +} + +void aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef) ALIAS: aio_read = REQ_READ @@ -1807,11 +1909,14 @@ void aio_nop (SV *callback=&PL_sv_undef) + ALIAS: + aio_nop = REQ_NOP + aio_sync = REQ_SYNC PPCODE: { dREQ; - req->type = REQ_NOP; + req->type = ix; REQ_SEND; } @@ -1907,7 +2012,6 @@ PUSHMARK (SP); PUTBACK; count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL); - SPAGAIN; unblock_sig (); if (SvTRUE (ERRSV))