ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.127
Committed: Mon Jun 29 22:42:33 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-6_44
Changes since 1.126: +1 -1 lines
Log Message:
6.44

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.65 Coro::Handle - non-blocking I/O with a blocking interface.
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.35 This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
15 root 1.14 other coroutines can run while reads or writes block on the handle.
16    
17     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18     data, allowing other coroutines to run while one coroutine waits for I/O.
19    
20     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21 root 1.1
22 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23     tied methods, i.e.:
24    
25     $fh->print ($str); # NOT print $fh $str;
26     my $line = $fh->readline; # NOT my $line = <$fh>;
27    
28     The reason is that perl recurses within the interpreter when invoking tie
29     magic, forcing the (temporary) allocation of a (big) stack. If you have
30     lots of socket connections and they happen to wait in e.g. <$fh>, then
31     they would all have a costly C coroutine associated with them.
32    
33 root 1.1 =over 4
34    
35     =cut
36    
37     package Coro::Handle;
38    
39 root 1.86 use common::sense;
40 root 1.13
41 root 1.16 use Carp ();
42 root 1.38 use Errno qw(EAGAIN EINTR EINPROGRESS);
43 root 1.37
44 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45 root 1.113 use AnyEvent::Socket ();
46 root 1.36
47 root 1.5 use base 'Exporter';
48 root 1.1
49 root 1.127 our $VERSION = 6.44;
50 root 1.16 our @EXPORT = qw(unblock);
51 root 1.1
52 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53 root 1.1
54     Create a new non-blocking io-handle using the given
55 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56     other supported argument is "timeout", which sets a timeout for each
57     operation.
58 root 1.1
59     =cut
60    
61     sub new_from_fh {
62     my $class = shift;
63     my $fh = shift or return;
64     my $self = do { local *Coro::Handle };
65    
66 root 1.96 tie *$self, 'Coro::Handle::FH', fh => $fh, @_;
67 root 1.1
68 root 1.22 bless \$self, ref $class ? ref $class : $class
69 root 1.1 }
70    
71 root 1.5 =item $fh = unblock $fh
72    
73 root 1.94 This is a convenience function that just calls C<new_from_fh> on the
74 root 1.22 given filehandle. Use it to replace a normal perl filehandle by a
75     non-(coroutine-)blocking equivalent.
76 root 1.5
77     =cut
78    
79     sub unblock($) {
80 root 1.19 new_from_fh Coro::Handle $_[0]
81 root 1.5 }
82    
83 root 1.1 =item $fh->writable, $fh->readable
84    
85     Wait until the filehandle is readable or writable (and return true) or
86     until an error condition happens (and return false).
87    
88     =cut
89    
90 root 1.96 sub readable { Coro::Handle::FH::readable (tied *${$_[0]}) }
91     sub writable { Coro::Handle::FH::writable (tied *${$_[0]}) }
92 root 1.1
93 root 1.16 =item $fh->readline ([$terminator])
94 root 1.4
95 root 1.79 Similar to the builtin of the same name, but allows you to specify the
96     input record separator in a coroutine-safe manner (i.e. not using a global
97     variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98     effect.
99 root 1.4
100     =cut
101    
102 root 1.96 sub readline { tied(*${+shift})->READLINE (@_) }
103 root 1.4
104 root 1.16 =item $fh->autoflush ([...])
105 root 1.4
106     Always returns true, arguments are being ignored (exists for compatibility
107 root 1.8 only). Might change in the future.
108 root 1.4
109     =cut
110    
111     sub autoflush { !0 }
112    
113 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114 root 1.8
115 root 1.13 Work like their function equivalents (except read, which works like
116 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
117 root 1.13 work but it's not efficient).
118 root 1.8
119     =cut
120    
121 root 1.96 sub read { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
122     sub sysread { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
123     sub syswrite { Coro::Handle::FH::WRITE (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
124     sub print { Coro::Handle::FH::WRITE (tied *${+shift}, join "", @_) }
125     sub printf { Coro::Handle::FH::PRINTF (tied *${+shift}, @_) }
126     sub fileno { Coro::Handle::FH::FILENO (tied *${$_[0]}) }
127     sub close { Coro::Handle::FH::CLOSE (tied *${$_[0]}) }
128 root 1.13 sub blocking { !0 } # this handler always blocks the caller
129    
130     sub partial {
131 root 1.96 my $obj = tied *${$_[0]};
132 root 1.13
133     my $retval = $obj->[8];
134     $obj->[8] = $_[1] if @_ > 1;
135     $retval
136     }
137 root 1.8
138 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
139     send, recv, peername, sockname, shutdown, peerport, peerhost
140    
141     Do the same thing as the perl builtins or IO::Socket methods (but return
142     true on EINPROGRESS). Remember that these must be method calls.
143    
144     =cut
145    
146 root 1.96 sub connect { connect tied(*${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
147     sub bind { bind tied(*${$_[0]})->[0], $_[1] }
148     sub listen { listen tied(*${$_[0]})->[0], $_[1] }
149     sub getsockopt { getsockopt tied(*${$_[0]})->[0], $_[1], $_[2] }
150     sub setsockopt { setsockopt tied(*${$_[0]})->[0], $_[1], $_[2], $_[3] }
151     sub send { send tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152     sub recv { recv tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
153     sub sockname { getsockname tied(*${$_[0]})->[0] }
154     sub peername { getpeername tied(*${$_[0]})->[0] }
155     sub shutdown { shutdown tied(*${$_[0]})->[0], $_[1] }
156 root 1.27
157 root 1.113 =item peeraddr, peerhost, peerport
158    
159     Return the peer host (as numericla IP address) and peer port (as integer).
160    
161     =cut
162    
163     sub peeraddr {
164     (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[1]
165     }
166    
167     sub peerport {
168     (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[0]
169     }
170    
171     sub peerhost {
172     AnyEvent::Socket::format_address &peeraddr
173     }
174    
175 root 1.27 =item ($fh, $peername) = $listen_fh->accept
176    
177     In scalar context, returns the newly accepted socket (or undef) and in
178     list context return the ($fh, $peername) pair (or nothing).
179    
180     =cut
181    
182     sub accept {
183     my ($peername, $fh);
184     while () {
185 root 1.96 $peername = accept $fh, tied(*${$_[0]})->[0]
186 root 1.27 and return wantarray
187     ? ($_[0]->new_from_fh($fh), $peername)
188     : $_[0]->new_from_fh($fh);
189    
190 root 1.39 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
191 root 1.27
192     $_[0]->readable or return;
193     }
194     }
195    
196 root 1.21 =item $fh->timeout ([...])
197 root 1.8
198 root 1.13 The optional argument sets the new timeout (in seconds) for this
199 root 1.8 handle. Returns the current (new) value.
200    
201     C<0> is a valid timeout, use C<undef> to disable the timeout.
202    
203     =cut
204    
205     sub timeout {
206 root 1.96 my $self = tied *${$_[0]};
207 root 1.13 if (@_ > 1) {
208     $self->[2] = $_[1];
209 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
210     $self->[6]->timeout ($_[1]) if $self->[6];
211 root 1.8 }
212 root 1.21 $self->[2]
213 root 1.13 }
214    
215     =item $fh->fh
216    
217     Returns the "real" (non-blocking) filehandle. Use this if you want to
218     do operations on the file handle you cannot do using the Coro::Handle
219     interface.
220    
221     =item $fh->rbuf
222    
223     Returns the current contents of the read buffer (this is an lvalue, so you
224     can change the read buffer if you like).
225    
226     You can use this function to implement your own optimized reader when neither
227     readline nor sysread are viable candidates, like this:
228    
229     # first get the _real_ non-blocking filehandle
230     # and fetch a reference to the read buffer
231     my $nb_fh = $fh->fh;
232     my $buf = \$fh->rbuf;
233    
234 root 1.18 while () {
235 root 1.13 # now use buffer contents, modifying
236     # if necessary to reflect the removed data
237    
238     last if $$buf ne ""; # we have leftover data
239    
240     # read another buffer full of data
241     $fh->readable or die "end of file";
242     sysread $nb_fh, $$buf, 8192;
243     }
244    
245     =cut
246    
247     sub fh {
248 root 1.96 (tied *${$_[0]})->[0];
249 root 1.13 }
250    
251     sub rbuf : lvalue {
252 root 1.96 (tied *${$_[0]})->[3];
253 root 1.13 }
254    
255     sub DESTROY {
256     # nop
257     }
258    
259 root 1.16 our $AUTOLOAD;
260    
261 root 1.13 sub AUTOLOAD {
262 root 1.96 my $self = tied *${$_[0]};
263 root 1.13
264     (my $func = $AUTOLOAD) =~ s/^(.*):://;
265    
266     my $forward = UNIVERSAL::can $self->[7], $func;
267    
268     $forward or
269     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
270    
271     goto &$forward;
272 root 1.8 }
273    
274 root 1.1 package Coro::Handle::FH;
275    
276 root 1.86 use common::sense;
277 root 1.13
278 root 1.10 use Carp 'croak';
279 root 1.37 use Errno qw(EAGAIN EINTR);
280 root 1.1
281 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK);
282 root 1.1
283 root 1.66 use Coro::AnyEvent;
284    
285 root 1.13 # formerly a hash, but we are speed-critical, so try
286     # to be faster even if it hurts.
287     #
288     # 0 FH
289     # 1 desc
290     # 2 timeout
291     # 3 rb
292     # 4 wb # unused
293 root 1.67 # 5 read watcher, if Coro::Event|EV used
294     # 6 write watcher, if Coro::Event|EV used
295 root 1.13 # 7 forward class
296     # 8 blocking
297 root 1.1
298     sub TIEHANDLE {
299 root 1.13 my ($class, %arg) = @_;
300 root 1.1
301 root 1.13 my $self = bless [], $class;
302     $self->[0] = $arg{fh};
303     $self->[1] = $arg{desc};
304     $self->[2] = $arg{timeout};
305     $self->[3] = "";
306     $self->[4] = "";
307 root 1.122 $self->[5] = undef; # work around changes in 5.20, which requires initialisation
308     $self->[6] = undef; # work around changes in 5.20, which requires initialisation
309 root 1.13 $self->[7] = $arg{forward_class};
310     $self->[8] = $arg{partial};
311 root 1.1
312 root 1.36 AnyEvent::Util::fh_nonblocking $self->[0], 1;
313 root 1.6
314 root 1.13 $self
315     }
316    
317     sub cleanup {
318 root 1.75 # gets overriden for Coro::Event
319 root 1.29 @{$_[0]} = ();
320 root 1.1 }
321    
322     sub OPEN {
323 root 1.13 &cleanup;
324 root 1.1 my $self = shift;
325 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
326     : open $self->[0], $_[0], $_[1], $_[2];
327 root 1.18
328 root 1.1 if ($r) {
329 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
330 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
331 root 1.1 }
332 root 1.18
333     $r
334 root 1.1 }
335    
336 root 1.13 sub PRINT {
337 root 1.18 WRITE (shift, join "", @_)
338 root 1.13 }
339    
340     sub PRINTF {
341 root 1.62 WRITE (shift, sprintf shift, @_)
342 root 1.13 }
343    
344     sub GETC {
345     my $buf;
346 root 1.16 READ ($_[0], $buf, 1);
347 root 1.18 $buf
348 root 1.13 }
349    
350     sub BINMODE {
351     binmode $_[0][0];
352     }
353    
354     sub TELL {
355 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
356 root 1.13 }
357    
358     sub SEEK {
359 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
360 root 1.13 }
361    
362     sub EOF {
363 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
364 root 1.13 }
365    
366 root 1.1 sub CLOSE {
367 root 1.89 my $fh = $_[0][0];
368 root 1.13 &cleanup;
369 root 1.89 close $fh
370 root 1.13 }
371    
372     sub DESTROY {
373     &cleanup;
374 root 1.8 }
375    
376     sub FILENO {
377 root 1.18 fileno $_[0][0]
378 root 1.1 }
379    
380 root 1.13 # seems to be called for stringification (how weird), at least
381     # when DumpValue::dumpValue is used to print this.
382     sub FETCH {
383 root 1.18 "$_[0]<$_[0][1]>"
384 root 1.1 }
385    
386 root 1.75 sub _readable_anyevent {
387 root 1.67 my $cb = Coro::rouse_cb;
388 root 1.13
389 root 1.85 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
390     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
391 root 1.13
392 root 1.85 Coro::rouse_wait
393 root 1.13 }
394    
395 root 1.75 sub _writable_anyevent {
396 root 1.67 my $cb = Coro::rouse_cb;
397 root 1.13
398 root 1.85 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
399     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
400 root 1.15
401 root 1.85 Coro::rouse_wait
402 root 1.1 }
403    
404 root 1.75 sub _readable_coro {
405 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
406 root 1.17 fd => $_[0][0],
407     desc => "fh $_[0][1] read watcher",
408     timeout => $_[0][2],
409     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
410 root 1.26 ))->next->[4] & &Event::Watcher::R
411 root 1.17 }
412    
413 root 1.75 sub _writable_coro {
414 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
415 root 1.17 fd => $_[0][0],
416     desc => "fh $_[0][1] write watcher",
417     timeout => $_[0][2],
418     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
419 root 1.26 ))->next->[4] & &Event::Watcher::W
420 root 1.17 }
421    
422 root 1.75 #sub _readable_ev {
423 root 1.33 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
424     #}
425     #
426 root 1.75 #sub _writable_ev {
427 root 1.33 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
428     #}
429 root 1.30
430 root 1.21 # decide on event model at runtime
431 root 1.17 for my $rw (qw(readable writable)) {
432     *$rw = sub {
433 root 1.25 AnyEvent::detect;
434 root 1.75 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
435     *$rw = \&{"_$rw\_coro"};
436     *cleanup = sub {
437     eval {
438     $_[0][5]->cancel if $_[0][5];
439     $_[0][6]->cancel if $_[0][6];
440     };
441     @{$_[0]} = ();
442     };
443    
444     } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
445     *$rw = \&{"Coro::EV::_$rw\_ev"};
446 root 1.74 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
447 root 1.75
448 root 1.17 } else {
449 root 1.75 *$rw = \&{"_$rw\_anyevent"};
450 root 1.17 }
451 root 1.25 goto &$rw
452 root 1.17 };
453     };
454    
455 root 1.1 sub WRITE {
456     my $len = defined $_[2] ? $_[2] : length $_[1];
457     my $ofs = $_[3];
458 root 1.90 my $res;
459 root 1.1
460 root 1.16 while () {
461     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
462 root 1.1 if (defined $r) {
463     $len -= $r;
464     $ofs += $r;
465     $res += $r;
466     last unless $len;
467 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
468 root 1.1 last;
469     }
470 root 1.13 last unless &writable;
471 root 1.1 }
472    
473 root 1.90 $res
474 root 1.1 }
475    
476     sub READ {
477     my $len = $_[2];
478     my $ofs = $_[3];
479 root 1.90 my $res;
480 root 1.1
481 root 1.7 # first deplete the read buffer
482 root 1.13 if (length $_[0][3]) {
483     my $l = length $_[0][3];
484 root 1.7 if ($l <= $len) {
485 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
486 root 1.7 $len -= $l;
487 root 1.13 $ofs += $l;
488 root 1.7 $res += $l;
489     return $res unless $len;
490     } else {
491 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
492     substr ($_[0][3], 0, $len) = "";
493 root 1.7 return $len;
494     }
495     }
496    
497 root 1.2 while() {
498 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
499 root 1.1 if (defined $r) {
500     $len -= $r;
501     $ofs += $r;
502     $res += $r;
503     last unless $len && $r;
504 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
505 root 1.1 last;
506     }
507 root 1.13 last if $_[0][8] || !&readable;
508 root 1.1 }
509    
510 root 1.90 $res
511 root 1.1 }
512    
513     sub READLINE {
514 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
515 root 1.82 my ($ofs, $len, $pos);
516 root 1.106 my $bufsize = 1020;
517 root 1.1
518 root 1.71 while () {
519 root 1.82 if (length $irs) {
520     $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
521    
522     return substr $_[0][3], 0, $pos + length $irs, ""
523     if $pos >= 0;
524    
525     $ofs = (length $_[0][3]) - (length $irs);
526     } elsif (defined $irs) {
527     $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
528    
529 root 1.24 if ($pos >= 0) {
530 root 1.82 my $res = substr $_[0][3], 0, $pos + 2, "";
531     $res =~ s/\A\n+//;
532 root 1.24 return $res;
533     }
534 root 1.51
535 root 1.82 $ofs = (length $_[0][3]) - 1;
536 root 1.1 }
537 root 1.13
538 root 1.106 $len = $bufsize - length $_[0][3];
539     $len = $bufsize *= 2 if $len < $bufsize * 0.5;
540     $len = sysread $_[0][0], $_[0][3], $len, length $_[0][3];
541 root 1.82
542     unless ($len) {
543     if (defined $len) {
544     # EOF
545     return undef unless length $_[0][3];
546    
547     $_[0][3] =~ s/\A\n+//
548     if ! length $irs && defined $irs;
549    
550     return delete $_[0][3];
551     } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
552     return length $_[0][3] ? delete $_[0][3] : undef;
553     }
554 root 1.1 }
555     }
556 root 1.6 }
557    
558 root 1.13 1;
559 root 1.1
560 root 1.13 =back
561 root 1.1
562     =head1 BUGS
563    
564     - Perl's IO-Handle model is THE bug.
565    
566 root 1.126 =head1 AUTHOR/SUPPORT/CONTACT
567 root 1.1
568 root 1.126 Marc A. Lehmann <schmorp@schmorp.de>
569     http://software.schmorp.de/pkg/Coro.html
570 root 1.1
571     =cut
572