ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.90 by root, Tue Oct 31 00:32:19 2006 UTC vs.
Revision 1.94 by root, Sun Nov 26 18:28:37 2006 UTC

17#include "autoconf/config.h" 17#include "autoconf/config.h"
18 18
19#include <pthread.h> 19#include <pthread.h>
20 20
21#include <stddef.h> 21#include <stddef.h>
22#include <stdlib.h>
22#include <errno.h> 23#include <errno.h>
23#include <sys/time.h> 24#include <sys/time.h>
24#include <sys/select.h> 25#include <sys/select.h>
25#include <sys/types.h> 26#include <sys/types.h>
26#include <sys/stat.h> 27#include <sys/stat.h>
160{ 161{
161 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS 162 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
162 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 163 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
163} 164}
164 165
166static pthread_t main_tid;
167static int main_sig;
168static int block_sig_level;
169
170void block_sig ()
171{
172 sigset_t ss;
173
174 if (block_sig_level++)
175 return;
176
177 if (!main_sig)
178 return;
179
180 sigemptyset (&ss);
181 sigaddset (&ss, main_sig);
182 pthread_sigmask (SIG_BLOCK, &ss, 0);
183}
184
185void unblock_sig ()
186{
187 sigset_t ss;
188
189 if (--block_sig_level)
190 return;
191
192 if (!main_sig)
193 return;
194
195 sigemptyset (&ss);
196 sigaddset (&ss, main_sig);
197 pthread_sigmask (SIG_UNBLOCK, &ss, 0);
198}
199
165static int next_pri = DEFAULT_PRI + PRI_BIAS; 200static int next_pri = DEFAULT_PRI + PRI_BIAS;
166 201
167static unsigned int started, idle, wanted; 202static unsigned int started, idle, wanted;
168 203
169#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 204#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
320 355
321 abort (); 356 abort ();
322} 357}
323 358
324static int poll_cb (); 359static int poll_cb ();
325static void req_invoke (aio_req req); 360static int req_invoke (aio_req req);
326static void req_free (aio_req req); 361static void req_free (aio_req req);
327static void req_cancel (aio_req req); 362static void req_cancel (aio_req req);
328 363
329/* must be called at most once */ 364/* must be called at most once */
330static SV *req_sv (aio_req req, const char *klass) 365static SV *req_sv (aio_req req, const char *klass)
350 return mg ? (aio_req)mg->mg_ptr : 0; 385 return mg ? (aio_req)mg->mg_ptr : 0;
351} 386}
352 387
353static void aio_grp_feed (aio_req grp) 388static void aio_grp_feed (aio_req grp)
354{ 389{
390 block_sig ();
391
355 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED)) 392 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED))
356 { 393 {
357 int old_len = grp->size; 394 int old_len = grp->size;
358 395
359 if (grp->sv2 && SvOK (grp->sv2)) 396 if (grp->sv2 && SvOK (grp->sv2))
377 SvREFCNT_dec (grp->sv2); 414 SvREFCNT_dec (grp->sv2);
378 grp->sv2 = 0; 415 grp->sv2 = 0;
379 break; 416 break;
380 } 417 }
381 } 418 }
419
420 unblock_sig ();
382} 421}
383 422
384static void aio_grp_dec (aio_req grp) 423static void aio_grp_dec (aio_req grp)
385{ 424{
386 --grp->size; 425 --grp->size;
389 aio_grp_feed (grp); 428 aio_grp_feed (grp);
390 429
391 /* finish, if done */ 430 /* finish, if done */
392 if (!grp->size && grp->int1) 431 if (!grp->size && grp->int1)
393 { 432 {
433 block_sig ();
434
394 req_invoke (grp); 435 if (!req_invoke (grp))
436 {
437 req_free (grp);
438 unblock_sig ();
439 croak (0);
440 }
441
395 req_free (grp); 442 req_free (grp);
443 unblock_sig ();
396 } 444 }
397} 445}
398 446
399static void req_invoke (aio_req req) 447static int req_invoke (aio_req req)
400{ 448{
401 dSP; 449 dSP;
402 450
403 if (req->flags & FLAG_SV1_RO_OFF) 451 if (req->flags & FLAG_SV1_RO_OFF)
404 SvREADONLY_off (req->sv1); 452 SvREADONLY_off (req->sv1);
447 PUSHs (sv_2mortal (newSViv (req->result))); 495 PUSHs (sv_2mortal (newSViv (req->result)));
448 PUTBACK; 496 PUTBACK;
449 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 497 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
450 SPAGAIN; 498 SPAGAIN;
451 499
452 fh = SvREFCNT_inc (POPs); 500 fh = POPs;
453
454 PUSHMARK (SP); 501 PUSHMARK (SP);
455 XPUSHs (sv_2mortal (fh)); 502 XPUSHs (fh);
456 } 503 }
457 break; 504 break;
458 505
459 case REQ_GROUP: 506 case REQ_GROUP:
460 req->int1 = 2; /* mark group as finished */ 507 req->int1 = 2; /* mark group as finished */
525 grp->grp_first = req->grp_next; 572 grp->grp_first = req->grp_next;
526 573
527 aio_grp_dec (grp); 574 aio_grp_dec (grp);
528 } 575 }
529 576
530 if (SvTRUE (ERRSV)) 577 return !SvTRUE (ERRSV);
531 {
532 req_free (req);
533 croak (0);
534 }
535} 578}
536 579
537static void req_free (aio_req req) 580static void req_free (aio_req req)
538{ 581{
539 if (req->self) 582 if (req->self)
594#endif 637#endif
595 638
596 sigfillset (&fullsigset); 639 sigfillset (&fullsigset);
597 640
598 LOCK (wrklock); 641 LOCK (wrklock);
599 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); 642 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
600 643
601 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) 644 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
602 { 645 {
603 wrk->prev = &wrk_first; 646 wrk->prev = &wrk_first;
604 wrk->next = wrk_first.next; 647 wrk->next = wrk_first.next;
607 ++started; 650 ++started;
608 } 651 }
609 else 652 else
610 free (wrk); 653 free (wrk);
611 654
612 sigprocmask (SIG_SETMASK, &oldsigset, 0); 655 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
613 UNLOCK (wrklock); 656 UNLOCK (wrklock);
614} 657}
615 658
616static void maybe_start_thread () 659static void maybe_start_thread ()
617{ 660{
625 start_thread (); 668 start_thread ();
626} 669}
627 670
628static void req_send (aio_req req) 671static void req_send (aio_req req)
629{ 672{
673 block_sig ();
674
630 ++nreqs; 675 ++nreqs;
631 676
632 LOCK (reqlock); 677 LOCK (reqlock);
633 ++nready; 678 ++nready;
634 reqq_push (&req_queue, req); 679 reqq_push (&req_queue, req);
635 pthread_cond_signal (&reqwait); 680 pthread_cond_signal (&reqwait);
636 UNLOCK (reqlock); 681 UNLOCK (reqlock);
682
683 unblock_sig ();
637 684
638 maybe_start_thread (); 685 maybe_start_thread ();
639} 686}
640 687
641static void end_thread (void) 688static void end_thread (void)
711 struct timeval tv_start, tv_now; 758 struct timeval tv_start, tv_now;
712 aio_req req; 759 aio_req req;
713 760
714 if (max_poll_time) 761 if (max_poll_time)
715 gettimeofday (&tv_start, 0); 762 gettimeofday (&tv_start, 0);
763
764 block_sig ();
716 765
717 for (;;) 766 for (;;)
718 { 767 {
719 for (;;) 768 for (;;)
720 { 769 {
748 req->int1 = 1; /* mark request as delayed */ 797 req->int1 = 1; /* mark request as delayed */
749 continue; 798 continue;
750 } 799 }
751 else 800 else
752 { 801 {
753 req_invoke (req); 802 if (!req_invoke (req))
803 {
804 req_free (req);
805 unblock_sig ();
806 croak (0);
807 }
754 808
755 count++; 809 count++;
756 } 810 }
757 811
758 req_free (req); 812 req_free (req);
775 poll_wait (); 829 poll_wait ();
776 830
777 ++maxreqs; 831 ++maxreqs;
778 } 832 }
779 833
834 unblock_sig ();
780 return count; 835 return count;
781} 836}
782 837
783static void create_pipe () 838static void create_pipe ()
784{ 839{
1155 LOCK (reslock); 1210 LOCK (reslock);
1156 1211
1157 ++npending; 1212 ++npending;
1158 1213
1159 if (!reqq_push (&res_queue, req)) 1214 if (!reqq_push (&res_queue, req))
1215 {
1160 /* write a dummy byte to the pipe so fh becomes ready */ 1216 /* write a dummy byte to the pipe so fh becomes ready */
1161 write (respipe [1], &respipe, 1); 1217 write (respipe [1], &respipe, 1);
1218
1219 /* optionally signal the main thread asynchronously */
1220 if (main_sig)
1221 pthread_kill (main_tid, main_sig);
1222 }
1162 1223
1163 self->req = 0; 1224 self->req = 0;
1164 worker_clear (self); 1225 worker_clear (self);
1165 1226
1166 UNLOCK (reslock); 1227 UNLOCK (reslock);
1269 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1330 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1270 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1331 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1271 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1332 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1272 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1333 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1273 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1334 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1335 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1274 1336
1275 create_pipe (); 1337 create_pipe ();
1276 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1338 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1277} 1339}
1278 1340
1644 PROTOTYPE: 1706 PROTOTYPE:
1645 CODE: 1707 CODE:
1646 while (nreqs) 1708 while (nreqs)
1647 { 1709 {
1648 poll_wait (); 1710 poll_wait ();
1649 poll_cb (0); 1711 poll_cb ();
1650 } 1712 }
1651 1713
1652void 1714int
1653poll() 1715poll()
1654 PROTOTYPE: 1716 PROTOTYPE:
1655 CODE: 1717 CODE:
1656 if (nreqs)
1657 {
1658 poll_wait (); 1718 poll_wait ();
1659 poll_cb (0); 1719 RETVAL = poll_cb ();
1660 } 1720 OUTPUT:
1721 RETVAL
1661 1722
1662int 1723int
1663poll_fileno() 1724poll_fileno()
1664 PROTOTYPE: 1725 PROTOTYPE:
1665 CODE: 1726 CODE:
1677 1738
1678void 1739void
1679poll_wait() 1740poll_wait()
1680 PROTOTYPE: 1741 PROTOTYPE:
1681 CODE: 1742 CODE:
1682 if (nreqs)
1683 poll_wait (); 1743 poll_wait ();
1744
1745void
1746setsig (int signum = SIGIO)
1747 PROTOTYPE: ;$
1748 CODE:
1749{
1750 if (block_sig_level)
1751 croak ("cannot call IO::AIO::setsig from within aio_block/callback");
1752
1753 LOCK (reslock);
1754 main_tid = pthread_self ();
1755 main_sig = signum;
1756 UNLOCK (reslock);
1757
1758 if (main_sig && npending)
1759 pthread_kill (main_tid, main_sig);
1760}
1761
1762void
1763aio_block (SV *cb)
1764 PROTOTYPE: &
1765 PPCODE:
1766{
1767 int count;
1768
1769 block_sig ();
1770 PUSHMARK (SP);
1771 PUTBACK;
1772 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL);
1773 SPAGAIN;
1774 unblock_sig ();
1775
1776 if (SvTRUE (ERRSV))
1777 croak (0);
1778
1779 XSRETURN (count);
1780}
1684 1781
1685int 1782int
1686nreqs() 1783nreqs()
1687 PROTOTYPE: 1784 PROTOTYPE:
1688 CODE: 1785 CODE:
1738 PPCODE: 1835 PPCODE:
1739{ 1836{
1740 int i; 1837 int i;
1741 aio_req req; 1838 aio_req req;
1742 1839
1840 if (main_sig && !block_sig_level)
1841 croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use");
1842
1743 if (grp->int1 == 2) 1843 if (grp->int1 == 2)
1744 croak ("cannot add requests to IO::AIO::GRP after the group finished"); 1844 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1745 1845
1746 for (i = 1; i < items; ++i ) 1846 for (i = 1; i < items; ++i )
1747 { 1847 {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines