ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.79 by root, Thu Oct 26 16:28:33 2006 UTC vs.
Revision 1.85 by root, Sat Oct 28 23:32:29 2006 UTC

43# else 43# else
44# error sendfile support requested but not available 44# error sendfile support requested but not available
45# endif 45# endif
46#endif 46#endif
47 47
48/* number of seconds after which idle threads exit */
49#define IDLE_TIMEOUT 10
50
48/* used for struct dirent, AIX doesn't provide it */ 51/* used for struct dirent, AIX doesn't provide it */
49#ifndef NAME_MAX 52#ifndef NAME_MAX
50# define NAME_MAX 4096 53# define NAME_MAX 4096
51#endif 54#endif
52 55
65 68
66/* wether word reads are potentially non-atomic. 69/* wether word reads are potentially non-atomic.
67 * this is conservatice, likely most arches this runs 70 * this is conservatice, likely most arches this runs
68 * on have atomic word read/writes. 71 * on have atomic word read/writes.
69 */ 72 */
70#ifndef WORDREAD_UNSAFE 73#ifndef WORDACCESS_UNSAFE
71# if __i386 || __x86_64 74# if __i386 || __x86_64
72# define WORDREAD_UNSAFE 0 75# define WORDACCESS_UNSAFE 0
73# else 76# else
74# define WORDREAD_UNSAFE 1 77# define WORDACCESS_UNSAFE 1
75# endif 78# endif
76#endif 79#endif
77 80
78/* buffer size for various temporary buffers */ 81/* buffer size for various temporary buffers */
79#define AIO_BUFSIZE 65536 82#define AIO_BUFSIZE 65536
92 REQ_READ, REQ_WRITE, REQ_READAHEAD, 95 REQ_READ, REQ_WRITE, REQ_READAHEAD,
93 REQ_SENDFILE, 96 REQ_SENDFILE,
94 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 97 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
95 REQ_FSYNC, REQ_FDATASYNC, 98 REQ_FSYNC, REQ_FDATASYNC,
96 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 99 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
97 REQ_READDIR, 100 REQ_MKNOD, REQ_READDIR,
98 REQ_LINK, REQ_SYMLINK, 101 REQ_LINK, REQ_SYMLINK,
99 REQ_GROUP, REQ_NOP, 102 REQ_GROUP, REQ_NOP,
100 REQ_BUSY, 103 REQ_BUSY,
101}; 104};
102 105
142 DEFAULT_PRI = 0, 145 DEFAULT_PRI = 0,
143 PRI_BIAS = -PRI_MIN, 146 PRI_BIAS = -PRI_MIN,
144 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
145}; 148};
146 149
150#define AIO_TICKS ((1000000 + 1023) >> 10)
151
152static unsigned int max_poll_time = 0;
153static unsigned int max_poll_reqs = 0;
154
155/* calculcate time difference in ~1/AIO_TICKS of a second */
156static int tvdiff (struct timeval *tv1, struct timeval *tv2)
157{
158 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
159 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
160}
161
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 162static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 163
149static unsigned int started, wanted; 164static unsigned int started, idle, wanted;
150static volatile unsigned int nreqs, nready, npending;
151static volatile unsigned int max_outstanding = 0xffffffff;
152static int respipe [2];
153 165
154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 166#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
155# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 167# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
156#else 168#else
157# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 169# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
198 wrk->prev->next = wrk->next; 210 wrk->prev->next = wrk->next;
199 211
200 free (wrk); 212 free (wrk);
201} 213}
202 214
215static volatile unsigned int nreqs, nready, npending;
216static volatile unsigned int max_idle = 4;
217static volatile unsigned int max_outstanding = 0xffffffff;
218static int respipe [2];
219
203static pthread_mutex_t reslock = AIO_MUTEX_INIT; 220static pthread_mutex_t reslock = AIO_MUTEX_INIT;
204static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 221static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
205static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 222static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
223
224#if WORDACCESS_UNSAFE
225
226static unsigned int get_nready ()
227{
228 unsigned int retval;
229
230 LOCK (reqlock);
231 retval = nready;
232 UNLOCK (reqlock);
233
234 return retval;
235}
236
237static unsigned int get_npending ()
238{
239 unsigned int retval;
240
241 LOCK (reslock);
242 retval = npending;
243 UNLOCK (reslock);
244
245 return retval;
246}
247
248static unsigned int get_nthreads ()
249{
250 unsigned int retval;
251
252 LOCK (wrklock);
253 retval = started;
254 UNLOCK (wrklock);
255
256 return retval;
257}
258
259#else
260
261# define get_nready() nready
262# define get_npending() npending
263# define get_nthreads() started
264
265#endif
206 266
207/* 267/*
208 * a somewhat faster data structure might be nice, but 268 * a somewhat faster data structure might be nice, but
209 * with 8 priorities this actually needs <20 insns 269 * with 8 priorities this actually needs <20 insns
210 * per shift, the most expensive operation. 270 * per shift, the most expensive operation.
256 } 316 }
257 317
258 abort (); 318 abort ();
259} 319}
260 320
261static int poll_cb (int max); 321static int poll_cb ();
262static void req_invoke (aio_req req); 322static void req_invoke (aio_req req);
263static void req_free (aio_req req); 323static void req_free (aio_req req);
264static void req_cancel (aio_req req); 324static void req_cancel (aio_req req);
265 325
266/* must be called at most once */ 326/* must be called at most once */
331 req_invoke (grp); 391 req_invoke (grp);
332 req_free (grp); 392 req_free (grp);
333 } 393 }
334} 394}
335 395
336static void poll_wait ()
337{
338 fd_set rfd;
339
340 while (nreqs)
341 {
342 int size;
343 if (WORDREAD_UNSAFE) LOCK (reslock);
344 size = res_queue.size;
345 if (WORDREAD_UNSAFE) UNLOCK (reslock);
346
347 if (size)
348 return;
349
350 FD_ZERO(&rfd);
351 FD_SET(respipe [0], &rfd);
352
353 select (respipe [0] + 1, &rfd, 0, 0, 0);
354 }
355}
356
357static void req_invoke (aio_req req) 396static void req_invoke (aio_req req)
358{ 397{
359 dSP; 398 dSP;
360 399
361 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback)) 400 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback))
504 req->flags |= FLAG_CANCELLED; 543 req->flags |= FLAG_CANCELLED;
505 544
506 req_cancel_subs (req); 545 req_cancel_subs (req);
507} 546}
508 547
548static void *aio_proc(void *arg);
549
550static void start_thread (void)
551{
552 sigset_t fullsigset, oldsigset;
553 pthread_attr_t attr;
554
555 worker *wrk = calloc (1, sizeof (worker));
556
557 if (!wrk)
558 croak ("unable to allocate worker thread data");
559
560 pthread_attr_init (&attr);
561 pthread_attr_setstacksize (&attr, STACKSIZE);
562 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
563#ifdef PTHREAD_SCOPE_PROCESS
564 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
565#endif
566
567 sigfillset (&fullsigset);
568
569 LOCK (wrklock);
570 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
571
572 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
573 {
574 wrk->prev = &wrk_first;
575 wrk->next = wrk_first.next;
576 wrk_first.next->prev = wrk;
577 wrk_first.next = wrk;
578 ++started;
579 }
580 else
581 free (wrk);
582
583 sigprocmask (SIG_SETMASK, &oldsigset, 0);
584 UNLOCK (wrklock);
585}
586
587static void maybe_start_thread ()
588{
589 if (get_nthreads () >= wanted)
590 return;
591
592 /* todo: maybe use idle here, but might be less exact */
593 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
594 return;
595
596 start_thread ();
597}
598
599static void req_send (aio_req req)
600{
601 ++nreqs;
602
603 LOCK (reqlock);
604 ++nready;
605 reqq_push (&req_queue, req);
606 pthread_cond_signal (&reqwait);
607 UNLOCK (reqlock);
608
609 maybe_start_thread ();
610}
611
612static void end_thread (void)
613{
614 aio_req req;
615
616 Newz (0, req, 1, aio_cb);
617
618 req->type = REQ_QUIT;
619 req->pri = PRI_MAX + PRI_BIAS;
620
621 LOCK (reqlock);
622 reqq_push (&req_queue, req);
623 pthread_cond_signal (&reqwait);
624 UNLOCK (reqlock);
625
626 LOCK (wrklock);
627 --started;
628 UNLOCK (wrklock);
629}
630
631static void set_max_idle (int nthreads)
632{
633 if (WORDACCESS_UNSAFE) LOCK (reqlock);
634 max_idle = nthreads <= 0 ? 1 : nthreads;
635 if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
636}
637
638static void min_parallel (int nthreads)
639{
640 if (wanted < nthreads)
641 wanted = nthreads;
642}
643
644static void max_parallel (int nthreads)
645{
646 if (wanted > nthreads)
647 wanted = nthreads;
648
649 while (started > wanted)
650 end_thread ();
651}
652
653static void poll_wait ()
654{
655 fd_set rfd;
656
657 while (nreqs)
658 {
659 int size;
660 if (WORDACCESS_UNSAFE) LOCK (reslock);
661 size = res_queue.size;
662 if (WORDACCESS_UNSAFE) UNLOCK (reslock);
663
664 if (size)
665 return;
666
667 maybe_start_thread ();
668
669 FD_ZERO(&rfd);
670 FD_SET(respipe [0], &rfd);
671
672 select (respipe [0] + 1, &rfd, 0, 0, 0);
673 }
674}
675
509static int poll_cb (int max) 676static int poll_cb ()
510{ 677{
511 dSP; 678 dSP;
512 int count = 0; 679 int count = 0;
680 int maxreqs = max_poll_reqs;
513 int do_croak = 0; 681 int do_croak = 0;
682 struct timeval tv_start, tv_now;
514 aio_req req; 683 aio_req req;
515 684
685 if (max_poll_time)
686 gettimeofday (&tv_start, 0);
687
516 for (;;) 688 for (;;)
517 { 689 {
518 while (max <= 0 || count < max) 690 for (;;)
519 { 691 {
692 maybe_start_thread ();
693
520 LOCK (reslock); 694 LOCK (reslock);
521 req = reqq_shift (&res_queue); 695 req = reqq_shift (&res_queue);
522 696
523 if (req) 697 if (req)
524 { 698 {
538 if (!req) 712 if (!req)
539 break; 713 break;
540 714
541 --nreqs; 715 --nreqs;
542 716
543 if (req->type == REQ_QUIT)
544 --started;
545 else if (req->type == REQ_GROUP && req->length) 717 if (req->type == REQ_GROUP && req->length)
546 { 718 {
547 req->fd = 1; /* mark request as delayed */ 719 req->fd = 1; /* mark request as delayed */
548 continue; 720 continue;
549 } 721 }
550 else 722 else
566 738
567 count++; 739 count++;
568 } 740 }
569 741
570 req_free (req); 742 req_free (req);
743
744 if (maxreqs && !--maxreqs)
745 break;
746
747 if (max_poll_time)
748 {
749 gettimeofday (&tv_now, 0);
750
751 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
752 break;
753 }
571 } 754 }
572 755
573 if (nreqs <= max_outstanding) 756 if (nreqs <= max_outstanding)
574 break; 757 break;
575 758
576 poll_wait (); 759 poll_wait ();
577 760
578 max = 0; 761 ++maxreqs;
579 } 762 }
580 763
581 return count; 764 return count;
582}
583
584static void *aio_proc(void *arg);
585
586static void start_thread (void)
587{
588 sigset_t fullsigset, oldsigset;
589 pthread_attr_t attr;
590
591 worker *wrk = calloc (1, sizeof (worker));
592
593 if (!wrk)
594 croak ("unable to allocate worker thread data");
595
596 pthread_attr_init (&attr);
597 pthread_attr_setstacksize (&attr, STACKSIZE);
598 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
599
600 sigfillset (&fullsigset);
601
602 LOCK (wrklock);
603 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
604
605 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
606 {
607 wrk->prev = &wrk_first;
608 wrk->next = wrk_first.next;
609 wrk_first.next->prev = wrk;
610 wrk_first.next = wrk;
611 ++started;
612 }
613 else
614 free (wrk);
615
616 sigprocmask (SIG_SETMASK, &oldsigset, 0);
617 UNLOCK (wrklock);
618}
619
620static void req_send (aio_req req)
621{
622 while (started < wanted && nreqs >= started)
623 start_thread ();
624
625 ++nreqs;
626
627 LOCK (reqlock);
628 ++nready;
629 reqq_push (&req_queue, req);
630 pthread_cond_signal (&reqwait);
631 UNLOCK (reqlock);
632}
633
634static void end_thread (void)
635{
636 aio_req req;
637
638 Newz (0, req, 1, aio_cb);
639
640 req->type = REQ_QUIT;
641 req->pri = PRI_MAX + PRI_BIAS;
642
643 req_send (req);
644}
645
646static void min_parallel (int nthreads)
647{
648 if (wanted < nthreads)
649 wanted = nthreads;
650}
651
652static void max_parallel (int nthreads)
653{
654 int cur = started;
655
656 if (wanted > nthreads)
657 wanted = nthreads;
658
659 while (cur > wanted)
660 {
661 end_thread ();
662 cur--;
663 }
664
665 while (started > wanted)
666 {
667 poll_wait ();
668 poll_cb (0);
669 }
670} 765}
671 766
672static void create_pipe () 767static void create_pipe ()
673{ 768{
674 if (pipe (respipe)) 769 if (pipe (respipe))
938/*****************************************************************************/ 1033/*****************************************************************************/
939 1034
940static void *aio_proc (void *thr_arg) 1035static void *aio_proc (void *thr_arg)
941{ 1036{
942 aio_req req; 1037 aio_req req;
943 int type; 1038 struct timespec ts;
944 worker *self = (worker *)thr_arg; 1039 worker *self = (worker *)thr_arg;
945 1040
946 do 1041 /* try to distribute timeouts somewhat evenly */
1042 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
1043 * (1000000000UL / 1024UL);
1044
1045 for (;;)
947 { 1046 {
1047 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1048
948 LOCK (reqlock); 1049 LOCK (reqlock);
949 1050
950 for (;;) 1051 for (;;)
951 { 1052 {
952 self->req = req = reqq_shift (&req_queue); 1053 self->req = req = reqq_shift (&req_queue);
953 1054
954 if (req) 1055 if (req)
955 break; 1056 break;
956 1057
1058 ++idle;
1059
1060 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
1061 == ETIMEDOUT)
1062 {
1063 if (idle > max_idle)
1064 {
1065 --idle;
1066 UNLOCK (reqlock);
1067 LOCK (wrklock);
1068 --started;
1069 UNLOCK (wrklock);
1070 goto quit;
1071 }
1072
1073 /* we are allowed to idle, so do so without any timeout */
957 pthread_cond_wait (&reqwait, &reqlock); 1074 pthread_cond_wait (&reqwait, &reqlock);
1075 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1076 }
1077
1078 --idle;
958 } 1079 }
959 1080
960 --nready; 1081 --nready;
961 1082
962 UNLOCK (reqlock); 1083 UNLOCK (reqlock);
963 1084
964 errno = 0; /* strictly unnecessary */ 1085 errno = 0; /* strictly unnecessary */
965 type = req->type; /* remember type for QUIT check */
966 1086
967 if (!(req->flags & FLAG_CANCELLED)) 1087 if (!(req->flags & FLAG_CANCELLED))
968 switch (type) 1088 switch (req->type)
969 { 1089 {
970 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 1090 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
971 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 1091 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
972 1092
973 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 1093 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
982 case REQ_UNLINK: req->result = unlink (req->dataptr); break; 1102 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
983 case REQ_RMDIR: req->result = rmdir (req->dataptr); break; 1103 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
984 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break; 1104 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
985 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break; 1105 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
986 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break; 1106 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
1107 case REQ_MKNOD: req->result = mknod (req->data2ptr, req->mode, (dev_t)req->offset); break;
987 1108
988 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; 1109 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
989 case REQ_FSYNC: req->result = fsync (req->fd); break; 1110 case REQ_FSYNC: req->result = fsync (req->fd); break;
990 case REQ_READDIR: scandir_ (req, self); break; 1111 case REQ_READDIR: scandir_ (req, self); break;
991 1112
999 req->result = select (0, 0, 0, 0, &tv); 1120 req->result = select (0, 0, 0, 0, &tv);
1000 } 1121 }
1001 1122
1002 case REQ_GROUP: 1123 case REQ_GROUP:
1003 case REQ_NOP: 1124 case REQ_NOP:
1125 break;
1126
1004 case REQ_QUIT: 1127 case REQ_QUIT:
1005 break; 1128 goto quit;
1006 1129
1007 default: 1130 default:
1008 req->result = ENOSYS; 1131 req->result = ENOSYS;
1009 break; 1132 break;
1010 } 1133 }
1022 self->req = 0; 1145 self->req = 0;
1023 worker_clear (self); 1146 worker_clear (self);
1024 1147
1025 UNLOCK (reslock); 1148 UNLOCK (reslock);
1026 } 1149 }
1027 while (type != REQ_QUIT);
1028 1150
1151quit:
1029 LOCK (wrklock); 1152 LOCK (wrklock);
1030 worker_free (self); 1153 worker_free (self);
1031 UNLOCK (wrklock); 1154 UNLOCK (wrklock);
1032 1155
1033 return 0; 1156 return 0;
1080 1203
1081 worker_clear (wrk); 1204 worker_clear (wrk);
1082 worker_free (wrk); 1205 worker_free (wrk);
1083 } 1206 }
1084 1207
1085 started = 0; 1208 started = 0;
1209 idle = 0;
1086 nreqs = 0; 1210 nreqs = 0;
1211 nready = 0;
1212 npending = 0;
1087 1213
1088 close (respipe [0]); 1214 close (respipe [0]);
1089 close (respipe [1]); 1215 close (respipe [1]);
1090 create_pipe (); 1216 create_pipe ();
1091 1217
1118PROTOTYPES: ENABLE 1244PROTOTYPES: ENABLE
1119 1245
1120BOOT: 1246BOOT:
1121{ 1247{
1122 HV *stash = gv_stashpv ("IO::AIO", 1); 1248 HV *stash = gv_stashpv ("IO::AIO", 1);
1249
1123 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1250 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1124 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1251 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1125 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1252 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1253 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1254 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1255 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1126 1256
1127 create_pipe (); 1257 create_pipe ();
1128 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1258 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1129} 1259}
1130 1260
1131void 1261void
1262max_poll_reqs (int nreqs)
1263 PROTOTYPE: $
1264 CODE:
1265 max_poll_reqs = nreqs;
1266
1267void
1268max_poll_time (double nseconds)
1269 PROTOTYPE: $
1270 CODE:
1271 max_poll_time = nseconds * AIO_TICKS;
1272
1273void
1132min_parallel (int nthreads) 1274min_parallel (int nthreads)
1133 PROTOTYPE: $ 1275 PROTOTYPE: $
1134 1276
1135void 1277void
1136max_parallel (int nthreads) 1278max_parallel (int nthreads)
1137 PROTOTYPE: $ 1279 PROTOTYPE: $
1280
1281void
1282max_idle (int nthreads)
1283 PROTOTYPE: $
1284 CODE:
1285 set_max_idle (nthreads);
1138 1286
1139int 1287int
1140max_outstanding (int maxreqs) 1288max_outstanding (int maxreqs)
1141 PROTOTYPE: $ 1289 PROTOTYPE: $
1142 CODE: 1290 CODE:
1361 req->type = ix; 1509 req->type = ix;
1362 req->fh = newSVsv (oldpath); 1510 req->fh = newSVsv (oldpath);
1363 req->data2ptr = SvPVbyte_nolen (req->fh); 1511 req->data2ptr = SvPVbyte_nolen (req->fh);
1364 req->data = newSVsv (newpath); 1512 req->data = newSVsv (newpath);
1365 req->dataptr = SvPVbyte_nolen (req->data); 1513 req->dataptr = SvPVbyte_nolen (req->data);
1514
1515 REQ_SEND;
1516}
1517
1518void
1519aio_mknod (pathname,mode,dev,callback=&PL_sv_undef)
1520 SV * pathname
1521 SV * callback
1522 UV mode
1523 UV dev
1524 PPCODE:
1525{
1526 dREQ;
1527
1528 req->type = REQ_MKNOD;
1529 req->data = newSVsv (pathname);
1530 req->dataptr = SvPVbyte_nolen (req->data);
1531 req->mode = (mode_t)mode;
1532 req->offset = dev;
1366 1533
1367 REQ_SEND; 1534 REQ_SEND;
1368} 1535}
1369 1536
1370void 1537void
1460 1627
1461int 1628int
1462poll_cb(...) 1629poll_cb(...)
1463 PROTOTYPE: 1630 PROTOTYPE:
1464 CODE: 1631 CODE:
1465 RETVAL = poll_cb (0); 1632 RETVAL = poll_cb ();
1466 OUTPUT:
1467 RETVAL
1468
1469int
1470poll_some(int max = 0)
1471 PROTOTYPE: $
1472 CODE:
1473 RETVAL = poll_cb (max);
1474 OUTPUT: 1633 OUTPUT:
1475 RETVAL 1634 RETVAL
1476 1635
1477void 1636void
1478poll_wait() 1637poll_wait()
1491 1650
1492int 1651int
1493nready() 1652nready()
1494 PROTOTYPE: 1653 PROTOTYPE:
1495 CODE: 1654 CODE:
1496 if (WORDREAD_UNSAFE) LOCK (reqlock);
1497 RETVAL = nready; 1655 RETVAL = get_nready ();
1498 if (WORDREAD_UNSAFE) UNLOCK (reqlock);
1499 OUTPUT: 1656 OUTPUT:
1500 RETVAL 1657 RETVAL
1501 1658
1502int 1659int
1503npending() 1660npending()
1504 PROTOTYPE: 1661 PROTOTYPE:
1505 CODE: 1662 CODE:
1506 if (WORDREAD_UNSAFE) LOCK (reslock);
1507 RETVAL = npending; 1663 RETVAL = get_npending ();
1664 OUTPUT:
1665 RETVAL
1666
1667int
1668nthreads()
1669 PROTOTYPE:
1670 CODE:
1671 if (WORDACCESS_UNSAFE) LOCK (wrklock);
1672 RETVAL = started;
1508 if (WORDREAD_UNSAFE) UNLOCK (reslock); 1673 if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
1509 OUTPUT: 1674 OUTPUT:
1510 RETVAL 1675 RETVAL
1511 1676
1512PROTOTYPES: DISABLE 1677PROTOTYPES: DISABLE
1513 1678

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines