ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.104 by root, Sun Jul 8 11:05:36 2007 UTC vs.
Revision 1.113 by root, Wed Apr 16 16:45:18 2008 UTC

97# define SvVAL64 SvIV 97# define SvVAL64 SvIV
98#else 98#else
99# define SvVAL64 SvNV 99# define SvVAL64 SvNV
100#endif 100#endif
101 101
102static HV *stash;
103
102#define dBUF \ 104#define dBUF \
103 char *aio_buf; \ 105 char *aio_buf; \
104 X_LOCK (wrklock); \ 106 X_LOCK (wrklock); \
105 self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \ 107 self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \
106 X_UNLOCK (wrklock); \ 108 X_UNLOCK (wrklock); \
117 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 119 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
118 REQ_TRUNCATE, REQ_FTRUNCATE, 120 REQ_TRUNCATE, REQ_FTRUNCATE,
119 REQ_UTIME, REQ_FUTIME, 121 REQ_UTIME, REQ_FUTIME,
120 REQ_CHMOD, REQ_FCHMOD, 122 REQ_CHMOD, REQ_FCHMOD,
121 REQ_CHOWN, REQ_FCHOWN, 123 REQ_CHOWN, REQ_FCHOWN,
122 REQ_FSYNC, REQ_FDATASYNC, 124 REQ_SYNC, REQ_FSYNC, REQ_FDATASYNC,
123 REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME, 125 REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME,
124 REQ_MKNOD, REQ_READDIR, 126 REQ_MKNOD, REQ_READDIR,
125 REQ_LINK, REQ_SYMLINK, REQ_READLINK, 127 REQ_LINK, REQ_SYMLINK, REQ_READLINK,
126 REQ_GROUP, REQ_NOP, 128 REQ_GROUP, REQ_NOP,
127 REQ_BUSY, 129 REQ_BUSY,
187 189
188static thread_t main_tid; 190static thread_t main_tid;
189static int main_sig; 191static int main_sig;
190static int block_sig_level; 192static int block_sig_level;
191 193
192void block_sig () 194void block_sig (void)
193{ 195{
194 sigset_t ss; 196 sigset_t ss;
195 197
196 if (block_sig_level++) 198 if (block_sig_level++)
197 return; 199 return;
202 sigemptyset (&ss); 204 sigemptyset (&ss);
203 sigaddset (&ss, main_sig); 205 sigaddset (&ss, main_sig);
204 pthread_sigmask (SIG_BLOCK, &ss, 0); 206 pthread_sigmask (SIG_BLOCK, &ss, 0);
205} 207}
206 208
207void unblock_sig () 209void unblock_sig (void)
208{ 210{
209 sigset_t ss; 211 sigset_t ss;
210 212
211 if (--block_sig_level) 213 if (--block_sig_level)
212 return; 214 return;
264} 266}
265 267
266static volatile unsigned int nreqs, nready, npending; 268static volatile unsigned int nreqs, nready, npending;
267static volatile unsigned int max_idle = 4; 269static volatile unsigned int max_idle = 4;
268static volatile unsigned int max_outstanding = 0xffffffff; 270static volatile unsigned int max_outstanding = 0xffffffff;
269static int respipe [2], respipe_osf [2]; 271static int respipe_osf [2], respipe [2] = { -1, -1 };
270 272
271static mutex_t reslock = X_MUTEX_INIT; 273static mutex_t reslock = X_MUTEX_INIT;
272static mutex_t reqlock = X_MUTEX_INIT; 274static mutex_t reqlock = X_MUTEX_INIT;
273static cond_t reqwait = X_COND_INIT; 275static cond_t reqwait = X_COND_INIT;
274 276
275#if WORDACCESS_UNSAFE 277#if WORDACCESS_UNSAFE
276 278
277static unsigned int get_nready () 279static unsigned int get_nready (void)
278{ 280{
279 unsigned int retval; 281 unsigned int retval;
280 282
281 X_LOCK (reqlock); 283 X_LOCK (reqlock);
282 retval = nready; 284 retval = nready;
283 X_UNLOCK (reqlock); 285 X_UNLOCK (reqlock);
284 286
285 return retval; 287 return retval;
286} 288}
287 289
288static unsigned int get_npending () 290static unsigned int get_npending (void)
289{ 291{
290 unsigned int retval; 292 unsigned int retval;
291 293
292 X_LOCK (reslock); 294 X_LOCK (reslock);
293 retval = npending; 295 retval = npending;
294 X_UNLOCK (reslock); 296 X_UNLOCK (reslock);
295 297
296 return retval; 298 return retval;
297} 299}
298 300
299static unsigned int get_nthreads () 301static unsigned int get_nthreads (void)
300{ 302{
301 unsigned int retval; 303 unsigned int retval;
302 304
303 X_LOCK (wrklock); 305 X_LOCK (wrklock);
304 retval = started; 306 retval = started;
367 } 369 }
368 370
369 abort (); 371 abort ();
370} 372}
371 373
372static int poll_cb (); 374static int poll_cb (void);
373static int req_invoke (aio_req req); 375static int req_invoke (aio_req req);
374static void req_destroy (aio_req req); 376static void req_destroy (aio_req req);
375static void req_cancel (aio_req req); 377static void req_cancel (aio_req req);
376 378
377/* must be called at most once */ 379/* must be called at most once */
501 break; 503 break;
502 504
503 case REQ_OPEN: 505 case REQ_OPEN:
504 { 506 {
505 /* convert fd to fh */ 507 /* convert fd to fh */
506 SV *fh; 508 SV *fh = &PL_sv_undef;
507 509
508 PUSHs (sv_2mortal (newSViv (req->result))); 510 if (req->result >= 0)
509 PUTBACK; 511 {
510 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 512 GV *gv = (GV *)sv_newmortal ();
511 SPAGAIN; 513 int flags = req->int1 & (O_RDONLY | O_WRONLY | O_RDWR);
514 char sym [64];
515 int symlen;
516
517 symlen = snprintf (sym, sizeof (sym), "fd#%d", req->result);
518 gv_init (gv, stash, sym, symlen, 0);
512 519
513 fh = POPs; 520 symlen = snprintf (
514 PUSHMARK (SP); 521 sym,
522 sizeof (sym),
523 "%s&=%d",
524 flags == O_RDONLY ? "<" : flags == O_WRONLY ? ">" : "+<",
525 req->result
526 );
527
528 if (do_open (gv, sym, symlen, 0, 0, 0, 0))
529 fh = (SV *)gv;
530 }
531
515 XPUSHs (fh); 532 PUSHs (fh);
516 } 533 }
517 break; 534 break;
518 535
519 case REQ_GROUP: 536 case REQ_GROUP:
520 req->int1 = 2; /* mark group as finished */ 537 req->int1 = 2; /* mark group as finished */
564 } 581 }
565 582
566 errno = req->errorno; 583 errno = req->errorno;
567 584
568 PUTBACK; 585 PUTBACK;
569 call_sv (req->callback, G_VOID | G_EVAL); 586 call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD);
570 SPAGAIN; 587 SPAGAIN;
571 588
572 FREETMPS; 589 FREETMPS;
573 LEAVE; 590 LEAVE;
591
592 PUTBACK;
574 } 593 }
575 594
576 if (req->grp) 595 if (req->grp)
577 { 596 {
578 aio_req grp = req->grp; 597 aio_req grp = req->grp;
634#else 653#else
635# define TO_SOCKET(x) (x) 654# define TO_SOCKET(x) (x)
636#endif 655#endif
637 656
638static void 657static void
639create_pipe (int fd[2]) 658create_respipe (void)
640{ 659{
660 int old_readfd = respipe [0];
661
662 if (respipe [1] >= 0)
663 respipe_close (TO_SOCKET (respipe [1]));
664
665#ifdef _WIN32
666 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
667#else
668 if (pipe (respipe))
669#endif
670 croak ("unable to initialize result pipe");
671
672 if (old_readfd >= 0)
673 {
674 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
675 croak ("unable to initialize result pipe(2)");
676
677 respipe_close (respipe [0]);
678 respipe [0] = old_readfd;
679 }
680
641#ifdef _WIN32 681#ifdef _WIN32
642 int arg = 1; 682 int arg = 1;
643 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd)
644 || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) 683 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
645 || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) 684 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
646#else 685#else
647 if (pipe (fd)
648 || fcntl (fd [0], F_SETFL, O_NONBLOCK) 686 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
649 || fcntl (fd [1], F_SETFL, O_NONBLOCK)) 687 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
650#endif 688#endif
651 croak ("unable to initialize result pipe"); 689 croak ("unable to initialize result pipe(3)");
652 690
653 respipe_osf [0] = TO_SOCKET (respipe [0]); 691 respipe_osf [0] = TO_SOCKET (respipe [0]);
654 respipe_osf [1] = TO_SOCKET (respipe [1]); 692 respipe_osf [1] = TO_SOCKET (respipe [1]);
655} 693}
656 694
677 free (wrk); 715 free (wrk);
678 716
679 X_UNLOCK (wrklock); 717 X_UNLOCK (wrklock);
680} 718}
681 719
682static void maybe_start_thread () 720static void maybe_start_thread (void)
683{ 721{
684 if (get_nthreads () >= wanted) 722 if (get_nthreads () >= wanted)
685 return; 723 return;
686 724
687 /* todo: maybe use idle here, but might be less exact */ 725 /* todo: maybe use idle here, but might be less exact */
747 785
748 while (started > wanted) 786 while (started > wanted)
749 end_thread (); 787 end_thread ();
750} 788}
751 789
752static void poll_wait () 790static void poll_wait (void)
753{ 791{
754 fd_set rfd; 792 fd_set rfd;
755 793
756 while (nreqs) 794 while (nreqs)
757 { 795 {
770 808
771 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); 809 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
772 } 810 }
773} 811}
774 812
775static int poll_cb () 813static int poll_cb (void)
776{ 814{
777 dSP; 815 dSP;
778 int count = 0; 816 int count = 0;
779 int maxreqs = max_poll_reqs; 817 int maxreqs = max_poll_reqs;
780 int do_croak = 0; 818 int do_croak = 0;
801 839
802 if (!res_queue.size) 840 if (!res_queue.size)
803 { 841 {
804 /* read any signals sent by the worker threads */ 842 /* read any signals sent by the worker threads */
805 char buf [4]; 843 char buf [4];
806 while (PerlSock_recv (respipe [0], buf, 4, 0) == 4) 844 while (respipe_read (respipe [0], buf, 4) == 4)
807 ; 845 ;
808 } 846 }
809 } 847 }
810 848
811 X_UNLOCK (reslock); 849 X_UNLOCK (reslock);
1140 res = -1; 1178 res = -1;
1141 1179
1142 req->result = res; 1180 req->result = res;
1143} 1181}
1144 1182
1183static int
1184aio_close (int fd)
1185{
1186 static int close_pipe = -1; /* dummy fd to close fds via dup2 */
1187
1188 X_LOCK (wrklock);
1189
1190 if (close_pipe < 0)
1191 {
1192 int pipefd [2];
1193
1194 if (pipe (pipefd) < 0
1195 || close (pipefd [1]) < 0
1196 || fcntl (pipefd [0], F_SETFD, FD_CLOEXEC) < 0)
1197 {
1198 X_UNLOCK (wrklock);
1199 return -1;
1200 }
1201
1202 close_pipe = pipefd [0];
1203 }
1204
1205 X_UNLOCK (wrklock);
1206
1207 return dup2 (close_pipe, fd) < 0 ? -1 : 0;
1208}
1209
1145/*****************************************************************************/ 1210/*****************************************************************************/
1146 1211
1147X_THREAD_PROC (aio_proc) 1212X_THREAD_PROC (aio_proc)
1148{ 1213{
1149 {//D
1150 aio_req req; 1214 aio_req req;
1151 struct timespec ts; 1215 struct timespec ts;
1152 worker *self = (worker *)thr_arg; 1216 worker *self = (worker *)thr_arg;
1153 1217
1154 /* try to distribute timeouts somewhat randomly */ 1218 /* try to distribute timeouts somewhat randomly */
1219 case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break; 1283 case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break;
1220 case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break; 1284 case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
1221 case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break; 1285 case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
1222 1286
1223 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; 1287 case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break;
1224 case REQ_CLOSE: req->result = close (req->int1); break; 1288 case REQ_CLOSE: req->result = aio_close (req->int1); break;
1225 case REQ_UNLINK: req->result = unlink (req->ptr1); break; 1289 case REQ_UNLINK: req->result = unlink (req->ptr1); break;
1226 case REQ_RMDIR: req->result = rmdir (req->ptr1); break; 1290 case REQ_RMDIR: req->result = rmdir (req->ptr1); break;
1227 case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break; 1291 case REQ_MKDIR: req->result = mkdir (req->ptr1, req->mode); break;
1228 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break; 1292 case REQ_RENAME: req->result = rename (req->ptr2, req->ptr1); break;
1229 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break; 1293 case REQ_LINK: req->result = link (req->ptr2, req->ptr1); break;
1230 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break; 1294 case REQ_SYMLINK: req->result = symlink (req->ptr2, req->ptr1); break;
1231 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break; 1295 case REQ_MKNOD: req->result = mknod (req->ptr2, req->mode, (dev_t)req->offs); break;
1232 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break; 1296 case REQ_READLINK: req->result = readlink (req->ptr2, req->ptr1, NAME_MAX); break;
1233 1297
1298 case REQ_SYNC: req->result = 0; sync (); break;
1299 case REQ_FSYNC: req->result = fsync (req->int1); break;
1234 case REQ_FDATASYNC: req->result = fdatasync (req->int1); break; 1300 case REQ_FDATASYNC: req->result = fdatasync (req->int1); break;
1235 case REQ_FSYNC: req->result = fsync (req->int1); break; 1301
1236 case REQ_READDIR: scandir_ (req, self); break; 1302 case REQ_READDIR: scandir_ (req, self); break;
1237 1303
1238 case REQ_BUSY: 1304 case REQ_BUSY:
1239#ifdef _WIN32 1305#ifdef _WIN32
1240 Sleep (req->nv1 * 1000.); 1306 Sleep (req->nv1 * 1000.);
1293 ++npending; 1359 ++npending;
1294 1360
1295 if (!reqq_push (&res_queue, req)) 1361 if (!reqq_push (&res_queue, req))
1296 { 1362 {
1297 /* write a dummy byte to the pipe so fh becomes ready */ 1363 /* write a dummy byte to the pipe so fh becomes ready */
1298 send (respipe_osf [1], (const void *)&respipe_osf, 1, 0); 1364 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
1299 1365
1300 /* optionally signal the main thread asynchronously */ 1366 /* optionally signal the main thread asynchronously */
1301 if (main_sig) 1367 if (main_sig)
1302 pthread_kill (main_tid, main_sig); 1368 pthread_kill (main_tid, main_sig);
1303 } 1369 }
1312 X_LOCK (wrklock); 1378 X_LOCK (wrklock);
1313 worker_free (self); 1379 worker_free (self);
1314 X_UNLOCK (wrklock); 1380 X_UNLOCK (wrklock);
1315 1381
1316 return 0; 1382 return 0;
1317 }//D
1318} 1383}
1319 1384
1320/*****************************************************************************/ 1385/*****************************************************************************/
1321 1386
1322static void atfork_prepare (void) 1387static void atfork_prepare (void)
1370 idle = 0; 1435 idle = 0;
1371 nreqs = 0; 1436 nreqs = 0;
1372 nready = 0; 1437 nready = 0;
1373 npending = 0; 1438 npending = 0;
1374 1439
1375 PerlSock_closesocket (respipe [0]);
1376 PerlSock_closesocket (respipe [1]);
1377
1378 create_pipe (respipe); 1440 create_respipe ();
1379 1441
1380 atfork_parent (); 1442 atfork_parent ();
1381} 1443}
1382 1444
1383#define dREQ \ 1445#define dREQ \
1405 1467
1406PROTOTYPES: ENABLE 1468PROTOTYPES: ENABLE
1407 1469
1408BOOT: 1470BOOT:
1409{ 1471{
1410 HV *stash = gv_stashpv ("IO::AIO", 1); 1472 stash = gv_stashpv ("IO::AIO", 1);
1411 1473
1412 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1474 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1413 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1475 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1414 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1476 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1415 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1477 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1426#else 1488#else
1427 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1489 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1428 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); 1490 newCONSTSUB (stash, "SIGIO", newSViv (SIGIO));
1429#endif 1491#endif
1430 1492
1431 create_pipe (respipe); 1493 create_respipe ();
1432 1494
1433 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); 1495 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1434} 1496}
1435 1497
1436void 1498void
1483 1545
1484 REQ_SEND; 1546 REQ_SEND;
1485} 1547}
1486 1548
1487void 1549void
1488aio_close (SV *fh, SV *callback=&PL_sv_undef) 1550aio_fsync (SV *fh, SV *callback=&PL_sv_undef)
1489 PROTOTYPE: $;$ 1551 PROTOTYPE: $;$
1490 ALIAS: 1552 ALIAS:
1491 aio_close = REQ_CLOSE
1492 aio_fsync = REQ_FSYNC 1553 aio_fsync = REQ_FSYNC
1493 aio_fdatasync = REQ_FDATASYNC 1554 aio_fdatasync = REQ_FDATASYNC
1494 PPCODE: 1555 PPCODE:
1495{ 1556{
1496 dREQ; 1557 dREQ;
1497 1558
1498 req->type = ix; 1559 req->type = ix;
1499 req->sv1 = newSVsv (fh); 1560 req->sv1 = newSVsv (fh);
1561 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1562
1563 REQ_SEND (req);
1564}
1565
1566void
1567aio_close (SV *fh, SV *callback=&PL_sv_undef)
1568 PROTOTYPE: $;$
1569 PPCODE:
1570{
1571 dREQ;
1572
1573 req->type = REQ_CLOSE;
1500 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh))); 1574 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1501 1575
1502 REQ_SEND (req); 1576 REQ_SEND (req);
1503} 1577}
1504 1578
1832 XPUSHs (req_sv (req, AIO_GRP_KLASS)); 1906 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1833} 1907}
1834 1908
1835void 1909void
1836aio_nop (SV *callback=&PL_sv_undef) 1910aio_nop (SV *callback=&PL_sv_undef)
1911 ALIAS:
1912 aio_nop = REQ_NOP
1913 aio_sync = REQ_SYNC
1837 PPCODE: 1914 PPCODE:
1838{ 1915{
1839 dREQ; 1916 dREQ;
1840 1917
1841 req->type = REQ_NOP; 1918 req->type = ix;
1842 1919
1843 REQ_SEND; 1920 REQ_SEND;
1844} 1921}
1845 1922
1846int 1923int
1932 2009
1933 block_sig (); 2010 block_sig ();
1934 PUSHMARK (SP); 2011 PUSHMARK (SP);
1935 PUTBACK; 2012 PUTBACK;
1936 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL); 2013 count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL);
1937 SPAGAIN;
1938 unblock_sig (); 2014 unblock_sig ();
1939 2015
1940 if (SvTRUE (ERRSV)) 2016 if (SvTRUE (ERRSV))
1941 croak (0); 2017 croak (0);
1942 2018

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines