ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.150 by root, Wed Jul 1 08:11:24 2009 UTC vs.
Revision 1.153 by root, Wed Jul 15 01:36:04 2009 UTC

3#include <errno.h> 3#include <errno.h>
4 4
5#include "EXTERN.h" 5#include "EXTERN.h"
6#include "perl.h" 6#include "perl.h"
7#include "XSUB.h" 7#include "XSUB.h"
8
9#include "schmorp.h"
8 10
9#include <stddef.h> 11#include <stddef.h>
10#include <stdlib.h> 12#include <stdlib.h>
11#include <errno.h> 13#include <errno.h>
12#include <sys/types.h> 14#include <sys/types.h>
158 160
159static SV *on_next_submit; 161static SV *on_next_submit;
160static int next_pri = EIO_PRI_DEFAULT; 162static int next_pri = EIO_PRI_DEFAULT;
161static int max_outstanding; 163static int max_outstanding;
162 164
163static int respipe_osf [2], respipe [2] = { -1, -1 }; 165static int respipe_osf [2];
166static s_epipe respipe;
164 167
165static void req_destroy (aio_req req); 168static void req_destroy (aio_req req);
166static void req_cancel (aio_req req); 169static void req_cancel (aio_req req);
167 170
168static void want_poll (void) 171static void want_poll (void)
169{ 172{
170 /* write a dummy byte to the pipe so fh becomes ready */ 173 /* write a dummy byte to the pipe so fh becomes ready */
171 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); 174 s_epipe_signal (&respipe);
172} 175}
173 176
174static void done_poll (void) 177static void done_poll (void)
175{ 178{
176 /* read any signals sent by the worker threads */ 179 /* read any signals sent by the worker threads */
177 char buf [4]; 180 s_epipe_drain (&respipe);
178 while (respipe_read (respipe [0], buf, 4) == 4)
179 ;
180} 181}
181 182
182/* must be called at most once */ 183/* must be called at most once */
183static SV *req_sv (aio_req req, const char *klass) 184static SV *req_sv (aio_req req, const char *klass)
184{ 185{
439 grp->sv2 = 0; 440 grp->sv2 = 0;
440 441
441 eio_grp_cancel (grp); 442 eio_grp_cancel (grp);
442} 443}
443 444
444#ifdef USE_SOCKETS_AS_HANDLES
445# define TO_SOCKET(x) (win32_get_osfhandle (x))
446#else
447# define TO_SOCKET(x) (x)
448#endif
449
450static void 445static void
451create_respipe (void) 446create_respipe (void)
452{ 447{
453 int old_readfd = respipe [0];
454
455 if (respipe [1] >= 0)
456 respipe_close (TO_SOCKET (respipe [1]));
457
458#ifdef _WIN32
459 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
460#else
461 if (pipe (respipe)) 448 if (s_epipe_renew (&respipe))
462#endif
463 croak ("unable to initialize result pipe"); 449 croak ("unable to initialize result pipe");
464 450
465 if (old_readfd >= 0)
466 {
467 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
468 croak ("unable to initialize result pipe(2)");
469
470 respipe_close (respipe [0]);
471 respipe [0] = old_readfd;
472 }
473
474#ifdef _WIN32
475 int arg = 1;
476 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
477 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
478#else
479 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
480 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
481#endif
482 croak ("unable to initialize result pipe(3)");
483
484 respipe_osf [0] = TO_SOCKET (respipe [0]); 451 respipe_osf [0] = S_TO_SOCKET (respipe.fd [0]);
485 respipe_osf [1] = TO_SOCKET (respipe [1]); 452 respipe_osf [1] = S_TO_SOCKET (respipe.fd [1]);
486} 453}
487 454
488static void poll_wait (void) 455static void poll_wait (void)
489{ 456{
490 fd_set rfd;
491
492 while (eio_nreqs ()) 457 while (eio_nreqs ())
493 { 458 {
494 int size; 459 int size;
495 460
496 X_LOCK (reslock); 461 X_LOCK (reslock);
500 if (size) 465 if (size)
501 return; 466 return;
502 467
503 etp_maybe_start_thread (); 468 etp_maybe_start_thread ();
504 469
505 FD_ZERO (&rfd); 470 s_epipe_wait (&respipe);
506 FD_SET (respipe [0], &rfd);
507
508 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
509 } 471 }
510} 472}
511 473
512static int poll_cb (void) 474static int poll_cb (void)
513{ 475{
531} 493}
532 494
533static SV * 495static SV *
534get_cb (SV *cb_sv) 496get_cb (SV *cb_sv)
535{ 497{
536 HV *st; 498 SvGETMAGIC (cb_sv);
537 GV *gvp; 499 return SvOK (cb_sv) ? s_get_cv_croak (cb_sv) : 0;
538 CV *cv;
539
540 if (!SvOK (cb_sv))
541 return 0;
542
543 cv = sv_2cv (cb_sv, &st, &gvp, 0);
544
545 if (!cv)
546 croak ("IO::AIO callback must be undef or a CODE reference");
547
548 return (SV *)cv;
549} 500}
550 501
551#define dREQ \ 502#define dREQ \
552 SV *cb_cv; \ 503 SV *cb_cv; \
553 aio_req req; \ 504 aio_req req; \
568 req_submit (req); \ 519 req_submit (req); \
569 SPAGAIN; \ 520 SPAGAIN; \
570 \ 521 \
571 if (GIMME_V != G_VOID) \ 522 if (GIMME_V != G_VOID) \
572 XPUSHs (req_sv (req, AIO_REQ_KLASS)); 523 XPUSHs (req_sv (req, AIO_REQ_KLASS));
573
574static int
575extract_fd (SV *fh, int wr)
576{
577 int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
578
579 if (fd < 0)
580 croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
581
582 return fd;
583}
584 524
585MODULE = IO::AIO PACKAGE = IO::AIO 525MODULE = IO::AIO PACKAGE = IO::AIO
586 526
587PROTOTYPES: ENABLE 527PROTOTYPES: ENABLE
588 528
702 ALIAS: 642 ALIAS:
703 aio_fsync = EIO_FSYNC 643 aio_fsync = EIO_FSYNC
704 aio_fdatasync = EIO_FDATASYNC 644 aio_fdatasync = EIO_FDATASYNC
705 PPCODE: 645 PPCODE:
706{ 646{
707 int fd = extract_fd (fh, 0); 647 int fd = s_fileno_croak (fh, 0);
708 dREQ; 648 dREQ;
709 649
710 req->type = ix; 650 req->type = ix;
711 req->sv1 = newSVsv (fh); 651 req->sv1 = newSVsv (fh);
712 req->int1 = fd; 652 req->int1 = fd;
717void 657void
718aio_sync_file_range (SV *fh, off_t offset, size_t nbytes, UV flags, SV *callback=&PL_sv_undef) 658aio_sync_file_range (SV *fh, off_t offset, size_t nbytes, UV flags, SV *callback=&PL_sv_undef)
719 PROTOTYPE: $$$$;$ 659 PROTOTYPE: $$$$;$
720 PPCODE: 660 PPCODE:
721{ 661{
722 int fd = extract_fd (fh, 0); 662 int fd = s_fileno_croak (fh, 0);
723 dREQ; 663 dREQ;
724 664
725 req->type = EIO_SYNC_FILE_RANGE; 665 req->type = EIO_SYNC_FILE_RANGE;
726 req->sv1 = newSVsv (fh); 666 req->sv1 = newSVsv (fh);
727 req->int1 = fd; 667 req->int1 = fd;
736aio_close (SV *fh, SV *callback=&PL_sv_undef) 676aio_close (SV *fh, SV *callback=&PL_sv_undef)
737 PROTOTYPE: $;$ 677 PROTOTYPE: $;$
738 PPCODE: 678 PPCODE:
739{ 679{
740 static int close_pipe = -1; /* dummy fd to close fds via dup2 */ 680 static int close_pipe = -1; /* dummy fd to close fds via dup2 */
741 int fd = extract_fd (fh, 0); 681 int fd = s_fileno_croak (fh, 0);
742 dREQ; 682 dREQ;
743 683
744 if (close_pipe < 0) 684 if (close_pipe < 0)
745 { 685 {
746 int pipefd [2]; 686 int pipefd [2];
768 aio_write = EIO_WRITE 708 aio_write = EIO_WRITE
769 PROTOTYPE: $$$$$;$ 709 PROTOTYPE: $$$$$;$
770 PPCODE: 710 PPCODE:
771{ 711{
772 STRLEN svlen; 712 STRLEN svlen;
773 int fd = extract_fd (fh, ix == EIO_WRITE); 713 int fd = s_fileno_croak (fh, ix == EIO_WRITE);
774 char *svptr = SvPVbyte (data, svlen); 714 char *svptr = SvPVbyte (data, svlen);
775 UV len = SvUV (length); 715 UV len = SvUV (length);
776 716
777 if (dataoffset < 0) 717 if (dataoffset < 0)
778 dataoffset += svlen; 718 dataoffset += svlen;
833void 773void
834aio_sendfile (SV *out_fh, SV *in_fh, off_t in_offset, size_t length, SV *callback=&PL_sv_undef) 774aio_sendfile (SV *out_fh, SV *in_fh, off_t in_offset, size_t length, SV *callback=&PL_sv_undef)
835 PROTOTYPE: $$$$;$ 775 PROTOTYPE: $$$$;$
836 PPCODE: 776 PPCODE:
837{ 777{
838 int ifd = extract_fd (in_fh , 0); 778 int ifd = s_fileno_croak (in_fh , 0);
839 int ofd = extract_fd (out_fh, 0); 779 int ofd = s_fileno_croak (out_fh, 1);
840 dREQ; 780 dREQ;
841 781
842 req->type = EIO_SENDFILE; 782 req->type = EIO_SENDFILE;
843 req->sv1 = newSVsv (out_fh); 783 req->sv1 = newSVsv (out_fh);
844 req->int1 = ofd; 784 req->int1 = ofd;
853void 793void
854aio_readahead (SV *fh, off_t offset, size_t length, SV *callback=&PL_sv_undef) 794aio_readahead (SV *fh, off_t offset, size_t length, SV *callback=&PL_sv_undef)
855 PROTOTYPE: $$$;$ 795 PROTOTYPE: $$$;$
856 PPCODE: 796 PPCODE:
857{ 797{
858 int fd = extract_fd (fh, 0); 798 int fd = s_fileno_croak (fh, 0);
859 dREQ; 799 dREQ;
860 800
861 req->type = EIO_READAHEAD; 801 req->type = EIO_READAHEAD;
862 req->sv1 = newSVsv (fh); 802 req->sv1 = newSVsv (fh);
863 req->int1 = fd; 803 req->int1 = fd;
1139 1079
1140int 1080int
1141poll_fileno() 1081poll_fileno()
1142 PROTOTYPE: 1082 PROTOTYPE:
1143 CODE: 1083 CODE:
1144 RETVAL = respipe [0]; 1084 RETVAL = s_epipe_fd (&respipe);
1145 OUTPUT: 1085 OUTPUT:
1146 RETVAL 1086 RETVAL
1147 1087
1148int 1088int
1149poll_cb(...) 1089poll_cb(...)
1230 if (GIMME_V != G_VOID) 1170 if (GIMME_V != G_VOID)
1231 XPUSHs (req->callback ? sv_2mortal (newRV_inc (req->callback)) : &PL_sv_undef); 1171 XPUSHs (req->callback ? sv_2mortal (newRV_inc (req->callback)) : &PL_sv_undef);
1232 1172
1233 if (items > 1) 1173 if (items > 1)
1234 { 1174 {
1235 SV *cb_cv = get_cb (callback); 1175 SV *cb_cv =get_cb (callback);
1236 1176
1237 SvREFCNT_dec (req->callback); 1177 SvREFCNT_dec (req->callback);
1238 req->callback = SvREFCNT_inc (cb_cv); 1178 req->callback = SvREFCNT_inc (cb_cv);
1239 } 1179 }
1240} 1180}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines