ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.117
Committed: Thu May 9 05:40:14 2013 UTC (11 years ago) by root
Branch: MAIN
CVS Tags: rel-6_32, rel-6_31
Changes since 1.116: +1 -1 lines
Log Message:
6.31

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.65 Coro::Handle - non-blocking I/O with a blocking interface.
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.35 This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
15 root 1.14 other coroutines can run while reads or writes block on the handle.
16    
17     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18     data, allowing other coroutines to run while one coroutine waits for I/O.
19    
20     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21 root 1.1
22 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23     tied methods, i.e.:
24    
25     $fh->print ($str); # NOT print $fh $str;
26     my $line = $fh->readline; # NOT my $line = <$fh>;
27    
28     The reason is that perl recurses within the interpreter when invoking tie
29     magic, forcing the (temporary) allocation of a (big) stack. If you have
30     lots of socket connections and they happen to wait in e.g. <$fh>, then
31     they would all have a costly C coroutine associated with them.
32    
33 root 1.1 =over 4
34    
35     =cut
36    
37     package Coro::Handle;
38    
39 root 1.86 use common::sense;
40 root 1.13
41 root 1.16 use Carp ();
42 root 1.38 use Errno qw(EAGAIN EINTR EINPROGRESS);
43 root 1.37
44 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45 root 1.113 use AnyEvent::Socket ();
46 root 1.36
47 root 1.5 use base 'Exporter';
48 root 1.1
49 root 1.117 our $VERSION = 6.31;
50 root 1.16 our @EXPORT = qw(unblock);
51 root 1.1
52 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53 root 1.1
54     Create a new non-blocking io-handle using the given
55 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56     other supported argument is "timeout", which sets a timeout for each
57     operation.
58 root 1.1
59     =cut
60    
61     sub new_from_fh {
62     my $class = shift;
63     my $fh = shift or return;
64     my $self = do { local *Coro::Handle };
65    
66 root 1.96 tie *$self, 'Coro::Handle::FH', fh => $fh, @_;
67 root 1.1
68 root 1.22 bless \$self, ref $class ? ref $class : $class
69 root 1.1 }
70    
71 root 1.5 =item $fh = unblock $fh
72    
73 root 1.94 This is a convenience function that just calls C<new_from_fh> on the
74 root 1.22 given filehandle. Use it to replace a normal perl filehandle by a
75     non-(coroutine-)blocking equivalent.
76 root 1.5
77     =cut
78    
79     sub unblock($) {
80 root 1.19 new_from_fh Coro::Handle $_[0]
81 root 1.5 }
82    
83 root 1.1 =item $fh->writable, $fh->readable
84    
85     Wait until the filehandle is readable or writable (and return true) or
86     until an error condition happens (and return false).
87    
88     =cut
89    
90 root 1.96 sub readable { Coro::Handle::FH::readable (tied *${$_[0]}) }
91     sub writable { Coro::Handle::FH::writable (tied *${$_[0]}) }
92 root 1.1
93 root 1.16 =item $fh->readline ([$terminator])
94 root 1.4
95 root 1.79 Similar to the builtin of the same name, but allows you to specify the
96     input record separator in a coroutine-safe manner (i.e. not using a global
97     variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98     effect.
99 root 1.4
100     =cut
101    
102 root 1.96 sub readline { tied(*${+shift})->READLINE (@_) }
103 root 1.4
104 root 1.16 =item $fh->autoflush ([...])
105 root 1.4
106     Always returns true, arguments are being ignored (exists for compatibility
107 root 1.8 only). Might change in the future.
108 root 1.4
109     =cut
110    
111     sub autoflush { !0 }
112    
113 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114 root 1.8
115 root 1.13 Work like their function equivalents (except read, which works like
116 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
117 root 1.13 work but it's not efficient).
118 root 1.8
119     =cut
120    
121 root 1.96 sub read { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
122     sub sysread { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
123     sub syswrite { Coro::Handle::FH::WRITE (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
124     sub print { Coro::Handle::FH::WRITE (tied *${+shift}, join "", @_) }
125     sub printf { Coro::Handle::FH::PRINTF (tied *${+shift}, @_) }
126     sub fileno { Coro::Handle::FH::FILENO (tied *${$_[0]}) }
127     sub close { Coro::Handle::FH::CLOSE (tied *${$_[0]}) }
128 root 1.13 sub blocking { !0 } # this handler always blocks the caller
129    
130     sub partial {
131 root 1.96 my $obj = tied *${$_[0]};
132 root 1.13
133     my $retval = $obj->[8];
134     $obj->[8] = $_[1] if @_ > 1;
135     $retval
136     }
137 root 1.8
138 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
139     send, recv, peername, sockname, shutdown, peerport, peerhost
140    
141     Do the same thing as the perl builtins or IO::Socket methods (but return
142     true on EINPROGRESS). Remember that these must be method calls.
143    
144     =cut
145    
146 root 1.96 sub connect { connect tied(*${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
147     sub bind { bind tied(*${$_[0]})->[0], $_[1] }
148     sub listen { listen tied(*${$_[0]})->[0], $_[1] }
149     sub getsockopt { getsockopt tied(*${$_[0]})->[0], $_[1], $_[2] }
150     sub setsockopt { setsockopt tied(*${$_[0]})->[0], $_[1], $_[2], $_[3] }
151     sub send { send tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152     sub recv { recv tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
153     sub sockname { getsockname tied(*${$_[0]})->[0] }
154     sub peername { getpeername tied(*${$_[0]})->[0] }
155     sub shutdown { shutdown tied(*${$_[0]})->[0], $_[1] }
156 root 1.27
157 root 1.113 =item peeraddr, peerhost, peerport
158    
159     Return the peer host (as numericla IP address) and peer port (as integer).
160    
161     =cut
162    
163     sub peeraddr {
164     (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[1]
165     }
166    
167     sub peerport {
168     (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[0]
169     }
170    
171     sub peerhost {
172     AnyEvent::Socket::format_address &peeraddr
173     }
174    
175 root 1.27 =item ($fh, $peername) = $listen_fh->accept
176    
177     In scalar context, returns the newly accepted socket (or undef) and in
178     list context return the ($fh, $peername) pair (or nothing).
179    
180     =cut
181    
182     sub accept {
183     my ($peername, $fh);
184     while () {
185 root 1.96 $peername = accept $fh, tied(*${$_[0]})->[0]
186 root 1.27 and return wantarray
187     ? ($_[0]->new_from_fh($fh), $peername)
188     : $_[0]->new_from_fh($fh);
189    
190 root 1.39 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
191 root 1.27
192     $_[0]->readable or return;
193     }
194     }
195    
196 root 1.21 =item $fh->timeout ([...])
197 root 1.8
198 root 1.13 The optional argument sets the new timeout (in seconds) for this
199 root 1.8 handle. Returns the current (new) value.
200    
201     C<0> is a valid timeout, use C<undef> to disable the timeout.
202    
203     =cut
204    
205     sub timeout {
206 root 1.96 my $self = tied *${$_[0]};
207 root 1.13 if (@_ > 1) {
208     $self->[2] = $_[1];
209 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
210     $self->[6]->timeout ($_[1]) if $self->[6];
211 root 1.8 }
212 root 1.21 $self->[2]
213 root 1.13 }
214    
215     =item $fh->fh
216    
217     Returns the "real" (non-blocking) filehandle. Use this if you want to
218     do operations on the file handle you cannot do using the Coro::Handle
219     interface.
220    
221     =item $fh->rbuf
222    
223     Returns the current contents of the read buffer (this is an lvalue, so you
224     can change the read buffer if you like).
225    
226     You can use this function to implement your own optimized reader when neither
227     readline nor sysread are viable candidates, like this:
228    
229     # first get the _real_ non-blocking filehandle
230     # and fetch a reference to the read buffer
231     my $nb_fh = $fh->fh;
232     my $buf = \$fh->rbuf;
233    
234 root 1.18 while () {
235 root 1.13 # now use buffer contents, modifying
236     # if necessary to reflect the removed data
237    
238     last if $$buf ne ""; # we have leftover data
239    
240     # read another buffer full of data
241     $fh->readable or die "end of file";
242     sysread $nb_fh, $$buf, 8192;
243     }
244    
245     =cut
246    
247     sub fh {
248 root 1.96 (tied *${$_[0]})->[0];
249 root 1.13 }
250    
251     sub rbuf : lvalue {
252 root 1.96 (tied *${$_[0]})->[3];
253 root 1.13 }
254    
255     sub DESTROY {
256     # nop
257     }
258    
259 root 1.16 our $AUTOLOAD;
260    
261 root 1.13 sub AUTOLOAD {
262 root 1.96 my $self = tied *${$_[0]};
263 root 1.13
264     (my $func = $AUTOLOAD) =~ s/^(.*):://;
265    
266     my $forward = UNIVERSAL::can $self->[7], $func;
267    
268     $forward or
269     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
270    
271     goto &$forward;
272 root 1.8 }
273    
274 root 1.1 package Coro::Handle::FH;
275    
276 root 1.86 use common::sense;
277 root 1.13
278 root 1.10 use Carp 'croak';
279 root 1.37 use Errno qw(EAGAIN EINTR);
280 root 1.1
281 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK);
282 root 1.1
283 root 1.66 use Coro::AnyEvent;
284    
285 root 1.13 # formerly a hash, but we are speed-critical, so try
286     # to be faster even if it hurts.
287     #
288     # 0 FH
289     # 1 desc
290     # 2 timeout
291     # 3 rb
292     # 4 wb # unused
293 root 1.67 # 5 read watcher, if Coro::Event|EV used
294     # 6 write watcher, if Coro::Event|EV used
295 root 1.13 # 7 forward class
296     # 8 blocking
297 root 1.1
298     sub TIEHANDLE {
299 root 1.13 my ($class, %arg) = @_;
300 root 1.1
301 root 1.13 my $self = bless [], $class;
302     $self->[0] = $arg{fh};
303     $self->[1] = $arg{desc};
304     $self->[2] = $arg{timeout};
305     $self->[3] = "";
306     $self->[4] = "";
307     $self->[7] = $arg{forward_class};
308     $self->[8] = $arg{partial};
309 root 1.1
310 root 1.36 AnyEvent::Util::fh_nonblocking $self->[0], 1;
311 root 1.6
312 root 1.13 $self
313     }
314    
315     sub cleanup {
316 root 1.75 # gets overriden for Coro::Event
317 root 1.29 @{$_[0]} = ();
318 root 1.1 }
319    
320     sub OPEN {
321 root 1.13 &cleanup;
322 root 1.1 my $self = shift;
323 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
324     : open $self->[0], $_[0], $_[1], $_[2];
325 root 1.18
326 root 1.1 if ($r) {
327 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
328 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
329 root 1.1 }
330 root 1.18
331     $r
332 root 1.1 }
333    
334 root 1.13 sub PRINT {
335 root 1.18 WRITE (shift, join "", @_)
336 root 1.13 }
337    
338     sub PRINTF {
339 root 1.62 WRITE (shift, sprintf shift, @_)
340 root 1.13 }
341    
342     sub GETC {
343     my $buf;
344 root 1.16 READ ($_[0], $buf, 1);
345 root 1.18 $buf
346 root 1.13 }
347    
348     sub BINMODE {
349     binmode $_[0][0];
350     }
351    
352     sub TELL {
353 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
354 root 1.13 }
355    
356     sub SEEK {
357 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
358 root 1.13 }
359    
360     sub EOF {
361 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
362 root 1.13 }
363    
364 root 1.1 sub CLOSE {
365 root 1.89 my $fh = $_[0][0];
366 root 1.13 &cleanup;
367 root 1.89 close $fh
368 root 1.13 }
369    
370     sub DESTROY {
371     &cleanup;
372 root 1.8 }
373    
374     sub FILENO {
375 root 1.18 fileno $_[0][0]
376 root 1.1 }
377    
378 root 1.13 # seems to be called for stringification (how weird), at least
379     # when DumpValue::dumpValue is used to print this.
380     sub FETCH {
381 root 1.18 "$_[0]<$_[0][1]>"
382 root 1.1 }
383    
384 root 1.75 sub _readable_anyevent {
385 root 1.67 my $cb = Coro::rouse_cb;
386 root 1.13
387 root 1.85 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
388     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
389 root 1.13
390 root 1.85 Coro::rouse_wait
391 root 1.13 }
392    
393 root 1.75 sub _writable_anyevent {
394 root 1.67 my $cb = Coro::rouse_cb;
395 root 1.13
396 root 1.85 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
397     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
398 root 1.15
399 root 1.85 Coro::rouse_wait
400 root 1.1 }
401    
402 root 1.75 sub _readable_coro {
403 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
404 root 1.17 fd => $_[0][0],
405     desc => "fh $_[0][1] read watcher",
406     timeout => $_[0][2],
407     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
408 root 1.26 ))->next->[4] & &Event::Watcher::R
409 root 1.17 }
410    
411 root 1.75 sub _writable_coro {
412 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
413 root 1.17 fd => $_[0][0],
414     desc => "fh $_[0][1] write watcher",
415     timeout => $_[0][2],
416     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
417 root 1.26 ))->next->[4] & &Event::Watcher::W
418 root 1.17 }
419    
420 root 1.75 #sub _readable_ev {
421 root 1.33 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
422     #}
423     #
424 root 1.75 #sub _writable_ev {
425 root 1.33 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
426     #}
427 root 1.30
428 root 1.21 # decide on event model at runtime
429 root 1.17 for my $rw (qw(readable writable)) {
430     *$rw = sub {
431 root 1.25 AnyEvent::detect;
432 root 1.75 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
433     *$rw = \&{"_$rw\_coro"};
434     *cleanup = sub {
435     eval {
436     $_[0][5]->cancel if $_[0][5];
437     $_[0][6]->cancel if $_[0][6];
438     };
439     @{$_[0]} = ();
440     };
441    
442     } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
443     *$rw = \&{"Coro::EV::_$rw\_ev"};
444 root 1.74 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
445 root 1.75
446 root 1.17 } else {
447 root 1.75 *$rw = \&{"_$rw\_anyevent"};
448 root 1.17 }
449 root 1.25 goto &$rw
450 root 1.17 };
451     };
452    
453 root 1.1 sub WRITE {
454     my $len = defined $_[2] ? $_[2] : length $_[1];
455     my $ofs = $_[3];
456 root 1.90 my $res;
457 root 1.1
458 root 1.16 while () {
459     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
460 root 1.1 if (defined $r) {
461     $len -= $r;
462     $ofs += $r;
463     $res += $r;
464     last unless $len;
465 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
466 root 1.1 last;
467     }
468 root 1.13 last unless &writable;
469 root 1.1 }
470    
471 root 1.90 $res
472 root 1.1 }
473    
474     sub READ {
475     my $len = $_[2];
476     my $ofs = $_[3];
477 root 1.90 my $res;
478 root 1.1
479 root 1.7 # first deplete the read buffer
480 root 1.13 if (length $_[0][3]) {
481     my $l = length $_[0][3];
482 root 1.7 if ($l <= $len) {
483 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
484 root 1.7 $len -= $l;
485 root 1.13 $ofs += $l;
486 root 1.7 $res += $l;
487     return $res unless $len;
488     } else {
489 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
490     substr ($_[0][3], 0, $len) = "";
491 root 1.7 return $len;
492     }
493     }
494    
495 root 1.2 while() {
496 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
497 root 1.1 if (defined $r) {
498     $len -= $r;
499     $ofs += $r;
500     $res += $r;
501     last unless $len && $r;
502 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
503 root 1.1 last;
504     }
505 root 1.13 last if $_[0][8] || !&readable;
506 root 1.1 }
507    
508 root 1.90 $res
509 root 1.1 }
510    
511     sub READLINE {
512 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
513 root 1.82 my ($ofs, $len, $pos);
514 root 1.106 my $bufsize = 1020;
515 root 1.1
516 root 1.71 while () {
517 root 1.82 if (length $irs) {
518     $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
519    
520     return substr $_[0][3], 0, $pos + length $irs, ""
521     if $pos >= 0;
522    
523     $ofs = (length $_[0][3]) - (length $irs);
524     } elsif (defined $irs) {
525     $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
526    
527 root 1.24 if ($pos >= 0) {
528 root 1.82 my $res = substr $_[0][3], 0, $pos + 2, "";
529     $res =~ s/\A\n+//;
530 root 1.24 return $res;
531     }
532 root 1.51
533 root 1.82 $ofs = (length $_[0][3]) - 1;
534 root 1.1 }
535 root 1.13
536 root 1.106 $len = $bufsize - length $_[0][3];
537     $len = $bufsize *= 2 if $len < $bufsize * 0.5;
538     $len = sysread $_[0][0], $_[0][3], $len, length $_[0][3];
539 root 1.82
540     unless ($len) {
541     if (defined $len) {
542     # EOF
543     return undef unless length $_[0][3];
544    
545     $_[0][3] =~ s/\A\n+//
546     if ! length $irs && defined $irs;
547    
548     return delete $_[0][3];
549     } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
550     return length $_[0][3] ? delete $_[0][3] : undef;
551     }
552 root 1.1 }
553     }
554 root 1.6 }
555    
556 root 1.13 1;
557 root 1.1
558 root 1.13 =back
559 root 1.1
560     =head1 BUGS
561    
562     - Perl's IO-Handle model is THE bug.
563    
564     =head1 AUTHOR
565    
566 root 1.13 Marc Lehmann <schmorp@schmorp.de>
567     http://home.schmorp.de/
568 root 1.1
569     =cut
570