--- Coro-Mysql/Mysql.xs 2009/06/20 20:44:09 1.4 +++ Coro-Mysql/Mysql.xs 2012/09/10 20:00:58 1.9 @@ -8,12 +8,18 @@ #include "perl.h" #include "XSUB.h" +#if HAVE_EV +# include "EVAPI.h" +# include "CoroAPI.h" +#endif + #define IN_DESTRUCT PL_dirty typedef U16 uint16; /* cached function gv's */ static CV *readable, *writable; +static int use_ev; #include "violite.h" @@ -23,15 +29,18 @@ typedef struct { int magic; - SV *corosocket; + SV *corohandle_sv, *corohandle; int bufofs, bufcnt; +#if HAVE_EV + ev_io rw, ww; +#endif char buf[VIO_READ_BUFFER_SIZE]; } ourdata; #define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET))) -static int -our_read (Vio *vio, gptr p, int len) +static xlen +our_read (Vio *vio, xgptr p, xlen len) { ourdata *our = OURDATAPTR; @@ -49,13 +58,23 @@ if (rd >= 0 || errno != EAGAIN) break; - { - dSP; - PUSHMARK (SP); - XPUSHs (our->corosocket); - PUTBACK; - call_sv ((SV *)readable, G_VOID | G_DISCARD); - } +#if HAVE_EV + if (use_ev) + { + our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT)); + ev_io_start (EV_DEFAULT_UC, &(our->rw)); + CORO_SCHEDULE; + ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */ + } + else +#endif + { + dSP; + PUSHMARK (SP); + XPUSHs (our->corohandle); + PUTBACK; + call_sv ((SV *)readable, G_VOID | G_DISCARD); + } } if (rd <= 0) @@ -75,8 +94,8 @@ return len; } -static int -our_write (Vio *vio, const gptr p, int len) +static xlen +our_write (Vio *vio, cxgptr p, xlen len) { char *ptr = (char *)p; my_bool dummy; @@ -94,11 +113,25 @@ } else if (errno == EAGAIN) { - dSP; - PUSHMARK (SP); - XPUSHs (OURDATAPTR->corosocket); - PUTBACK; - call_sv ((SV *)writable, G_VOID | G_DISCARD); + ourdata *our = OURDATAPTR; + +#if HAVE_EV + if (use_ev) + { + our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT)); + ev_io_start (EV_DEFAULT_UC, &(our->ww)); + CORO_SCHEDULE; + ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */ + } + else +#endif + { + dSP; + PUSHMARK (SP); + XPUSHs (our->corohandle); + PUTBACK; + call_sv ((SV *)writable, G_VOID | G_DISCARD); + } } else if (ptr == (char *)p) return -1; @@ -109,6 +142,49 @@ return ptr - (char *)p; } +static int +our_close (Vio *vio) +{ + ourdata *our = OURDATAPTR; + + if (vio->read != our_read) + croak ("vio.read has unexpected content during unpatch - wtf?"); + + if (vio->write != our_write) + croak ("vio.write has unexpected content during unpatch - wtf?"); + + if (vio->vioclose != our_close) + croak ("vio.vioclose has unexpected content during unpatch - wtf?"); + +#if HAVE_EV + if (use_ev) + { + ev_io_stop (EV_DEFAULT_UC, &(our->rw)); + ev_io_stop (EV_DEFAULT_UC, &(our->ww)); + } +#endif + + SvREFCNT_dec (our->corohandle); + SvREFCNT_dec (our->corohandle_sv); + + Safefree (our); + + vio->read = vio_read; + vio->write = vio_write; + vio->vioclose = vio_close; + + vio->vioclose (vio); +} + +#if HAVE_EV +static void +iocb (EV_P_ ev_io *w, int revents) +{ + ev_io_stop (EV_A, w); + CORO_READY ((SV *)w->data); +} +#endif + MODULE = Coro::Mysql PACKAGE = Coro::Mysql BOOT: @@ -120,60 +196,70 @@ PROTOTYPES: ENABLE void -_patch (IV sock, int fd, SV *corosocket) +_use_ev () + PPCODE: +{ + static int onceonly; + + if (!onceonly) + { + onceonly = 1; +#if HAVE_EV + I_EV_API ("Coro::Mysql"); + I_CORO_API ("Coro::Mysql"); + use_ev = 1; +#endif + } + + XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no); +} + +void +_patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle) CODE: { MYSQL *my = (MYSQL *)sock; Vio *vio = my->net.vio; ourdata *our; + /* matching versions are required but not sufficient */ + if (client_version != mysql_get_client_version ()) + croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).", + client_version, mysql_get_client_version ()); + if (fd != my->net.fd) croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?"); if (fd != vio->sd) croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?"); + if (vio->vioclose != vio_close) + croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?"); + if (vio->write != vio_write) croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?"); - if (vio->read != vio_read && vio->read != vio_read_buff) + if (vio->read != vio_read + && vio->read != vio_read_buff) croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?"); Newz (0, our, 1, ourdata); our->magic = CoMy_MAGIC; - our->corosocket = newSVsv (corosocket); + our->corohandle_sv = newSVsv (corohandle_sv); + our->corohandle = newSVsv (corohandle); +#if HAVE_EV + if (use_ev) + { + ev_io_init (&(our->rw), iocb, vio->sd, EV_READ); + ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE); + } +#endif vio->desc [DESC_OFFSET - 1] = 0; OURDATAPTR = our; - vio->write = our_write; - vio->read = our_read; + vio->vioclose = our_close; + vio->write = our_write; + vio->read = our_read; } -void -_unpatch (IV sock) - CODE: - if (!IN_DESTRUCT) - { - /* we currently leak data during global destruction */ - /* perl makes it extremely hard to do otherwise, though */ - MYSQL *my = (MYSQL *)sock; - Vio *vio = my->net.vio; - my_bool dummy; - - if (vio->read != our_read) - croak ("vio.read has unexpected content during unpatch - wtf?"); - - if (vio->write != our_write) - croak ("vio.write has unexpected content during unpatch - wtf?"); - - SvREFCNT_dec (OURDATAPTR->corosocket); - - Safefree (OURDATAPTR); - - vio->read = vio_read; - vio->write = vio_write; - } - - -