ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.82
Committed: Mon Jul 20 23:46:28 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-5_16
Changes since 1.81: +28 -13 lines
Log Message:
5.16

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.65 Coro::Handle - non-blocking I/O with a blocking interface.
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.35 This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
15 root 1.14 other coroutines can run while reads or writes block on the handle.
16    
17     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18     data, allowing other coroutines to run while one coroutine waits for I/O.
19    
20     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21 root 1.1
22 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23     tied methods, i.e.:
24    
25     $fh->print ($str); # NOT print $fh $str;
26     my $line = $fh->readline; # NOT my $line = <$fh>;
27    
28     The reason is that perl recurses within the interpreter when invoking tie
29     magic, forcing the (temporary) allocation of a (big) stack. If you have
30     lots of socket connections and they happen to wait in e.g. <$fh>, then
31     they would all have a costly C coroutine associated with them.
32    
33 root 1.1 =over 4
34    
35     =cut
36    
37     package Coro::Handle;
38    
39 root 1.16 no warnings;
40     use strict;
41 root 1.13
42 root 1.16 use Carp ();
43 root 1.38 use Errno qw(EAGAIN EINTR EINPROGRESS);
44 root 1.37
45 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
46 root 1.36
47 root 1.5 use base 'Exporter';
48 root 1.1
49 root 1.82 our $VERSION = 5.16;
50 root 1.16 our @EXPORT = qw(unblock);
51 root 1.1
52 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53 root 1.1
54     Create a new non-blocking io-handle using the given
55 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56     other supported argument is "timeout", which sets a timeout for each
57     operation.
58 root 1.1
59     =cut
60    
61     sub new_from_fh {
62     my $class = shift;
63     my $fh = shift or return;
64     my $self = do { local *Coro::Handle };
65    
66 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
67 root 1.1
68 root 1.22 bless \$self, ref $class ? ref $class : $class
69 root 1.1 }
70    
71 root 1.5 =item $fh = unblock $fh
72    
73 root 1.22 This is a convinience function that just calls C<new_from_fh> on the
74     given filehandle. Use it to replace a normal perl filehandle by a
75     non-(coroutine-)blocking equivalent.
76 root 1.5
77     =cut
78    
79     sub unblock($) {
80 root 1.19 new_from_fh Coro::Handle $_[0]
81 root 1.5 }
82    
83 root 1.1 =item $fh->writable, $fh->readable
84    
85     Wait until the filehandle is readable or writable (and return true) or
86     until an error condition happens (and return false).
87    
88     =cut
89    
90 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
91     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
92 root 1.1
93 root 1.16 =item $fh->readline ([$terminator])
94 root 1.4
95 root 1.79 Similar to the builtin of the same name, but allows you to specify the
96     input record separator in a coroutine-safe manner (i.e. not using a global
97     variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98     effect.
99 root 1.4
100     =cut
101    
102 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
103 root 1.4
104 root 1.16 =item $fh->autoflush ([...])
105 root 1.4
106     Always returns true, arguments are being ignored (exists for compatibility
107 root 1.8 only). Might change in the future.
108 root 1.4
109     =cut
110    
111     sub autoflush { !0 }
112    
113 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114 root 1.8
115 root 1.13 Work like their function equivalents (except read, which works like
116 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
117 root 1.13 work but it's not efficient).
118 root 1.8
119     =cut
120    
121 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
124     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
125     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
126     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
127     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
128     sub blocking { !0 } # this handler always blocks the caller
129    
130     sub partial {
131     my $obj = tied ${$_[0]};
132    
133     my $retval = $obj->[8];
134     $obj->[8] = $_[1] if @_ > 1;
135     $retval
136     }
137 root 1.8
138 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
139     send, recv, peername, sockname, shutdown, peerport, peerhost
140    
141     Do the same thing as the perl builtins or IO::Socket methods (but return
142     true on EINPROGRESS). Remember that these must be method calls.
143    
144     =cut
145    
146 root 1.81 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
147 root 1.27 sub bind { bind tied(${$_[0]})->[0], $_[1] }
148     sub listen { listen tied(${$_[0]})->[0], $_[1] }
149     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
150     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
151     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
153     sub sockname { getsockname tied(${$_[0]})->[0] }
154     sub peername { getpeername tied(${$_[0]})->[0] }
155     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
156    
157     =item ($fh, $peername) = $listen_fh->accept
158    
159     In scalar context, returns the newly accepted socket (or undef) and in
160     list context return the ($fh, $peername) pair (or nothing).
161    
162     =cut
163    
164     sub accept {
165     my ($peername, $fh);
166     while () {
167     $peername = accept $fh, tied(${$_[0]})->[0]
168     and return wantarray
169     ? ($_[0]->new_from_fh($fh), $peername)
170     : $_[0]->new_from_fh($fh);
171    
172 root 1.39 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
173 root 1.27
174     $_[0]->readable or return;
175     }
176     }
177    
178 root 1.21 =item $fh->timeout ([...])
179 root 1.8
180 root 1.13 The optional argument sets the new timeout (in seconds) for this
181 root 1.8 handle. Returns the current (new) value.
182    
183     C<0> is a valid timeout, use C<undef> to disable the timeout.
184    
185     =cut
186    
187     sub timeout {
188 root 1.18 my $self = tied ${$_[0]};
189 root 1.13 if (@_ > 1) {
190     $self->[2] = $_[1];
191 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
192     $self->[6]->timeout ($_[1]) if $self->[6];
193 root 1.8 }
194 root 1.21 $self->[2]
195 root 1.13 }
196    
197     =item $fh->fh
198    
199     Returns the "real" (non-blocking) filehandle. Use this if you want to
200     do operations on the file handle you cannot do using the Coro::Handle
201     interface.
202    
203     =item $fh->rbuf
204    
205     Returns the current contents of the read buffer (this is an lvalue, so you
206     can change the read buffer if you like).
207    
208     You can use this function to implement your own optimized reader when neither
209     readline nor sysread are viable candidates, like this:
210    
211     # first get the _real_ non-blocking filehandle
212     # and fetch a reference to the read buffer
213     my $nb_fh = $fh->fh;
214     my $buf = \$fh->rbuf;
215    
216 root 1.18 while () {
217 root 1.13 # now use buffer contents, modifying
218     # if necessary to reflect the removed data
219    
220     last if $$buf ne ""; # we have leftover data
221    
222     # read another buffer full of data
223     $fh->readable or die "end of file";
224     sysread $nb_fh, $$buf, 8192;
225     }
226    
227     =cut
228    
229     sub fh {
230     (tied ${$_[0]})->[0];
231     }
232    
233     sub rbuf : lvalue {
234     (tied ${$_[0]})->[3];
235     }
236    
237     sub DESTROY {
238     # nop
239     }
240    
241 root 1.16 our $AUTOLOAD;
242    
243 root 1.13 sub AUTOLOAD {
244     my $self = tied ${$_[0]};
245    
246     (my $func = $AUTOLOAD) =~ s/^(.*):://;
247    
248     my $forward = UNIVERSAL::can $self->[7], $func;
249    
250     $forward or
251     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
252    
253     goto &$forward;
254 root 1.8 }
255    
256 root 1.1 package Coro::Handle::FH;
257    
258 root 1.16 no warnings;
259     use strict;
260 root 1.13
261 root 1.10 use Carp 'croak';
262 root 1.37 use Errno qw(EAGAIN EINTR);
263 root 1.1
264 root 1.39 use AnyEvent::Util qw(WSAEWOULDBLOCK);
265 root 1.1
266 root 1.66 use Coro::AnyEvent;
267    
268 root 1.13 # formerly a hash, but we are speed-critical, so try
269     # to be faster even if it hurts.
270     #
271     # 0 FH
272     # 1 desc
273     # 2 timeout
274     # 3 rb
275     # 4 wb # unused
276 root 1.67 # 5 read watcher, if Coro::Event|EV used
277     # 6 write watcher, if Coro::Event|EV used
278 root 1.13 # 7 forward class
279     # 8 blocking
280 root 1.1
281     sub TIEHANDLE {
282 root 1.13 my ($class, %arg) = @_;
283 root 1.1
284 root 1.13 my $self = bless [], $class;
285     $self->[0] = $arg{fh};
286     $self->[1] = $arg{desc};
287     $self->[2] = $arg{timeout};
288     $self->[3] = "";
289     $self->[4] = "";
290     $self->[7] = $arg{forward_class};
291     $self->[8] = $arg{partial};
292 root 1.1
293 root 1.36 AnyEvent::Util::fh_nonblocking $self->[0], 1;
294 root 1.6
295 root 1.13 $self
296     }
297    
298     sub cleanup {
299 root 1.75 # gets overriden for Coro::Event
300 root 1.29 @{$_[0]} = ();
301 root 1.1 }
302    
303     sub OPEN {
304 root 1.13 &cleanup;
305 root 1.1 my $self = shift;
306 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
307     : open $self->[0], $_[0], $_[1], $_[2];
308 root 1.18
309 root 1.1 if ($r) {
310 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
311 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
312 root 1.1 }
313 root 1.18
314     $r
315 root 1.1 }
316    
317 root 1.13 sub PRINT {
318 root 1.18 WRITE (shift, join "", @_)
319 root 1.13 }
320    
321     sub PRINTF {
322 root 1.62 WRITE (shift, sprintf shift, @_)
323 root 1.13 }
324    
325     sub GETC {
326     my $buf;
327 root 1.16 READ ($_[0], $buf, 1);
328 root 1.18 $buf
329 root 1.13 }
330    
331     sub BINMODE {
332     binmode $_[0][0];
333     }
334    
335     sub TELL {
336 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
337 root 1.13 }
338    
339     sub SEEK {
340 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
341 root 1.13 }
342    
343     sub EOF {
344 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
345 root 1.13 }
346    
347 root 1.1 sub CLOSE {
348 root 1.13 &cleanup;
349 root 1.18 close $_[0][0]
350 root 1.13 }
351    
352     sub DESTROY {
353     &cleanup;
354 root 1.8 }
355    
356     sub FILENO {
357 root 1.18 fileno $_[0][0]
358 root 1.1 }
359    
360 root 1.13 # seems to be called for stringification (how weird), at least
361     # when DumpValue::dumpValue is used to print this.
362     sub FETCH {
363 root 1.18 "$_[0]<$_[0][1]>"
364 root 1.1 }
365    
366 root 1.75 sub _readable_anyevent {
367 root 1.67 my $cb = Coro::rouse_cb;
368 root 1.13 my $io = 1;
369    
370     my $w = AnyEvent->io (
371 root 1.16 fh => $_[0][0],
372     poll => 'r',
373 root 1.67 cb => $cb,
374 root 1.13 );
375    
376 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
377 root 1.13 after => $_[0][2],
378     cb => sub {
379     $io = 0;
380 root 1.67 $cb->();
381 root 1.13 },
382     );
383    
384 root 1.67 Coro::rouse_wait;
385 root 1.15
386 root 1.13 $io
387     }
388    
389 root 1.75 sub _writable_anyevent {
390 root 1.67 my $cb = Coro::rouse_cb;
391 root 1.13 my $io = 1;
392    
393     my $w = AnyEvent->io (
394 root 1.16 fh => $_[0][0],
395     poll => 'w',
396 root 1.67 cb => $cb,
397 root 1.13 );
398    
399 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
400 root 1.13 after => $_[0][2],
401     cb => sub {
402     $io = 0;
403 root 1.67 $cb->();
404 root 1.13 },
405     );
406    
407 root 1.67 Coro::rouse_wait;
408 root 1.15
409 root 1.13 $io
410 root 1.1 }
411    
412 root 1.75 sub _readable_coro {
413 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
414 root 1.17 fd => $_[0][0],
415     desc => "fh $_[0][1] read watcher",
416     timeout => $_[0][2],
417     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
418 root 1.26 ))->next->[4] & &Event::Watcher::R
419 root 1.17 }
420    
421 root 1.75 sub _writable_coro {
422 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
423 root 1.17 fd => $_[0][0],
424     desc => "fh $_[0][1] write watcher",
425     timeout => $_[0][2],
426     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
427 root 1.26 ))->next->[4] & &Event::Watcher::W
428 root 1.17 }
429    
430 root 1.75 #sub _readable_ev {
431 root 1.33 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
432     #}
433     #
434 root 1.75 #sub _writable_ev {
435 root 1.33 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
436     #}
437 root 1.30
438 root 1.21 # decide on event model at runtime
439 root 1.17 for my $rw (qw(readable writable)) {
440     no strict 'refs';
441    
442     *$rw = sub {
443 root 1.25 AnyEvent::detect;
444 root 1.75 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
445     *$rw = \&{"_$rw\_coro"};
446     *cleanup = sub {
447     eval {
448     $_[0][5]->cancel if $_[0][5];
449     $_[0][6]->cancel if $_[0][6];
450     };
451     @{$_[0]} = ();
452     };
453    
454     } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
455     *$rw = \&{"Coro::EV::_$rw\_ev"};
456 root 1.74 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
457 root 1.75
458 root 1.17 } else {
459 root 1.75 *$rw = \&{"_$rw\_anyevent"};
460 root 1.17 }
461 root 1.25 goto &$rw
462 root 1.17 };
463     };
464    
465 root 1.1 sub WRITE {
466     my $len = defined $_[2] ? $_[2] : length $_[1];
467     my $ofs = $_[3];
468     my $res = 0;
469    
470 root 1.16 while () {
471     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
472 root 1.1 if (defined $r) {
473     $len -= $r;
474     $ofs += $r;
475     $res += $r;
476     last unless $len;
477 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
478 root 1.1 last;
479     }
480 root 1.13 last unless &writable;
481 root 1.1 }
482    
483     return $res;
484     }
485    
486     sub READ {
487     my $len = $_[2];
488     my $ofs = $_[3];
489     my $res = 0;
490    
491 root 1.7 # first deplete the read buffer
492 root 1.13 if (length $_[0][3]) {
493     my $l = length $_[0][3];
494 root 1.7 if ($l <= $len) {
495 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
496 root 1.7 $len -= $l;
497 root 1.13 $ofs += $l;
498 root 1.7 $res += $l;
499     return $res unless $len;
500     } else {
501 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
502     substr ($_[0][3], 0, $len) = "";
503 root 1.7 return $len;
504     }
505     }
506    
507 root 1.2 while() {
508 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
509 root 1.1 if (defined $r) {
510     $len -= $r;
511     $ofs += $r;
512     $res += $r;
513     last unless $len && $r;
514 root 1.39 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
515 root 1.1 last;
516     }
517 root 1.13 last if $_[0][8] || !&readable;
518 root 1.1 }
519    
520     return $res;
521     }
522    
523     sub READLINE {
524 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
525 root 1.82 my ($ofs, $len, $pos);
526 root 1.1
527 root 1.71 while () {
528 root 1.82 if (length $irs) {
529     $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
530    
531     return substr $_[0][3], 0, $pos + length $irs, ""
532     if $pos >= 0;
533    
534     $ofs = (length $_[0][3]) - (length $irs);
535     } elsif (defined $irs) {
536     $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
537    
538 root 1.24 if ($pos >= 0) {
539 root 1.82 my $res = substr $_[0][3], 0, $pos + 2, "";
540     $res =~ s/\A\n+//;
541 root 1.24 return $res;
542     }
543 root 1.51
544 root 1.82 $ofs = (length $_[0][3]) - 1;
545 root 1.1 }
546 root 1.13
547 root 1.52 $len = sysread $_[0][0], $_[0][3], $len + 4096, length $_[0][3];
548 root 1.82
549     unless ($len) {
550     if (defined $len) {
551     # EOF
552     return undef unless length $_[0][3];
553    
554     $_[0][3] =~ s/\A\n+//
555     if ! length $irs && defined $irs;
556    
557     return delete $_[0][3];
558     } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
559     return length $_[0][3] ? delete $_[0][3] : undef;
560     }
561 root 1.1 }
562     }
563 root 1.6 }
564    
565 root 1.13 1;
566 root 1.1
567 root 1.13 =back
568 root 1.1
569     =head1 BUGS
570    
571     - Perl's IO-Handle model is THE bug.
572    
573     =head1 AUTHOR
574    
575 root 1.13 Marc Lehmann <schmorp@schmorp.de>
576     http://home.schmorp.de/
577 root 1.1
578     =cut
579