ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.104 by root, Sun Jul 8 11:05:36 2007 UTC vs.
Revision 1.108 by root, Thu Oct 4 12:50:35 2007 UTC

97# define SvVAL64 SvIV 97# define SvVAL64 SvIV
98#else 98#else
99# define SvVAL64 SvNV 99# define SvVAL64 SvNV
100#endif 100#endif
101 101
102static HV *stash;
103
102#define dBUF \ 104#define dBUF \
103 char *aio_buf; \ 105 char *aio_buf; \
104 X_LOCK (wrklock); \ 106 X_LOCK (wrklock); \
105 self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \ 107 self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \
106 X_UNLOCK (wrklock); \ 108 X_UNLOCK (wrklock); \
264} 266}
265 267
266static volatile unsigned int nreqs, nready, npending; 268static volatile unsigned int nreqs, nready, npending;
267static volatile unsigned int max_idle = 4; 269static volatile unsigned int max_idle = 4;
268static volatile unsigned int max_outstanding = 0xffffffff; 270static volatile unsigned int max_outstanding = 0xffffffff;
269static int respipe [2], respipe_osf [2]; 271static int respipe_osf [2], respipe [2] = { -1, -1 };
270 272
271static mutex_t reslock = X_MUTEX_INIT; 273static mutex_t reslock = X_MUTEX_INIT;
272static mutex_t reqlock = X_MUTEX_INIT; 274static mutex_t reqlock = X_MUTEX_INIT;
273static cond_t reqwait = X_COND_INIT; 275static cond_t reqwait = X_COND_INIT;
274 276
501 break; 503 break;
502 504
503 case REQ_OPEN: 505 case REQ_OPEN:
504 { 506 {
505 /* convert fd to fh */ 507 /* convert fd to fh */
506 SV *fh; 508 SV *fh = &PL_sv_undef;
507 509
508 PUSHs (sv_2mortal (newSViv (req->result))); 510 if (req->result >= 0)
509 PUTBACK; 511 {
510 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 512 GV *gv = (GV *)sv_newmortal ();
511 SPAGAIN; 513 int flags = req->int1 & (O_RDONLY | O_WRONLY | O_RDWR);
514 char sym [64];
515 int symlen;
516
517 symlen = snprintf (sym, sizeof (sym), "fd#%d", req->result);
518 gv_init (gv, stash, sym, symlen, 0);
512 519
513 fh = POPs; 520 symlen = snprintf (
514 PUSHMARK (SP); 521 sym,
522 sizeof (sym),
523 "%s&=%d",
524 flags == O_RDONLY ? "<" : flags == O_WRONLY ? ">" : "+<",
525 req->result
526 );
527
528 if (do_open (gv, sym, symlen, 0, 0, 0, 0))
529 fh = (SV *)gv;
530 }
531
515 XPUSHs (fh); 532 XPUSHs (fh);
516 } 533 }
517 break; 534 break;
518 535
519 case REQ_GROUP: 536 case REQ_GROUP:
634#else 651#else
635# define TO_SOCKET(x) (x) 652# define TO_SOCKET(x) (x)
636#endif 653#endif
637 654
638static void 655static void
639create_pipe (int fd[2]) 656create_respipe ()
640{ 657{
658 int old_readfd = respipe [0];
659
660 if (respipe [1] >= 0)
661 respipe_close (TO_SOCKET (respipe [1]));
662
663#ifdef _WIN32
664 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
665#else
666 if (pipe (respipe))
667#endif
668 croak ("unable to initialize result pipe");
669
670 if (old_readfd >= 0)
671 {
672 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
673 croak ("unable to initialize result pipe(2)");
674
675 respipe_close (respipe [0]);
676 respipe [0] = old_readfd;
677 }
678
641#ifdef _WIN32 679#ifdef _WIN32
642 int arg = 1; 680 int arg = 1;
643 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd)
644 || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) 681 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
645 || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) 682 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
646#else 683#else
647 if (pipe (fd)
648 || fcntl (fd [0], F_SETFL, O_NONBLOCK) 684 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
649 || fcntl (fd [1], F_SETFL, O_NONBLOCK)) 685 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
650#endif 686#endif
651 croak ("unable to initialize result pipe"); 687 croak ("unable to initialize result pipe(3)");
652 688
653 respipe_osf [0] = TO_SOCKET (respipe [0]); 689 respipe_osf [0] = TO_SOCKET (respipe [0]);
654 respipe_osf [1] = TO_SOCKET (respipe [1]); 690 respipe_osf [1] = TO_SOCKET (respipe [1]);
655} 691}
656 692
801 837
802 if (!res_queue.size) 838 if (!res_queue.size)
803 { 839 {
804 /* read any signals sent by the worker threads */ 840 /* read any signals sent by the worker threads */
805 char buf [4]; 841 char buf [4];
806 while (PerlSock_recv (respipe [0], buf, 4, 0) == 4) 842 while (respipe_read (respipe [0], buf, 4) == 4)
807 ; 843 ;
808 } 844 }
809 } 845 }
810 846
811 X_UNLOCK (reslock); 847 X_UNLOCK (reslock);
1219 case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break; 1255 case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break;
1220 case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break; 1256 case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
1221 case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break; 1257 case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
1222 1258
1223 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; 1259 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break;
1224 case REQ_CLOSE: req->result = close (req->int1); break; 1260 case REQ_CLOSE: req->result = PerlIO_close ((PerlIO *)req->ptr1); break;
1225 case REQ_UNLINK: req->result = unlink (req->ptr1); break; 1261 case REQ_UNLINK: req->result = unlink (req->ptr1); break;
1226 case REQ_RMDIR: req->result = rmdir (req->ptr1); break; 1262 case REQ_RMDIR: req->result = rmdir (req->ptr1); break;
1227 case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break; 1263 case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break;
1228 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break; 1264 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break;
1229 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break; 1265 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break;
1293 ++npending; 1329 ++npending;
1294 1330
1295 if (!reqq_push (&res_queue, req)) 1331 if (!reqq_push (&res_queue, req))
1296 { 1332 {
1297 /* write a dummy byte to the pipe so fh becomes ready */ 1333 /* write a dummy byte to the pipe so fh becomes ready */
1298 send (respipe_osf [1], (const void *)&respipe_osf, 1, 0); 1334 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
1299 1335
1300 /* optionally signal the main thread asynchronously */ 1336 /* optionally signal the main thread asynchronously */
1301 if (main_sig) 1337 if (main_sig)
1302 pthread_kill (main_tid, main_sig); 1338 pthread_kill (main_tid, main_sig);
1303 } 1339 }
1370 idle = 0; 1406 idle = 0;
1371 nreqs = 0; 1407 nreqs = 0;
1372 nready = 0; 1408 nready = 0;
1373 npending = 0; 1409 npending = 0;
1374 1410
1375 PerlSock_closesocket (respipe [0]);
1376 PerlSock_closesocket (respipe [1]);
1377
1378 create_pipe (respipe); 1411 create_respipe ();
1379 1412
1380 atfork_parent (); 1413 atfork_parent ();
1381} 1414}
1382 1415
1383#define dREQ \ 1416#define dREQ \
1405 1438
1406PROTOTYPES: ENABLE 1439PROTOTYPES: ENABLE
1407 1440
1408BOOT: 1441BOOT:
1409{ 1442{
1410 HV *stash = gv_stashpv ("IO::AIO", 1); 1443 stash = gv_stashpv ("IO::AIO", 1);
1411 1444
1412 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1445 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1413 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1446 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1414 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1447 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1415 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1448 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1426#else 1459#else
1427 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1460 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1428 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); 1461 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1429#endif 1462#endif
1430 1463
1431 create_pipe (respipe); 1464 create_respipe ();
1432 1465
1433 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); 1466 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1434} 1467}
1435 1468
1436void 1469void
1483 1516
1484 REQ_SEND; 1517 REQ_SEND;
1485} 1518}
1486 1519
1487void 1520void
1488aio_close (SV *fh, SV *callback=&PL_sv_undef) 1521aio_fsync (SV *fh, SV *callback=&PL_sv_undef)
1489 PROTOTYPE: $;$ 1522 PROTOTYPE: $;$
1490 ALIAS: 1523 ALIAS:
1491 aio_close = REQ_CLOSE
1492 aio_fsync = REQ_FSYNC 1524 aio_fsync = REQ_FSYNC
1493 aio_fdatasync = REQ_FDATASYNC 1525 aio_fdatasync = REQ_FDATASYNC
1494 PPCODE: 1526 PPCODE:
1495{ 1527{
1496 dREQ; 1528 dREQ;
1498 req->type = ix; 1530 req->type = ix;
1499 req->sv1 = newSVsv (fh); 1531 req->sv1 = newSVsv (fh);
1500 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh))); 1532 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1501 1533
1502 REQ_SEND (req); 1534 REQ_SEND (req);
1535}
1536
1537void
1538aio_close (SV *fh, SV *callback=&PL_sv_undef)
1539 PROTOTYPE: $;$
1540 PPCODE:
1541{
1542 PerlIO *io = IoIFP (sv_2io (fh));
1543 int fd = PerlIO_fileno (io);
1544
1545 if (fd < 0)
1546 croak ("aio_close called with fd-less filehandle");
1547
1548 PerlIO_binmode (aTHX_ io, 0, 0, 0);
1549
1550 {
1551 dREQ;
1552
1553 req->type = REQ_CLOSE;
1554 req->sv1 = newSVsv (fh);
1555 req->ptr1 = (void *)io;
1556
1557 REQ_SEND (req);
1558 }
1503} 1559}
1504 1560
1505void 1561void
1506aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef) 1562aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef)
1507 ALIAS: 1563 ALIAS:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines