ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.30
Committed: Mon Oct 29 19:13:39 2007 UTC (16 years, 7 months ago) by root
Branch: MAIN
Changes since 1.29: +15 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Handle - non-blocking io with a blocking interface.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
12 root 1.14 other coroutines can run while reads or writes block on the handle.
13    
14     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
15     data, allowing other coroutines to run while one coroutine waits for I/O.
16    
17     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
18 root 1.1
19 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
20     tied methods, i.e.:
21    
22     $fh->print ($str); # NOT print $fh $str;
23     my $line = $fh->readline; # NOT my $line = <$fh>;
24    
25     The reason is that perl recurses within the interpreter when invoking tie
26     magic, forcing the (temporary) allocation of a (big) stack. If you have
27     lots of socket connections and they happen to wait in e.g. <$fh>, then
28     they would all have a costly C coroutine associated with them.
29    
30 root 1.1 =over 4
31    
32     =cut
33    
34     package Coro::Handle;
35    
36 root 1.16 no warnings;
37     use strict;
38 root 1.13
39 root 1.16 use Carp ();
40 root 1.1 use Errno ();
41 root 1.5 use base 'Exporter';
42 root 1.1
43 root 1.22 our $VERSION = '3.0';
44 root 1.16 our @EXPORT = qw(unblock);
45 root 1.1
46 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
47 root 1.1
48     Create a new non-blocking io-handle using the given
49 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
50     other supported argument is "timeout", which sets a timeout for each
51     operation.
52 root 1.1
53     =cut
54    
55     sub new_from_fh {
56     my $class = shift;
57     my $fh = shift or return;
58     my $self = do { local *Coro::Handle };
59    
60 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
61 root 1.1
62 root 1.22 bless \$self, ref $class ? ref $class : $class
63 root 1.1 }
64    
65 root 1.5 =item $fh = unblock $fh
66    
67 root 1.22 This is a convinience function that just calls C<new_from_fh> on the
68     given filehandle. Use it to replace a normal perl filehandle by a
69     non-(coroutine-)blocking equivalent.
70 root 1.5
71     =cut
72    
73     sub unblock($) {
74 root 1.19 new_from_fh Coro::Handle $_[0]
75 root 1.5 }
76    
77 root 1.1 =item $fh->writable, $fh->readable
78    
79     Wait until the filehandle is readable or writable (and return true) or
80     until an error condition happens (and return false).
81    
82     =cut
83    
84 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
85     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
86 root 1.1
87 root 1.16 =item $fh->readline ([$terminator])
88 root 1.4
89     Like the builtin of the same name, but allows you to specify the input
90 root 1.7 record separator in a coroutine-safe manner (i.e. not using a global
91 root 1.4 variable).
92    
93     =cut
94    
95 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
96 root 1.4
97 root 1.16 =item $fh->autoflush ([...])
98 root 1.4
99     Always returns true, arguments are being ignored (exists for compatibility
100 root 1.8 only). Might change in the future.
101 root 1.4
102     =cut
103    
104     sub autoflush { !0 }
105    
106 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
107 root 1.8
108 root 1.13 Work like their function equivalents (except read, which works like
109 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
110 root 1.13 work but it's not efficient).
111 root 1.8
112     =cut
113    
114 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
115     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
116     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
117     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
118     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
119     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
120     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
121     sub blocking { !0 } # this handler always blocks the caller
122    
123     sub partial {
124     my $obj = tied ${$_[0]};
125    
126     my $retval = $obj->[8];
127     $obj->[8] = $_[1] if @_ > 1;
128     $retval
129     }
130 root 1.8
131 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
132     send, recv, peername, sockname, shutdown, peerport, peerhost
133    
134     Do the same thing as the perl builtins or IO::Socket methods (but return
135     true on EINPROGRESS). Remember that these must be method calls.
136    
137     =cut
138    
139     sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == Errno::EINPROGRESS }
140     sub bind { bind tied(${$_[0]})->[0], $_[1] }
141     sub listen { listen tied(${$_[0]})->[0], $_[1] }
142     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
143     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
144     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
145     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
146     sub sockname { getsockname tied(${$_[0]})->[0] }
147     sub peername { getpeername tied(${$_[0]})->[0] }
148     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
149    
150     =item ($fh, $peername) = $listen_fh->accept
151    
152     In scalar context, returns the newly accepted socket (or undef) and in
153     list context return the ($fh, $peername) pair (or nothing).
154    
155     =cut
156    
157     sub accept {
158     my ($peername, $fh);
159     while () {
160     $peername = accept $fh, tied(${$_[0]})->[0]
161     and return wantarray
162     ? ($_[0]->new_from_fh($fh), $peername)
163     : $_[0]->new_from_fh($fh);
164    
165     return unless $!{EAGAIN};
166    
167     $_[0]->readable or return;
168     }
169     }
170    
171 root 1.21 =item $fh->timeout ([...])
172 root 1.8
173 root 1.13 The optional argument sets the new timeout (in seconds) for this
174 root 1.8 handle. Returns the current (new) value.
175    
176     C<0> is a valid timeout, use C<undef> to disable the timeout.
177    
178     =cut
179    
180     sub timeout {
181 root 1.18 my $self = tied ${$_[0]};
182 root 1.13 if (@_ > 1) {
183     $self->[2] = $_[1];
184 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
185     $self->[6]->timeout ($_[1]) if $self->[6];
186 root 1.8 }
187 root 1.21 $self->[2]
188 root 1.13 }
189    
190     =item $fh->fh
191    
192     Returns the "real" (non-blocking) filehandle. Use this if you want to
193     do operations on the file handle you cannot do using the Coro::Handle
194     interface.
195    
196     =item $fh->rbuf
197    
198     Returns the current contents of the read buffer (this is an lvalue, so you
199     can change the read buffer if you like).
200    
201     You can use this function to implement your own optimized reader when neither
202     readline nor sysread are viable candidates, like this:
203    
204     # first get the _real_ non-blocking filehandle
205     # and fetch a reference to the read buffer
206     my $nb_fh = $fh->fh;
207     my $buf = \$fh->rbuf;
208    
209 root 1.18 while () {
210 root 1.13 # now use buffer contents, modifying
211     # if necessary to reflect the removed data
212    
213     last if $$buf ne ""; # we have leftover data
214    
215     # read another buffer full of data
216     $fh->readable or die "end of file";
217     sysread $nb_fh, $$buf, 8192;
218     }
219    
220     =cut
221    
222     sub fh {
223     (tied ${$_[0]})->[0];
224     }
225    
226     sub rbuf : lvalue {
227     (tied ${$_[0]})->[3];
228     }
229    
230     sub DESTROY {
231     # nop
232     }
233    
234 root 1.16 our $AUTOLOAD;
235    
236 root 1.13 sub AUTOLOAD {
237     my $self = tied ${$_[0]};
238    
239     (my $func = $AUTOLOAD) =~ s/^(.*):://;
240    
241     my $forward = UNIVERSAL::can $self->[7], $func;
242    
243     $forward or
244     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
245    
246     goto &$forward;
247 root 1.8 }
248    
249 root 1.1 package Coro::Handle::FH;
250    
251 root 1.16 no warnings;
252     use strict;
253 root 1.13
254 root 1.1 use Fcntl ();
255     use Errno ();
256 root 1.10 use Carp 'croak';
257 root 1.1
258 root 1.13 use AnyEvent;
259 root 1.1
260 root 1.13 # formerly a hash, but we are speed-critical, so try
261     # to be faster even if it hurts.
262     #
263     # 0 FH
264     # 1 desc
265     # 2 timeout
266     # 3 rb
267     # 4 wb # unused
268 root 1.17 # 5 read watcher, if Coro::Event used
269     # 6 write watcher, if Coro::Event used
270 root 1.13 # 7 forward class
271     # 8 blocking
272 root 1.1
273     sub TIEHANDLE {
274 root 1.13 my ($class, %arg) = @_;
275 root 1.1
276 root 1.13 my $self = bless [], $class;
277     $self->[0] = $arg{fh};
278     $self->[1] = $arg{desc};
279     $self->[2] = $arg{timeout};
280     $self->[3] = "";
281     $self->[4] = "";
282     $self->[7] = $arg{forward_class};
283     $self->[8] = $arg{partial};
284 root 1.1
285 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
286 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
287 root 1.6
288 root 1.13 $self
289     }
290    
291     sub cleanup {
292 root 1.29 $_[0][5]->cancel if $_[0][5];
293     $_[0][6]->cancel if $_[0][6];
294     @{$_[0]} = ();
295 root 1.1 }
296    
297     sub OPEN {
298 root 1.13 &cleanup;
299 root 1.1 my $self = shift;
300 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
301     : open $self->[0], $_[0], $_[1], $_[2];
302 root 1.18
303 root 1.1 if ($r) {
304 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
305 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
306 root 1.1 }
307 root 1.18
308     $r
309 root 1.1 }
310    
311 root 1.13 sub PRINT {
312 root 1.18 WRITE (shift, join "", @_)
313 root 1.13 }
314    
315     sub PRINTF {
316 root 1.18 WRITE (shift, sprintf shift,@_)
317 root 1.13 }
318    
319     sub GETC {
320     my $buf;
321 root 1.16 READ ($_[0], $buf, 1);
322 root 1.18 $buf
323 root 1.13 }
324    
325     sub BINMODE {
326     binmode $_[0][0];
327     }
328    
329     sub TELL {
330 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
331 root 1.13 }
332    
333     sub SEEK {
334 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
335 root 1.13 }
336    
337     sub EOF {
338 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
339 root 1.13 }
340    
341 root 1.1 sub CLOSE {
342 root 1.13 &cleanup;
343 root 1.18 close $_[0][0]
344 root 1.13 }
345    
346     sub DESTROY {
347     &cleanup;
348 root 1.8 }
349    
350     sub FILENO {
351 root 1.18 fileno $_[0][0]
352 root 1.1 }
353    
354 root 1.13 # seems to be called for stringification (how weird), at least
355     # when DumpValue::dumpValue is used to print this.
356     sub FETCH {
357 root 1.18 "$_[0]<$_[0][1]>"
358 root 1.1 }
359    
360 root 1.17 sub readable_anyevent {
361 root 1.13 my $current = $Coro::current;
362     my $io = 1;
363    
364     my $w = AnyEvent->io (
365 root 1.16 desc => "$_[0][1] read watcher",
366     fh => $_[0][0],
367     poll => 'r',
368     cb => sub {
369 root 1.30 $current->ready if $current;
370 root 1.15 undef $current;
371 root 1.13 },
372     );
373    
374 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
375     desc => "fh $_[0][1] read timeout",
376 root 1.13 after => $_[0][2],
377     cb => sub {
378     $io = 0;
379 root 1.30 $current->ready if $current;
380 root 1.15 undef $current;
381 root 1.13 },
382     );
383    
384     &Coro::schedule;
385 root 1.15 &Coro::schedule while $current;
386    
387 root 1.13 $io
388     }
389    
390 root 1.17 sub writable_anyevent {
391 root 1.13 my $current = $Coro::current;
392     my $io = 1;
393    
394     my $w = AnyEvent->io (
395 root 1.16 desc => "fh $_[0][1] write watcher",
396     fh => $_[0][0],
397     poll => 'w',
398     cb => sub {
399 root 1.30 $current->ready if $current;
400 root 1.15 undef $current;
401 root 1.13 },
402     );
403    
404 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
405     desc => "fh $_[0][1] write timeout",
406 root 1.13 after => $_[0][2],
407     cb => sub {
408     $io = 0;
409 root 1.30 $current->ready if $current;
410 root 1.15 undef $current;
411 root 1.13 },
412     );
413    
414 root 1.15 &Coro::schedule while $current;
415    
416 root 1.13 $io
417 root 1.1 }
418    
419 root 1.17 sub readable_coro {
420 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
421 root 1.17 fd => $_[0][0],
422     desc => "fh $_[0][1] read watcher",
423     timeout => $_[0][2],
424     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
425 root 1.26 ))->next->[4] & &Event::Watcher::R
426 root 1.17 }
427    
428     sub writable_coro {
429 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
430 root 1.17 fd => $_[0][0],
431     desc => "fh $_[0][1] write watcher",
432     timeout => $_[0][2],
433     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
434 root 1.26 ))->next->[4] & &Event::Watcher::W
435 root 1.17 }
436    
437 root 1.30 sub readable_ev {
438     &EV::READ == Coro::EV::once_timed_io (fileno $_[0][0], &EV::READ , $_[0][2])
439     }
440    
441     sub writable_ev {
442     &EV::WRITE == Coro::EV::once_timed_io (fileno $_[0][0], &EV::WRITE, $_[0][2])
443     }
444    
445 root 1.21 # decide on event model at runtime
446 root 1.17 for my $rw (qw(readable writable)) {
447     no strict 'refs';
448    
449     *$rw = sub {
450 root 1.25 AnyEvent::detect;
451 root 1.17 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
452     require Coro::Event;
453     *$rw = \&{"$rw\_coro"};
454 root 1.30 } elsif ($AnyEvent::MODEL eq "EV::AnyEvent") {
455     require Coro::EV;
456     *$rw = \&{"$rw\_ev"};
457 root 1.17 } else {
458     *$rw = \&{"$rw\_anyevent"};
459     }
460 root 1.25 goto &$rw
461 root 1.17 };
462     };
463    
464 root 1.1 sub WRITE {
465     my $len = defined $_[2] ? $_[2] : length $_[1];
466     my $ofs = $_[3];
467     my $res = 0;
468    
469 root 1.16 while () {
470     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
471 root 1.1 if (defined $r) {
472     $len -= $r;
473     $ofs += $r;
474     $res += $r;
475     last unless $len;
476     } elsif ($! != Errno::EAGAIN) {
477     last;
478     }
479 root 1.13 last unless &writable;
480 root 1.1 }
481    
482     return $res;
483     }
484    
485     sub READ {
486     my $len = $_[2];
487     my $ofs = $_[3];
488     my $res = 0;
489    
490 root 1.7 # first deplete the read buffer
491 root 1.13 if (length $_[0][3]) {
492     my $l = length $_[0][3];
493 root 1.7 if ($l <= $len) {
494 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
495 root 1.7 $len -= $l;
496 root 1.13 $ofs += $l;
497 root 1.7 $res += $l;
498     return $res unless $len;
499     } else {
500 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
501     substr ($_[0][3], 0, $len) = "";
502 root 1.7 return $len;
503     }
504     }
505    
506 root 1.2 while() {
507 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
508 root 1.1 if (defined $r) {
509     $len -= $r;
510     $ofs += $r;
511     $res += $r;
512     last unless $len && $r;
513     } elsif ($! != Errno::EAGAIN) {
514     last;
515     }
516 root 1.13 last if $_[0][8] || !&readable;
517 root 1.1 }
518    
519     return $res;
520     }
521    
522     sub READLINE {
523 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
524 root 1.1
525     while() {
526 root 1.24 if (defined $irs) {
527     my $pos = index $_[0][3], $irs;
528     if ($pos >= 0) {
529     $pos += length $irs;
530     my $res = substr $_[0][3], 0, $pos;
531     substr ($_[0][3], 0, $pos) = "";
532     return $res;
533     }
534 root 1.1 }
535 root 1.13
536     my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
537 root 1.1 if (defined $r) {
538 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef unless $r;
539 root 1.13 } elsif ($! != Errno::EAGAIN || !&readable) {
540 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef;
541 root 1.1 }
542     }
543 root 1.6 }
544    
545 root 1.13 1;
546 root 1.1
547 root 1.13 =back
548 root 1.1
549     =head1 BUGS
550    
551     - Perl's IO-Handle model is THE bug.
552    
553     =head1 AUTHOR
554    
555 root 1.13 Marc Lehmann <schmorp@schmorp.de>
556     http://home.schmorp.de/
557 root 1.1
558     =cut
559