ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.95
Committed: Sun Feb 13 04:39:15 2011 UTC (13 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-5_36
Changes since 1.94: +1 -1 lines
Log Message:
5.26

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.65 Coro::Handle - non-blocking I/O with a blocking interface.
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.35 This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
15 root 1.14 other coroutines can run while reads or writes block on the handle.
16    
17     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18     data, allowing other coroutines to run while one coroutine waits for I/O.
19    
20     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21 root 1.1
22 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23     tied methods, i.e.:
24    
25     $fh->print ($str); # NOT print $fh $str;
26     my $line = $fh->readline; # NOT my $line = <$fh>;
27    
28     The reason is that perl recurses within the interpreter when invoking tie
29     magic, forcing the (temporary) allocation of a (big) stack. If you have
30     lots of socket connections and they happen to wait in e.g. <$fh>, then
31     they would all have a costly C coroutine associated with them.
32    
33 root 1.1 =over 4
34    
35     =cut
36    
37     package Coro::Handle;
38    
39 root 1.86 use common::sense;
40 root 1.13
41 root 1.16 use Carp ();
42 root 1.38 use Errno qw(EAGAIN EINTR EINPROGRESS);
43 root 1.37
44 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45 root 1.36
46 root 1.5 use base 'Exporter';
47 root 1.1
48 root 1.95 our $VERSION = 5.26;
49 root 1.16 our @EXPORT = qw(unblock);
50 root 1.1
51 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
52 root 1.1
53     Create a new non-blocking io-handle using the given
54 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
55     other supported argument is "timeout", which sets a timeout for each
56     operation.
57 root 1.1
58     =cut
59    
60     sub new_from_fh {
61     my $class = shift;
62     my $fh = shift or return;
63     my $self = do { local *Coro::Handle };
64    
65 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
66 root 1.1
67 root 1.22 bless \$self, ref $class ? ref $class : $class
68 root 1.1 }
69    
70 root 1.5 =item $fh = unblock $fh
71    
72 root 1.94 This is a convenience function that just calls C<new_from_fh> on the
73 root 1.22 given filehandle. Use it to replace a normal perl filehandle by a
74     non-(coroutine-)blocking equivalent.
75 root 1.5
76     =cut
77    
78     sub unblock($) {
79 root 1.19 new_from_fh Coro::Handle $_[0]
80 root 1.5 }
81    
82 root 1.1 =item $fh->writable, $fh->readable
83    
84     Wait until the filehandle is readable or writable (and return true) or
85     until an error condition happens (and return false).
86    
87     =cut
88    
89 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
90     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
91 root 1.1
92 root 1.16 =item $fh->readline ([$terminator])
93 root 1.4
94 root 1.79 Similar to the builtin of the same name, but allows you to specify the
95     input record separator in a coroutine-safe manner (i.e. not using a global
96     variable). Paragraph mode is not supported, use "\n\n" to achieve the same
97     effect.
98 root 1.4
99     =cut
100    
101 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
102 root 1.4
103 root 1.16 =item $fh->autoflush ([...])
104 root 1.4
105     Always returns true, arguments are being ignored (exists for compatibility
106 root 1.8 only). Might change in the future.
107 root 1.4
108     =cut
109    
110     sub autoflush { !0 }
111    
112 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
113 root 1.8
114 root 1.13 Work like their function equivalents (except read, which works like
115 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
116 root 1.13 work but it's not efficient).
117 root 1.8
118     =cut
119    
120 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
121     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
124     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
125     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
126     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
127     sub blocking { !0 } # this handler always blocks the caller
128    
129     sub partial {
130     my $obj = tied ${$_[0]};
131    
132     my $retval = $obj->[8];
133     $obj->[8] = $_[1] if @_ > 1;
134     $retval
135     }
136 root 1.8
137 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
138     send, recv, peername, sockname, shutdown, peerport, peerhost
139    
140     Do the same thing as the perl builtins or IO::Socket methods (but return
141     true on EINPROGRESS). Remember that these must be method calls.
142    
143     =cut
144    
145 root 1.81 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
146 root 1.27 sub bind { bind tied(${$_[0]})->[0], $_[1] }
147     sub listen { listen tied(${$_[0]})->[0], $_[1] }
148     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
149     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
150     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
151     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152     sub sockname { getsockname tied(${$_[0]})->[0] }
153     sub peername { getpeername tied(${$_[0]})->[0] }
154     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
155    
156     =item ($fh, $peername) = $listen_fh->accept
157    
158     In scalar context, returns the newly accepted socket (or undef) and in
159     list context return the ($fh, $peername) pair (or nothing).
160    
161     =cut
162    
163     sub accept {
164     my ($peername, $fh);
165     while () {
166     $peername = accept $fh, tied(${$_[0]})->[0]
167     and return wantarray
168     ? ($_[0]->new_from_fh($fh), $peername)
169     : $_[0]->new_from_fh($fh);
170    
171 root 1.39 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
172 root 1.27
173     $_[0]->readable or return;
174     }
175     }
176    
177 root 1.21 =item $fh->timeout ([...])
178 root 1.8
179 root 1.13 The optional argument sets the new timeout (in seconds) for this
180 root 1.8 handle. Returns the current (new) value.
181    
182     C<0> is a valid timeout, use C<undef> to disable the timeout.
183    
184     =cut
185    
186     sub timeout {
187 root 1.18 my $self = tied ${$_[0]};
188 root 1.13 if (@_ > 1) {
189     $self->[2] = $_[1];
190 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
191     $self->[6]->timeout ($_[1]) if $self->[6];
192 root 1.8 }
193 root 1.21 $self->[2]
194 root 1.13 }
195    
196     =item $fh->fh
197    
198     Returns the "real" (non-blocking) filehandle. Use this if you want to
199     do operations on the file handle you cannot do using the Coro::Handle
200     interface.
201    
202     =item $fh->rbuf
203    
204     Returns the current contents of the read buffer (this is an lvalue, so you
205     can change the read buffer if you like).
206    
207     You can use this function to implement your own optimized reader when neither
208     readline nor sysread are viable candidates, like this:
209    
210     # first get the _real_ non-blocking filehandle
211     # and fetch a reference to the read buffer
212     my $nb_fh = $fh->fh;
213     my $buf = \$fh->rbuf;
214    
215 root 1.18 while () {
216 root 1.13 # now use buffer contents, modifying
217     # if necessary to reflect the removed data
218    
219     last if $$buf ne ""; # we have leftover data
220    
221     # read another buffer full of data
222     $fh->readable or die "end of file";
223     sysread $nb_fh, $$buf, 8192;
224     }
225    
226     =cut
227    
228     sub fh {
229     (tied ${$_[0]})->[0];
230     }
231    
232     sub rbuf : lvalue {
233     (tied ${$_[0]})->[3];
234     }
235    
236     sub DESTROY {
237     # nop
238     }
239    
240 root 1.16 our $AUTOLOAD;
241    
242 root 1.13 sub AUTOLOAD {
243     my $self = tied ${$_[0]};
244    
245     (my $func = $AUTOLOAD) =~ s/^(.*):://;
246    
247     my $forward = UNIVERSAL::can $self->[7], $func;
248    
249     $forward or
250     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
251    
252     goto &$forward;
253 root 1.8 }
254    
255 root 1.1 package Coro::Handle::FH;
256    
257 root 1.86 use common::sense;
258 root 1.13
259 root 1.10 use Carp 'croak';
260 root 1.37 use Errno qw(EAGAIN EINTR);
261 root 1.1
262 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK);
263 root 1.1
264 root 1.66 use Coro::AnyEvent;
265    
266 root 1.13 # formerly a hash, but we are speed-critical, so try
267     # to be faster even if it hurts.
268     #
269     # 0 FH
270     # 1 desc
271     # 2 timeout
272     # 3 rb
273     # 4 wb # unused
274 root 1.67 # 5 read watcher, if Coro::Event|EV used
275     # 6 write watcher, if Coro::Event|EV used
276 root 1.13 # 7 forward class
277     # 8 blocking
278 root 1.1
279     sub TIEHANDLE {
280 root 1.13 my ($class, %arg) = @_;
281 root 1.1
282 root 1.13 my $self = bless [], $class;
283     $self->[0] = $arg{fh};
284     $self->[1] = $arg{desc};
285     $self->[2] = $arg{timeout};
286     $self->[3] = "";
287     $self->[4] = "";
288     $self->[7] = $arg{forward_class};
289     $self->[8] = $arg{partial};
290 root 1.1
291 root 1.36 AnyEvent::Util::fh_nonblocking $self->[0], 1;
292 root 1.6
293 root 1.13 $self
294     }
295    
296     sub cleanup {
297 root 1.75 # gets overriden for Coro::Event
298 root 1.29 @{$_[0]} = ();
299 root 1.1 }
300    
301     sub OPEN {
302 root 1.13 &cleanup;
303 root 1.1 my $self = shift;
304 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
305     : open $self->[0], $_[0], $_[1], $_[2];
306 root 1.18
307 root 1.1 if ($r) {
308 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
309 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
310 root 1.1 }
311 root 1.18
312     $r
313 root 1.1 }
314    
315 root 1.13 sub PRINT {
316 root 1.18 WRITE (shift, join "", @_)
317 root 1.13 }
318    
319     sub PRINTF {
320 root 1.62 WRITE (shift, sprintf shift, @_)
321 root 1.13 }
322    
323     sub GETC {
324     my $buf;
325 root 1.16 READ ($_[0], $buf, 1);
326 root 1.18 $buf
327 root 1.13 }
328    
329     sub BINMODE {
330     binmode $_[0][0];
331     }
332    
333     sub TELL {
334 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
335 root 1.13 }
336    
337     sub SEEK {
338 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
339 root 1.13 }
340    
341     sub EOF {
342 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
343 root 1.13 }
344    
345 root 1.1 sub CLOSE {
346 root 1.89 my $fh = $_[0][0];
347 root 1.13 &cleanup;
348 root 1.89 close $fh
349 root 1.13 }
350    
351     sub DESTROY {
352     &cleanup;
353 root 1.8 }
354    
355     sub FILENO {
356 root 1.18 fileno $_[0][0]
357 root 1.1 }
358    
359 root 1.13 # seems to be called for stringification (how weird), at least
360     # when DumpValue::dumpValue is used to print this.
361     sub FETCH {
362 root 1.18 "$_[0]<$_[0][1]>"
363 root 1.1 }
364    
365 root 1.75 sub _readable_anyevent {
366 root 1.67 my $cb = Coro::rouse_cb;
367 root 1.13
368 root 1.85 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
369     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
370 root 1.13
371 root 1.85 Coro::rouse_wait
372 root 1.13 }
373    
374 root 1.75 sub _writable_anyevent {
375 root 1.67 my $cb = Coro::rouse_cb;
376 root 1.13
377 root 1.85 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
378     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
379 root 1.15
380 root 1.85 Coro::rouse_wait
381 root 1.1 }
382    
383 root 1.75 sub _readable_coro {
384 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
385 root 1.17 fd => $_[0][0],
386     desc => "fh $_[0][1] read watcher",
387     timeout => $_[0][2],
388     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
389 root 1.26 ))->next->[4] & &Event::Watcher::R
390 root 1.17 }
391    
392 root 1.75 sub _writable_coro {
393 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
394 root 1.17 fd => $_[0][0],
395     desc => "fh $_[0][1] write watcher",
396     timeout => $_[0][2],
397     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
398 root 1.26 ))->next->[4] & &Event::Watcher::W
399 root 1.17 }
400    
401 root 1.75 #sub _readable_ev {
402 root 1.33 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
403     #}
404     #
405 root 1.75 #sub _writable_ev {
406 root 1.33 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
407     #}
408 root 1.30
409 root 1.21 # decide on event model at runtime
410 root 1.17 for my $rw (qw(readable writable)) {
411     *$rw = sub {
412 root 1.25 AnyEvent::detect;
413 root 1.75 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
414     *$rw = \&{"_$rw\_coro"};
415     *cleanup = sub {
416     eval {
417     $_[0][5]->cancel if $_[0][5];
418     $_[0][6]->cancel if $_[0][6];
419     };
420     @{$_[0]} = ();
421     };
422    
423     } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
424     *$rw = \&{"Coro::EV::_$rw\_ev"};
425 root 1.74 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
426 root 1.75
427 root 1.17 } else {
428 root 1.75 *$rw = \&{"_$rw\_anyevent"};
429 root 1.17 }
430 root 1.25 goto &$rw
431 root 1.17 };
432     };
433    
434 root 1.1 sub WRITE {
435     my $len = defined $_[2] ? $_[2] : length $_[1];
436     my $ofs = $_[3];
437 root 1.90 my $res;
438 root 1.1
439 root 1.16 while () {
440     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
441 root 1.1 if (defined $r) {
442     $len -= $r;
443     $ofs += $r;
444     $res += $r;
445     last unless $len;
446 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
447 root 1.1 last;
448     }
449 root 1.13 last unless &writable;
450 root 1.1 }
451    
452 root 1.90 $res
453 root 1.1 }
454    
455     sub READ {
456     my $len = $_[2];
457     my $ofs = $_[3];
458 root 1.90 my $res;
459 root 1.1
460 root 1.7 # first deplete the read buffer
461 root 1.13 if (length $_[0][3]) {
462     my $l = length $_[0][3];
463 root 1.7 if ($l <= $len) {
464 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
465 root 1.7 $len -= $l;
466 root 1.13 $ofs += $l;
467 root 1.7 $res += $l;
468     return $res unless $len;
469     } else {
470 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
471     substr ($_[0][3], 0, $len) = "";
472 root 1.7 return $len;
473     }
474     }
475    
476 root 1.2 while() {
477 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
478 root 1.1 if (defined $r) {
479     $len -= $r;
480     $ofs += $r;
481     $res += $r;
482     last unless $len && $r;
483 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
484 root 1.1 last;
485     }
486 root 1.13 last if $_[0][8] || !&readable;
487 root 1.1 }
488    
489 root 1.90 $res
490 root 1.1 }
491    
492     sub READLINE {
493 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
494 root 1.82 my ($ofs, $len, $pos);
495 root 1.1
496 root 1.71 while () {
497 root 1.82 if (length $irs) {
498     $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
499    
500     return substr $_[0][3], 0, $pos + length $irs, ""
501     if $pos >= 0;
502    
503     $ofs = (length $_[0][3]) - (length $irs);
504     } elsif (defined $irs) {
505     $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
506    
507 root 1.24 if ($pos >= 0) {
508 root 1.82 my $res = substr $_[0][3], 0, $pos + 2, "";
509     $res =~ s/\A\n+//;
510 root 1.24 return $res;
511     }
512 root 1.51
513 root 1.82 $ofs = (length $_[0][3]) - 1;
514 root 1.1 }
515 root 1.13
516 root 1.52 $len = sysread $_[0][0], $_[0][3], $len + 4096, length $_[0][3];
517 root 1.82
518     unless ($len) {
519     if (defined $len) {
520     # EOF
521     return undef unless length $_[0][3];
522    
523     $_[0][3] =~ s/\A\n+//
524     if ! length $irs && defined $irs;
525    
526     return delete $_[0][3];
527     } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
528     return length $_[0][3] ? delete $_[0][3] : undef;
529     }
530 root 1.1 }
531     }
532 root 1.6 }
533    
534 root 1.13 1;
535 root 1.1
536 root 1.13 =back
537 root 1.1
538     =head1 BUGS
539    
540     - Perl's IO-Handle model is THE bug.
541    
542     =head1 AUTHOR
543    
544 root 1.13 Marc Lehmann <schmorp@schmorp.de>
545     http://home.schmorp.de/
546 root 1.1
547     =cut
548