ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.38
Committed: Sun May 25 03:05:42 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-4_72
Changes since 1.37: +9 -9 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Handle - non-blocking io with a blocking interface.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.35 This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
15 root 1.14 other coroutines can run while reads or writes block on the handle.
16    
17     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18     data, allowing other coroutines to run while one coroutine waits for I/O.
19    
20     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21 root 1.1
22 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23     tied methods, i.e.:
24    
25     $fh->print ($str); # NOT print $fh $str;
26     my $line = $fh->readline; # NOT my $line = <$fh>;
27    
28     The reason is that perl recurses within the interpreter when invoking tie
29     magic, forcing the (temporary) allocation of a (big) stack. If you have
30     lots of socket connections and they happen to wait in e.g. <$fh>, then
31     they would all have a costly C coroutine associated with them.
32    
33 root 1.1 =over 4
34    
35     =cut
36    
37     package Coro::Handle;
38    
39 root 1.16 no warnings;
40     use strict;
41 root 1.13
42 root 1.16 use Carp ();
43 root 1.38 use Errno qw(EAGAIN EINTR EINPROGRESS);
44 root 1.37
45 root 1.38 use AnyEvent::Util qw(WSAWOULDBLOCK WSAEINPROGRESS);
46 root 1.36
47 root 1.5 use base 'Exporter';
48 root 1.1
49 root 1.38 our $VERSION = 4.72;
50 root 1.16 our @EXPORT = qw(unblock);
51 root 1.1
52 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53 root 1.1
54     Create a new non-blocking io-handle using the given
55 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56     other supported argument is "timeout", which sets a timeout for each
57     operation.
58 root 1.1
59     =cut
60    
61     sub new_from_fh {
62     my $class = shift;
63     my $fh = shift or return;
64     my $self = do { local *Coro::Handle };
65    
66 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
67 root 1.1
68 root 1.22 bless \$self, ref $class ? ref $class : $class
69 root 1.1 }
70    
71 root 1.5 =item $fh = unblock $fh
72    
73 root 1.22 This is a convinience function that just calls C<new_from_fh> on the
74     given filehandle. Use it to replace a normal perl filehandle by a
75     non-(coroutine-)blocking equivalent.
76 root 1.5
77     =cut
78    
79     sub unblock($) {
80 root 1.19 new_from_fh Coro::Handle $_[0]
81 root 1.5 }
82    
83 root 1.1 =item $fh->writable, $fh->readable
84    
85     Wait until the filehandle is readable or writable (and return true) or
86     until an error condition happens (and return false).
87    
88     =cut
89    
90 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
91     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
92 root 1.1
93 root 1.16 =item $fh->readline ([$terminator])
94 root 1.4
95     Like the builtin of the same name, but allows you to specify the input
96 root 1.7 record separator in a coroutine-safe manner (i.e. not using a global
97 root 1.4 variable).
98    
99     =cut
100    
101 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
102 root 1.4
103 root 1.16 =item $fh->autoflush ([...])
104 root 1.4
105     Always returns true, arguments are being ignored (exists for compatibility
106 root 1.8 only). Might change in the future.
107 root 1.4
108     =cut
109    
110     sub autoflush { !0 }
111    
112 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
113 root 1.8
114 root 1.13 Work like their function equivalents (except read, which works like
115 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
116 root 1.13 work but it's not efficient).
117 root 1.8
118     =cut
119    
120 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
121     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
124     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
125     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
126     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
127     sub blocking { !0 } # this handler always blocks the caller
128    
129     sub partial {
130     my $obj = tied ${$_[0]};
131    
132     my $retval = $obj->[8];
133     $obj->[8] = $_[1] if @_ > 1;
134     $retval
135     }
136 root 1.8
137 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
138     send, recv, peername, sockname, shutdown, peerport, peerhost
139    
140     Do the same thing as the perl builtins or IO::Socket methods (but return
141     true on EINPROGRESS). Remember that these must be method calls.
142    
143     =cut
144    
145 root 1.38 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == WSAEINPROGRESS }
146 root 1.27 sub bind { bind tied(${$_[0]})->[0], $_[1] }
147     sub listen { listen tied(${$_[0]})->[0], $_[1] }
148     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
149     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
150     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
151     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152     sub sockname { getsockname tied(${$_[0]})->[0] }
153     sub peername { getpeername tied(${$_[0]})->[0] }
154     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
155    
156     =item ($fh, $peername) = $listen_fh->accept
157    
158     In scalar context, returns the newly accepted socket (or undef) and in
159     list context return the ($fh, $peername) pair (or nothing).
160    
161     =cut
162    
163     sub accept {
164     my ($peername, $fh);
165     while () {
166     $peername = accept $fh, tied(${$_[0]})->[0]
167     and return wantarray
168     ? ($_[0]->new_from_fh($fh), $peername)
169     : $_[0]->new_from_fh($fh);
170    
171 root 1.38 return if $! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK;
172 root 1.27
173     $_[0]->readable or return;
174     }
175     }
176    
177 root 1.21 =item $fh->timeout ([...])
178 root 1.8
179 root 1.13 The optional argument sets the new timeout (in seconds) for this
180 root 1.8 handle. Returns the current (new) value.
181    
182     C<0> is a valid timeout, use C<undef> to disable the timeout.
183    
184     =cut
185    
186     sub timeout {
187 root 1.18 my $self = tied ${$_[0]};
188 root 1.13 if (@_ > 1) {
189     $self->[2] = $_[1];
190 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
191     $self->[6]->timeout ($_[1]) if $self->[6];
192 root 1.8 }
193 root 1.21 $self->[2]
194 root 1.13 }
195    
196     =item $fh->fh
197    
198     Returns the "real" (non-blocking) filehandle. Use this if you want to
199     do operations on the file handle you cannot do using the Coro::Handle
200     interface.
201    
202     =item $fh->rbuf
203    
204     Returns the current contents of the read buffer (this is an lvalue, so you
205     can change the read buffer if you like).
206    
207     You can use this function to implement your own optimized reader when neither
208     readline nor sysread are viable candidates, like this:
209    
210     # first get the _real_ non-blocking filehandle
211     # and fetch a reference to the read buffer
212     my $nb_fh = $fh->fh;
213     my $buf = \$fh->rbuf;
214    
215 root 1.18 while () {
216 root 1.13 # now use buffer contents, modifying
217     # if necessary to reflect the removed data
218    
219     last if $$buf ne ""; # we have leftover data
220    
221     # read another buffer full of data
222     $fh->readable or die "end of file";
223     sysread $nb_fh, $$buf, 8192;
224     }
225    
226     =cut
227    
228     sub fh {
229     (tied ${$_[0]})->[0];
230     }
231    
232     sub rbuf : lvalue {
233     (tied ${$_[0]})->[3];
234     }
235    
236     sub DESTROY {
237     # nop
238     }
239    
240 root 1.16 our $AUTOLOAD;
241    
242 root 1.13 sub AUTOLOAD {
243     my $self = tied ${$_[0]};
244    
245     (my $func = $AUTOLOAD) =~ s/^(.*):://;
246    
247     my $forward = UNIVERSAL::can $self->[7], $func;
248    
249     $forward or
250     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
251    
252     goto &$forward;
253 root 1.8 }
254    
255 root 1.1 package Coro::Handle::FH;
256    
257 root 1.16 no warnings;
258     use strict;
259 root 1.13
260 root 1.10 use Carp 'croak';
261 root 1.37 use Errno qw(EAGAIN EINTR);
262 root 1.1
263 root 1.36 use AnyEvent ();
264 root 1.38 use AnyEvent::Util qw(WSAWOULDBLOCK);
265 root 1.1
266 root 1.13 # formerly a hash, but we are speed-critical, so try
267     # to be faster even if it hurts.
268     #
269     # 0 FH
270     # 1 desc
271     # 2 timeout
272     # 3 rb
273     # 4 wb # unused
274 root 1.17 # 5 read watcher, if Coro::Event used
275     # 6 write watcher, if Coro::Event used
276 root 1.13 # 7 forward class
277     # 8 blocking
278 root 1.1
279     sub TIEHANDLE {
280 root 1.13 my ($class, %arg) = @_;
281 root 1.1
282 root 1.13 my $self = bless [], $class;
283     $self->[0] = $arg{fh};
284     $self->[1] = $arg{desc};
285     $self->[2] = $arg{timeout};
286     $self->[3] = "";
287     $self->[4] = "";
288     $self->[7] = $arg{forward_class};
289     $self->[8] = $arg{partial};
290 root 1.1
291 root 1.36 AnyEvent::Util::fh_nonblocking $self->[0], 1;
292 root 1.6
293 root 1.13 $self
294     }
295    
296     sub cleanup {
297 root 1.29 $_[0][5]->cancel if $_[0][5];
298     $_[0][6]->cancel if $_[0][6];
299     @{$_[0]} = ();
300 root 1.1 }
301    
302     sub OPEN {
303 root 1.13 &cleanup;
304 root 1.1 my $self = shift;
305 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
306     : open $self->[0], $_[0], $_[1], $_[2];
307 root 1.18
308 root 1.1 if ($r) {
309 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
310 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
311 root 1.1 }
312 root 1.18
313     $r
314 root 1.1 }
315    
316 root 1.13 sub PRINT {
317 root 1.18 WRITE (shift, join "", @_)
318 root 1.13 }
319    
320     sub PRINTF {
321 root 1.18 WRITE (shift, sprintf shift,@_)
322 root 1.13 }
323    
324     sub GETC {
325     my $buf;
326 root 1.16 READ ($_[0], $buf, 1);
327 root 1.18 $buf
328 root 1.13 }
329    
330     sub BINMODE {
331     binmode $_[0][0];
332     }
333    
334     sub TELL {
335 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
336 root 1.13 }
337    
338     sub SEEK {
339 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
340 root 1.13 }
341    
342     sub EOF {
343 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
344 root 1.13 }
345    
346 root 1.1 sub CLOSE {
347 root 1.13 &cleanup;
348 root 1.18 close $_[0][0]
349 root 1.13 }
350    
351     sub DESTROY {
352     &cleanup;
353 root 1.8 }
354    
355     sub FILENO {
356 root 1.18 fileno $_[0][0]
357 root 1.1 }
358    
359 root 1.13 # seems to be called for stringification (how weird), at least
360     # when DumpValue::dumpValue is used to print this.
361     sub FETCH {
362 root 1.18 "$_[0]<$_[0][1]>"
363 root 1.1 }
364    
365 root 1.17 sub readable_anyevent {
366 root 1.13 my $current = $Coro::current;
367     my $io = 1;
368    
369     my $w = AnyEvent->io (
370 root 1.16 fh => $_[0][0],
371     poll => 'r',
372     cb => sub {
373 root 1.30 $current->ready if $current;
374 root 1.15 undef $current;
375 root 1.13 },
376     );
377    
378 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
379 root 1.13 after => $_[0][2],
380     cb => sub {
381     $io = 0;
382 root 1.30 $current->ready if $current;
383 root 1.15 undef $current;
384 root 1.13 },
385     );
386    
387     &Coro::schedule;
388 root 1.15 &Coro::schedule while $current;
389    
390 root 1.13 $io
391     }
392    
393 root 1.17 sub writable_anyevent {
394 root 1.13 my $current = $Coro::current;
395     my $io = 1;
396    
397     my $w = AnyEvent->io (
398 root 1.16 fh => $_[0][0],
399     poll => 'w',
400     cb => sub {
401 root 1.30 $current->ready if $current;
402 root 1.15 undef $current;
403 root 1.13 },
404     );
405    
406 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
407 root 1.13 after => $_[0][2],
408     cb => sub {
409     $io = 0;
410 root 1.30 $current->ready if $current;
411 root 1.15 undef $current;
412 root 1.13 },
413     );
414    
415 root 1.15 &Coro::schedule while $current;
416    
417 root 1.13 $io
418 root 1.1 }
419    
420 root 1.17 sub readable_coro {
421 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
422 root 1.17 fd => $_[0][0],
423     desc => "fh $_[0][1] read watcher",
424     timeout => $_[0][2],
425     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
426 root 1.26 ))->next->[4] & &Event::Watcher::R
427 root 1.17 }
428    
429     sub writable_coro {
430 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
431 root 1.17 fd => $_[0][0],
432     desc => "fh $_[0][1] write watcher",
433     timeout => $_[0][2],
434     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
435 root 1.26 ))->next->[4] & &Event::Watcher::W
436 root 1.17 }
437    
438 root 1.33 #sub readable_ev {
439     # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
440     #}
441     #
442     #sub writable_ev {
443     # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
444     #}
445 root 1.30
446 root 1.21 # decide on event model at runtime
447 root 1.17 for my $rw (qw(readable writable)) {
448     no strict 'refs';
449    
450     *$rw = sub {
451 root 1.25 AnyEvent::detect;
452 root 1.17 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
453     require Coro::Event;
454     *$rw = \&{"$rw\_coro"};
455 root 1.32 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::CoroEV" or $AnyEvent::MODEL eq "AnyEvent::Impl::EV") {
456 root 1.30 require Coro::EV;
457 root 1.33 *$rw = \&{"Coro::EV::$rw\_ev"};
458 root 1.17 } else {
459     *$rw = \&{"$rw\_anyevent"};
460     }
461 root 1.25 goto &$rw
462 root 1.17 };
463     };
464    
465 root 1.1 sub WRITE {
466     my $len = defined $_[2] ? $_[2] : length $_[1];
467     my $ofs = $_[3];
468     my $res = 0;
469    
470 root 1.16 while () {
471     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
472 root 1.1 if (defined $r) {
473     $len -= $r;
474     $ofs += $r;
475     $res += $r;
476     last unless $len;
477 root 1.38 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) {
478 root 1.1 last;
479     }
480 root 1.13 last unless &writable;
481 root 1.1 }
482    
483     return $res;
484     }
485    
486     sub READ {
487     my $len = $_[2];
488     my $ofs = $_[3];
489     my $res = 0;
490    
491 root 1.7 # first deplete the read buffer
492 root 1.13 if (length $_[0][3]) {
493     my $l = length $_[0][3];
494 root 1.7 if ($l <= $len) {
495 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
496 root 1.7 $len -= $l;
497 root 1.13 $ofs += $l;
498 root 1.7 $res += $l;
499     return $res unless $len;
500     } else {
501 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
502     substr ($_[0][3], 0, $len) = "";
503 root 1.7 return $len;
504     }
505     }
506    
507 root 1.2 while() {
508 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
509 root 1.1 if (defined $r) {
510     $len -= $r;
511     $ofs += $r;
512     $res += $r;
513     last unless $len && $r;
514 root 1.38 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) {
515 root 1.1 last;
516     }
517 root 1.13 last if $_[0][8] || !&readable;
518 root 1.1 }
519    
520     return $res;
521     }
522    
523     sub READLINE {
524 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
525 root 1.1
526     while() {
527 root 1.24 if (defined $irs) {
528     my $pos = index $_[0][3], $irs;
529     if ($pos >= 0) {
530     $pos += length $irs;
531     my $res = substr $_[0][3], 0, $pos;
532     substr ($_[0][3], 0, $pos) = "";
533     return $res;
534     }
535 root 1.1 }
536 root 1.13
537     my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
538 root 1.1 if (defined $r) {
539 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef unless $r;
540 root 1.38 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAWOULDBLOCK) || !&readable) {
541 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef;
542 root 1.1 }
543     }
544 root 1.6 }
545    
546 root 1.13 1;
547 root 1.1
548 root 1.13 =back
549 root 1.1
550     =head1 BUGS
551    
552     - Perl's IO-Handle model is THE bug.
553    
554     =head1 AUTHOR
555    
556 root 1.13 Marc Lehmann <schmorp@schmorp.de>
557     http://home.schmorp.de/
558 root 1.1
559     =cut
560