--- IO-AIO/schmorp.h 2009/07/14 00:51:31 1.4 +++ IO-AIO/schmorp.h 2009/07/15 01:36:04 1.5 @@ -5,6 +5,12 @@ * This header file is a shared resource between many modules. */ +#include + +#ifndef _WIN32 +# include +#endif + /* useful stuff, used by schmorp mostly */ #include "patchlevel.h" @@ -76,6 +82,7 @@ static char sig_size [] = { SIG_NUM }; # define SIG_SIZE (sizeof (sig_size) + 1) #endif + dTHX; int signum; SvGETMAGIC (sig); @@ -98,7 +105,10 @@ int signum = s_signum (sig); if (signum < 0) - croak ("%s: invalid signal name or number", SvPV_nolen (sig)); + { + dTHX; + croak ("%s: invalid signal name or number", SvPV_nolen (sig)); + } return signum; } @@ -106,6 +116,7 @@ static int s_fileno (SV *fh, int wr) { + dTHX; SvGETMAGIC (fh); if (SvROK (fh)) @@ -129,7 +140,10 @@ int fd = s_fileno (fh, wr); if (fd < 0) - croak ("%s: illegal fh argument, either not an OS file or read/write mode mismatch", SvPV_nolen (fh)); + { + dTHX; + croak ("%s: illegal fh argument, either not an OS file or read/write mode mismatch", SvPV_nolen (fh)); + } return fd; } @@ -137,6 +151,7 @@ static SV * s_get_cv (SV *cb_sv) { + dTHX; HV *st; GV *gvp; @@ -149,7 +164,10 @@ SV *cv = s_get_cv (cb_sv); if (!cv) - croak ("%s: callback must be a CODE reference or another callable object", SvPV_nolen (cb_sv)); + { + dTHX; + croak ("%s: callback must be a CODE reference or another callable object", SvPV_nolen (cb_sv)); + } return cv; } @@ -177,5 +195,274 @@ return newRV_noinc ((SV *)cv); } +/** portable pipe/socketpair */ + +#ifdef USE_SOCKETS_AS_HANDLES +# define S_TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define S_TO_SOCKET(x) (x) +#endif + +#ifdef _WIN32 +/* taken almost verbatim from libev's ev_win32.c */ +/* oh, the humanity! */ +static int +s_pipe (int filedes [2]) +{ + struct sockaddr_in addr = { 0 }; + int addr_size = sizeof (addr); + struct sockaddr_in adr2; + int adr2_size = sizeof (adr2); + SOCKET listener; + SOCKET sock [2] = { -1, -1 }; + + if ((listener = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + return -1; + + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + addr.sin_port = 0; + + if (bind (listener, (struct sockaddr *)&addr, addr_size)) + goto fail; + + if (getsockname (listener, (struct sockaddr *)&addr, &addr_size)) + goto fail; + + if (listen (listener, 1)) + goto fail; + + if ((sock [0] = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + goto fail; + + if (connect (sock [0], (struct sockaddr *)&addr, addr_size)) + goto fail; + + if ((sock [1] = accept (listener, 0, 0)) < 0) + goto fail; + + /* windows vista returns fantasy port numbers for getpeername. + * example for two interconnected tcp sockets: + * + * (Socket::unpack_sockaddr_in getsockname $sock0)[0] == 53364 + * (Socket::unpack_sockaddr_in getpeername $sock0)[0] == 53363 + * (Socket::unpack_sockaddr_in getsockname $sock1)[0] == 53363 + * (Socket::unpack_sockaddr_in getpeername $sock1)[0] == 53365 + * + * wow! tridirectional sockets! + * + * this way of checking ports seems to work: + */ + if (getpeername (sock [0], (struct sockaddr *)&addr, &addr_size)) + goto fail; + + if (getsockname (sock [1], (struct sockaddr *)&adr2, &adr2_size)) + goto fail; + + errno = WSAEINVAL; + if (addr_size != adr2_size + || addr.sin_addr.s_addr != adr2.sin_addr.s_addr /* just to be sure, I mean, it's windows */ + || addr.sin_port != adr2.sin_port) + goto fail; + + closesocket (listener); + +#ifdef USE_SOCKETS_AS_HANDLES + /* when select isn't winsocket, we also expect socket, connect, accept etc. + * to work on fds */ + filedes [0] = sock [0]; + filedes [1] = sock [1]; +#else + filedes [0] = _open_osfhandle (sock [0], 0); + filedes [1] = _open_osfhandle (sock [1], 0); +#endif + + return 0; + +fail: + closesocket (listener); + + if (sock [0] != INVALID_SOCKET) closesocket (sock [0]); + if (sock [1] != INVALID_SOCKET) closesocket (sock [1]); + + return -1; +} + +#define s_socketpair(domain,type,protocol,filedes) s_pipe (filedes) + +static int +s_fd_blocking (int fd, int blocking) +{ + blocking = !blocking; + + return ioctlsocket (S_TO_SOCKET (fd), FIONBIO, &blocking); +} + +#define s_fd_prepare(fd) s_fd_blocking (fd, 0) + +#else + +#define s_socketpair(domain,type,protocol,filedes) socketpair (domain, type, protocol, filedes) +#define s_pipe(filedes) pipe (filedes) + +static int +s_fd_blocking (int fd, int blocking) +{ + return fcntl (fd, F_SETFL, blocking ? 0 : O_NONBLOCK); +} + +static int +s_fd_prepare (int fd) +{ + return s_fd_blocking (fd, 0) + || fcntl (fd, F_SETFD, FD_CLOEXEC); +} + +#endif + +#if __linux && (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 7)) +/* our minimum requirement is glibc 2.7 which has the stub, but not the header */ +# include +# ifdef __cplusplus +extern "C" { +# endif + int eventfd (unsigned int initval, int flags); +# ifdef __cplusplus +} +# endif +#else +# define eventfd(initval,flags) -1 +#endif + +typedef struct { + int fd[2]; /* read, write fd, might be equal */ + int len; /* write length (1 pipe/socket, 8 eventfd) */ + volatile sig_atomic_t sent; +} s_epipe; + +static int +s_epipe_new (s_epipe *epp) +{ + s_epipe ep; + + ep.fd [0] = ep.fd [1] = eventfd (0, 0); + + if (ep.fd [0] >= 0) + { + s_fd_prepare (ep.fd [0]); + ep.len = 8; + } + else + { + if (s_pipe (ep.fd)) + return -1; + + if (s_fd_prepare (ep.fd [0]) + || s_fd_prepare (ep.fd [1])) + { + close (ep.fd [0]); + close (ep.fd [1]); + return -1; + } + + ep.len = 1; + } + + ep.sent = 0; + *epp = ep; + return 0; +} + +static void +s_epipe_destroy (s_epipe *epp) +{ + close (epp->fd [0]); + + if (epp->fd [1] != epp->fd [0]) + close (epp->fd [1]); + + epp->len = 0; +} + +static void +s_epipe_signal (s_epipe *epp) +{ + if (epp->sent) + return; + + epp->sent = 1; +#ifdef _WIN32 + send (epp->fd [1], epp, 1); +#else + static uint64_t counter = 1; + write (epp->fd [1], &counter, epp->len); +#endif +} + +static void +s_epipe_drain (s_epipe *epp) +{ + char buf [9]; + +#ifdef _WIN32 + PerlSock_recv (epp->fd [0], buf, sizeof (buf), 0); +#else + read (epp->fd [0], buf, sizeof (buf)); +#endif + + epp->sent = 0; +} + +/* like new, but dups over old */ +static int +s_epipe_renew (s_epipe *epp) +{ + s_epipe epn; + + if (epp->fd [1] != epp->fd [0]) + close (epp->fd [1]); + + if (s_epipe_new (&epn)) + return -1; + + if (epp->len) + { + if (dup2 (S_TO_SOCKET (epn.fd [0]), S_TO_SOCKET (epp->fd [0])) < 0) + croak ("unable to dup over old event pipe"); /* should not croak */ + + if (epp->fd [1] != epp->fd [0]) + close (epn.fd [0]); + + epn.fd [0] = epp->fd [0]; + } + + *epp = epn; + + return 0; +} + +#define s_epipe_fd(epp) ((epp)->fd [0]) + +static int +s_epipe_wait (s_epipe *epp) +{ +#ifdef _WIN32 + fd_set rfd; + + FD_ZERO (&rfd); + FD_SET (s_epipe_fd (epp), &rfd); + + return PerlSock_select (s_epipe_fd (epp) + 1, &rfd, 0, 0, 0); +#else + /* poll is preferable on posix systems */ + struct pollfd pfd; + + pfd.fd = s_epipe_fd (epp); + pfd.events = POLLIN; + + return poll (&pfd, 1, 0); +#endif +} + #endif