ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.71 by root, Tue Oct 24 16:35:04 2006 UTC vs.
Revision 1.75 by root, Thu Oct 26 06:44:48 2006 UTC

47/* used for struct dirent, AIX doesn't provide it */ 47/* used for struct dirent, AIX doesn't provide it */
48#ifndef NAME_MAX 48#ifndef NAME_MAX
49# define NAME_MAX 4096 49# define NAME_MAX 4096
50#endif 50#endif
51 51
52#ifndef PTHREAD_STACK_MIN
53/* care for broken platforms, e.g. windows */
54# define PTHREAD_STACK_MIN 16384
55#endif
56
52#if __ia64 57#if __ia64
53# define STACKSIZE 65536 58# define STACKSIZE 65536
54#elif __i386 || __x86_64 /* 16k is unreasonably high :( */ 59#elif __i386 || __x86_64 /* 16k is unreasonably high :( */
55# define STACKSIZE PTHREAD_STACK_MIN 60# define STACKSIZE PTHREAD_STACK_MIN
56#else 61#else
128 133
129static int next_pri = DEFAULT_PRI + PRI_BIAS; 134static int next_pri = DEFAULT_PRI + PRI_BIAS;
130 135
131static int started, wanted; 136static int started, wanted;
132static volatile int nreqs; 137static volatile int nreqs;
133static int max_outstanding = 1<<30;
134static int respipe [2]; 138static int respipe [2];
135 139
136#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 140#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
137# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 141# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
138#else 142#else
238 } 242 }
239 243
240 abort (); 244 abort ();
241} 245}
242 246
247static int poll_cb ();
243static void req_invoke (aio_req req); 248static void req_invoke (aio_req req);
244static void req_free (aio_req req); 249static void req_free (aio_req req);
250static void req_cancel (aio_req req);
245 251
246/* must be called at most once */ 252/* must be called at most once */
247static SV *req_sv (aio_req req, const char *klass) 253static SV *req_sv (aio_req req, const char *klass)
248{ 254{
249 if (!req->self) 255 if (!req->self)
468 free (req->data2ptr); 474 free (req->data2ptr);
469 475
470 Safefree (req); 476 Safefree (req);
471} 477}
472 478
479static void req_cancel_subs (aio_req grp)
480{
481 aio_req sub;
482
483 if (grp->type != REQ_GROUP)
484 return;
485
486 SvREFCNT_dec (grp->fh2);
487 grp->fh2 = 0;
488
489 for (sub = grp->grp_first; sub; sub = sub->grp_next)
490 req_cancel (sub);
491}
492
473static void req_cancel (aio_req req) 493static void req_cancel (aio_req req)
474{ 494{
475 req->flags |= FLAG_CANCELLED; 495 req->flags |= FLAG_CANCELLED;
476 496
477 if (req->type == REQ_GROUP) 497 req_cancel_subs (req);
478 {
479 aio_req sub;
480
481 for (sub = req->grp_first; sub; sub = sub->grp_next)
482 req_cancel (sub);
483 }
484} 498}
485 499
486static int poll_cb () 500static int poll_cb ()
487{ 501{
488 dSP; 502 dSP;
548 562
549static void *aio_proc(void *arg); 563static void *aio_proc(void *arg);
550 564
551static void start_thread (void) 565static void start_thread (void)
552{ 566{
567 sigset_t fullsigset, oldsigset;
568 pthread_attr_t attr;
569
553 worker *wrk = calloc (1, sizeof (worker)); 570 worker *wrk = calloc (1, sizeof (worker));
554 571
555 if (!wrk) 572 if (!wrk)
556 croak ("unable to allocate worker thread data"); 573 croak ("unable to allocate worker thread data");
557
558 sigset_t fullsigset, oldsigset;
559 pthread_attr_t attr;
560 574
561 pthread_attr_init (&attr); 575 pthread_attr_init (&attr);
562 pthread_attr_setstacksize (&attr, STACKSIZE); 576 pthread_attr_setstacksize (&attr, STACKSIZE);
563 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); 577 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
564 578
591 605
592 LOCK (reqlock); 606 LOCK (reqlock);
593 reqq_push (&req_queue, req); 607 reqq_push (&req_queue, req);
594 pthread_cond_signal (&reqwait); 608 pthread_cond_signal (&reqwait);
595 UNLOCK (reqlock); 609 UNLOCK (reqlock);
596
597 if (nreqs > max_outstanding)
598 for (;;)
599 {
600 poll_cb ();
601
602 if (nreqs <= max_outstanding)
603 break;
604
605 poll_wait ();
606 }
607} 610}
608 611
609static void end_thread (void) 612static void end_thread (void)
610{ 613{
611 aio_req req; 614 aio_req req;
704#if !HAVE_FDATASYNC 707#if !HAVE_FDATASYNC
705# define fdatasync fsync 708# define fdatasync fsync
706#endif 709#endif
707 710
708#if !HAVE_READAHEAD 711#if !HAVE_READAHEAD
709# define readahead aio_readahead 712# define readahead(fd,offset,count) aio_readahead (fd, offset, count, self)
710 713
711static ssize_t readahead (int fd, off_t offset, size_t count) 714static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self)
712{ 715{
713 dBUF; 716 dBUF;
714 717
715 while (count > 0) 718 while (count > 0)
716 { 719 {
721 count -= len; 724 count -= len;
722 } 725 }
723 726
724 errno = 0; 727 errno = 0;
725} 728}
729
726#endif 730#endif
727 731
728#if !HAVE_READDIR_R 732#if !HAVE_READDIR_R
729# define readdir_r aio_readdir_r 733# define readdir_r aio_readdir_r
730 734
1107void 1111void
1108max_parallel (nthreads) 1112max_parallel (nthreads)
1109 int nthreads 1113 int nthreads
1110 PROTOTYPE: $ 1114 PROTOTYPE: $
1111 1115
1112int
1113max_outstanding (nreqs)
1114 int nreqs
1115 PROTOTYPE: $
1116 CODE:
1117 RETVAL = max_outstanding;
1118 max_outstanding = nreqs;
1119
1120void 1116void
1121aio_open (pathname,flags,mode,callback=&PL_sv_undef) 1117aio_open (pathname,flags,mode,callback=&PL_sv_undef)
1122 SV * pathname 1118 SV * pathname
1123 int flags 1119 int flags
1124 int mode 1120 int mode
1495 } 1491 }
1496 } 1492 }
1497} 1493}
1498 1494
1499void 1495void
1496cancel_subs (aio_req_ornot req)
1497 CODE:
1498 req_cancel_subs (req);
1499
1500void
1500result (aio_req grp, ...) 1501result (aio_req grp, ...)
1501 CODE: 1502 CODE:
1502{ 1503{
1503 int i; 1504 int i;
1504 AV *av = newAV (); 1505 AV *av = newAV ();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines