ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.93 by root, Wed Nov 8 02:01:02 2006 UTC vs.
Revision 1.94 by root, Sun Nov 26 18:28:37 2006 UTC

17#include "autoconf/config.h" 17#include "autoconf/config.h"
18 18
19#include <pthread.h> 19#include <pthread.h>
20 20
21#include <stddef.h> 21#include <stddef.h>
22#include <stdlib.h>
22#include <errno.h> 23#include <errno.h>
23#include <sys/time.h> 24#include <sys/time.h>
24#include <sys/select.h> 25#include <sys/select.h>
25#include <sys/types.h> 26#include <sys/types.h>
26#include <sys/stat.h> 27#include <sys/stat.h>
160{ 161{
161 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS 162 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
162 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 163 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
163} 164}
164 165
166static pthread_t main_tid;
167static int main_sig;
168static int block_sig_level;
169
170void block_sig ()
171{
172 sigset_t ss;
173
174 if (block_sig_level++)
175 return;
176
177 if (!main_sig)
178 return;
179
180 sigemptyset (&ss);
181 sigaddset (&ss, main_sig);
182 pthread_sigmask (SIG_BLOCK, &ss, 0);
183}
184
185void unblock_sig ()
186{
187 sigset_t ss;
188
189 if (--block_sig_level)
190 return;
191
192 if (!main_sig)
193 return;
194
195 sigemptyset (&ss);
196 sigaddset (&ss, main_sig);
197 pthread_sigmask (SIG_UNBLOCK, &ss, 0);
198}
199
165static int next_pri = DEFAULT_PRI + PRI_BIAS; 200static int next_pri = DEFAULT_PRI + PRI_BIAS;
166 201
167static unsigned int started, idle, wanted; 202static unsigned int started, idle, wanted;
168 203
169#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 204#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
320 355
321 abort (); 356 abort ();
322} 357}
323 358
324static int poll_cb (); 359static int poll_cb ();
325static void req_invoke (aio_req req); 360static int req_invoke (aio_req req);
326static void req_free (aio_req req); 361static void req_free (aio_req req);
327static void req_cancel (aio_req req); 362static void req_cancel (aio_req req);
328 363
329/* must be called at most once */ 364/* must be called at most once */
330static SV *req_sv (aio_req req, const char *klass) 365static SV *req_sv (aio_req req, const char *klass)
350 return mg ? (aio_req)mg->mg_ptr : 0; 385 return mg ? (aio_req)mg->mg_ptr : 0;
351} 386}
352 387
353static void aio_grp_feed (aio_req grp) 388static void aio_grp_feed (aio_req grp)
354{ 389{
390 block_sig ();
391
355 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED)) 392 while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED))
356 { 393 {
357 int old_len = grp->size; 394 int old_len = grp->size;
358 395
359 if (grp->sv2 && SvOK (grp->sv2)) 396 if (grp->sv2 && SvOK (grp->sv2))
377 SvREFCNT_dec (grp->sv2); 414 SvREFCNT_dec (grp->sv2);
378 grp->sv2 = 0; 415 grp->sv2 = 0;
379 break; 416 break;
380 } 417 }
381 } 418 }
419
420 unblock_sig ();
382} 421}
383 422
384static void aio_grp_dec (aio_req grp) 423static void aio_grp_dec (aio_req grp)
385{ 424{
386 --grp->size; 425 --grp->size;
389 aio_grp_feed (grp); 428 aio_grp_feed (grp);
390 429
391 /* finish, if done */ 430 /* finish, if done */
392 if (!grp->size && grp->int1) 431 if (!grp->size && grp->int1)
393 { 432 {
433 block_sig ();
434
394 req_invoke (grp); 435 if (!req_invoke (grp))
436 {
437 req_free (grp);
438 unblock_sig ();
439 croak (0);
440 }
441
395 req_free (grp); 442 req_free (grp);
443 unblock_sig ();
396 } 444 }
397} 445}
398 446
399static void req_invoke (aio_req req) 447static int req_invoke (aio_req req)
400{ 448{
401 dSP; 449 dSP;
402 450
403 if (req->flags & FLAG_SV1_RO_OFF) 451 if (req->flags & FLAG_SV1_RO_OFF)
404 SvREADONLY_off (req->sv1); 452 SvREADONLY_off (req->sv1);
524 grp->grp_first = req->grp_next; 572 grp->grp_first = req->grp_next;
525 573
526 aio_grp_dec (grp); 574 aio_grp_dec (grp);
527 } 575 }
528 576
529 if (SvTRUE (ERRSV)) 577 return !SvTRUE (ERRSV);
530 {
531 req_free (req);
532 croak (0);
533 }
534} 578}
535 579
536static void req_free (aio_req req) 580static void req_free (aio_req req)
537{ 581{
538 if (req->self) 582 if (req->self)
593#endif 637#endif
594 638
595 sigfillset (&fullsigset); 639 sigfillset (&fullsigset);
596 640
597 LOCK (wrklock); 641 LOCK (wrklock);
598 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); 642 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
599 643
600 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) 644 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
601 { 645 {
602 wrk->prev = &wrk_first; 646 wrk->prev = &wrk_first;
603 wrk->next = wrk_first.next; 647 wrk->next = wrk_first.next;
606 ++started; 650 ++started;
607 } 651 }
608 else 652 else
609 free (wrk); 653 free (wrk);
610 654
611 sigprocmask (SIG_SETMASK, &oldsigset, 0); 655 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
612 UNLOCK (wrklock); 656 UNLOCK (wrklock);
613} 657}
614 658
615static void maybe_start_thread () 659static void maybe_start_thread ()
616{ 660{
624 start_thread (); 668 start_thread ();
625} 669}
626 670
627static void req_send (aio_req req) 671static void req_send (aio_req req)
628{ 672{
673 block_sig ();
674
629 ++nreqs; 675 ++nreqs;
630 676
631 LOCK (reqlock); 677 LOCK (reqlock);
632 ++nready; 678 ++nready;
633 reqq_push (&req_queue, req); 679 reqq_push (&req_queue, req);
634 pthread_cond_signal (&reqwait); 680 pthread_cond_signal (&reqwait);
635 UNLOCK (reqlock); 681 UNLOCK (reqlock);
682
683 unblock_sig ();
636 684
637 maybe_start_thread (); 685 maybe_start_thread ();
638} 686}
639 687
640static void end_thread (void) 688static void end_thread (void)
710 struct timeval tv_start, tv_now; 758 struct timeval tv_start, tv_now;
711 aio_req req; 759 aio_req req;
712 760
713 if (max_poll_time) 761 if (max_poll_time)
714 gettimeofday (&tv_start, 0); 762 gettimeofday (&tv_start, 0);
763
764 block_sig ();
715 765
716 for (;;) 766 for (;;)
717 { 767 {
718 for (;;) 768 for (;;)
719 { 769 {
747 req->int1 = 1; /* mark request as delayed */ 797 req->int1 = 1; /* mark request as delayed */
748 continue; 798 continue;
749 } 799 }
750 else 800 else
751 { 801 {
752 req_invoke (req); 802 if (!req_invoke (req))
803 {
804 req_free (req);
805 unblock_sig ();
806 croak (0);
807 }
753 808
754 count++; 809 count++;
755 } 810 }
756 811
757 req_free (req); 812 req_free (req);
774 poll_wait (); 829 poll_wait ();
775 830
776 ++maxreqs; 831 ++maxreqs;
777 } 832 }
778 833
834 unblock_sig ();
779 return count; 835 return count;
780} 836}
781 837
782static void create_pipe () 838static void create_pipe ()
783{ 839{
1154 LOCK (reslock); 1210 LOCK (reslock);
1155 1211
1156 ++npending; 1212 ++npending;
1157 1213
1158 if (!reqq_push (&res_queue, req)) 1214 if (!reqq_push (&res_queue, req))
1215 {
1159 /* write a dummy byte to the pipe so fh becomes ready */ 1216 /* write a dummy byte to the pipe so fh becomes ready */
1160 write (respipe [1], &respipe, 1); 1217 write (respipe [1], &respipe, 1);
1218
1219 /* optionally signal the main thread asynchronously */
1220 if (main_sig)
1221 pthread_kill (main_tid, main_sig);
1222 }
1161 1223
1162 self->req = 0; 1224 self->req = 0;
1163 worker_clear (self); 1225 worker_clear (self);
1164 1226
1165 UNLOCK (reslock); 1227 UNLOCK (reslock);
1268 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1330 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1269 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1331 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1270 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1332 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1271 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1333 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1272 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1334 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1335 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1273 1336
1274 create_pipe (); 1337 create_pipe ();
1275 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1338 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1276} 1339}
1277 1340
1677poll_wait() 1740poll_wait()
1678 PROTOTYPE: 1741 PROTOTYPE:
1679 CODE: 1742 CODE:
1680 poll_wait (); 1743 poll_wait ();
1681 1744
1745void
1746setsig (int signum = SIGIO)
1747 PROTOTYPE: ;$
1748 CODE:
1749{
1750 if (block_sig_level)
1751 croak ("cannot call IO::AIO::setsig from within aio_block/callback");
1752
1753 LOCK (reslock);
1754 main_tid = pthread_self ();
1755 main_sig = signum;
1756 UNLOCK (reslock);
1757
1758 if (main_sig && npending)
1759 pthread_kill (main_tid, main_sig);
1760}
1761
1762void
1763aio_block (SV *cb)
1764 PROTOTYPE: &
1765 PPCODE:
1766{
1767 int count;
1768
1769 block_sig ();
1770 PUSHMARK (SP);
1771 PUTBACK;
1772 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL);
1773 SPAGAIN;
1774 unblock_sig ();
1775
1776 if (SvTRUE (ERRSV))
1777 croak (0);
1778
1779 XSRETURN (count);
1780}
1781
1682int 1782int
1683nreqs() 1783nreqs()
1684 PROTOTYPE: 1784 PROTOTYPE:
1685 CODE: 1785 CODE:
1686 RETVAL = nreqs; 1786 RETVAL = nreqs;
1735 PPCODE: 1835 PPCODE:
1736{ 1836{
1737 int i; 1837 int i;
1738 aio_req req; 1838 aio_req req;
1739 1839
1840 if (main_sig && !block_sig_level)
1841 croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use");
1842
1740 if (grp->int1 == 2) 1843 if (grp->int1 == 2)
1741 croak ("cannot add requests to IO::AIO::GRP after the group finished"); 1844 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1742 1845
1743 for (i = 1; i < items; ++i ) 1846 for (i = 1; i < items; ++i )
1744 { 1847 {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines