ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.85
Committed: Sat Aug 22 22:36:23 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-5_17
Changes since 1.84: +7 -35 lines
Log Message:
5.17

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.65 Coro::Handle - non-blocking I/O with a blocking interface.
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.35 This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
15 root 1.14 other coroutines can run while reads or writes block on the handle.
16    
17     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18     data, allowing other coroutines to run while one coroutine waits for I/O.
19    
20     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21 root 1.1
22 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23     tied methods, i.e.:
24    
25     $fh->print ($str); # NOT print $fh $str;
26     my $line = $fh->readline; # NOT my $line = <$fh>;
27    
28     The reason is that perl recurses within the interpreter when invoking tie
29     magic, forcing the (temporary) allocation of a (big) stack. If you have
30     lots of socket connections and they happen to wait in e.g. <$fh>, then
31     they would all have a costly C coroutine associated with them.
32    
33 root 1.1 =over 4
34    
35     =cut
36    
37     package Coro::Handle;
38    
39 root 1.16 no warnings;
40     use strict;
41 root 1.13
42 root 1.16 use Carp ();
43 root 1.38 use Errno qw(EAGAIN EINTR EINPROGRESS);
44 root 1.37
45 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
46 root 1.36
47 root 1.5 use base 'Exporter';
48 root 1.1
49 root 1.85 our $VERSION = 5.17;
50 root 1.16 our @EXPORT = qw(unblock);
51 root 1.1
52 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53 root 1.1
54     Create a new non-blocking io-handle using the given
55 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56     other supported argument is "timeout", which sets a timeout for each
57     operation.
58 root 1.1
59     =cut
60    
61     sub new_from_fh {
62     my $class = shift;
63     my $fh = shift or return;
64     my $self = do { local *Coro::Handle };
65    
66 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
67 root 1.1
68 root 1.22 bless \$self, ref $class ? ref $class : $class
69 root 1.1 }
70    
71 root 1.5 =item $fh = unblock $fh
72    
73 root 1.22 This is a convinience function that just calls C<new_from_fh> on the
74     given filehandle. Use it to replace a normal perl filehandle by a
75     non-(coroutine-)blocking equivalent.
76 root 1.5
77     =cut
78    
79     sub unblock($) {
80 root 1.19 new_from_fh Coro::Handle $_[0]
81 root 1.5 }
82    
83 root 1.1 =item $fh->writable, $fh->readable
84    
85     Wait until the filehandle is readable or writable (and return true) or
86     until an error condition happens (and return false).
87    
88     =cut
89    
90 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
91     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
92 root 1.1
93 root 1.16 =item $fh->readline ([$terminator])
94 root 1.4
95 root 1.79 Similar to the builtin of the same name, but allows you to specify the
96     input record separator in a coroutine-safe manner (i.e. not using a global
97     variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98     effect.
99 root 1.4
100     =cut
101    
102 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
103 root 1.4
104 root 1.16 =item $fh->autoflush ([...])
105 root 1.4
106     Always returns true, arguments are being ignored (exists for compatibility
107 root 1.8 only). Might change in the future.
108 root 1.4
109     =cut
110    
111     sub autoflush { !0 }
112    
113 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114 root 1.8
115 root 1.13 Work like their function equivalents (except read, which works like
116 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
117 root 1.13 work but it's not efficient).
118 root 1.8
119     =cut
120    
121 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
124     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
125     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
126     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
127     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
128     sub blocking { !0 } # this handler always blocks the caller
129    
130     sub partial {
131     my $obj = tied ${$_[0]};
132    
133     my $retval = $obj->[8];
134     $obj->[8] = $_[1] if @_ > 1;
135     $retval
136     }
137 root 1.8
138 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
139     send, recv, peername, sockname, shutdown, peerport, peerhost
140    
141     Do the same thing as the perl builtins or IO::Socket methods (but return
142     true on EINPROGRESS). Remember that these must be method calls.
143    
144     =cut
145    
146 root 1.81 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
147 root 1.27 sub bind { bind tied(${$_[0]})->[0], $_[1] }
148     sub listen { listen tied(${$_[0]})->[0], $_[1] }
149     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
150     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
151     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
153     sub sockname { getsockname tied(${$_[0]})->[0] }
154     sub peername { getpeername tied(${$_[0]})->[0] }
155     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
156    
157     =item ($fh, $peername) = $listen_fh->accept
158    
159     In scalar context, returns the newly accepted socket (or undef) and in
160     list context return the ($fh, $peername) pair (or nothing).
161    
162     =cut
163    
164     sub accept {
165     my ($peername, $fh);
166     while () {
167     $peername = accept $fh, tied(${$_[0]})->[0]
168     and return wantarray
169     ? ($_[0]->new_from_fh($fh), $peername)
170     : $_[0]->new_from_fh($fh);
171    
172 root 1.39 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
173 root 1.27
174     $_[0]->readable or return;
175     }
176     }
177    
178 root 1.21 =item $fh->timeout ([...])
179 root 1.8
180 root 1.13 The optional argument sets the new timeout (in seconds) for this
181 root 1.8 handle. Returns the current (new) value.
182    
183     C<0> is a valid timeout, use C<undef> to disable the timeout.
184    
185     =cut
186    
187     sub timeout {
188 root 1.18 my $self = tied ${$_[0]};
189 root 1.13 if (@_ > 1) {
190     $self->[2] = $_[1];
191 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
192     $self->[6]->timeout ($_[1]) if $self->[6];
193 root 1.8 }
194 root 1.21 $self->[2]
195 root 1.13 }
196    
197     =item $fh->fh
198    
199     Returns the "real" (non-blocking) filehandle. Use this if you want to
200     do operations on the file handle you cannot do using the Coro::Handle
201     interface.
202    
203     =item $fh->rbuf
204    
205     Returns the current contents of the read buffer (this is an lvalue, so you
206     can change the read buffer if you like).
207    
208     You can use this function to implement your own optimized reader when neither
209     readline nor sysread are viable candidates, like this:
210    
211     # first get the _real_ non-blocking filehandle
212     # and fetch a reference to the read buffer
213     my $nb_fh = $fh->fh;
214     my $buf = \$fh->rbuf;
215    
216 root 1.18 while () {
217 root 1.13 # now use buffer contents, modifying
218     # if necessary to reflect the removed data
219    
220     last if $$buf ne ""; # we have leftover data
221    
222     # read another buffer full of data
223     $fh->readable or die "end of file";
224     sysread $nb_fh, $$buf, 8192;
225     }
226    
227     =cut
228    
229     sub fh {
230     (tied ${$_[0]})->[0];
231     }
232    
233     sub rbuf : lvalue {
234     (tied ${$_[0]})->[3];
235     }
236    
237     sub DESTROY {
238     # nop
239     }
240    
241 root 1.16 our $AUTOLOAD;
242    
243 root 1.13 sub AUTOLOAD {
244     my $self = tied ${$_[0]};
245    
246     (my $func = $AUTOLOAD) =~ s/^(.*):://;
247    
248     my $forward = UNIVERSAL::can $self->[7], $func;
249    
250     $forward or
251     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
252    
253     goto &$forward;
254 root 1.8 }
255    
256 root 1.1 package Coro::Handle::FH;
257    
258 root 1.16 no warnings;
259     use strict;
260 root 1.13
261 root 1.10 use Carp 'croak';
262 root 1.37 use Errno qw(EAGAIN EINTR);
263 root 1.1
264 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK);
265 root 1.1
266 root 1.66 use Coro::AnyEvent;
267    
268 root 1.13 # formerly a hash, but we are speed-critical, so try
269     # to be faster even if it hurts.
270     #
271     # 0 FH
272     # 1 desc
273     # 2 timeout
274     # 3 rb
275     # 4 wb # unused
276 root 1.67 # 5 read watcher, if Coro::Event|EV used
277     # 6 write watcher, if Coro::Event|EV used
278 root 1.13 # 7 forward class
279     # 8 blocking
280 root 1.1
281     sub TIEHANDLE {
282 root 1.13 my ($class, %arg) = @_;
283 root 1.1
284 root 1.13 my $self = bless [], $class;
285     $self->[0] = $arg{fh};
286     $self->[1] = $arg{desc};
287     $self->[2] = $arg{timeout};
288     $self->[3] = "";
289     $self->[4] = "";
290     $self->[7] = $arg{forward_class};
291     $self->[8] = $arg{partial};
292 root 1.1
293 root 1.36 AnyEvent::Util::fh_nonblocking $self->[0], 1;
294 root 1.6
295 root 1.13 $self
296     }
297    
298     sub cleanup {
299 root 1.75 # gets overriden for Coro::Event
300 root 1.29 @{$_[0]} = ();
301 root 1.1 }
302    
303     sub OPEN {
304 root 1.13 &cleanup;
305 root 1.1 my $self = shift;
306 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
307     : open $self->[0], $_[0], $_[1], $_[2];
308 root 1.18
309 root 1.1 if ($r) {
310 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
311 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
312 root 1.1 }
313 root 1.18
314     $r
315 root 1.1 }
316    
317 root 1.13 sub PRINT {
318 root 1.18 WRITE (shift, join "", @_)
319 root 1.13 }
320    
321     sub PRINTF {
322 root 1.62 WRITE (shift, sprintf shift, @_)
323 root 1.13 }
324    
325     sub GETC {
326     my $buf;
327 root 1.16 READ ($_[0], $buf, 1);
328 root 1.18 $buf
329 root 1.13 }
330    
331     sub BINMODE {
332     binmode $_[0][0];
333     }
334    
335     sub TELL {
336 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
337 root 1.13 }
338    
339     sub SEEK {
340 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
341 root 1.13 }
342    
343     sub EOF {
344 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
345 root 1.13 }
346    
347 root 1.1 sub CLOSE {
348 root 1.13 &cleanup;
349 root 1.18 close $_[0][0]
350 root 1.13 }
351    
352     sub DESTROY {
353     &cleanup;
354 root 1.8 }
355    
356     sub FILENO {
357 root 1.18 fileno $_[0][0]
358 root 1.1 }
359    
360 root 1.13 # seems to be called for stringification (how weird), at least
361     # when DumpValue::dumpValue is used to print this.
362     sub FETCH {
363 root 1.18 "$_[0]<$_[0][1]>"
364 root 1.1 }
365    
366 root 1.75 sub _readable_anyevent {
367 root 1.67 my $cb = Coro::rouse_cb;
368 root 1.13
369 root 1.85 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
370     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
371 root 1.13
372 root 1.85 Coro::rouse_wait
373 root 1.13 }
374    
375 root 1.75 sub _writable_anyevent {
376 root 1.67 my $cb = Coro::rouse_cb;
377 root 1.13
378 root 1.85 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
379     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
380 root 1.15
381 root 1.85 Coro::rouse_wait
382 root 1.1 }
383    
384 root 1.75 sub _readable_coro {
385 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
386 root 1.17 fd => $_[0][0],
387     desc => "fh $_[0][1] read watcher",
388     timeout => $_[0][2],
389     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
390 root 1.26 ))->next->[4] & &Event::Watcher::R
391 root 1.17 }
392    
393 root 1.75 sub _writable_coro {
394 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
395 root 1.17 fd => $_[0][0],
396     desc => "fh $_[0][1] write watcher",
397     timeout => $_[0][2],
398     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
399 root 1.26 ))->next->[4] & &Event::Watcher::W
400 root 1.17 }
401    
402 root 1.75 #sub _readable_ev {
403 root 1.33 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
404     #}
405     #
406 root 1.75 #sub _writable_ev {
407 root 1.33 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
408     #}
409 root 1.30
410 root 1.21 # decide on event model at runtime
411 root 1.17 for my $rw (qw(readable writable)) {
412     no strict 'refs';
413    
414     *$rw = sub {
415 root 1.25 AnyEvent::detect;
416 root 1.75 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
417     *$rw = \&{"_$rw\_coro"};
418     *cleanup = sub {
419     eval {
420     $_[0][5]->cancel if $_[0][5];
421     $_[0][6]->cancel if $_[0][6];
422     };
423     @{$_[0]} = ();
424     };
425    
426     } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
427     *$rw = \&{"Coro::EV::_$rw\_ev"};
428 root 1.74 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
429 root 1.75
430 root 1.17 } else {
431 root 1.75 *$rw = \&{"_$rw\_anyevent"};
432 root 1.17 }
433 root 1.25 goto &$rw
434 root 1.17 };
435     };
436    
437 root 1.1 sub WRITE {
438     my $len = defined $_[2] ? $_[2] : length $_[1];
439     my $ofs = $_[3];
440     my $res = 0;
441    
442 root 1.16 while () {
443     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
444 root 1.1 if (defined $r) {
445     $len -= $r;
446     $ofs += $r;
447     $res += $r;
448     last unless $len;
449 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
450 root 1.1 last;
451     }
452 root 1.13 last unless &writable;
453 root 1.1 }
454    
455     return $res;
456     }
457    
458     sub READ {
459     my $len = $_[2];
460     my $ofs = $_[3];
461     my $res = 0;
462    
463 root 1.7 # first deplete the read buffer
464 root 1.13 if (length $_[0][3]) {
465     my $l = length $_[0][3];
466 root 1.7 if ($l <= $len) {
467 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
468 root 1.7 $len -= $l;
469 root 1.13 $ofs += $l;
470 root 1.7 $res += $l;
471     return $res unless $len;
472     } else {
473 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
474     substr ($_[0][3], 0, $len) = "";
475 root 1.7 return $len;
476     }
477     }
478    
479 root 1.2 while() {
480 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
481 root 1.1 if (defined $r) {
482     $len -= $r;
483     $ofs += $r;
484     $res += $r;
485     last unless $len && $r;
486 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
487 root 1.1 last;
488     }
489 root 1.13 last if $_[0][8] || !&readable;
490 root 1.1 }
491    
492     return $res;
493     }
494    
495     sub READLINE {
496 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
497 root 1.82 my ($ofs, $len, $pos);
498 root 1.1
499 root 1.71 while () {
500 root 1.82 if (length $irs) {
501     $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
502    
503     return substr $_[0][3], 0, $pos + length $irs, ""
504     if $pos >= 0;
505    
506     $ofs = (length $_[0][3]) - (length $irs);
507     } elsif (defined $irs) {
508     $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
509    
510 root 1.24 if ($pos >= 0) {
511 root 1.82 my $res = substr $_[0][3], 0, $pos + 2, "";
512     $res =~ s/\A\n+//;
513 root 1.24 return $res;
514     }
515 root 1.51
516 root 1.82 $ofs = (length $_[0][3]) - 1;
517 root 1.1 }
518 root 1.13
519 root 1.52 $len = sysread $_[0][0], $_[0][3], $len + 4096, length $_[0][3];
520 root 1.82
521     unless ($len) {
522     if (defined $len) {
523     # EOF
524     return undef unless length $_[0][3];
525    
526     $_[0][3] =~ s/\A\n+//
527     if ! length $irs && defined $irs;
528    
529     return delete $_[0][3];
530     } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
531     return length $_[0][3] ? delete $_[0][3] : undef;
532     }
533 root 1.1 }
534     }
535 root 1.6 }
536    
537 root 1.13 1;
538 root 1.1
539 root 1.13 =back
540 root 1.1
541     =head1 BUGS
542    
543     - Perl's IO-Handle model is THE bug.
544    
545     =head1 AUTHOR
546    
547 root 1.13 Marc Lehmann <schmorp@schmorp.de>
548     http://home.schmorp.de/
549 root 1.1
550     =cut
551