ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.80 by root, Fri Oct 27 19:17:23 2006 UTC vs.
Revision 1.85 by root, Sat Oct 28 23:32:29 2006 UTC

43# else 43# else
44# error sendfile support requested but not available 44# error sendfile support requested but not available
45# endif 45# endif
46#endif 46#endif
47 47
48/* number of seconds after which idle threads exit */
49#define IDLE_TIMEOUT 10
50
48/* used for struct dirent, AIX doesn't provide it */ 51/* used for struct dirent, AIX doesn't provide it */
49#ifndef NAME_MAX 52#ifndef NAME_MAX
50# define NAME_MAX 4096 53# define NAME_MAX 4096
51#endif 54#endif
52 55
65 68
66/* wether word reads are potentially non-atomic. 69/* wether word reads are potentially non-atomic.
67 * this is conservatice, likely most arches this runs 70 * this is conservatice, likely most arches this runs
68 * on have atomic word read/writes. 71 * on have atomic word read/writes.
69 */ 72 */
70#ifndef WORDREAD_UNSAFE 73#ifndef WORDACCESS_UNSAFE
71# if __i386 || __x86_64 74# if __i386 || __x86_64
72# define WORDREAD_UNSAFE 0 75# define WORDACCESS_UNSAFE 0
73# else 76# else
74# define WORDREAD_UNSAFE 1 77# define WORDACCESS_UNSAFE 1
75# endif 78# endif
76#endif 79#endif
77 80
78/* buffer size for various temporary buffers */ 81/* buffer size for various temporary buffers */
79#define AIO_BUFSIZE 65536 82#define AIO_BUFSIZE 65536
92 REQ_READ, REQ_WRITE, REQ_READAHEAD, 95 REQ_READ, REQ_WRITE, REQ_READAHEAD,
93 REQ_SENDFILE, 96 REQ_SENDFILE,
94 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 97 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
95 REQ_FSYNC, REQ_FDATASYNC, 98 REQ_FSYNC, REQ_FDATASYNC,
96 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 99 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
97 REQ_READDIR, 100 REQ_MKNOD, REQ_READDIR,
98 REQ_LINK, REQ_SYMLINK, 101 REQ_LINK, REQ_SYMLINK,
99 REQ_GROUP, REQ_NOP, 102 REQ_GROUP, REQ_NOP,
100 REQ_BUSY, 103 REQ_BUSY,
101}; 104};
102 105
142 DEFAULT_PRI = 0, 145 DEFAULT_PRI = 0,
143 PRI_BIAS = -PRI_MIN, 146 PRI_BIAS = -PRI_MIN,
144 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
145}; 148};
146 149
150#define AIO_TICKS ((1000000 + 1023) >> 10)
151
152static unsigned int max_poll_time = 0;
153static unsigned int max_poll_reqs = 0;
154
155/* calculcate time difference in ~1/AIO_TICKS of a second */
156static int tvdiff (struct timeval *tv1, struct timeval *tv2)
157{
158 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
159 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
160}
161
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 162static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 163
149static unsigned int started, wanted; 164static unsigned int started, idle, wanted;
150 165
151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 166#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 167# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
153#else 168#else
154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 169# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
196 211
197 free (wrk); 212 free (wrk);
198} 213}
199 214
200static volatile unsigned int nreqs, nready, npending; 215static volatile unsigned int nreqs, nready, npending;
216static volatile unsigned int max_idle = 4;
201static volatile unsigned int max_outstanding = 0xffffffff; 217static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2]; 218static int respipe [2];
203 219
204static pthread_mutex_t reslock = AIO_MUTEX_INIT; 220static pthread_mutex_t reslock = AIO_MUTEX_INIT;
205static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 221static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
206static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 222static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
207 223
208#if WORDREAD_UNSAFE 224#if WORDACCESS_UNSAFE
209 225
210static unsigned int get_nready () 226static unsigned int get_nready ()
211{ 227{
212 unsigned int retval; 228 unsigned int retval;
213 229
227 UNLOCK (reslock); 243 UNLOCK (reslock);
228 244
229 return retval; 245 return retval;
230} 246}
231 247
248static unsigned int get_nthreads ()
249{
250 unsigned int retval;
251
252 LOCK (wrklock);
253 retval = started;
254 UNLOCK (wrklock);
255
256 return retval;
257}
258
232#else 259#else
233 260
234# define get_nready() nready 261# define get_nready() nready
235# define get_npending() npending 262# define get_npending() npending
263# define get_nthreads() started
236 264
237#endif 265#endif
238 266
239/* 267/*
240 * a somewhat faster data structure might be nice, but 268 * a somewhat faster data structure might be nice, but
288 } 316 }
289 317
290 abort (); 318 abort ();
291} 319}
292 320
293static int poll_cb (int max); 321static int poll_cb ();
294static void req_invoke (aio_req req); 322static void req_invoke (aio_req req);
295static void req_free (aio_req req); 323static void req_free (aio_req req);
296static void req_cancel (aio_req req); 324static void req_cancel (aio_req req);
297 325
298/* must be called at most once */ 326/* must be called at most once */
556 UNLOCK (wrklock); 584 UNLOCK (wrklock);
557} 585}
558 586
559static void maybe_start_thread () 587static void maybe_start_thread ()
560{ 588{
561#if 0 589 if (get_nthreads () >= wanted)
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted)
567 return; 590 return;
568 591
569 if (nready <= nreqs - get_nready () - get_npending ()) 592 /* todo: maybe use idle here, but might be less exact */
593 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
570 return; 594 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589 595
590 start_thread (); 596 start_thread ();
591} 597}
592 598
593static void req_send (aio_req req) 599static void req_send (aio_req req)
610 Newz (0, req, 1, aio_cb); 616 Newz (0, req, 1, aio_cb);
611 617
612 req->type = REQ_QUIT; 618 req->type = REQ_QUIT;
613 req->pri = PRI_MAX + PRI_BIAS; 619 req->pri = PRI_MAX + PRI_BIAS;
614 620
615 req_send (req); 621 LOCK (reqlock);
622 reqq_push (&req_queue, req);
623 pthread_cond_signal (&reqwait);
624 UNLOCK (reqlock);
616 625
617 LOCK (wrklock); 626 LOCK (wrklock);
618 --started; 627 --started;
619 UNLOCK (wrklock); 628 UNLOCK (wrklock);
620} 629}
621 630
631static void set_max_idle (int nthreads)
632{
633 if (WORDACCESS_UNSAFE) LOCK (reqlock);
634 max_idle = nthreads <= 0 ? 1 : nthreads;
635 if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
636}
637
622static void min_parallel (int nthreads) 638static void min_parallel (int nthreads)
623{ 639{
624 if (wanted < nthreads) 640 if (wanted < nthreads)
625 wanted = nthreads; 641 wanted = nthreads;
626} 642}
639 fd_set rfd; 655 fd_set rfd;
640 656
641 while (nreqs) 657 while (nreqs)
642 { 658 {
643 int size; 659 int size;
644 if (WORDREAD_UNSAFE) LOCK (reslock); 660 if (WORDACCESS_UNSAFE) LOCK (reslock);
645 size = res_queue.size; 661 size = res_queue.size;
646 if (WORDREAD_UNSAFE) UNLOCK (reslock); 662 if (WORDACCESS_UNSAFE) UNLOCK (reslock);
647 663
648 if (size) 664 if (size)
649 return; 665 return;
650 666
651 maybe_start_thread (); 667 maybe_start_thread ();
655 671
656 select (respipe [0] + 1, &rfd, 0, 0, 0); 672 select (respipe [0] + 1, &rfd, 0, 0, 0);
657 } 673 }
658} 674}
659 675
660static int poll_cb (int max) 676static int poll_cb ()
661{ 677{
662 dSP; 678 dSP;
663 int count = 0; 679 int count = 0;
680 int maxreqs = max_poll_reqs;
664 int do_croak = 0; 681 int do_croak = 0;
682 struct timeval tv_start, tv_now;
665 aio_req req; 683 aio_req req;
666 684
685 if (max_poll_time)
686 gettimeofday (&tv_start, 0);
687
667 for (;;) 688 for (;;)
668 { 689 {
669 while (max <= 0 || count < max) 690 for (;;)
670 { 691 {
671 maybe_start_thread (); 692 maybe_start_thread ();
672 693
673 LOCK (reslock); 694 LOCK (reslock);
674 req = reqq_shift (&res_queue); 695 req = reqq_shift (&res_queue);
717 738
718 count++; 739 count++;
719 } 740 }
720 741
721 req_free (req); 742 req_free (req);
743
744 if (maxreqs && !--maxreqs)
745 break;
746
747 if (max_poll_time)
748 {
749 gettimeofday (&tv_now, 0);
750
751 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
752 break;
753 }
722 } 754 }
723 755
724 if (nreqs <= max_outstanding) 756 if (nreqs <= max_outstanding)
725 break; 757 break;
726 758
727 poll_wait (); 759 poll_wait ();
728 760
729 max = 0; 761 ++maxreqs;
730 } 762 }
731 763
732 return count; 764 return count;
733} 765}
734 766
1001/*****************************************************************************/ 1033/*****************************************************************************/
1002 1034
1003static void *aio_proc (void *thr_arg) 1035static void *aio_proc (void *thr_arg)
1004{ 1036{
1005 aio_req req; 1037 aio_req req;
1038 struct timespec ts;
1006 worker *self = (worker *)thr_arg; 1039 worker *self = (worker *)thr_arg;
1007 1040
1041 /* try to distribute timeouts somewhat evenly */
1042 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
1043 * (1000000000UL / 1024UL);
1044
1008 for (;;) 1045 for (;;)
1009 { 1046 {
1047 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1048
1010 LOCK (reqlock); 1049 LOCK (reqlock);
1011 1050
1012 for (;;) 1051 for (;;)
1013 { 1052 {
1014 self->req = req = reqq_shift (&req_queue); 1053 self->req = req = reqq_shift (&req_queue);
1015 1054
1016 if (req) 1055 if (req)
1017 break; 1056 break;
1018 1057
1058 ++idle;
1059
1060 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
1061 == ETIMEDOUT)
1062 {
1063 if (idle > max_idle)
1064 {
1065 --idle;
1066 UNLOCK (reqlock);
1067 LOCK (wrklock);
1068 --started;
1069 UNLOCK (wrklock);
1070 goto quit;
1071 }
1072
1073 /* we are allowed to idle, so do so without any timeout */
1019 pthread_cond_wait (&reqwait, &reqlock); 1074 pthread_cond_wait (&reqwait, &reqlock);
1075 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1076 }
1077
1078 --idle;
1020 } 1079 }
1021 1080
1022 --nready; 1081 --nready;
1023 1082
1024 UNLOCK (reqlock); 1083 UNLOCK (reqlock);
1043 case REQ_UNLINK: req->result = unlink (req->dataptr); break; 1102 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
1044 case REQ_RMDIR: req->result = rmdir (req->dataptr); break; 1103 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
1045 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break; 1104 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
1046 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break; 1105 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
1047 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break; 1106 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
1107 case REQ_MKNOD: req->result = mknod (req->data2ptr, req->mode, (dev_t)req->offset); break;
1048 1108
1049 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; 1109 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
1050 case REQ_FSYNC: req->result = fsync (req->fd); break; 1110 case REQ_FSYNC: req->result = fsync (req->fd); break;
1051 case REQ_READDIR: scandir_ (req, self); break; 1111 case REQ_READDIR: scandir_ (req, self); break;
1052 1112
1063 case REQ_GROUP: 1123 case REQ_GROUP:
1064 case REQ_NOP: 1124 case REQ_NOP:
1065 break; 1125 break;
1066 1126
1067 case REQ_QUIT: 1127 case REQ_QUIT:
1068 LOCK (wrklock); 1128 goto quit;
1069 worker_free (self);
1070 --started;
1071 UNLOCK (wrklock);
1072 return 0;
1073 1129
1074 default: 1130 default:
1075 req->result = ENOSYS; 1131 req->result = ENOSYS;
1076 break; 1132 break;
1077 } 1133 }
1089 self->req = 0; 1145 self->req = 0;
1090 worker_clear (self); 1146 worker_clear (self);
1091 1147
1092 UNLOCK (reslock); 1148 UNLOCK (reslock);
1093 } 1149 }
1150
1151quit:
1152 LOCK (wrklock);
1153 worker_free (self);
1154 UNLOCK (wrklock);
1155
1156 return 0;
1094} 1157}
1095 1158
1096/*****************************************************************************/ 1159/*****************************************************************************/
1097 1160
1098static void atfork_prepare (void) 1161static void atfork_prepare (void)
1140 1203
1141 worker_clear (wrk); 1204 worker_clear (wrk);
1142 worker_free (wrk); 1205 worker_free (wrk);
1143 } 1206 }
1144 1207
1145 started = 0; 1208 started = 0;
1209 idle = 0;
1146 nreqs = 0; 1210 nreqs = 0;
1211 nready = 0;
1212 npending = 0;
1147 1213
1148 close (respipe [0]); 1214 close (respipe [0]);
1149 close (respipe [1]); 1215 close (respipe [1]);
1150 create_pipe (); 1216 create_pipe ();
1151 1217
1178PROTOTYPES: ENABLE 1244PROTOTYPES: ENABLE
1179 1245
1180BOOT: 1246BOOT:
1181{ 1247{
1182 HV *stash = gv_stashpv ("IO::AIO", 1); 1248 HV *stash = gv_stashpv ("IO::AIO", 1);
1249
1183 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1250 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1184 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1251 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1185 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1252 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1253 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1254 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1255 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1186 1256
1187 create_pipe (); 1257 create_pipe ();
1188 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1258 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1189
1190 start_thread ();
1191} 1259}
1260
1261void
1262max_poll_reqs (int nreqs)
1263 PROTOTYPE: $
1264 CODE:
1265 max_poll_reqs = nreqs;
1266
1267void
1268max_poll_time (double nseconds)
1269 PROTOTYPE: $
1270 CODE:
1271 max_poll_time = nseconds * AIO_TICKS;
1192 1272
1193void 1273void
1194min_parallel (int nthreads) 1274min_parallel (int nthreads)
1195 PROTOTYPE: $ 1275 PROTOTYPE: $
1196 1276
1197void 1277void
1198max_parallel (int nthreads) 1278max_parallel (int nthreads)
1199 PROTOTYPE: $ 1279 PROTOTYPE: $
1280
1281void
1282max_idle (int nthreads)
1283 PROTOTYPE: $
1284 CODE:
1285 set_max_idle (nthreads);
1200 1286
1201int 1287int
1202max_outstanding (int maxreqs) 1288max_outstanding (int maxreqs)
1203 PROTOTYPE: $ 1289 PROTOTYPE: $
1204 CODE: 1290 CODE:
1423 req->type = ix; 1509 req->type = ix;
1424 req->fh = newSVsv (oldpath); 1510 req->fh = newSVsv (oldpath);
1425 req->data2ptr = SvPVbyte_nolen (req->fh); 1511 req->data2ptr = SvPVbyte_nolen (req->fh);
1426 req->data = newSVsv (newpath); 1512 req->data = newSVsv (newpath);
1427 req->dataptr = SvPVbyte_nolen (req->data); 1513 req->dataptr = SvPVbyte_nolen (req->data);
1514
1515 REQ_SEND;
1516}
1517
1518void
1519aio_mknod (pathname,mode,dev,callback=&PL_sv_undef)
1520 SV * pathname
1521 SV * callback
1522 UV mode
1523 UV dev
1524 PPCODE:
1525{
1526 dREQ;
1527
1528 req->type = REQ_MKNOD;
1529 req->data = newSVsv (pathname);
1530 req->dataptr = SvPVbyte_nolen (req->data);
1531 req->mode = (mode_t)mode;
1532 req->offset = dev;
1428 1533
1429 REQ_SEND; 1534 REQ_SEND;
1430} 1535}
1431 1536
1432void 1537void
1522 1627
1523int 1628int
1524poll_cb(...) 1629poll_cb(...)
1525 PROTOTYPE: 1630 PROTOTYPE:
1526 CODE: 1631 CODE:
1527 RETVAL = poll_cb (0); 1632 RETVAL = poll_cb ();
1528 OUTPUT:
1529 RETVAL
1530
1531int
1532poll_some(int max = 0)
1533 PROTOTYPE: $
1534 CODE:
1535 RETVAL = poll_cb (max);
1536 OUTPUT: 1633 OUTPUT:
1537 RETVAL 1634 RETVAL
1538 1635
1539void 1636void
1540poll_wait() 1637poll_wait()
1565 CODE: 1662 CODE:
1566 RETVAL = get_npending (); 1663 RETVAL = get_npending ();
1567 OUTPUT: 1664 OUTPUT:
1568 RETVAL 1665 RETVAL
1569 1666
1667int
1668nthreads()
1669 PROTOTYPE:
1670 CODE:
1671 if (WORDACCESS_UNSAFE) LOCK (wrklock);
1672 RETVAL = started;
1673 if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
1674 OUTPUT:
1675 RETVAL
1676
1570PROTOTYPES: DISABLE 1677PROTOTYPES: DISABLE
1571 1678
1572MODULE = IO::AIO PACKAGE = IO::AIO::REQ 1679MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1573 1680
1574void 1681void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines