ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.80 by root, Fri Oct 27 19:17:23 2006 UTC vs.
Revision 1.84 by root, Sat Oct 28 01:24:19 2006 UTC

43# else 43# else
44# error sendfile support requested but not available 44# error sendfile support requested but not available
45# endif 45# endif
46#endif 46#endif
47 47
48/* number of seconds after which idle threads exit */
49#define IDLE_TIMEOUT 10
50
48/* used for struct dirent, AIX doesn't provide it */ 51/* used for struct dirent, AIX doesn't provide it */
49#ifndef NAME_MAX 52#ifndef NAME_MAX
50# define NAME_MAX 4096 53# define NAME_MAX 4096
51#endif 54#endif
52 55
92 REQ_READ, REQ_WRITE, REQ_READAHEAD, 95 REQ_READ, REQ_WRITE, REQ_READAHEAD,
93 REQ_SENDFILE, 96 REQ_SENDFILE,
94 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 97 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
95 REQ_FSYNC, REQ_FDATASYNC, 98 REQ_FSYNC, REQ_FDATASYNC,
96 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 99 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
97 REQ_READDIR, 100 REQ_MKNOD, REQ_READDIR,
98 REQ_LINK, REQ_SYMLINK, 101 REQ_LINK, REQ_SYMLINK,
99 REQ_GROUP, REQ_NOP, 102 REQ_GROUP, REQ_NOP,
100 REQ_BUSY, 103 REQ_BUSY,
101}; 104};
102 105
144 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
145}; 148};
146 149
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 150static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 151
149static unsigned int started, wanted; 152static unsigned int started, idle, wanted;
150 153
151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 155# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
153#else 156#else
154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 157# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
196 199
197 free (wrk); 200 free (wrk);
198} 201}
199 202
200static volatile unsigned int nreqs, nready, npending; 203static volatile unsigned int nreqs, nready, npending;
204static volatile unsigned int max_idle = 4;
201static volatile unsigned int max_outstanding = 0xffffffff; 205static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2]; 206static int respipe [2];
203 207
204static pthread_mutex_t reslock = AIO_MUTEX_INIT; 208static pthread_mutex_t reslock = AIO_MUTEX_INIT;
205static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 209static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
556 UNLOCK (wrklock); 560 UNLOCK (wrklock);
557} 561}
558 562
559static void maybe_start_thread () 563static void maybe_start_thread ()
560{ 564{
561#if 0
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted) 565 if (started >= wanted)
567 return; 566 return;
568 567
568 /* todo: maybe use idle here, but might be less exact */
569 if (nready <= nreqs - get_nready () - get_npending ()) 569 if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ()))
570 return; 570 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589 571
590 start_thread (); 572 start_thread ();
591} 573}
592 574
593static void req_send (aio_req req) 575static void req_send (aio_req req)
610 Newz (0, req, 1, aio_cb); 592 Newz (0, req, 1, aio_cb);
611 593
612 req->type = REQ_QUIT; 594 req->type = REQ_QUIT;
613 req->pri = PRI_MAX + PRI_BIAS; 595 req->pri = PRI_MAX + PRI_BIAS;
614 596
615 req_send (req); 597 LOCK (reqlock);
598 reqq_push (&req_queue, req);
599 pthread_cond_signal (&reqwait);
600 UNLOCK (reqlock);
616 601
617 LOCK (wrklock); 602 LOCK (wrklock);
618 --started; 603 --started;
619 UNLOCK (wrklock); 604 UNLOCK (wrklock);
620} 605}
1001/*****************************************************************************/ 986/*****************************************************************************/
1002 987
1003static void *aio_proc (void *thr_arg) 988static void *aio_proc (void *thr_arg)
1004{ 989{
1005 aio_req req; 990 aio_req req;
991 struct timespec ts;
1006 worker *self = (worker *)thr_arg; 992 worker *self = (worker *)thr_arg;
1007 993
994 /* try to distribute timeouts somewhat evenly */
995 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
996 * (1000000000UL / 1024UL);
997
1008 for (;;) 998 for (;;)
1009 { 999 {
1000 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1001
1010 LOCK (reqlock); 1002 LOCK (reqlock);
1011 1003
1012 for (;;) 1004 for (;;)
1013 { 1005 {
1014 self->req = req = reqq_shift (&req_queue); 1006 self->req = req = reqq_shift (&req_queue);
1015 1007
1016 if (req) 1008 if (req)
1017 break; 1009 break;
1018 1010
1011 ++idle;
1012
1013 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
1014 == ETIMEDOUT)
1015 {
1016 if (idle > max_idle)
1017 {
1018 --idle;
1019 UNLOCK (reqlock);
1020 LOCK (wrklock);
1021 --started;
1022 UNLOCK (wrklock);
1023 goto quit;
1024 }
1025
1026 /* we are allowed to idle, so do so without any timeout */
1019 pthread_cond_wait (&reqwait, &reqlock); 1027 pthread_cond_wait (&reqwait, &reqlock);
1028 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1029 }
1030
1031 --idle;
1020 } 1032 }
1021 1033
1022 --nready; 1034 --nready;
1023 1035
1024 UNLOCK (reqlock); 1036 UNLOCK (reqlock);
1043 case REQ_UNLINK: req->result = unlink (req->dataptr); break; 1055 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
1044 case REQ_RMDIR: req->result = rmdir (req->dataptr); break; 1056 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
1045 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break; 1057 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
1046 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break; 1058 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
1047 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break; 1059 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
1060 case REQ_MKNOD: req->result = mknod (req->data2ptr, req->mode, (dev_t)req->offset); break;
1048 1061
1049 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; 1062 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
1050 case REQ_FSYNC: req->result = fsync (req->fd); break; 1063 case REQ_FSYNC: req->result = fsync (req->fd); break;
1051 case REQ_READDIR: scandir_ (req, self); break; 1064 case REQ_READDIR: scandir_ (req, self); break;
1052 1065
1063 case REQ_GROUP: 1076 case REQ_GROUP:
1064 case REQ_NOP: 1077 case REQ_NOP:
1065 break; 1078 break;
1066 1079
1067 case REQ_QUIT: 1080 case REQ_QUIT:
1068 LOCK (wrklock); 1081 goto quit;
1069 worker_free (self);
1070 --started;
1071 UNLOCK (wrklock);
1072 return 0;
1073 1082
1074 default: 1083 default:
1075 req->result = ENOSYS; 1084 req->result = ENOSYS;
1076 break; 1085 break;
1077 } 1086 }
1089 self->req = 0; 1098 self->req = 0;
1090 worker_clear (self); 1099 worker_clear (self);
1091 1100
1092 UNLOCK (reslock); 1101 UNLOCK (reslock);
1093 } 1102 }
1103
1104quit:
1105 LOCK (wrklock);
1106 worker_free (self);
1107 UNLOCK (wrklock);
1108
1109 return 0;
1094} 1110}
1095 1111
1096/*****************************************************************************/ 1112/*****************************************************************************/
1097 1113
1098static void atfork_prepare (void) 1114static void atfork_prepare (void)
1140 1156
1141 worker_clear (wrk); 1157 worker_clear (wrk);
1142 worker_free (wrk); 1158 worker_free (wrk);
1143 } 1159 }
1144 1160
1145 started = 0; 1161 started = 0;
1162 idle = 0;
1146 nreqs = 0; 1163 nreqs = 0;
1164 nready = 0;
1165 npending = 0;
1147 1166
1148 close (respipe [0]); 1167 close (respipe [0]);
1149 close (respipe [1]); 1168 close (respipe [1]);
1150 create_pipe (); 1169 create_pipe ();
1151 1170
1178PROTOTYPES: ENABLE 1197PROTOTYPES: ENABLE
1179 1198
1180BOOT: 1199BOOT:
1181{ 1200{
1182 HV *stash = gv_stashpv ("IO::AIO", 1); 1201 HV *stash = gv_stashpv ("IO::AIO", 1);
1202
1183 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1203 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1184 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1204 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1185 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1205 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1206 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1207 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1208 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1186 1209
1187 create_pipe (); 1210 create_pipe ();
1188 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1211 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1189
1190 start_thread ();
1191} 1212}
1192 1213
1193void 1214void
1194min_parallel (int nthreads) 1215min_parallel (int nthreads)
1195 PROTOTYPE: $ 1216 PROTOTYPE: $
1423 req->type = ix; 1444 req->type = ix;
1424 req->fh = newSVsv (oldpath); 1445 req->fh = newSVsv (oldpath);
1425 req->data2ptr = SvPVbyte_nolen (req->fh); 1446 req->data2ptr = SvPVbyte_nolen (req->fh);
1426 req->data = newSVsv (newpath); 1447 req->data = newSVsv (newpath);
1427 req->dataptr = SvPVbyte_nolen (req->data); 1448 req->dataptr = SvPVbyte_nolen (req->data);
1449
1450 REQ_SEND;
1451}
1452
1453void
1454aio_mknod (pathname,mode,dev,callback=&PL_sv_undef)
1455 SV * pathname
1456 SV * callback
1457 UV mode
1458 UV dev
1459 PPCODE:
1460{
1461 dREQ;
1462
1463 req->type = REQ_MKNOD;
1464 req->data = newSVsv (pathname);
1465 req->dataptr = SvPVbyte_nolen (req->data);
1466 req->mode = (mode_t)mode;
1467 req->offset = dev;
1428 1468
1429 REQ_SEND; 1469 REQ_SEND;
1430} 1470}
1431 1471
1432void 1472void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines