ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.84 by root, Sat Oct 28 01:24:19 2006 UTC vs.
Revision 1.85 by root, Sat Oct 28 23:32:29 2006 UTC

68 68
69/* wether word reads are potentially non-atomic. 69/* wether word reads are potentially non-atomic.
70 * this is conservatice, likely most arches this runs 70 * this is conservatice, likely most arches this runs
71 * on have atomic word read/writes. 71 * on have atomic word read/writes.
72 */ 72 */
73#ifndef WORDREAD_UNSAFE 73#ifndef WORDACCESS_UNSAFE
74# if __i386 || __x86_64 74# if __i386 || __x86_64
75# define WORDREAD_UNSAFE 0 75# define WORDACCESS_UNSAFE 0
76# else 76# else
77# define WORDREAD_UNSAFE 1 77# define WORDACCESS_UNSAFE 1
78# endif 78# endif
79#endif 79#endif
80 80
81/* buffer size for various temporary buffers */ 81/* buffer size for various temporary buffers */
82#define AIO_BUFSIZE 65536 82#define AIO_BUFSIZE 65536
145 DEFAULT_PRI = 0, 145 DEFAULT_PRI = 0,
146 PRI_BIAS = -PRI_MIN, 146 PRI_BIAS = -PRI_MIN,
147 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
148}; 148};
149 149
150#define AIO_TICKS ((1000000 + 1023) >> 10)
151
152static unsigned int max_poll_time = 0;
153static unsigned int max_poll_reqs = 0;
154
155/* calculcate time difference in ~1/AIO_TICKS of a second */
156static int tvdiff (struct timeval *tv1, struct timeval *tv2)
157{
158 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
159 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
160}
161
150static int next_pri = DEFAULT_PRI + PRI_BIAS; 162static int next_pri = DEFAULT_PRI + PRI_BIAS;
151 163
152static unsigned int started, idle, wanted; 164static unsigned int started, idle, wanted;
153 165
154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 166#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
207 219
208static pthread_mutex_t reslock = AIO_MUTEX_INIT; 220static pthread_mutex_t reslock = AIO_MUTEX_INIT;
209static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 221static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
210static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 222static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
211 223
212#if WORDREAD_UNSAFE 224#if WORDACCESS_UNSAFE
213 225
214static unsigned int get_nready () 226static unsigned int get_nready ()
215{ 227{
216 unsigned int retval; 228 unsigned int retval;
217 229
231 UNLOCK (reslock); 243 UNLOCK (reslock);
232 244
233 return retval; 245 return retval;
234} 246}
235 247
248static unsigned int get_nthreads ()
249{
250 unsigned int retval;
251
252 LOCK (wrklock);
253 retval = started;
254 UNLOCK (wrklock);
255
256 return retval;
257}
258
236#else 259#else
237 260
238# define get_nready() nready 261# define get_nready() nready
239# define get_npending() npending 262# define get_npending() npending
263# define get_nthreads() started
240 264
241#endif 265#endif
242 266
243/* 267/*
244 * a somewhat faster data structure might be nice, but 268 * a somewhat faster data structure might be nice, but
292 } 316 }
293 317
294 abort (); 318 abort ();
295} 319}
296 320
297static int poll_cb (int max); 321static int poll_cb ();
298static void req_invoke (aio_req req); 322static void req_invoke (aio_req req);
299static void req_free (aio_req req); 323static void req_free (aio_req req);
300static void req_cancel (aio_req req); 324static void req_cancel (aio_req req);
301 325
302/* must be called at most once */ 326/* must be called at most once */
560 UNLOCK (wrklock); 584 UNLOCK (wrklock);
561} 585}
562 586
563static void maybe_start_thread () 587static void maybe_start_thread ()
564{ 588{
565 if (started >= wanted) 589 if (get_nthreads () >= wanted)
566 return; 590 return;
567 591
568 /* todo: maybe use idle here, but might be less exact */ 592 /* todo: maybe use idle here, but might be less exact */
569 if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ())) 593 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
570 return; 594 return;
571 595
572 start_thread (); 596 start_thread ();
573} 597}
574 598
602 LOCK (wrklock); 626 LOCK (wrklock);
603 --started; 627 --started;
604 UNLOCK (wrklock); 628 UNLOCK (wrklock);
605} 629}
606 630
631static void set_max_idle (int nthreads)
632{
633 if (WORDACCESS_UNSAFE) LOCK (reqlock);
634 max_idle = nthreads <= 0 ? 1 : nthreads;
635 if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
636}
637
607static void min_parallel (int nthreads) 638static void min_parallel (int nthreads)
608{ 639{
609 if (wanted < nthreads) 640 if (wanted < nthreads)
610 wanted = nthreads; 641 wanted = nthreads;
611} 642}
624 fd_set rfd; 655 fd_set rfd;
625 656
626 while (nreqs) 657 while (nreqs)
627 { 658 {
628 int size; 659 int size;
629 if (WORDREAD_UNSAFE) LOCK (reslock); 660 if (WORDACCESS_UNSAFE) LOCK (reslock);
630 size = res_queue.size; 661 size = res_queue.size;
631 if (WORDREAD_UNSAFE) UNLOCK (reslock); 662 if (WORDACCESS_UNSAFE) UNLOCK (reslock);
632 663
633 if (size) 664 if (size)
634 return; 665 return;
635 666
636 maybe_start_thread (); 667 maybe_start_thread ();
640 671
641 select (respipe [0] + 1, &rfd, 0, 0, 0); 672 select (respipe [0] + 1, &rfd, 0, 0, 0);
642 } 673 }
643} 674}
644 675
645static int poll_cb (int max) 676static int poll_cb ()
646{ 677{
647 dSP; 678 dSP;
648 int count = 0; 679 int count = 0;
680 int maxreqs = max_poll_reqs;
649 int do_croak = 0; 681 int do_croak = 0;
682 struct timeval tv_start, tv_now;
650 aio_req req; 683 aio_req req;
651 684
685 if (max_poll_time)
686 gettimeofday (&tv_start, 0);
687
652 for (;;) 688 for (;;)
653 { 689 {
654 while (max <= 0 || count < max) 690 for (;;)
655 { 691 {
656 maybe_start_thread (); 692 maybe_start_thread ();
657 693
658 LOCK (reslock); 694 LOCK (reslock);
659 req = reqq_shift (&res_queue); 695 req = reqq_shift (&res_queue);
702 738
703 count++; 739 count++;
704 } 740 }
705 741
706 req_free (req); 742 req_free (req);
743
744 if (maxreqs && !--maxreqs)
745 break;
746
747 if (max_poll_time)
748 {
749 gettimeofday (&tv_now, 0);
750
751 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
752 break;
753 }
707 } 754 }
708 755
709 if (nreqs <= max_outstanding) 756 if (nreqs <= max_outstanding)
710 break; 757 break;
711 758
712 poll_wait (); 759 poll_wait ();
713 760
714 max = 0; 761 ++maxreqs;
715 } 762 }
716 763
717 return count; 764 return count;
718} 765}
719 766
1210 create_pipe (); 1257 create_pipe ();
1211 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1258 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1212} 1259}
1213 1260
1214void 1261void
1262max_poll_reqs (int nreqs)
1263 PROTOTYPE: $
1264 CODE:
1265 max_poll_reqs = nreqs;
1266
1267void
1268max_poll_time (double nseconds)
1269 PROTOTYPE: $
1270 CODE:
1271 max_poll_time = nseconds * AIO_TICKS;
1272
1273void
1215min_parallel (int nthreads) 1274min_parallel (int nthreads)
1216 PROTOTYPE: $ 1275 PROTOTYPE: $
1217 1276
1218void 1277void
1219max_parallel (int nthreads) 1278max_parallel (int nthreads)
1220 PROTOTYPE: $ 1279 PROTOTYPE: $
1280
1281void
1282max_idle (int nthreads)
1283 PROTOTYPE: $
1284 CODE:
1285 set_max_idle (nthreads);
1221 1286
1222int 1287int
1223max_outstanding (int maxreqs) 1288max_outstanding (int maxreqs)
1224 PROTOTYPE: $ 1289 PROTOTYPE: $
1225 CODE: 1290 CODE:
1562 1627
1563int 1628int
1564poll_cb(...) 1629poll_cb(...)
1565 PROTOTYPE: 1630 PROTOTYPE:
1566 CODE: 1631 CODE:
1567 RETVAL = poll_cb (0); 1632 RETVAL = poll_cb ();
1568 OUTPUT:
1569 RETVAL
1570
1571int
1572poll_some(int max = 0)
1573 PROTOTYPE: $
1574 CODE:
1575 RETVAL = poll_cb (max);
1576 OUTPUT: 1633 OUTPUT:
1577 RETVAL 1634 RETVAL
1578 1635
1579void 1636void
1580poll_wait() 1637poll_wait()
1605 CODE: 1662 CODE:
1606 RETVAL = get_npending (); 1663 RETVAL = get_npending ();
1607 OUTPUT: 1664 OUTPUT:
1608 RETVAL 1665 RETVAL
1609 1666
1667int
1668nthreads()
1669 PROTOTYPE:
1670 CODE:
1671 if (WORDACCESS_UNSAFE) LOCK (wrklock);
1672 RETVAL = started;
1673 if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
1674 OUTPUT:
1675 RETVAL
1676
1610PROTOTYPES: DISABLE 1677PROTOTYPES: DISABLE
1611 1678
1612MODULE = IO::AIO PACKAGE = IO::AIO::REQ 1679MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1613 1680
1614void 1681void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines