ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.103 by root, Sun Jul 8 09:09:34 2007 UTC vs.
Revision 1.106 by root, Mon Sep 24 18:14:00 2007 UTC

24 24
25// perl overrides all those nice win32 functions 25// perl overrides all those nice win32 functions
26# undef open 26# undef open
27# undef read 27# undef read
28# undef write 28# undef write
29# undef send
30# undef recv
29# undef stat 31# undef stat
30# undef fstat 32# undef fstat
31# define lstat stat 33# define lstat stat
32# undef truncate 34# undef truncate
33# undef ftruncate 35# undef ftruncate
262} 264}
263 265
264static volatile unsigned int nreqs, nready, npending; 266static volatile unsigned int nreqs, nready, npending;
265static volatile unsigned int max_idle = 4; 267static volatile unsigned int max_idle = 4;
266static volatile unsigned int max_outstanding = 0xffffffff; 268static volatile unsigned int max_outstanding = 0xffffffff;
267static int respipe [2]; 269static int respipe_osf [2], respipe [2] = { -1, -1 };
268 270
269static mutex_t reslock = X_MUTEX_INIT; 271static mutex_t reslock = X_MUTEX_INIT;
270static mutex_t reqlock = X_MUTEX_INIT; 272static mutex_t reqlock = X_MUTEX_INIT;
271static cond_t reqwait = X_COND_INIT; 273static cond_t reqwait = X_COND_INIT;
272 274
625 req->flags |= FLAG_CANCELLED; 627 req->flags |= FLAG_CANCELLED;
626 628
627 req_cancel_subs (req); 629 req_cancel_subs (req);
628} 630}
629 631
632#ifdef USE_SOCKETS_AS_HANDLES
633# define TO_SOCKET(x) (win32_get_osfhandle (x))
634#else
635# define TO_SOCKET(x) (x)
636#endif
637
638static void
639create_respipe ()
640{
641 int old_readfd = respipe [0];
642
643 if (respipe [1] >= 0)
644 respipe_close (TO_SOCKET (respipe [1]));
645
646#ifdef _WIN32
647 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
648#else
649 if (pipe (respipe))
650#endif
651 croak ("unable to initialize result pipe");
652
653 if (old_readfd >= 0)
654 {
655 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
656 croak ("unable to initialize result pipe(2)");
657
658 respipe_close (respipe [0]);
659 respipe [0] = old_readfd;
660 }
661
662#ifdef _WIN32
663 int arg = 1;
664 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
665 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
666#else
667 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
668 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
669#endif
670 croak ("unable to initialize result pipe(3)");
671
672 respipe_osf [0] = TO_SOCKET (respipe [0]);
673 respipe_osf [1] = TO_SOCKET (respipe [1]);
674}
675
630X_THREAD_PROC (aio_proc); 676X_THREAD_PROC (aio_proc);
631 677
632static void start_thread (void) 678static void start_thread (void)
633{ 679{
634 worker *wrk = calloc (1, sizeof (worker)); 680 worker *wrk = calloc (1, sizeof (worker));
736 if (size) 782 if (size)
737 return; 783 return;
738 784
739 maybe_start_thread (); 785 maybe_start_thread ();
740 786
741 FD_ZERO(&rfd); 787 FD_ZERO (&rfd);
742 FD_SET(respipe [0], &rfd); 788 FD_SET (respipe [0], &rfd);
743 789
744 select (respipe [0] + 1, &rfd, 0, 0, 0); 790 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
745 } 791 }
746} 792}
747 793
748static int poll_cb () 794static int poll_cb ()
749{ 795{
774 820
775 if (!res_queue.size) 821 if (!res_queue.size)
776 { 822 {
777 /* read any signals sent by the worker threads */ 823 /* read any signals sent by the worker threads */
778 char buf [4]; 824 char buf [4];
779 while (read (respipe [0], buf, 4) == 4) 825 while (respipe_read (respipe [0], buf, 4) == 4)
780 ; 826 ;
781 } 827 }
782 } 828 }
783 829
784 X_UNLOCK (reslock); 830 X_UNLOCK (reslock);
1266 ++npending; 1312 ++npending;
1267 1313
1268 if (!reqq_push (&res_queue, req)) 1314 if (!reqq_push (&res_queue, req))
1269 { 1315 {
1270 /* write a dummy byte to the pipe so fh becomes ready */ 1316 /* write a dummy byte to the pipe so fh becomes ready */
1271 write (respipe [1], &respipe, 1); 1317 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
1272 1318
1273 /* optionally signal the main thread asynchronously */ 1319 /* optionally signal the main thread asynchronously */
1274 if (main_sig) 1320 if (main_sig)
1275 pthread_kill (main_tid, main_sig); 1321 pthread_kill (main_tid, main_sig);
1276 } 1322 }
1343 idle = 0; 1389 idle = 0;
1344 nreqs = 0; 1390 nreqs = 0;
1345 nready = 0; 1391 nready = 0;
1346 npending = 0; 1392 npending = 0;
1347 1393
1348 close (respipe [0]); 1394 create_respipe ();
1349 close (respipe [1]);
1350
1351 if (!create_pipe (respipe))
1352 croak ("cannot set result pipe to nonblocking mode");
1353 1395
1354 atfork_parent (); 1396 atfork_parent ();
1355} 1397}
1356 1398
1357#define dREQ \ 1399#define dREQ \
1393 X_MUTEX_CHECK (reslock); 1435 X_MUTEX_CHECK (reslock);
1394 X_MUTEX_CHECK (reqlock); 1436 X_MUTEX_CHECK (reqlock);
1395 X_MUTEX_CHECK (reqwait); 1437 X_MUTEX_CHECK (reqwait);
1396 X_MUTEX_CHECK (preadwritelock); 1438 X_MUTEX_CHECK (preadwritelock);
1397 X_MUTEX_CHECK (readdirlock); 1439 X_MUTEX_CHECK (readdirlock);
1440
1441 X_COND_CHECK (reqwait);
1398#else 1442#else
1399 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1443 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1400 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); 1444 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1401#endif 1445#endif
1402 1446
1403 if (!create_pipe (respipe)) 1447 create_respipe ();
1404 croak ("cannot set result pipe to nonblocking mode");
1405 1448
1406 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); 1449 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1407} 1450}
1408 1451
1409void 1452void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines