ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.91 by root, Wed Nov 8 01:57:43 2006 UTC vs.
Revision 1.97 by root, Sun Jan 7 22:59:57 2007 UTC

17#include "autoconf/config.h" 17#include "autoconf/config.h"
18 18
19#include <pthread.h> 19#include <pthread.h>
20 20
21#include <stddef.h> 21#include <stddef.h>
22#include <stdlib.h>
22#include <errno.h> 23#include <errno.h>
23#include <sys/time.h> 24#include <sys/time.h>
24#include <sys/select.h> 25#include <sys/select.h>
25#include <sys/types.h> 26#include <sys/types.h>
26#include <sys/stat.h> 27#include <sys/stat.h>
96 REQ_OPEN, REQ_CLOSE, 97 REQ_OPEN, REQ_CLOSE,
97 REQ_READ, REQ_WRITE, REQ_READAHEAD, 98 REQ_READ, REQ_WRITE, REQ_READAHEAD,
98 REQ_SENDFILE, 99 REQ_SENDFILE,
99 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 100 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
100 REQ_FSYNC, REQ_FDATASYNC, 101 REQ_FSYNC, REQ_FDATASYNC,
101 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 102 REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME,
102 REQ_MKNOD, REQ_READDIR, 103 REQ_MKNOD, REQ_READDIR,
103 REQ_LINK, REQ_SYMLINK, REQ_READLINK, 104 REQ_LINK, REQ_SYMLINK, REQ_READLINK,
104 REQ_GROUP, REQ_NOP, 105 REQ_GROUP, REQ_NOP,
105 REQ_BUSY, 106 REQ_BUSY,
106}; 107};
158/* calculcate time difference in ~1/AIO_TICKS of a second */ 159/* calculcate time difference in ~1/AIO_TICKS of a second */
159static int tvdiff (struct timeval *tv1, struct timeval *tv2) 160static int tvdiff (struct timeval *tv1, struct timeval *tv2)
160{ 161{
161 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS 162 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
162 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 163 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
164}
165
166static pthread_t main_tid;
167static int main_sig;
168static int block_sig_level;
169
170void block_sig ()
171{
172 sigset_t ss;
173
174 if (block_sig_level++)
175 return;
176
177 if (!main_sig)
178 return;
179
180 sigemptyset (&ss);
181 sigaddset (&ss, main_sig);
182 pthread_sigmask (SIG_BLOCK, &ss, 0);
183}
184
185void unblock_sig ()
186{
187 sigset_t ss;
188
189 if (--block_sig_level)
190 return;
191
192 if (!main_sig)
193 return;
194
195 sigemptyset (&ss);
196 sigaddset (&ss, main_sig);
197 pthread_sigmask (SIG_UNBLOCK, &ss, 0);
163} 198}
164 199
165static int next_pri = DEFAULT_PRI + PRI_BIAS; 200static int next_pri = DEFAULT_PRI + PRI_BIAS;
166 201
167static unsigned int started, idle, wanted; 202static unsigned int started, idle, wanted;
320 355
321 abort (); 356 abort ();
322} 357}
323 358
324static int poll_cb (); 359static int poll_cb ();
325static void req_invoke (aio_req req); 360static int req_invoke (aio_req req);
326static void req_free (aio_req req); 361static void req_free (aio_req req);
327static void req_cancel (aio_req req); 362static void req_cancel (aio_req req);
328 363
329/* must be called at most once */ 364/* must be called at most once */
330static SV *req_sv (aio_req req, const char *klass) 365static SV *req_sv (aio_req req, const char *klass)
350 return mg ? (aio_req)mg->mg_ptr : 0; 385 return mg ? (aio_req)mg->mg_ptr : 0;
351} 386}
352 387
353static void aio_grp_feed (aio_req grp) 388static void aio_grp_feed (aio_req grp)
354{ 389{
390 block_sig ();
391
355 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED)) 392 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED))
356 { 393 {
357 int old_len = grp->size; 394 int old_len = grp->size;
358 395
359 if (grp->sv2 && SvOK (grp->sv2)) 396 if (grp->sv2 && SvOK (grp->sv2))
377 SvREFCNT_dec (grp->sv2); 414 SvREFCNT_dec (grp->sv2);
378 grp->sv2 = 0; 415 grp->sv2 = 0;
379 break; 416 break;
380 } 417 }
381 } 418 }
419
420 unblock_sig ();
382} 421}
383 422
384static void aio_grp_dec (aio_req grp) 423static void aio_grp_dec (aio_req grp)
385{ 424{
386 --grp->size; 425 --grp->size;
389 aio_grp_feed (grp); 428 aio_grp_feed (grp);
390 429
391 /* finish, if done */ 430 /* finish, if done */
392 if (!grp->size && grp->int1) 431 if (!grp->size && grp->int1)
393 { 432 {
433 block_sig ();
434
394 req_invoke (grp); 435 if (!req_invoke (grp))
436 {
437 req_free (grp);
438 unblock_sig ();
439 croak (0);
440 }
441
395 req_free (grp); 442 req_free (grp);
443 unblock_sig ();
396 } 444 }
397} 445}
398 446
399static void req_invoke (aio_req req) 447static int req_invoke (aio_req req)
400{ 448{
401 dSP; 449 dSP;
402 450
403 if (req->flags & FLAG_SV1_RO_OFF) 451 if (req->flags & FLAG_SV1_RO_OFF)
404 SvREADONLY_off (req->sv1); 452 SvREADONLY_off (req->sv1);
447 PUSHs (sv_2mortal (newSViv (req->result))); 495 PUSHs (sv_2mortal (newSViv (req->result)));
448 PUTBACK; 496 PUTBACK;
449 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 497 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
450 SPAGAIN; 498 SPAGAIN;
451 499
452 fh = SvREFCNT_inc (POPs); 500 fh = POPs;
453
454 PUSHMARK (SP); 501 PUSHMARK (SP);
455 XPUSHs (sv_2mortal (fh)); 502 XPUSHs (fh);
456 } 503 }
457 break; 504 break;
458 505
459 case REQ_GROUP: 506 case REQ_GROUP:
460 req->int1 = 2; /* mark group as finished */ 507 req->int1 = 2; /* mark group as finished */
525 grp->grp_first = req->grp_next; 572 grp->grp_first = req->grp_next;
526 573
527 aio_grp_dec (grp); 574 aio_grp_dec (grp);
528 } 575 }
529 576
530 if (SvTRUE (ERRSV)) 577 return !SvTRUE (ERRSV);
531 {
532 req_free (req);
533 croak (0);
534 }
535} 578}
536 579
537static void req_free (aio_req req) 580static void req_free (aio_req req)
538{ 581{
539 if (req->self) 582 if (req->self)
594#endif 637#endif
595 638
596 sigfillset (&fullsigset); 639 sigfillset (&fullsigset);
597 640
598 LOCK (wrklock); 641 LOCK (wrklock);
599 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); 642 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
600 643
601 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) 644 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
602 { 645 {
603 wrk->prev = &wrk_first; 646 wrk->prev = &wrk_first;
604 wrk->next = wrk_first.next; 647 wrk->next = wrk_first.next;
607 ++started; 650 ++started;
608 } 651 }
609 else 652 else
610 free (wrk); 653 free (wrk);
611 654
612 sigprocmask (SIG_SETMASK, &oldsigset, 0); 655 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
613 UNLOCK (wrklock); 656 UNLOCK (wrklock);
614} 657}
615 658
616static void maybe_start_thread () 659static void maybe_start_thread ()
617{ 660{
625 start_thread (); 668 start_thread ();
626} 669}
627 670
628static void req_send (aio_req req) 671static void req_send (aio_req req)
629{ 672{
673 block_sig ();
674
630 ++nreqs; 675 ++nreqs;
631 676
632 LOCK (reqlock); 677 LOCK (reqlock);
633 ++nready; 678 ++nready;
634 reqq_push (&req_queue, req); 679 reqq_push (&req_queue, req);
635 pthread_cond_signal (&reqwait); 680 pthread_cond_signal (&reqwait);
636 UNLOCK (reqlock); 681 UNLOCK (reqlock);
682
683 unblock_sig ();
637 684
638 maybe_start_thread (); 685 maybe_start_thread ();
639} 686}
640 687
641static void end_thread (void) 688static void end_thread (void)
712 aio_req req; 759 aio_req req;
713 760
714 if (max_poll_time) 761 if (max_poll_time)
715 gettimeofday (&tv_start, 0); 762 gettimeofday (&tv_start, 0);
716 763
764 block_sig ();
765
717 for (;;) 766 for (;;)
718 { 767 {
719 for (;;) 768 for (;;)
720 { 769 {
721 maybe_start_thread (); 770 maybe_start_thread ();
728 --npending; 777 --npending;
729 778
730 if (!res_queue.size) 779 if (!res_queue.size)
731 { 780 {
732 /* read any signals sent by the worker threads */ 781 /* read any signals sent by the worker threads */
733 char buf [32]; 782 char buf [4];
734 while (read (respipe [0], buf, 32) == 32) 783 while (read (respipe [0], buf, 4) == 4)
735 ; 784 ;
736 } 785 }
737 } 786 }
738 787
739 UNLOCK (reslock); 788 UNLOCK (reslock);
748 req->int1 = 1; /* mark request as delayed */ 797 req->int1 = 1; /* mark request as delayed */
749 continue; 798 continue;
750 } 799 }
751 else 800 else
752 { 801 {
753 req_invoke (req); 802 if (!req_invoke (req))
803 {
804 req_free (req);
805 unblock_sig ();
806 croak (0);
807 }
754 808
755 count++; 809 count++;
756 } 810 }
757 811
758 req_free (req); 812 req_free (req);
775 poll_wait (); 829 poll_wait ();
776 830
777 ++maxreqs; 831 ++maxreqs;
778 } 832 }
779 833
834 unblock_sig ();
780 return count; 835 return count;
781} 836}
782 837
783static void create_pipe () 838static void create_pipe ()
784{ 839{
1116 1171
1117 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; 1172 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break;
1118 case REQ_CLOSE: req->result = close (req->int1); break; 1173 case REQ_CLOSE: req->result = close (req->int1); break;
1119 case REQ_UNLINK: req->result = unlink (req->ptr1); break; 1174 case REQ_UNLINK: req->result = unlink (req->ptr1); break;
1120 case REQ_RMDIR: req->result = rmdir (req->ptr1); break; 1175 case REQ_RMDIR: req->result = rmdir (req->ptr1); break;
1176 case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break;
1121 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break; 1177 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break;
1122 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break; 1178 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break;
1123 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break; 1179 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break;
1124 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break; 1180 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break;
1125 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break; 1181 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break;
1155 LOCK (reslock); 1211 LOCK (reslock);
1156 1212
1157 ++npending; 1213 ++npending;
1158 1214
1159 if (!reqq_push (&res_queue, req)) 1215 if (!reqq_push (&res_queue, req))
1216 {
1160 /* write a dummy byte to the pipe so fh becomes ready */ 1217 /* write a dummy byte to the pipe so fh becomes ready */
1161 write (respipe [1], &respipe, 1); 1218 write (respipe [1], &respipe, 1);
1219
1220 /* optionally signal the main thread asynchronously */
1221 if (main_sig)
1222 pthread_kill (main_tid, main_sig);
1223 }
1162 1224
1163 self->req = 0; 1225 self->req = 0;
1164 worker_clear (self); 1226 worker_clear (self);
1165 1227
1166 UNLOCK (reslock); 1228 UNLOCK (reslock);
1269 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1331 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1270 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1332 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1271 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1333 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1272 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1334 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1273 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1335 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1336 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1274 1337
1275 create_pipe (); 1338 create_pipe ();
1276 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1339 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1277} 1340}
1278 1341
1384 length = svlen - dataoffset; 1447 length = svlen - dataoffset;
1385 } 1448 }
1386 else 1449 else
1387 { 1450 {
1388 /* read: grow scalar as necessary */ 1451 /* read: grow scalar as necessary */
1389 svptr = SvGROW (data, length + dataoffset); 1452 svptr = SvGROW (data, length + dataoffset + 1);
1390 } 1453 }
1391 1454
1392 if (length < 0) 1455 if (length < 0)
1393 croak ("length must not be negative"); 1456 croak ("length must not be negative");
1394 1457
1534 1597
1535 REQ_SEND; 1598 REQ_SEND;
1536} 1599}
1537 1600
1538void 1601void
1602aio_mkdir (pathname,mode,callback=&PL_sv_undef)
1603 SV8 * pathname
1604 UV mode
1605 SV * callback
1606 PPCODE:
1607{
1608 dREQ;
1609
1610 req->type = REQ_MKDIR;
1611 req->sv1 = newSVsv (pathname);
1612 req->ptr1 = SvPVbyte_nolen (req->sv1);
1613 req->mode = mode;
1614
1615 REQ_SEND;
1616}
1617
1618void
1539aio_link (oldpath,newpath,callback=&PL_sv_undef) 1619aio_link (oldpath,newpath,callback=&PL_sv_undef)
1540 SV8 * oldpath 1620 SV8 * oldpath
1541 SV8 * newpath 1621 SV8 * newpath
1542 SV * callback 1622 SV * callback
1543 ALIAS: 1623 ALIAS:
1558} 1638}
1559 1639
1560void 1640void
1561aio_mknod (pathname,mode,dev,callback=&PL_sv_undef) 1641aio_mknod (pathname,mode,dev,callback=&PL_sv_undef)
1562 SV8 * pathname 1642 SV8 * pathname
1563 SV * callback
1564 UV mode 1643 UV mode
1565 UV dev 1644 UV dev
1645 SV * callback
1566 PPCODE: 1646 PPCODE:
1567{ 1647{
1568 dREQ; 1648 dREQ;
1569 1649
1570 req->type = REQ_MKNOD; 1650 req->type = REQ_MKNOD;
1651 1731
1652int 1732int
1653poll() 1733poll()
1654 PROTOTYPE: 1734 PROTOTYPE:
1655 CODE: 1735 CODE:
1656 if (nreqs)
1657 {
1658 poll_wait (); 1736 poll_wait ();
1659 RETVAL = poll_cb (); 1737 RETVAL = poll_cb ();
1660 }
1661 else
1662 RETVAL = 0;
1663 OUTPUT: 1738 OUTPUT:
1664 RETVAL 1739 RETVAL
1665 1740
1666int 1741int
1667poll_fileno() 1742poll_fileno()
1681 1756
1682void 1757void
1683poll_wait() 1758poll_wait()
1684 PROTOTYPE: 1759 PROTOTYPE:
1685 CODE: 1760 CODE:
1686 if (nreqs)
1687 poll_wait (); 1761 poll_wait ();
1762
1763void
1764setsig (int signum = SIGIO)
1765 PROTOTYPE: ;$
1766 CODE:
1767{
1768 if (block_sig_level)
1769 croak ("cannot call IO::AIO::setsig from within aio_block/callback");
1770
1771 LOCK (reslock);
1772 main_tid = pthread_self ();
1773 main_sig = signum;
1774 UNLOCK (reslock);
1775
1776 if (main_sig && npending)
1777 pthread_kill (main_tid, main_sig);
1778}
1779
1780void
1781aio_block (SV *cb)
1782 PROTOTYPE: &
1783 PPCODE:
1784{
1785 int count;
1786
1787 block_sig ();
1788 PUSHMARK (SP);
1789 PUTBACK;
1790 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL);
1791 SPAGAIN;
1792 unblock_sig ();
1793
1794 if (SvTRUE (ERRSV))
1795 croak (0);
1796
1797 XSRETURN (count);
1798}
1688 1799
1689int 1800int
1690nreqs() 1801nreqs()
1691 PROTOTYPE: 1802 PROTOTYPE:
1692 CODE: 1803 CODE:
1742 PPCODE: 1853 PPCODE:
1743{ 1854{
1744 int i; 1855 int i;
1745 aio_req req; 1856 aio_req req;
1746 1857
1858 if (main_sig && !block_sig_level)
1859 croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use");
1860
1747 if (grp->int1 == 2) 1861 if (grp->int1 == 2)
1748 croak ("cannot add requests to IO::AIO::GRP after the group finished"); 1862 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1749 1863
1750 for (i = 1; i < items; ++i ) 1864 for (i = 1; i < items; ++i )
1751 { 1865 {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines