--- IO-AIO/AIO.xs 2009/07/01 08:11:24 1.150 +++ IO-AIO/AIO.xs 2010/01/02 14:11:32 1.158 @@ -6,6 +6,8 @@ #include "perl.h" #include "XSUB.h" +#include "schmorp.h" + #include #include #include @@ -99,13 +101,11 @@ /*****************************************************************************/ -static HV *stash; typedef SV SV8; /* byte-sv, used for argument-checking */ typedef int aio_rfd; /* read file desriptor */ typedef int aio_wfd; /* write file descriptor */ -#define AIO_REQ_KLASS "IO::AIO::REQ" -#define AIO_GRP_KLASS "IO::AIO::GRP" +static HV *aio_stash, *aio_req_stash, *aio_grp_stash; #define EIO_REQ_MEMBERS \ SV *callback; \ @@ -160,7 +160,7 @@ static int next_pri = EIO_PRI_DEFAULT; static int max_outstanding; -static int respipe_osf [2], respipe [2] = { -1, -1 }; +static s_epipe respipe; static void req_destroy (aio_req req); static void req_cancel (aio_req req); @@ -168,19 +168,17 @@ static void want_poll (void) { /* write a dummy byte to the pipe so fh becomes ready */ - respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); + s_epipe_signal (&respipe); } static void done_poll (void) { /* read any signals sent by the worker threads */ - char buf [4]; - while (respipe_read (respipe [0], buf, 4) == 4) - ; + s_epipe_drain (&respipe); } /* must be called at most once */ -static SV *req_sv (aio_req req, const char *klass) +static SV *req_sv (aio_req req, HV *stash) { if (!req->self) { @@ -188,15 +186,18 @@ sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0); } - return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1))); + return sv_2mortal (sv_bless (newRV_inc (req->self), stash)); } static aio_req SvAIO_REQ (SV *sv) { MAGIC *mg; - if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv)) - croak ("object of class " AIO_REQ_KLASS " expected"); + if (!SvROK (sv) + || (SvSTASH (SvRV (sv)) != aio_grp_stash + && SvSTASH (SvRV (sv)) != aio_req_stash + && !sv_derived_from (sv, "IO::AIO::REQ"))) + croak ("object of class IO::AIO::REQ expected"); mg = mg_find (SvRV (sv), PERL_MAGIC_ext); @@ -212,7 +213,7 @@ ENTER; SAVETMPS; PUSHMARK (SP); - XPUSHs (req_sv (grp, AIO_GRP_KLASS)); + XPUSHs (req_sv (grp, aio_grp_stash)); PUTBACK; call_sv (grp->sv2, G_VOID | G_EVAL | G_KEEPERR); SPAGAIN; @@ -328,7 +329,7 @@ int symlen; symlen = snprintf (sym, sizeof (sym), "fd#%d", (int)req->result); - gv_init (gv, stash, sym, symlen, 0); + gv_init (gv, aio_stash, sym, symlen, 0); symlen = snprintf ( sym, @@ -441,54 +442,15 @@ eio_grp_cancel (grp); } -#ifdef USE_SOCKETS_AS_HANDLES -# define TO_SOCKET(x) (win32_get_osfhandle (x)) -#else -# define TO_SOCKET(x) (x) -#endif - static void create_respipe (void) { - int old_readfd = respipe [0]; - - if (respipe [1] >= 0) - respipe_close (TO_SOCKET (respipe [1])); - -#ifdef _WIN32 - if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe)) -#else - if (pipe (respipe)) -#endif - croak ("unable to initialize result pipe"); - - if (old_readfd >= 0) - { - if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0) - croak ("unable to initialize result pipe(2)"); - - respipe_close (respipe [0]); - respipe [0] = old_readfd; - } - -#ifdef _WIN32 - int arg = 1; - if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg) - || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg)) -#else - if (fcntl (respipe [0], F_SETFL, O_NONBLOCK) - || fcntl (respipe [1], F_SETFL, O_NONBLOCK)) -#endif - croak ("unable to initialize result pipe(3)"); - - respipe_osf [0] = TO_SOCKET (respipe [0]); - respipe_osf [1] = TO_SOCKET (respipe [1]); + if (s_epipe_renew (&respipe)) + croak ("IO::AIO: unable to initialize result pipe"); } static void poll_wait (void) { - fd_set rfd; - while (eio_nreqs ()) { int size; @@ -502,10 +464,7 @@ etp_maybe_start_thread (); - FD_ZERO (&rfd); - FD_SET (respipe [0], &rfd); - - PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); + s_epipe_wait (&respipe); } } @@ -533,19 +492,8 @@ static SV * get_cb (SV *cb_sv) { - HV *st; - GV *gvp; - CV *cv; - - if (!SvOK (cb_sv)) - return 0; - - cv = sv_2cv (cb_sv, &st, &gvp, 0); - - if (!cv) - croak ("IO::AIO callback must be undef or a CODE reference"); - - return (SV *)cv; + SvGETMAGIC (cb_sv); + return SvOK (cb_sv) ? s_get_cv_croak (cb_sv) : 0; } #define dREQ \ @@ -569,18 +517,7 @@ SPAGAIN; \ \ if (GIMME_V != G_VOID) \ - XPUSHs (req_sv (req, AIO_REQ_KLASS)); - -static int -extract_fd (SV *fh, int wr) -{ - int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh))); - - if (fd < 0) - croak ("illegal fh argument, either not an OS file or read/write mode mismatch"); - - return fd; -} + XPUSHs (req_sv (req, aio_req_stash)); MODULE = IO::AIO PACKAGE = IO::AIO @@ -610,6 +547,12 @@ const_iv (FADV_WILLNEED , POSIX_FADV_WILLNEED) const_iv (FADV_DONTNEED , POSIX_FADV_DONTNEED) + const_eio (MS_ASYNC) + const_eio (MS_INVALIDATE) + const_eio (MS_SYNC) + + const_eio (MT_MODIFY) + const_eio (SYNC_FILE_RANGE_WAIT_BEFORE) const_eio (SYNC_FILE_RANGE_WRITE) const_eio (SYNC_FILE_RANGE_WAIT_AFTER) @@ -630,10 +573,12 @@ const_eio (DT_WHT) }; - stash = gv_stashpv ("IO::AIO", 1); + aio_stash = gv_stashpv ("IO::AIO" , 1); + aio_req_stash = gv_stashpv ("IO::AIO::REQ", 1); + aio_grp_stash = gv_stashpv ("IO::AIO::GRP", 1); for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) - newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); + newCONSTSUB (aio_stash, (char *)civ->name, newSViv (civ->iv)); create_respipe (); @@ -704,7 +649,7 @@ aio_fdatasync = EIO_FDATASYNC PPCODE: { - int fd = extract_fd (fh, 0); + int fd = s_fileno_croak (fh, 0); dREQ; req->type = ix; @@ -719,7 +664,7 @@ PROTOTYPE: $$$$;$ PPCODE: { - int fd = extract_fd (fh, 0); + int fd = s_fileno_croak (fh, 0); dREQ; req->type = EIO_SYNC_FILE_RANGE; @@ -738,7 +683,7 @@ PPCODE: { static int close_pipe = -1; /* dummy fd to close fds via dup2 */ - int fd = extract_fd (fh, 0); + int fd = s_fileno_croak (fh, 0); dREQ; if (close_pipe < 0) @@ -770,7 +715,7 @@ PPCODE: { STRLEN svlen; - int fd = extract_fd (fh, ix == EIO_WRITE); + int fd = s_fileno_croak (fh, ix == EIO_WRITE); char *svptr = SvPVbyte (data, svlen); UV len = SvUV (length); @@ -835,8 +780,8 @@ PROTOTYPE: $$$$;$ PPCODE: { - int ifd = extract_fd (in_fh , 0); - int ofd = extract_fd (out_fh, 0); + int ifd = s_fileno_croak (in_fh , 0); + int ofd = s_fileno_croak (out_fh, 1); dREQ; req->type = EIO_SENDFILE; @@ -855,7 +800,7 @@ PROTOTYPE: $$$;$ PPCODE: { - int fd = extract_fd (fh, 0); + int fd = s_fileno_croak (fh, 0); dREQ; req->type = EIO_READAHEAD; @@ -1058,6 +1003,40 @@ } void +aio_mtouch (SV8 *data, IV offset = 0, SV *length = &PL_sv_undef, int flags = 0, SV *callback=&PL_sv_undef) + ALIAS: + aio_mtouch = EIO_MTOUCH + aio_msync = EIO_MSYNC + PROTOTYPE: $$$$;$ + PPCODE: +{ + STRLEN svlen; + UV len = SvUV (length); + char *svptr = SvPVbyte (data, svlen); + + if (offset < 0) + offset += svlen; + + if (offset < 0 || offset > svlen) + croak ("offset outside of scalar"); + + if (!SvOK (length) || len + offset > svlen) + len = svlen - offset; + + { + dREQ; + + req->type = ix; + req->size = len; + req->sv2 = SvREFCNT_inc (data); + req->ptr2 = (char *)svptr + offset; + req->int1 = flags; + + REQ_SEND; + } +} + +void aio_busy (double delay, SV *callback=&PL_sv_undef) PPCODE: { @@ -1079,7 +1058,7 @@ req->type = EIO_GROUP; req_submit (req); - XPUSHs (req_sv (req, AIO_GRP_KLASS)); + XPUSHs (req_sv (req, aio_grp_stash)); } void @@ -1141,7 +1120,7 @@ poll_fileno() PROTOTYPE: CODE: - RETVAL = respipe [0]; + RETVAL = s_epipe_fd (&respipe); OUTPUT: RETVAL @@ -1207,7 +1186,9 @@ sendfile (aio_wfd ofh, aio_rfd ifh, off_t offset, size_t count) PROTOTYPE: $$$$ CODE: - eio_sendfile_sync (ofh, ifh, offset, count); + RETVAL = eio_sendfile_sync (ofh, ifh, offset, count); + OUTPUT: + RETVAL void _on_next_submit (SV *cb) CODE: @@ -1232,7 +1213,7 @@ if (items > 1) { - SV *cb_cv = get_cb (callback); + SV *cb_cv =get_cb (callback); SvREFCNT_dec (req->callback); req->callback = SvREFCNT_inc (cb_cv);