ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.103 by root, Sun Jul 8 09:09:34 2007 UTC vs.
Revision 1.104 by root, Sun Jul 8 11:05:36 2007 UTC

24 24
25// perl overrides all those nice win32 functions 25// perl overrides all those nice win32 functions
26# undef open 26# undef open
27# undef read 27# undef read
28# undef write 28# undef write
29# undef send
30# undef recv
29# undef stat 31# undef stat
30# undef fstat 32# undef fstat
31# define lstat stat 33# define lstat stat
32# undef truncate 34# undef truncate
33# undef ftruncate 35# undef ftruncate
262} 264}
263 265
264static volatile unsigned int nreqs, nready, npending; 266static volatile unsigned int nreqs, nready, npending;
265static volatile unsigned int max_idle = 4; 267static volatile unsigned int max_idle = 4;
266static volatile unsigned int max_outstanding = 0xffffffff; 268static volatile unsigned int max_outstanding = 0xffffffff;
267static int respipe [2]; 269static int respipe [2], respipe_osf [2];
268 270
269static mutex_t reslock = X_MUTEX_INIT; 271static mutex_t reslock = X_MUTEX_INIT;
270static mutex_t reqlock = X_MUTEX_INIT; 272static mutex_t reqlock = X_MUTEX_INIT;
271static cond_t reqwait = X_COND_INIT; 273static cond_t reqwait = X_COND_INIT;
272 274
625 req->flags |= FLAG_CANCELLED; 627 req->flags |= FLAG_CANCELLED;
626 628
627 req_cancel_subs (req); 629 req_cancel_subs (req);
628} 630}
629 631
632#ifdef USE_SOCKETS_AS_HANDLES
633# define TO_SOCKET(x) (win32_get_osfhandle (x))
634#else
635# define TO_SOCKET(x) (x)
636#endif
637
638static void
639create_pipe (int fd[2])
640{
641#ifdef _WIN32
642 int arg = 1;
643 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd)
644 || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg)
645 || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg))
646#else
647 if (pipe (fd)
648 || fcntl (fd [0], F_SETFL, O_NONBLOCK)
649 || fcntl (fd [1], F_SETFL, O_NONBLOCK))
650#endif
651 croak ("unable to initialize result pipe");
652
653 respipe_osf [0] = TO_SOCKET (respipe [0]);
654 respipe_osf [1] = TO_SOCKET (respipe [1]);
655}
656
630X_THREAD_PROC (aio_proc); 657X_THREAD_PROC (aio_proc);
631 658
632static void start_thread (void) 659static void start_thread (void)
633{ 660{
634 worker *wrk = calloc (1, sizeof (worker)); 661 worker *wrk = calloc (1, sizeof (worker));
736 if (size) 763 if (size)
737 return; 764 return;
738 765
739 maybe_start_thread (); 766 maybe_start_thread ();
740 767
741 FD_ZERO(&rfd); 768 FD_ZERO (&rfd);
742 FD_SET(respipe [0], &rfd); 769 FD_SET (respipe [0], &rfd);
743 770
744 select (respipe [0] + 1, &rfd, 0, 0, 0); 771 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
745 } 772 }
746} 773}
747 774
748static int poll_cb () 775static int poll_cb ()
749{ 776{
774 801
775 if (!res_queue.size) 802 if (!res_queue.size)
776 { 803 {
777 /* read any signals sent by the worker threads */ 804 /* read any signals sent by the worker threads */
778 char buf [4]; 805 char buf [4];
779 while (read (respipe [0], buf, 4) == 4) 806 while (PerlSock_recv (respipe [0], buf, 4, 0) == 4)
780 ; 807 ;
781 } 808 }
782 } 809 }
783 810
784 X_UNLOCK (reslock); 811 X_UNLOCK (reslock);
1266 ++npending; 1293 ++npending;
1267 1294
1268 if (!reqq_push (&res_queue, req)) 1295 if (!reqq_push (&res_queue, req))
1269 { 1296 {
1270 /* write a dummy byte to the pipe so fh becomes ready */ 1297 /* write a dummy byte to the pipe so fh becomes ready */
1271 write (respipe [1], &respipe, 1); 1298 send (respipe_osf [1], (const void *)&respipe_osf, 1, 0);
1272 1299
1273 /* optionally signal the main thread asynchronously */ 1300 /* optionally signal the main thread asynchronously */
1274 if (main_sig) 1301 if (main_sig)
1275 pthread_kill (main_tid, main_sig); 1302 pthread_kill (main_tid, main_sig);
1276 } 1303 }
1343 idle = 0; 1370 idle = 0;
1344 nreqs = 0; 1371 nreqs = 0;
1345 nready = 0; 1372 nready = 0;
1346 npending = 0; 1373 npending = 0;
1347 1374
1348 close (respipe [0]); 1375 PerlSock_closesocket (respipe [0]);
1349 close (respipe [1]); 1376 PerlSock_closesocket (respipe [1]);
1350 1377
1351 if (!create_pipe (respipe)) 1378 create_pipe (respipe);
1352 croak ("cannot set result pipe to nonblocking mode");
1353 1379
1354 atfork_parent (); 1380 atfork_parent ();
1355} 1381}
1356 1382
1357#define dREQ \ 1383#define dREQ \
1393 X_MUTEX_CHECK (reslock); 1419 X_MUTEX_CHECK (reslock);
1394 X_MUTEX_CHECK (reqlock); 1420 X_MUTEX_CHECK (reqlock);
1395 X_MUTEX_CHECK (reqwait); 1421 X_MUTEX_CHECK (reqwait);
1396 X_MUTEX_CHECK (preadwritelock); 1422 X_MUTEX_CHECK (preadwritelock);
1397 X_MUTEX_CHECK (readdirlock); 1423 X_MUTEX_CHECK (readdirlock);
1424
1425 X_COND_CHECK (reqwait);
1398#else 1426#else
1399 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1427 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1400 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); 1428 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1401#endif 1429#endif
1402 1430
1403 if (!create_pipe (respipe)) 1431 create_pipe (respipe);
1404 croak ("cannot set result pipe to nonblocking mode");
1405 1432
1406 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); 1433 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1407} 1434}
1408 1435
1409void 1436void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines