ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.151 by root, Tue Jul 14 00:13:36 2009 UTC vs.
Revision 1.154 by root, Wed Jul 15 01:39:15 2009 UTC

160 160
161static SV *on_next_submit; 161static SV *on_next_submit;
162static int next_pri = EIO_PRI_DEFAULT; 162static int next_pri = EIO_PRI_DEFAULT;
163static int max_outstanding; 163static int max_outstanding;
164 164
165static int respipe_osf [2], respipe [2] = { -1, -1 }; 165static s_epipe respipe;
166 166
167static void req_destroy (aio_req req); 167static void req_destroy (aio_req req);
168static void req_cancel (aio_req req); 168static void req_cancel (aio_req req);
169 169
170static void want_poll (void) 170static void want_poll (void)
171{ 171{
172 /* write a dummy byte to the pipe so fh becomes ready */ 172 /* write a dummy byte to the pipe so fh becomes ready */
173 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); 173 s_epipe_signal (&respipe);
174} 174}
175 175
176static void done_poll (void) 176static void done_poll (void)
177{ 177{
178 /* read any signals sent by the worker threads */ 178 /* read any signals sent by the worker threads */
179 char buf [4]; 179 s_epipe_drain (&respipe);
180 while (respipe_read (respipe [0], buf, 4) == 4)
181 ;
182} 180}
183 181
184/* must be called at most once */ 182/* must be called at most once */
185static SV *req_sv (aio_req req, const char *klass) 183static SV *req_sv (aio_req req, const char *klass)
186{ 184{
441 grp->sv2 = 0; 439 grp->sv2 = 0;
442 440
443 eio_grp_cancel (grp); 441 eio_grp_cancel (grp);
444} 442}
445 443
446#ifdef USE_SOCKETS_AS_HANDLES
447# define TO_SOCKET(x) (win32_get_osfhandle (x))
448#else
449# define TO_SOCKET(x) (x)
450#endif
451
452static void 444static void
453create_respipe (void) 445create_respipe (void)
454{ 446{
455 int old_readfd = respipe [0];
456
457 if (respipe [1] >= 0)
458 respipe_close (TO_SOCKET (respipe [1]));
459
460#ifdef _WIN32
461 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
462#else
463 if (pipe (respipe)) 447 if (s_epipe_renew (&respipe))
464#endif
465 croak ("unable to initialize result pipe"); 448 croak ("unable to initialize result pipe");
466
467 if (old_readfd >= 0)
468 {
469 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
470 croak ("unable to initialize result pipe(2)");
471
472 respipe_close (respipe [0]);
473 respipe [0] = old_readfd;
474 }
475
476#ifdef _WIN32
477 int arg = 1;
478 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
479 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
480#else
481 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
482 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
483#endif
484 croak ("unable to initialize result pipe(3)");
485
486 respipe_osf [0] = TO_SOCKET (respipe [0]);
487 respipe_osf [1] = TO_SOCKET (respipe [1]);
488} 449}
489 450
490static void poll_wait (void) 451static void poll_wait (void)
491{ 452{
492 fd_set rfd;
493
494 while (eio_nreqs ()) 453 while (eio_nreqs ())
495 { 454 {
496 int size; 455 int size;
497 456
498 X_LOCK (reslock); 457 X_LOCK (reslock);
502 if (size) 461 if (size)
503 return; 462 return;
504 463
505 etp_maybe_start_thread (); 464 etp_maybe_start_thread ();
506 465
507 FD_ZERO (&rfd); 466 s_epipe_wait (&respipe);
508 FD_SET (respipe [0], &rfd);
509
510 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
511 } 467 }
512} 468}
513 469
514static int poll_cb (void) 470static int poll_cb (void)
515{ 471{
533} 489}
534 490
535static SV * 491static SV *
536get_cb (SV *cb_sv) 492get_cb (SV *cb_sv)
537{ 493{
538 HV *st; 494 SvGETMAGIC (cb_sv);
539 GV *gvp; 495 return SvOK (cb_sv) ? s_get_cv_croak (cb_sv) : 0;
540 CV *cv;
541
542 if (!SvOK (cb_sv))
543 return 0;
544
545 cv = sv_2cv (cb_sv, &st, &gvp, 0);
546
547 if (!cv)
548 croak ("IO::AIO callback must be undef or a CODE reference");
549
550 return (SV *)cv;
551} 496}
552 497
553#define dREQ \ 498#define dREQ \
554 SV *cb_cv; \ 499 SV *cb_cv; \
555 aio_req req; \ 500 aio_req req; \
1130 1075
1131int 1076int
1132poll_fileno() 1077poll_fileno()
1133 PROTOTYPE: 1078 PROTOTYPE:
1134 CODE: 1079 CODE:
1135 RETVAL = respipe [0]; 1080 RETVAL = s_epipe_fd (&respipe);
1136 OUTPUT: 1081 OUTPUT:
1137 RETVAL 1082 RETVAL
1138 1083
1139int 1084int
1140poll_cb(...) 1085poll_cb(...)
1221 if (GIMME_V != G_VOID) 1166 if (GIMME_V != G_VOID)
1222 XPUSHs (req->callback ? sv_2mortal (newRV_inc (req->callback)) : &PL_sv_undef); 1167 XPUSHs (req->callback ? sv_2mortal (newRV_inc (req->callback)) : &PL_sv_undef);
1223 1168
1224 if (items > 1) 1169 if (items > 1)
1225 { 1170 {
1226 SV *cb_cv = get_cb (callback); 1171 SV *cb_cv =get_cb (callback);
1227 1172
1228 SvREFCNT_dec (req->callback); 1173 SvREFCNT_dec (req->callback);
1229 req->callback = SvREFCNT_inc (cb_cv); 1174 req->callback = SvREFCNT_inc (cb_cv);
1230 } 1175 }
1231} 1176}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines