--- IO-AIO/AIO.xs 2007/07/08 09:09:34 1.103 +++ IO-AIO/AIO.xs 2007/07/08 11:05:36 1.104 @@ -26,6 +26,8 @@ # undef open # undef read # undef write +# undef send +# undef recv # undef stat # undef fstat # define lstat stat @@ -264,7 +266,7 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2]; +static int respipe [2], respipe_osf [2]; static mutex_t reslock = X_MUTEX_INIT; static mutex_t reqlock = X_MUTEX_INIT; @@ -627,6 +629,31 @@ req_cancel_subs (req); } +#ifdef USE_SOCKETS_AS_HANDLES +# define TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define TO_SOCKET(x) (x) +#endif + +static void +create_pipe (int fd[2]) +{ +#ifdef _WIN32 + int arg = 1; + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd) + || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) +#else + if (pipe (fd) + || fcntl (fd [0], F_SETFL, O_NONBLOCK) + || fcntl (fd [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe"); + + respipe_osf [0] = TO_SOCKET (respipe [0]); + respipe_osf [1] = TO_SOCKET (respipe [1]); +} + X_THREAD_PROC (aio_proc); static void start_thread (void) @@ -738,10 +765,10 @@ maybe_start_thread (); - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + FD_ZERO (&rfd); + FD_SET (respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); } } @@ -776,7 +803,7 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (read (respipe [0], buf, 4) == 4) + while (PerlSock_recv (respipe [0], buf, 4, 0) == 4) ; } } @@ -1268,7 +1295,7 @@ if (!reqq_push (&res_queue, req)) { /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + send (respipe_osf [1], (const void *)&respipe_osf, 1, 0); /* optionally signal the main thread asynchronously */ if (main_sig) @@ -1345,11 +1372,10 @@ nready = 0; npending = 0; - close (respipe [0]); - close (respipe [1]); + PerlSock_closesocket (respipe [0]); + PerlSock_closesocket (respipe [1]); - if (!create_pipe (respipe)) - croak ("cannot set result pipe to nonblocking mode"); + create_pipe (respipe); atfork_parent (); } @@ -1395,13 +1421,14 @@ X_MUTEX_CHECK (reqwait); X_MUTEX_CHECK (preadwritelock); X_MUTEX_CHECK (readdirlock); + + X_COND_CHECK (reqwait); #else newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); #endif - if (!create_pipe (respipe)) - croak ("cannot set result pipe to nonblocking mode"); + create_pipe (respipe); X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); }