ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.93 by root, Wed Nov 8 02:01:02 2006 UTC vs.
Revision 1.97 by root, Sun Jan 7 22:59:57 2007 UTC

17#include "autoconf/config.h" 17#include "autoconf/config.h"
18 18
19#include <pthread.h> 19#include <pthread.h>
20 20
21#include <stddef.h> 21#include <stddef.h>
22#include <stdlib.h>
22#include <errno.h> 23#include <errno.h>
23#include <sys/time.h> 24#include <sys/time.h>
24#include <sys/select.h> 25#include <sys/select.h>
25#include <sys/types.h> 26#include <sys/types.h>
26#include <sys/stat.h> 27#include <sys/stat.h>
96 REQ_OPEN, REQ_CLOSE, 97 REQ_OPEN, REQ_CLOSE,
97 REQ_READ, REQ_WRITE, REQ_READAHEAD, 98 REQ_READ, REQ_WRITE, REQ_READAHEAD,
98 REQ_SENDFILE, 99 REQ_SENDFILE,
99 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 100 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
100 REQ_FSYNC, REQ_FDATASYNC, 101 REQ_FSYNC, REQ_FDATASYNC,
101 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 102 REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME,
102 REQ_MKNOD, REQ_READDIR, 103 REQ_MKNOD, REQ_READDIR,
103 REQ_LINK, REQ_SYMLINK, REQ_READLINK, 104 REQ_LINK, REQ_SYMLINK, REQ_READLINK,
104 REQ_GROUP, REQ_NOP, 105 REQ_GROUP, REQ_NOP,
105 REQ_BUSY, 106 REQ_BUSY,
106}; 107};
158/* calculcate time difference in ~1/AIO_TICKS of a second */ 159/* calculcate time difference in ~1/AIO_TICKS of a second */
159static int tvdiff (struct timeval *tv1, struct timeval *tv2) 160static int tvdiff (struct timeval *tv1, struct timeval *tv2)
160{ 161{
161 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS 162 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
162 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 163 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
164}
165
166static pthread_t main_tid;
167static int main_sig;
168static int block_sig_level;
169
170void block_sig ()
171{
172 sigset_t ss;
173
174 if (block_sig_level++)
175 return;
176
177 if (!main_sig)
178 return;
179
180 sigemptyset (&ss);
181 sigaddset (&ss, main_sig);
182 pthread_sigmask (SIG_BLOCK, &ss, 0);
183}
184
185void unblock_sig ()
186{
187 sigset_t ss;
188
189 if (--block_sig_level)
190 return;
191
192 if (!main_sig)
193 return;
194
195 sigemptyset (&ss);
196 sigaddset (&ss, main_sig);
197 pthread_sigmask (SIG_UNBLOCK, &ss, 0);
163} 198}
164 199
165static int next_pri = DEFAULT_PRI + PRI_BIAS; 200static int next_pri = DEFAULT_PRI + PRI_BIAS;
166 201
167static unsigned int started, idle, wanted; 202static unsigned int started, idle, wanted;
320 355
321 abort (); 356 abort ();
322} 357}
323 358
324static int poll_cb (); 359static int poll_cb ();
325static void req_invoke (aio_req req); 360static int req_invoke (aio_req req);
326static void req_free (aio_req req); 361static void req_free (aio_req req);
327static void req_cancel (aio_req req); 362static void req_cancel (aio_req req);
328 363
329/* must be called at most once */ 364/* must be called at most once */
330static SV *req_sv (aio_req req, const char *klass) 365static SV *req_sv (aio_req req, const char *klass)
350 return mg ? (aio_req)mg->mg_ptr : 0; 385 return mg ? (aio_req)mg->mg_ptr : 0;
351} 386}
352 387
353static void aio_grp_feed (aio_req grp) 388static void aio_grp_feed (aio_req grp)
354{ 389{
390 block_sig ();
391
355 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED)) 392 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED))
356 { 393 {
357 int old_len = grp->size; 394 int old_len = grp->size;
358 395
359 if (grp->sv2 && SvOK (grp->sv2)) 396 if (grp->sv2 && SvOK (grp->sv2))
377 SvREFCNT_dec (grp->sv2); 414 SvREFCNT_dec (grp->sv2);
378 grp->sv2 = 0; 415 grp->sv2 = 0;
379 break; 416 break;
380 } 417 }
381 } 418 }
419
420 unblock_sig ();
382} 421}
383 422
384static void aio_grp_dec (aio_req grp) 423static void aio_grp_dec (aio_req grp)
385{ 424{
386 --grp->size; 425 --grp->size;
389 aio_grp_feed (grp); 428 aio_grp_feed (grp);
390 429
391 /* finish, if done */ 430 /* finish, if done */
392 if (!grp->size && grp->int1) 431 if (!grp->size && grp->int1)
393 { 432 {
433 block_sig ();
434
394 req_invoke (grp); 435 if (!req_invoke (grp))
436 {
437 req_free (grp);
438 unblock_sig ();
439 croak (0);
440 }
441
395 req_free (grp); 442 req_free (grp);
443 unblock_sig ();
396 } 444 }
397} 445}
398 446
399static void req_invoke (aio_req req) 447static int req_invoke (aio_req req)
400{ 448{
401 dSP; 449 dSP;
402 450
403 if (req->flags & FLAG_SV1_RO_OFF) 451 if (req->flags & FLAG_SV1_RO_OFF)
404 SvREADONLY_off (req->sv1); 452 SvREADONLY_off (req->sv1);
524 grp->grp_first = req->grp_next; 572 grp->grp_first = req->grp_next;
525 573
526 aio_grp_dec (grp); 574 aio_grp_dec (grp);
527 } 575 }
528 576
529 if (SvTRUE (ERRSV)) 577 return !SvTRUE (ERRSV);
530 {
531 req_free (req);
532 croak (0);
533 }
534} 578}
535 579
536static void req_free (aio_req req) 580static void req_free (aio_req req)
537{ 581{
538 if (req->self) 582 if (req->self)
593#endif 637#endif
594 638
595 sigfillset (&fullsigset); 639 sigfillset (&fullsigset);
596 640
597 LOCK (wrklock); 641 LOCK (wrklock);
598 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); 642 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
599 643
600 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) 644 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
601 { 645 {
602 wrk->prev = &wrk_first; 646 wrk->prev = &wrk_first;
603 wrk->next = wrk_first.next; 647 wrk->next = wrk_first.next;
606 ++started; 650 ++started;
607 } 651 }
608 else 652 else
609 free (wrk); 653 free (wrk);
610 654
611 sigprocmask (SIG_SETMASK, &oldsigset, 0); 655 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
612 UNLOCK (wrklock); 656 UNLOCK (wrklock);
613} 657}
614 658
615static void maybe_start_thread () 659static void maybe_start_thread ()
616{ 660{
624 start_thread (); 668 start_thread ();
625} 669}
626 670
627static void req_send (aio_req req) 671static void req_send (aio_req req)
628{ 672{
673 block_sig ();
674
629 ++nreqs; 675 ++nreqs;
630 676
631 LOCK (reqlock); 677 LOCK (reqlock);
632 ++nready; 678 ++nready;
633 reqq_push (&req_queue, req); 679 reqq_push (&req_queue, req);
634 pthread_cond_signal (&reqwait); 680 pthread_cond_signal (&reqwait);
635 UNLOCK (reqlock); 681 UNLOCK (reqlock);
682
683 unblock_sig ();
636 684
637 maybe_start_thread (); 685 maybe_start_thread ();
638} 686}
639 687
640static void end_thread (void) 688static void end_thread (void)
711 aio_req req; 759 aio_req req;
712 760
713 if (max_poll_time) 761 if (max_poll_time)
714 gettimeofday (&tv_start, 0); 762 gettimeofday (&tv_start, 0);
715 763
764 block_sig ();
765
716 for (;;) 766 for (;;)
717 { 767 {
718 for (;;) 768 for (;;)
719 { 769 {
720 maybe_start_thread (); 770 maybe_start_thread ();
727 --npending; 777 --npending;
728 778
729 if (!res_queue.size) 779 if (!res_queue.size)
730 { 780 {
731 /* read any signals sent by the worker threads */ 781 /* read any signals sent by the worker threads */
732 char buf [32]; 782 char buf [4];
733 while (read (respipe [0], buf, 32) == 32) 783 while (read (respipe [0], buf, 4) == 4)
734 ; 784 ;
735 } 785 }
736 } 786 }
737 787
738 UNLOCK (reslock); 788 UNLOCK (reslock);
747 req->int1 = 1; /* mark request as delayed */ 797 req->int1 = 1; /* mark request as delayed */
748 continue; 798 continue;
749 } 799 }
750 else 800 else
751 { 801 {
752 req_invoke (req); 802 if (!req_invoke (req))
803 {
804 req_free (req);
805 unblock_sig ();
806 croak (0);
807 }
753 808
754 count++; 809 count++;
755 } 810 }
756 811
757 req_free (req); 812 req_free (req);
774 poll_wait (); 829 poll_wait ();
775 830
776 ++maxreqs; 831 ++maxreqs;
777 } 832 }
778 833
834 unblock_sig ();
779 return count; 835 return count;
780} 836}
781 837
782static void create_pipe () 838static void create_pipe ()
783{ 839{
1115 1171
1116 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; 1172 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break;
1117 case REQ_CLOSE: req->result = close (req->int1); break; 1173 case REQ_CLOSE: req->result = close (req->int1); break;
1118 case REQ_UNLINK: req->result = unlink (req->ptr1); break; 1174 case REQ_UNLINK: req->result = unlink (req->ptr1); break;
1119 case REQ_RMDIR: req->result = rmdir (req->ptr1); break; 1175 case REQ_RMDIR: req->result = rmdir (req->ptr1); break;
1176 case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break;
1120 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break; 1177 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break;
1121 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break; 1178 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break;
1122 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break; 1179 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break;
1123 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break; 1180 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break;
1124 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break; 1181 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break;
1154 LOCK (reslock); 1211 LOCK (reslock);
1155 1212
1156 ++npending; 1213 ++npending;
1157 1214
1158 if (!reqq_push (&res_queue, req)) 1215 if (!reqq_push (&res_queue, req))
1216 {
1159 /* write a dummy byte to the pipe so fh becomes ready */ 1217 /* write a dummy byte to the pipe so fh becomes ready */
1160 write (respipe [1], &respipe, 1); 1218 write (respipe [1], &respipe, 1);
1219
1220 /* optionally signal the main thread asynchronously */
1221 if (main_sig)
1222 pthread_kill (main_tid, main_sig);
1223 }
1161 1224
1162 self->req = 0; 1225 self->req = 0;
1163 worker_clear (self); 1226 worker_clear (self);
1164 1227
1165 UNLOCK (reslock); 1228 UNLOCK (reslock);
1268 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1331 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1269 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1332 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1270 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1333 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1271 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1334 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1272 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1335 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1336 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1273 1337
1274 create_pipe (); 1338 create_pipe ();
1275 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1339 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1276} 1340}
1277 1341
1383 length = svlen - dataoffset; 1447 length = svlen - dataoffset;
1384 } 1448 }
1385 else 1449 else
1386 { 1450 {
1387 /* read: grow scalar as necessary */ 1451 /* read: grow scalar as necessary */
1388 svptr = SvGROW (data, length + dataoffset); 1452 svptr = SvGROW (data, length + dataoffset + 1);
1389 } 1453 }
1390 1454
1391 if (length < 0) 1455 if (length < 0)
1392 croak ("length must not be negative"); 1456 croak ("length must not be negative");
1393 1457
1533 1597
1534 REQ_SEND; 1598 REQ_SEND;
1535} 1599}
1536 1600
1537void 1601void
1602aio_mkdir (pathname,mode,callback=&PL_sv_undef)
1603 SV8 * pathname
1604 UV mode
1605 SV * callback
1606 PPCODE:
1607{
1608 dREQ;
1609
1610 req->type = REQ_MKDIR;
1611 req->sv1 = newSVsv (pathname);
1612 req->ptr1 = SvPVbyte_nolen (req->sv1);
1613 req->mode = mode;
1614
1615 REQ_SEND;
1616}
1617
1618void
1538aio_link (oldpath,newpath,callback=&PL_sv_undef) 1619aio_link (oldpath,newpath,callback=&PL_sv_undef)
1539 SV8 * oldpath 1620 SV8 * oldpath
1540 SV8 * newpath 1621 SV8 * newpath
1541 SV * callback 1622 SV * callback
1542 ALIAS: 1623 ALIAS:
1557} 1638}
1558 1639
1559void 1640void
1560aio_mknod (pathname,mode,dev,callback=&PL_sv_undef) 1641aio_mknod (pathname,mode,dev,callback=&PL_sv_undef)
1561 SV8 * pathname 1642 SV8 * pathname
1562 SV * callback
1563 UV mode 1643 UV mode
1564 UV dev 1644 UV dev
1645 SV * callback
1565 PPCODE: 1646 PPCODE:
1566{ 1647{
1567 dREQ; 1648 dREQ;
1568 1649
1569 req->type = REQ_MKNOD; 1650 req->type = REQ_MKNOD;
1677poll_wait() 1758poll_wait()
1678 PROTOTYPE: 1759 PROTOTYPE:
1679 CODE: 1760 CODE:
1680 poll_wait (); 1761 poll_wait ();
1681 1762
1763void
1764setsig (int signum = SIGIO)
1765 PROTOTYPE: ;$
1766 CODE:
1767{
1768 if (block_sig_level)
1769 croak ("cannot call IO::AIO::setsig from within aio_block/callback");
1770
1771 LOCK (reslock);
1772 main_tid = pthread_self ();
1773 main_sig = signum;
1774 UNLOCK (reslock);
1775
1776 if (main_sig && npending)
1777 pthread_kill (main_tid, main_sig);
1778}
1779
1780void
1781aio_block (SV *cb)
1782 PROTOTYPE: &
1783 PPCODE:
1784{
1785 int count;
1786
1787 block_sig ();
1788 PUSHMARK (SP);
1789 PUTBACK;
1790 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL);
1791 SPAGAIN;
1792 unblock_sig ();
1793
1794 if (SvTRUE (ERRSV))
1795 croak (0);
1796
1797 XSRETURN (count);
1798}
1799
1682int 1800int
1683nreqs() 1801nreqs()
1684 PROTOTYPE: 1802 PROTOTYPE:
1685 CODE: 1803 CODE:
1686 RETVAL = nreqs; 1804 RETVAL = nreqs;
1735 PPCODE: 1853 PPCODE:
1736{ 1854{
1737 int i; 1855 int i;
1738 aio_req req; 1856 aio_req req;
1739 1857
1858 if (main_sig && !block_sig_level)
1859 croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use");
1860
1740 if (grp->int1 == 2) 1861 if (grp->int1 == 2)
1741 croak ("cannot add requests to IO::AIO::GRP after the group finished"); 1862 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1742 1863
1743 for (i = 1; i < items; ++i ) 1864 for (i = 1; i < items; ++i )
1744 { 1865 {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines