ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.103 by root, Sun Jul 8 09:09:34 2007 UTC vs.
Revision 1.112 by root, Tue Apr 1 19:40:47 2008 UTC

24 24
25// perl overrides all those nice win32 functions 25// perl overrides all those nice win32 functions
26# undef open 26# undef open
27# undef read 27# undef read
28# undef write 28# undef write
29# undef send
30# undef recv
29# undef stat 31# undef stat
30# undef fstat 32# undef fstat
31# define lstat stat 33# define lstat stat
32# undef truncate 34# undef truncate
33# undef ftruncate 35# undef ftruncate
95# define SvVAL64 SvIV 97# define SvVAL64 SvIV
96#else 98#else
97# define SvVAL64 SvNV 99# define SvVAL64 SvNV
98#endif 100#endif
99 101
102static HV *stash;
103
100#define dBUF \ 104#define dBUF \
101 char *aio_buf; \ 105 char *aio_buf; \
102 X_LOCK (wrklock); \ 106 X_LOCK (wrklock); \
103 self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \ 107 self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \
104 X_UNLOCK (wrklock); \ 108 X_UNLOCK (wrklock); \
115 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 119 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
116 REQ_TRUNCATE, REQ_FTRUNCATE, 120 REQ_TRUNCATE, REQ_FTRUNCATE,
117 REQ_UTIME, REQ_FUTIME, 121 REQ_UTIME, REQ_FUTIME,
118 REQ_CHMOD, REQ_FCHMOD, 122 REQ_CHMOD, REQ_FCHMOD,
119 REQ_CHOWN, REQ_FCHOWN, 123 REQ_CHOWN, REQ_FCHOWN,
120 REQ_FSYNC, REQ_FDATASYNC, 124 REQ_SYNC, REQ_FSYNC, REQ_FDATASYNC,
121 REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME, 125 REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME,
122 REQ_MKNOD, REQ_READDIR, 126 REQ_MKNOD, REQ_READDIR,
123 REQ_LINK, REQ_SYMLINK, REQ_READLINK, 127 REQ_LINK, REQ_SYMLINK, REQ_READLINK,
124 REQ_GROUP, REQ_NOP, 128 REQ_GROUP, REQ_NOP,
125 REQ_BUSY, 129 REQ_BUSY,
185 189
186static thread_t main_tid; 190static thread_t main_tid;
187static int main_sig; 191static int main_sig;
188static int block_sig_level; 192static int block_sig_level;
189 193
190void block_sig () 194void block_sig (void)
191{ 195{
192 sigset_t ss; 196 sigset_t ss;
193 197
194 if (block_sig_level++) 198 if (block_sig_level++)
195 return; 199 return;
200 sigemptyset (&ss); 204 sigemptyset (&ss);
201 sigaddset (&ss, main_sig); 205 sigaddset (&ss, main_sig);
202 pthread_sigmask (SIG_BLOCK, &ss, 0); 206 pthread_sigmask (SIG_BLOCK, &ss, 0);
203} 207}
204 208
205void unblock_sig () 209void unblock_sig (void)
206{ 210{
207 sigset_t ss; 211 sigset_t ss;
208 212
209 if (--block_sig_level) 213 if (--block_sig_level)
210 return; 214 return;
262} 266}
263 267
264static volatile unsigned int nreqs, nready, npending; 268static volatile unsigned int nreqs, nready, npending;
265static volatile unsigned int max_idle = 4; 269static volatile unsigned int max_idle = 4;
266static volatile unsigned int max_outstanding = 0xffffffff; 270static volatile unsigned int max_outstanding = 0xffffffff;
267static int respipe [2]; 271static int respipe_osf [2], respipe [2] = { -1, -1 };
268 272
269static mutex_t reslock = X_MUTEX_INIT; 273static mutex_t reslock = X_MUTEX_INIT;
270static mutex_t reqlock = X_MUTEX_INIT; 274static mutex_t reqlock = X_MUTEX_INIT;
271static cond_t reqwait = X_COND_INIT; 275static cond_t reqwait = X_COND_INIT;
272 276
273#if WORDACCESS_UNSAFE 277#if WORDACCESS_UNSAFE
274 278
275static unsigned int get_nready () 279static unsigned int get_nready (void)
276{ 280{
277 unsigned int retval; 281 unsigned int retval;
278 282
279 X_LOCK (reqlock); 283 X_LOCK (reqlock);
280 retval = nready; 284 retval = nready;
281 X_UNLOCK (reqlock); 285 X_UNLOCK (reqlock);
282 286
283 return retval; 287 return retval;
284} 288}
285 289
286static unsigned int get_npending () 290static unsigned int get_npending (void)
287{ 291{
288 unsigned int retval; 292 unsigned int retval;
289 293
290 X_LOCK (reslock); 294 X_LOCK (reslock);
291 retval = npending; 295 retval = npending;
292 X_UNLOCK (reslock); 296 X_UNLOCK (reslock);
293 297
294 return retval; 298 return retval;
295} 299}
296 300
297static unsigned int get_nthreads () 301static unsigned int get_nthreads (void)
298{ 302{
299 unsigned int retval; 303 unsigned int retval;
300 304
301 X_LOCK (wrklock); 305 X_LOCK (wrklock);
302 retval = started; 306 retval = started;
365 } 369 }
366 370
367 abort (); 371 abort ();
368} 372}
369 373
370static int poll_cb (); 374static int poll_cb (void);
371static int req_invoke (aio_req req); 375static int req_invoke (aio_req req);
372static void req_destroy (aio_req req); 376static void req_destroy (aio_req req);
373static void req_cancel (aio_req req); 377static void req_cancel (aio_req req);
374 378
375/* must be called at most once */ 379/* must be called at most once */
499 break; 503 break;
500 504
501 case REQ_OPEN: 505 case REQ_OPEN:
502 { 506 {
503 /* convert fd to fh */ 507 /* convert fd to fh */
504 SV *fh; 508 SV *fh = &PL_sv_undef;
505 509
506 PUSHs (sv_2mortal (newSViv (req->result))); 510 if (req->result >= 0)
507 PUTBACK; 511 {
508 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 512 GV *gv = (GV *)sv_newmortal ();
509 SPAGAIN; 513 int flags = req->int1 & (O_RDONLY | O_WRONLY | O_RDWR);
514 char sym [64];
515 int symlen;
516
517 symlen = snprintf (sym, sizeof (sym), "fd#%d", req->result);
518 gv_init (gv, stash, sym, symlen, 0);
510 519
511 fh = POPs; 520 symlen = snprintf (
512 PUSHMARK (SP); 521 sym,
522 sizeof (sym),
523 "%s&=%d",
524 flags == O_RDONLY ? "<" : flags == O_WRONLY ? ">" : "+<",
525 req->result
526 );
527
528 if (do_open (gv, sym, symlen, 0, 0, 0, 0))
529 fh = (SV *)gv;
530 }
531
513 XPUSHs (fh); 532 PUSHs (fh);
514 } 533 }
515 break; 534 break;
516 535
517 case REQ_GROUP: 536 case REQ_GROUP:
518 req->int1 = 2; /* mark group as finished */ 537 req->int1 = 2; /* mark group as finished */
562 } 581 }
563 582
564 errno = req->errorno; 583 errno = req->errorno;
565 584
566 PUTBACK; 585 PUTBACK;
567 call_sv (req->callback, G_VOID | G_EVAL); 586 call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD);
568 SPAGAIN; 587 SPAGAIN;
569 588
570 FREETMPS; 589 FREETMPS;
571 LEAVE; 590 LEAVE;
591
592 PUTBACK;
572 } 593 }
573 594
574 if (req->grp) 595 if (req->grp)
575 { 596 {
576 aio_req grp = req->grp; 597 aio_req grp = req->grp;
623static void req_cancel (aio_req req) 644static void req_cancel (aio_req req)
624{ 645{
625 req->flags |= FLAG_CANCELLED; 646 req->flags |= FLAG_CANCELLED;
626 647
627 req_cancel_subs (req); 648 req_cancel_subs (req);
649}
650
651#ifdef USE_SOCKETS_AS_HANDLES
652# define TO_SOCKET(x) (win32_get_osfhandle (x))
653#else
654# define TO_SOCKET(x) (x)
655#endif
656
657static void
658create_respipe (void)
659{
660 int old_readfd = respipe [0];
661
662 if (respipe [1] >= 0)
663 respipe_close (TO_SOCKET (respipe [1]));
664
665#ifdef _WIN32
666 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
667#else
668 if (pipe (respipe))
669#endif
670 croak ("unable to initialize result pipe");
671
672 if (old_readfd >= 0)
673 {
674 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
675 croak ("unable to initialize result pipe(2)");
676
677 respipe_close (respipe [0]);
678 respipe [0] = old_readfd;
679 }
680
681#ifdef _WIN32
682 int arg = 1;
683 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
684 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
685#else
686 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
687 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
688#endif
689 croak ("unable to initialize result pipe(3)");
690
691 respipe_osf [0] = TO_SOCKET (respipe [0]);
692 respipe_osf [1] = TO_SOCKET (respipe [1]);
628} 693}
629 694
630X_THREAD_PROC (aio_proc); 695X_THREAD_PROC (aio_proc);
631 696
632static void start_thread (void) 697static void start_thread (void)
650 free (wrk); 715 free (wrk);
651 716
652 X_UNLOCK (wrklock); 717 X_UNLOCK (wrklock);
653} 718}
654 719
655static void maybe_start_thread () 720static void maybe_start_thread (void)
656{ 721{
657 if (get_nthreads () >= wanted) 722 if (get_nthreads () >= wanted)
658 return; 723 return;
659 724
660 /* todo: maybe use idle here, but might be less exact */ 725 /* todo: maybe use idle here, but might be less exact */
720 785
721 while (started > wanted) 786 while (started > wanted)
722 end_thread (); 787 end_thread ();
723} 788}
724 789
725static void poll_wait () 790static void poll_wait (void)
726{ 791{
727 fd_set rfd; 792 fd_set rfd;
728 793
729 while (nreqs) 794 while (nreqs)
730 { 795 {
736 if (size) 801 if (size)
737 return; 802 return;
738 803
739 maybe_start_thread (); 804 maybe_start_thread ();
740 805
741 FD_ZERO(&rfd); 806 FD_ZERO (&rfd);
742 FD_SET(respipe [0], &rfd); 807 FD_SET (respipe [0], &rfd);
743 808
744 select (respipe [0] + 1, &rfd, 0, 0, 0); 809 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
745 } 810 }
746} 811}
747 812
748static int poll_cb () 813static int poll_cb (void)
749{ 814{
750 dSP; 815 dSP;
751 int count = 0; 816 int count = 0;
752 int maxreqs = max_poll_reqs; 817 int maxreqs = max_poll_reqs;
753 int do_croak = 0; 818 int do_croak = 0;
774 839
775 if (!res_queue.size) 840 if (!res_queue.size)
776 { 841 {
777 /* read any signals sent by the worker threads */ 842 /* read any signals sent by the worker threads */
778 char buf [4]; 843 char buf [4];
779 while (read (respipe [0], buf, 4) == 4) 844 while (respipe_read (respipe [0], buf, 4) == 4)
780 ; 845 ;
781 } 846 }
782 } 847 }
783 848
784 X_UNLOCK (reslock); 849 X_UNLOCK (reslock);
1117 1182
1118/*****************************************************************************/ 1183/*****************************************************************************/
1119 1184
1120X_THREAD_PROC (aio_proc) 1185X_THREAD_PROC (aio_proc)
1121{ 1186{
1122 {//D
1123 aio_req req; 1187 aio_req req;
1124 struct timespec ts; 1188 struct timespec ts;
1125 worker *self = (worker *)thr_arg; 1189 worker *self = (worker *)thr_arg;
1126 1190
1127 /* try to distribute timeouts somewhat randomly */ 1191 /* try to distribute timeouts somewhat randomly */
1202 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break; 1266 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break;
1203 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break; 1267 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break;
1204 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break; 1268 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break;
1205 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break; 1269 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break;
1206 1270
1271 case REQ_SYNC: req->result = 0; sync (); break;
1272 case REQ_FSYNC: req->result = fsync (req->int1); break;
1207 case REQ_FDATASYNC: req->result = fdatasync (req->int1); break; 1273 case REQ_FDATASYNC: req->result = fdatasync (req->int1); break;
1208 case REQ_FSYNC: req->result = fsync (req->int1); break; 1274
1209 case REQ_READDIR: scandir_ (req, self); break; 1275 case REQ_READDIR: scandir_ (req, self); break;
1210 1276
1211 case REQ_BUSY: 1277 case REQ_BUSY:
1212#ifdef _WIN32 1278#ifdef _WIN32
1213 Sleep (req->nv1 * 1000.); 1279 Sleep (req->nv1 * 1000.);
1266 ++npending; 1332 ++npending;
1267 1333
1268 if (!reqq_push (&res_queue, req)) 1334 if (!reqq_push (&res_queue, req))
1269 { 1335 {
1270 /* write a dummy byte to the pipe so fh becomes ready */ 1336 /* write a dummy byte to the pipe so fh becomes ready */
1271 write (respipe [1], &respipe, 1); 1337 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
1272 1338
1273 /* optionally signal the main thread asynchronously */ 1339 /* optionally signal the main thread asynchronously */
1274 if (main_sig) 1340 if (main_sig)
1275 pthread_kill (main_tid, main_sig); 1341 pthread_kill (main_tid, main_sig);
1276 } 1342 }
1285 X_LOCK (wrklock); 1351 X_LOCK (wrklock);
1286 worker_free (self); 1352 worker_free (self);
1287 X_UNLOCK (wrklock); 1353 X_UNLOCK (wrklock);
1288 1354
1289 return 0; 1355 return 0;
1290 }//D
1291} 1356}
1292 1357
1293/*****************************************************************************/ 1358/*****************************************************************************/
1294 1359
1295static void atfork_prepare (void) 1360static void atfork_prepare (void)
1343 idle = 0; 1408 idle = 0;
1344 nreqs = 0; 1409 nreqs = 0;
1345 nready = 0; 1410 nready = 0;
1346 npending = 0; 1411 npending = 0;
1347 1412
1348 close (respipe [0]); 1413 create_respipe ();
1349 close (respipe [1]);
1350
1351 if (!create_pipe (respipe))
1352 croak ("cannot set result pipe to nonblocking mode");
1353 1414
1354 atfork_parent (); 1415 atfork_parent ();
1355} 1416}
1356 1417
1357#define dREQ \ 1418#define dREQ \
1379 1440
1380PROTOTYPES: ENABLE 1441PROTOTYPES: ENABLE
1381 1442
1382BOOT: 1443BOOT:
1383{ 1444{
1384 HV *stash = gv_stashpv ("IO::AIO", 1); 1445 stash = gv_stashpv ("IO::AIO", 1);
1385 1446
1386 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1447 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1387 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1448 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1388 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1449 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1389 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1450 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1393 X_MUTEX_CHECK (reslock); 1454 X_MUTEX_CHECK (reslock);
1394 X_MUTEX_CHECK (reqlock); 1455 X_MUTEX_CHECK (reqlock);
1395 X_MUTEX_CHECK (reqwait); 1456 X_MUTEX_CHECK (reqwait);
1396 X_MUTEX_CHECK (preadwritelock); 1457 X_MUTEX_CHECK (preadwritelock);
1397 X_MUTEX_CHECK (readdirlock); 1458 X_MUTEX_CHECK (readdirlock);
1459
1460 X_COND_CHECK (reqwait);
1398#else 1461#else
1399 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1462 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1400 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); 1463 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1401#endif 1464#endif
1402 1465
1403 if (!create_pipe (respipe)) 1466 create_respipe ();
1404 croak ("cannot set result pipe to nonblocking mode");
1405 1467
1406 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); 1468 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1407} 1469}
1408 1470
1409void 1471void
1456 1518
1457 REQ_SEND; 1519 REQ_SEND;
1458} 1520}
1459 1521
1460void 1522void
1461aio_close (SV *fh, SV *callback=&PL_sv_undef) 1523aio_fsync (SV *fh, SV *callback=&PL_sv_undef)
1462 PROTOTYPE: $;$ 1524 PROTOTYPE: $;$
1463 ALIAS: 1525 ALIAS:
1464 aio_close = REQ_CLOSE
1465 aio_fsync = REQ_FSYNC 1526 aio_fsync = REQ_FSYNC
1466 aio_fdatasync = REQ_FDATASYNC 1527 aio_fdatasync = REQ_FDATASYNC
1467 PPCODE: 1528 PPCODE:
1468{ 1529{
1469 dREQ; 1530 dREQ;
1470 1531
1471 req->type = ix; 1532 req->type = ix;
1472 req->sv1 = newSVsv (fh); 1533 req->sv1 = newSVsv (fh);
1473 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh))); 1534 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1535
1536 REQ_SEND (req);
1537}
1538
1539int
1540_dup (int fd)
1541 PROTOTYPE: $
1542 CODE:
1543 RETVAL = dup (fd);
1544 OUTPUT:
1545 RETVAL
1546
1547void
1548_aio_close (int fd, SV *callback=&PL_sv_undef)
1549 PROTOTYPE: $;$
1550 PPCODE:
1551{
1552 dREQ;
1553
1554 req->type = REQ_CLOSE;
1555 req->int1 = fd;
1474 1556
1475 REQ_SEND (req); 1557 REQ_SEND (req);
1476} 1558}
1477 1559
1478void 1560void
1805 XPUSHs (req_sv (req, AIO_GRP_KLASS)); 1887 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1806} 1888}
1807 1889
1808void 1890void
1809aio_nop (SV *callback=&PL_sv_undef) 1891aio_nop (SV *callback=&PL_sv_undef)
1892 ALIAS:
1893 aio_nop = REQ_NOP
1894 aio_sync = REQ_SYNC
1810 PPCODE: 1895 PPCODE:
1811{ 1896{
1812 dREQ; 1897 dREQ;
1813 1898
1814 req->type = REQ_NOP; 1899 req->type = ix;
1815 1900
1816 REQ_SEND; 1901 REQ_SEND;
1817} 1902}
1818 1903
1819int 1904int
1905 1990
1906 block_sig (); 1991 block_sig ();
1907 PUSHMARK (SP); 1992 PUSHMARK (SP);
1908 PUTBACK; 1993 PUTBACK;
1909 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL); 1994 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL);
1910 SPAGAIN;
1911 unblock_sig (); 1995 unblock_sig ();
1912 1996
1913 if (SvTRUE (ERRSV)) 1997 if (SvTRUE (ERRSV))
1914 croak (0); 1998 croak (0);
1915 1999

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines