ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.86
Committed: Thu Oct 1 23:16:27 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.85: +2 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.65 Coro::Handle - non-blocking I/O with a blocking interface.
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.35 This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
15 root 1.14 other coroutines can run while reads or writes block on the handle.
16    
17     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18     data, allowing other coroutines to run while one coroutine waits for I/O.
19    
20     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21 root 1.1
22 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23     tied methods, i.e.:
24    
25     $fh->print ($str); # NOT print $fh $str;
26     my $line = $fh->readline; # NOT my $line = <$fh>;
27    
28     The reason is that perl recurses within the interpreter when invoking tie
29     magic, forcing the (temporary) allocation of a (big) stack. If you have
30     lots of socket connections and they happen to wait in e.g. <$fh>, then
31     they would all have a costly C coroutine associated with them.
32    
33 root 1.1 =over 4
34    
35     =cut
36    
37     package Coro::Handle;
38    
39 root 1.86 use common::sense;
40 root 1.13
41 root 1.16 use Carp ();
42 root 1.38 use Errno qw(EAGAIN EINTR EINPROGRESS);
43 root 1.37
44 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45 root 1.36
46 root 1.5 use base 'Exporter';
47 root 1.1
48 root 1.85 our $VERSION = 5.17;
49 root 1.16 our @EXPORT = qw(unblock);
50 root 1.1
51 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
52 root 1.1
53     Create a new non-blocking io-handle using the given
54 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
55     other supported argument is "timeout", which sets a timeout for each
56     operation.
57 root 1.1
58     =cut
59    
60     sub new_from_fh {
61     my $class = shift;
62     my $fh = shift or return;
63     my $self = do { local *Coro::Handle };
64    
65 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
66 root 1.1
67 root 1.22 bless \$self, ref $class ? ref $class : $class
68 root 1.1 }
69    
70 root 1.5 =item $fh = unblock $fh
71    
72 root 1.22 This is a convinience function that just calls C<new_from_fh> on the
73     given filehandle. Use it to replace a normal perl filehandle by a
74     non-(coroutine-)blocking equivalent.
75 root 1.5
76     =cut
77    
78     sub unblock($) {
79 root 1.19 new_from_fh Coro::Handle $_[0]
80 root 1.5 }
81    
82 root 1.1 =item $fh->writable, $fh->readable
83    
84     Wait until the filehandle is readable or writable (and return true) or
85     until an error condition happens (and return false).
86    
87     =cut
88    
89 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
90     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
91 root 1.1
92 root 1.16 =item $fh->readline ([$terminator])
93 root 1.4
94 root 1.79 Similar to the builtin of the same name, but allows you to specify the
95     input record separator in a coroutine-safe manner (i.e. not using a global
96     variable). Paragraph mode is not supported, use "\n\n" to achieve the same
97     effect.
98 root 1.4
99     =cut
100    
101 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
102 root 1.4
103 root 1.16 =item $fh->autoflush ([...])
104 root 1.4
105     Always returns true, arguments are being ignored (exists for compatibility
106 root 1.8 only). Might change in the future.
107 root 1.4
108     =cut
109    
110     sub autoflush { !0 }
111    
112 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
113 root 1.8
114 root 1.13 Work like their function equivalents (except read, which works like
115 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
116 root 1.13 work but it's not efficient).
117 root 1.8
118     =cut
119    
120 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
121     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
124     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
125     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
126     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
127     sub blocking { !0 } # this handler always blocks the caller
128    
129     sub partial {
130     my $obj = tied ${$_[0]};
131    
132     my $retval = $obj->[8];
133     $obj->[8] = $_[1] if @_ > 1;
134     $retval
135     }
136 root 1.8
137 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
138     send, recv, peername, sockname, shutdown, peerport, peerhost
139    
140     Do the same thing as the perl builtins or IO::Socket methods (but return
141     true on EINPROGRESS). Remember that these must be method calls.
142    
143     =cut
144    
145 root 1.81 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
146 root 1.27 sub bind { bind tied(${$_[0]})->[0], $_[1] }
147     sub listen { listen tied(${$_[0]})->[0], $_[1] }
148     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
149     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
150     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
151     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152     sub sockname { getsockname tied(${$_[0]})->[0] }
153     sub peername { getpeername tied(${$_[0]})->[0] }
154     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
155    
156     =item ($fh, $peername) = $listen_fh->accept
157    
158     In scalar context, returns the newly accepted socket (or undef) and in
159     list context return the ($fh, $peername) pair (or nothing).
160    
161     =cut
162    
163     sub accept {
164     my ($peername, $fh);
165     while () {
166     $peername = accept $fh, tied(${$_[0]})->[0]
167     and return wantarray
168     ? ($_[0]->new_from_fh($fh), $peername)
169     : $_[0]->new_from_fh($fh);
170    
171 root 1.39 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
172 root 1.27
173     $_[0]->readable or return;
174     }
175     }
176    
177 root 1.21 =item $fh->timeout ([...])
178 root 1.8
179 root 1.13 The optional argument sets the new timeout (in seconds) for this
180 root 1.8 handle. Returns the current (new) value.
181    
182     C<0> is a valid timeout, use C<undef> to disable the timeout.
183    
184     =cut
185    
186     sub timeout {
187 root 1.18 my $self = tied ${$_[0]};
188 root 1.13 if (@_ > 1) {
189     $self->[2] = $_[1];
190 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
191     $self->[6]->timeout ($_[1]) if $self->[6];
192 root 1.8 }
193 root 1.21 $self->[2]
194 root 1.13 }
195    
196     =item $fh->fh
197    
198     Returns the "real" (non-blocking) filehandle. Use this if you want to
199     do operations on the file handle you cannot do using the Coro::Handle
200     interface.
201    
202     =item $fh->rbuf
203    
204     Returns the current contents of the read buffer (this is an lvalue, so you
205     can change the read buffer if you like).
206    
207     You can use this function to implement your own optimized reader when neither
208     readline nor sysread are viable candidates, like this:
209    
210     # first get the _real_ non-blocking filehandle
211     # and fetch a reference to the read buffer
212     my $nb_fh = $fh->fh;
213     my $buf = \$fh->rbuf;
214    
215 root 1.18 while () {
216 root 1.13 # now use buffer contents, modifying
217     # if necessary to reflect the removed data
218    
219     last if $$buf ne ""; # we have leftover data
220    
221     # read another buffer full of data
222     $fh->readable or die "end of file";
223     sysread $nb_fh, $$buf, 8192;
224     }
225    
226     =cut
227    
228     sub fh {
229     (tied ${$_[0]})->[0];
230     }
231    
232     sub rbuf : lvalue {
233     (tied ${$_[0]})->[3];
234     }
235    
236     sub DESTROY {
237     # nop
238     }
239    
240 root 1.16 our $AUTOLOAD;
241    
242 root 1.13 sub AUTOLOAD {
243     my $self = tied ${$_[0]};
244    
245     (my $func = $AUTOLOAD) =~ s/^(.*):://;
246    
247     my $forward = UNIVERSAL::can $self->[7], $func;
248    
249     $forward or
250     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
251    
252     goto &$forward;
253 root 1.8 }
254    
255 root 1.1 package Coro::Handle::FH;
256    
257 root 1.86 use common::sense;
258 root 1.13
259 root 1.10 use Carp 'croak';
260 root 1.37 use Errno qw(EAGAIN EINTR);
261 root 1.1
262 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK);
263 root 1.1
264 root 1.66 use Coro::AnyEvent;
265    
266 root 1.13 # formerly a hash, but we are speed-critical, so try
267     # to be faster even if it hurts.
268     #
269     # 0 FH
270     # 1 desc
271     # 2 timeout
272     # 3 rb
273     # 4 wb # unused
274 root 1.67 # 5 read watcher, if Coro::Event|EV used
275     # 6 write watcher, if Coro::Event|EV used
276 root 1.13 # 7 forward class
277     # 8 blocking
278 root 1.1
279     sub TIEHANDLE {
280 root 1.13 my ($class, %arg) = @_;
281 root 1.1
282 root 1.13 my $self = bless [], $class;
283     $self->[0] = $arg{fh};
284     $self->[1] = $arg{desc};
285     $self->[2] = $arg{timeout};
286     $self->[3] = "";
287     $self->[4] = "";
288     $self->[7] = $arg{forward_class};
289     $self->[8] = $arg{partial};
290 root 1.1
291 root 1.36 AnyEvent::Util::fh_nonblocking $self->[0], 1;
292 root 1.6
293 root 1.13 $self
294     }
295    
296     sub cleanup {
297 root 1.75 # gets overriden for Coro::Event
298 root 1.29 @{$_[0]} = ();
299 root 1.1 }
300    
301     sub OPEN {
302 root 1.13 &cleanup;
303 root 1.1 my $self = shift;
304 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
305     : open $self->[0], $_[0], $_[1], $_[2];
306 root 1.18
307 root 1.1 if ($r) {
308 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
309 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
310 root 1.1 }
311 root 1.18
312     $r
313 root 1.1 }
314    
315 root 1.13 sub PRINT {
316 root 1.18 WRITE (shift, join "", @_)
317 root 1.13 }
318    
319     sub PRINTF {
320 root 1.62 WRITE (shift, sprintf shift, @_)
321 root 1.13 }
322    
323     sub GETC {
324     my $buf;
325 root 1.16 READ ($_[0], $buf, 1);
326 root 1.18 $buf
327 root 1.13 }
328    
329     sub BINMODE {
330     binmode $_[0][0];
331     }
332    
333     sub TELL {
334 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
335 root 1.13 }
336    
337     sub SEEK {
338 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
339 root 1.13 }
340    
341     sub EOF {
342 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
343 root 1.13 }
344    
345 root 1.1 sub CLOSE {
346 root 1.13 &cleanup;
347 root 1.18 close $_[0][0]
348 root 1.13 }
349    
350     sub DESTROY {
351     &cleanup;
352 root 1.8 }
353    
354     sub FILENO {
355 root 1.18 fileno $_[0][0]
356 root 1.1 }
357    
358 root 1.13 # seems to be called for stringification (how weird), at least
359     # when DumpValue::dumpValue is used to print this.
360     sub FETCH {
361 root 1.18 "$_[0]<$_[0][1]>"
362 root 1.1 }
363    
364 root 1.75 sub _readable_anyevent {
365 root 1.67 my $cb = Coro::rouse_cb;
366 root 1.13
367 root 1.85 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
368     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
369 root 1.13
370 root 1.85 Coro::rouse_wait
371 root 1.13 }
372    
373 root 1.75 sub _writable_anyevent {
374 root 1.67 my $cb = Coro::rouse_cb;
375 root 1.13
376 root 1.85 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
377     my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
378 root 1.15
379 root 1.85 Coro::rouse_wait
380 root 1.1 }
381    
382 root 1.75 sub _readable_coro {
383 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
384 root 1.17 fd => $_[0][0],
385     desc => "fh $_[0][1] read watcher",
386     timeout => $_[0][2],
387     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
388 root 1.26 ))->next->[4] & &Event::Watcher::R
389 root 1.17 }
390    
391 root 1.75 sub _writable_coro {
392 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
393 root 1.17 fd => $_[0][0],
394     desc => "fh $_[0][1] write watcher",
395     timeout => $_[0][2],
396     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
397 root 1.26 ))->next->[4] & &Event::Watcher::W
398 root 1.17 }
399    
400 root 1.75 #sub _readable_ev {
401 root 1.33 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
402     #}
403     #
404 root 1.75 #sub _writable_ev {
405 root 1.33 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
406     #}
407 root 1.30
408 root 1.21 # decide on event model at runtime
409 root 1.17 for my $rw (qw(readable writable)) {
410     *$rw = sub {
411 root 1.25 AnyEvent::detect;
412 root 1.75 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
413     *$rw = \&{"_$rw\_coro"};
414     *cleanup = sub {
415     eval {
416     $_[0][5]->cancel if $_[0][5];
417     $_[0][6]->cancel if $_[0][6];
418     };
419     @{$_[0]} = ();
420     };
421    
422     } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
423     *$rw = \&{"Coro::EV::_$rw\_ev"};
424 root 1.74 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
425 root 1.75
426 root 1.17 } else {
427 root 1.75 *$rw = \&{"_$rw\_anyevent"};
428 root 1.17 }
429 root 1.25 goto &$rw
430 root 1.17 };
431     };
432    
433 root 1.1 sub WRITE {
434     my $len = defined $_[2] ? $_[2] : length $_[1];
435     my $ofs = $_[3];
436     my $res = 0;
437    
438 root 1.16 while () {
439     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
440 root 1.1 if (defined $r) {
441     $len -= $r;
442     $ofs += $r;
443     $res += $r;
444     last unless $len;
445 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
446 root 1.1 last;
447     }
448 root 1.13 last unless &writable;
449 root 1.1 }
450    
451     return $res;
452     }
453    
454     sub READ {
455     my $len = $_[2];
456     my $ofs = $_[3];
457     my $res = 0;
458    
459 root 1.7 # first deplete the read buffer
460 root 1.13 if (length $_[0][3]) {
461     my $l = length $_[0][3];
462 root 1.7 if ($l <= $len) {
463 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
464 root 1.7 $len -= $l;
465 root 1.13 $ofs += $l;
466 root 1.7 $res += $l;
467     return $res unless $len;
468     } else {
469 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
470     substr ($_[0][3], 0, $len) = "";
471 root 1.7 return $len;
472     }
473     }
474    
475 root 1.2 while() {
476 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
477 root 1.1 if (defined $r) {
478     $len -= $r;
479     $ofs += $r;
480     $res += $r;
481     last unless $len && $r;
482 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
483 root 1.1 last;
484     }
485 root 1.13 last if $_[0][8] || !&readable;
486 root 1.1 }
487    
488     return $res;
489     }
490    
491     sub READLINE {
492 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
493 root 1.82 my ($ofs, $len, $pos);
494 root 1.1
495 root 1.71 while () {
496 root 1.82 if (length $irs) {
497     $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
498    
499     return substr $_[0][3], 0, $pos + length $irs, ""
500     if $pos >= 0;
501    
502     $ofs = (length $_[0][3]) - (length $irs);
503     } elsif (defined $irs) {
504     $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
505    
506 root 1.24 if ($pos >= 0) {
507 root 1.82 my $res = substr $_[0][3], 0, $pos + 2, "";
508     $res =~ s/\A\n+//;
509 root 1.24 return $res;
510     }
511 root 1.51
512 root 1.82 $ofs = (length $_[0][3]) - 1;
513 root 1.1 }
514 root 1.13
515 root 1.52 $len = sysread $_[0][0], $_[0][3], $len + 4096, length $_[0][3];
516 root 1.82
517     unless ($len) {
518     if (defined $len) {
519     # EOF
520     return undef unless length $_[0][3];
521    
522     $_[0][3] =~ s/\A\n+//
523     if ! length $irs && defined $irs;
524    
525     return delete $_[0][3];
526     } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
527     return length $_[0][3] ? delete $_[0][3] : undef;
528     }
529 root 1.1 }
530     }
531 root 1.6 }
532    
533 root 1.13 1;
534 root 1.1
535 root 1.13 =back
536 root 1.1
537     =head1 BUGS
538    
539     - Perl's IO-Handle model is THE bug.
540    
541     =head1 AUTHOR
542    
543 root 1.13 Marc Lehmann <schmorp@schmorp.de>
544     http://home.schmorp.de/
545 root 1.1
546     =cut
547