ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.71 by root, Tue Oct 24 16:35:04 2006 UTC vs.
Revision 1.77 by root, Thu Oct 26 13:25:40 2006 UTC

1/* solaris */ 1/* solaris */
2#define _POSIX_PTHREAD_SEMANTICS 1 2#define _POSIX_PTHREAD_SEMANTICS 1
3 3
4#if __linux 4#if __linux && !defined(_GNU_SOURCE)
5# define _GNU_SOURCE 5# define _GNU_SOURCE
6#endif 6#endif
7 7
8/* just in case */
8#define _REENTRANT 1 9#define _REENTRANT 1
9 10
10#include <errno.h> 11#include <errno.h>
11 12
12#include "EXTERN.h" 13#include "EXTERN.h"
47/* used for struct dirent, AIX doesn't provide it */ 48/* used for struct dirent, AIX doesn't provide it */
48#ifndef NAME_MAX 49#ifndef NAME_MAX
49# define NAME_MAX 4096 50# define NAME_MAX 4096
50#endif 51#endif
51 52
53#ifndef PTHREAD_STACK_MIN
54/* care for broken platforms, e.g. windows */
55# define PTHREAD_STACK_MIN 16384
56#endif
57
52#if __ia64 58#if __ia64
53# define STACKSIZE 65536 59# define STACKSIZE 65536
54#elif __i386 || __x86_64 /* 16k is unreasonably high :( */ 60#elif __i386 || __x86_64 /* 16k is unreasonably high :( */
55# define STACKSIZE PTHREAD_STACK_MIN 61# define STACKSIZE PTHREAD_STACK_MIN
56#else 62#else
128 134
129static int next_pri = DEFAULT_PRI + PRI_BIAS; 135static int next_pri = DEFAULT_PRI + PRI_BIAS;
130 136
131static int started, wanted; 137static int started, wanted;
132static volatile int nreqs; 138static volatile int nreqs;
133static int max_outstanding = 1<<30;
134static int respipe [2]; 139static int respipe [2];
135 140
136#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 141#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
137# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 142# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
138#else 143#else
140#endif 145#endif
141 146
142#define LOCK(mutex) pthread_mutex_lock (&(mutex)) 147#define LOCK(mutex) pthread_mutex_lock (&(mutex))
143#define UNLOCK(mutex) pthread_mutex_unlock (&(mutex)) 148#define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
144 149
145/* worker threasd management */ 150/* worker threads management */
146static pthread_mutex_t wrklock = AIO_MUTEX_INIT; 151static pthread_mutex_t wrklock = AIO_MUTEX_INIT;
147 152
148typedef struct worker { 153typedef struct worker {
149 /* locked by wrklock */ 154 /* locked by wrklock */
150 struct worker *prev, *next; 155 struct worker *prev, *next;
238 } 243 }
239 244
240 abort (); 245 abort ();
241} 246}
242 247
248static int poll_cb (int max);
243static void req_invoke (aio_req req); 249static void req_invoke (aio_req req);
244static void req_free (aio_req req); 250static void req_free (aio_req req);
251static void req_cancel (aio_req req);
245 252
246/* must be called at most once */ 253/* must be called at most once */
247static SV *req_sv (aio_req req, const char *klass) 254static SV *req_sv (aio_req req, const char *klass)
248{ 255{
249 if (!req->self) 256 if (!req->self)
468 free (req->data2ptr); 475 free (req->data2ptr);
469 476
470 Safefree (req); 477 Safefree (req);
471} 478}
472 479
480static void req_cancel_subs (aio_req grp)
481{
482 aio_req sub;
483
484 if (grp->type != REQ_GROUP)
485 return;
486
487 SvREFCNT_dec (grp->fh2);
488 grp->fh2 = 0;
489
490 for (sub = grp->grp_first; sub; sub = sub->grp_next)
491 req_cancel (sub);
492}
493
473static void req_cancel (aio_req req) 494static void req_cancel (aio_req req)
474{ 495{
475 req->flags |= FLAG_CANCELLED; 496 req->flags |= FLAG_CANCELLED;
476 497
477 if (req->type == REQ_GROUP) 498 req_cancel_subs (req);
478 {
479 aio_req sub;
480
481 for (sub = req->grp_first; sub; sub = sub->grp_next)
482 req_cancel (sub);
483 }
484} 499}
485 500
486static int poll_cb () 501static int poll_cb (int max)
487{ 502{
488 dSP; 503 dSP;
489 int count = 0; 504 int count = 0;
490 int do_croak = 0; 505 int do_croak = 0;
491 aio_req req; 506 aio_req req;
492 507
493 for (;;) 508 while (max <= 0 || count < max)
494 { 509 {
495 LOCK (reslock); 510 LOCK (reslock);
496 req = reqq_shift (&res_queue); 511 req = reqq_shift (&res_queue);
497 512
498 if (req) 513 if (req)
548 563
549static void *aio_proc(void *arg); 564static void *aio_proc(void *arg);
550 565
551static void start_thread (void) 566static void start_thread (void)
552{ 567{
568 sigset_t fullsigset, oldsigset;
569 pthread_attr_t attr;
570
553 worker *wrk = calloc (1, sizeof (worker)); 571 worker *wrk = calloc (1, sizeof (worker));
554 572
555 if (!wrk) 573 if (!wrk)
556 croak ("unable to allocate worker thread data"); 574 croak ("unable to allocate worker thread data");
557
558 sigset_t fullsigset, oldsigset;
559 pthread_attr_t attr;
560 575
561 pthread_attr_init (&attr); 576 pthread_attr_init (&attr);
562 pthread_attr_setstacksize (&attr, STACKSIZE); 577 pthread_attr_setstacksize (&attr, STACKSIZE);
563 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); 578 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
564 579
591 606
592 LOCK (reqlock); 607 LOCK (reqlock);
593 reqq_push (&req_queue, req); 608 reqq_push (&req_queue, req);
594 pthread_cond_signal (&reqwait); 609 pthread_cond_signal (&reqwait);
595 UNLOCK (reqlock); 610 UNLOCK (reqlock);
596
597 if (nreqs > max_outstanding)
598 for (;;)
599 {
600 poll_cb ();
601
602 if (nreqs <= max_outstanding)
603 break;
604
605 poll_wait ();
606 }
607} 611}
608 612
609static void end_thread (void) 613static void end_thread (void)
610{ 614{
611 aio_req req; 615 aio_req req;
638 } 642 }
639 643
640 while (started > wanted) 644 while (started > wanted)
641 { 645 {
642 poll_wait (); 646 poll_wait ();
643 poll_cb (); 647 poll_cb (0);
644 } 648 }
645} 649}
646 650
647static void create_pipe () 651static void create_pipe ()
648{ 652{
704#if !HAVE_FDATASYNC 708#if !HAVE_FDATASYNC
705# define fdatasync fsync 709# define fdatasync fsync
706#endif 710#endif
707 711
708#if !HAVE_READAHEAD 712#if !HAVE_READAHEAD
709# define readahead aio_readahead 713# define readahead(fd,offset,count) aio_readahead (fd, offset, count, self)
710 714
711static ssize_t readahead (int fd, off_t offset, size_t count) 715static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self)
712{ 716{
713 dBUF; 717 dBUF;
714 718
715 while (count > 0) 719 while (count > 0)
716 { 720 {
721 count -= len; 725 count -= len;
722 } 726 }
723 727
724 errno = 0; 728 errno = 0;
725} 729}
730
726#endif 731#endif
727 732
728#if !HAVE_READDIR_R 733#if !HAVE_READDIR_R
729# define readdir_r aio_readdir_r 734# define readdir_r aio_readdir_r
730 735
775 { 780 {
776 off_t sbytes; 781 off_t sbytes;
777 res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); 782 res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
778 783
779 if (res < 0 && sbytes) 784 if (res < 0 && sbytes)
780 /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */ 785 /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
781 res = sbytes; 786 res = sbytes;
782 } 787 }
783 788
784# elif __hpux 789# elif __hpux
785 res = sendfile (ofd, ifd, offset, count, 0, 0); 790 res = sendfile (ofd, ifd, offset, count, 0, 0);
865 int errorno; 870 int errorno;
866 871
867 LOCK (wrklock); 872 LOCK (wrklock);
868 self->dirp = dirp = opendir (req->dataptr); 873 self->dirp = dirp = opendir (req->dataptr);
869 self->dbuf = u = malloc (sizeof (*u)); 874 self->dbuf = u = malloc (sizeof (*u));
875 req->data2ptr = names = malloc (memlen);
870 UNLOCK (wrklock); 876 UNLOCK (wrklock);
871
872 req->data2ptr = names = malloc (memlen);
873 877
874 if (dirp && u && names) 878 if (dirp && u && names)
875 for (;;) 879 for (;;)
876 { 880 {
877 errno = 0; 881 errno = 0;
1107void 1111void
1108max_parallel (nthreads) 1112max_parallel (nthreads)
1109 int nthreads 1113 int nthreads
1110 PROTOTYPE: $ 1114 PROTOTYPE: $
1111 1115
1112int
1113max_outstanding (nreqs)
1114 int nreqs
1115 PROTOTYPE: $
1116 CODE:
1117 RETVAL = max_outstanding;
1118 max_outstanding = nreqs;
1119
1120void 1116void
1121aio_open (pathname,flags,mode,callback=&PL_sv_undef) 1117aio_open (pathname,flags,mode,callback=&PL_sv_undef)
1122 SV * pathname 1118 SV * pathname
1123 int flags 1119 int flags
1124 int mode 1120 int mode
1400 PROTOTYPE: 1396 PROTOTYPE:
1401 CODE: 1397 CODE:
1402 while (nreqs) 1398 while (nreqs)
1403 { 1399 {
1404 poll_wait (); 1400 poll_wait ();
1405 poll_cb (); 1401 poll_cb (0);
1406 } 1402 }
1407 1403
1408void 1404void
1409poll() 1405poll()
1410 PROTOTYPE: 1406 PROTOTYPE:
1411 CODE: 1407 CODE:
1412 if (nreqs) 1408 if (nreqs)
1413 { 1409 {
1414 poll_wait (); 1410 poll_wait ();
1415 poll_cb (); 1411 poll_cb (0);
1416 } 1412 }
1417 1413
1418int 1414int
1419poll_fileno() 1415poll_fileno()
1420 PROTOTYPE: 1416 PROTOTYPE:
1425 1421
1426int 1422int
1427poll_cb(...) 1423poll_cb(...)
1428 PROTOTYPE: 1424 PROTOTYPE:
1429 CODE: 1425 CODE:
1430 RETVAL = poll_cb (); 1426 RETVAL = poll_cb (0);
1427 OUTPUT:
1428 RETVAL
1429
1430int
1431poll_some(int max = 0)
1432 PROTOTYPE: $
1433 CODE:
1434 RETVAL = poll_cb (max);
1431 OUTPUT: 1435 OUTPUT:
1432 RETVAL 1436 RETVAL
1433 1437
1434void 1438void
1435poll_wait() 1439poll_wait()
1495 } 1499 }
1496 } 1500 }
1497} 1501}
1498 1502
1499void 1503void
1504cancel_subs (aio_req_ornot req)
1505 CODE:
1506 req_cancel_subs (req);
1507
1508void
1500result (aio_req grp, ...) 1509result (aio_req grp, ...)
1501 CODE: 1510 CODE:
1502{ 1511{
1503 int i; 1512 int i;
1504 AV *av = newAV (); 1513 AV *av = newAV ();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines