ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.29
Committed: Thu Oct 25 08:14:00 2007 UTC (16 years, 7 months ago) by root
Branch: MAIN
Changes since 1.28: +3 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Handle - non-blocking io with a blocking interface.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
12 root 1.14 other coroutines can run while reads or writes block on the handle.
13    
14     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
15     data, allowing other coroutines to run while one coroutine waits for I/O.
16    
17     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
18 root 1.1
19 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
20     tied methods, i.e.:
21    
22     $fh->print ($str); # NOT print $fh $str;
23     my $line = $fh->readline; # NOT my $line = <$fh>;
24    
25     The reason is that perl recurses within the interpreter when invoking tie
26     magic, forcing the (temporary) allocation of a (big) stack. If you have
27     lots of socket connections and they happen to wait in e.g. <$fh>, then
28     they would all have a costly C coroutine associated with them.
29    
30 root 1.1 =over 4
31    
32     =cut
33    
34     package Coro::Handle;
35    
36 root 1.16 no warnings;
37     use strict;
38 root 1.13
39 root 1.16 use Carp ();
40 root 1.1 use Errno ();
41 root 1.5 use base 'Exporter';
42 root 1.1
43 root 1.22 our $VERSION = '3.0';
44 root 1.16 our @EXPORT = qw(unblock);
45 root 1.1
46 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
47 root 1.1
48     Create a new non-blocking io-handle using the given
49 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
50     other supported argument is "timeout", which sets a timeout for each
51     operation.
52 root 1.1
53     =cut
54    
55     sub new_from_fh {
56     my $class = shift;
57     my $fh = shift or return;
58     my $self = do { local *Coro::Handle };
59    
60 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
61 root 1.1
62 root 1.22 bless \$self, ref $class ? ref $class : $class
63 root 1.1 }
64    
65 root 1.5 =item $fh = unblock $fh
66    
67 root 1.22 This is a convinience function that just calls C<new_from_fh> on the
68     given filehandle. Use it to replace a normal perl filehandle by a
69     non-(coroutine-)blocking equivalent.
70 root 1.5
71     =cut
72    
73     sub unblock($) {
74 root 1.19 new_from_fh Coro::Handle $_[0]
75 root 1.5 }
76    
77 root 1.1 =item $fh->writable, $fh->readable
78    
79     Wait until the filehandle is readable or writable (and return true) or
80     until an error condition happens (and return false).
81    
82     =cut
83    
84 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
85     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
86 root 1.1
87 root 1.16 =item $fh->readline ([$terminator])
88 root 1.4
89     Like the builtin of the same name, but allows you to specify the input
90 root 1.7 record separator in a coroutine-safe manner (i.e. not using a global
91 root 1.4 variable).
92    
93     =cut
94    
95 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
96 root 1.4
97 root 1.16 =item $fh->autoflush ([...])
98 root 1.4
99     Always returns true, arguments are being ignored (exists for compatibility
100 root 1.8 only). Might change in the future.
101 root 1.4
102     =cut
103    
104     sub autoflush { !0 }
105    
106 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
107 root 1.8
108 root 1.13 Work like their function equivalents (except read, which works like
109 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
110 root 1.13 work but it's not efficient).
111 root 1.8
112     =cut
113    
114 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
115     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
116     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
117     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
118     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
119     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
120     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
121     sub blocking { !0 } # this handler always blocks the caller
122    
123     sub partial {
124     my $obj = tied ${$_[0]};
125    
126     my $retval = $obj->[8];
127     $obj->[8] = $_[1] if @_ > 1;
128     $retval
129     }
130 root 1.8
131 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
132     send, recv, peername, sockname, shutdown, peerport, peerhost
133    
134     Do the same thing as the perl builtins or IO::Socket methods (but return
135     true on EINPROGRESS). Remember that these must be method calls.
136    
137     =cut
138    
139     sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == Errno::EINPROGRESS }
140     sub bind { bind tied(${$_[0]})->[0], $_[1] }
141     sub listen { listen tied(${$_[0]})->[0], $_[1] }
142     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
143     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
144     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
145     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
146     sub sockname { getsockname tied(${$_[0]})->[0] }
147     sub peername { getpeername tied(${$_[0]})->[0] }
148     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
149    
150     =item ($fh, $peername) = $listen_fh->accept
151    
152     In scalar context, returns the newly accepted socket (or undef) and in
153     list context return the ($fh, $peername) pair (or nothing).
154    
155     =cut
156    
157     sub accept {
158     my ($peername, $fh);
159     while () {
160     $peername = accept $fh, tied(${$_[0]})->[0]
161     and return wantarray
162     ? ($_[0]->new_from_fh($fh), $peername)
163     : $_[0]->new_from_fh($fh);
164    
165     return unless $!{EAGAIN};
166    
167     $_[0]->readable or return;
168     }
169     }
170    
171 root 1.21 =item $fh->timeout ([...])
172 root 1.8
173 root 1.13 The optional argument sets the new timeout (in seconds) for this
174 root 1.8 handle. Returns the current (new) value.
175    
176     C<0> is a valid timeout, use C<undef> to disable the timeout.
177    
178     =cut
179    
180     sub timeout {
181 root 1.18 my $self = tied ${$_[0]};
182 root 1.13 if (@_ > 1) {
183     $self->[2] = $_[1];
184 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
185     $self->[6]->timeout ($_[1]) if $self->[6];
186 root 1.8 }
187 root 1.21 $self->[2]
188 root 1.13 }
189    
190     =item $fh->fh
191    
192     Returns the "real" (non-blocking) filehandle. Use this if you want to
193     do operations on the file handle you cannot do using the Coro::Handle
194     interface.
195    
196     =item $fh->rbuf
197    
198     Returns the current contents of the read buffer (this is an lvalue, so you
199     can change the read buffer if you like).
200    
201     You can use this function to implement your own optimized reader when neither
202     readline nor sysread are viable candidates, like this:
203    
204     # first get the _real_ non-blocking filehandle
205     # and fetch a reference to the read buffer
206     my $nb_fh = $fh->fh;
207     my $buf = \$fh->rbuf;
208    
209 root 1.18 while () {
210 root 1.13 # now use buffer contents, modifying
211     # if necessary to reflect the removed data
212    
213     last if $$buf ne ""; # we have leftover data
214    
215     # read another buffer full of data
216     $fh->readable or die "end of file";
217     sysread $nb_fh, $$buf, 8192;
218     }
219    
220     =cut
221    
222     sub fh {
223     (tied ${$_[0]})->[0];
224     }
225    
226     sub rbuf : lvalue {
227     (tied ${$_[0]})->[3];
228     }
229    
230     sub DESTROY {
231     # nop
232     }
233    
234 root 1.16 our $AUTOLOAD;
235    
236 root 1.13 sub AUTOLOAD {
237     my $self = tied ${$_[0]};
238    
239     (my $func = $AUTOLOAD) =~ s/^(.*):://;
240    
241     my $forward = UNIVERSAL::can $self->[7], $func;
242    
243     $forward or
244     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
245    
246     goto &$forward;
247 root 1.8 }
248    
249 root 1.1 package Coro::Handle::FH;
250    
251 root 1.16 no warnings;
252     use strict;
253 root 1.13
254 root 1.1 use Fcntl ();
255     use Errno ();
256 root 1.10 use Carp 'croak';
257 root 1.1
258 root 1.13 use AnyEvent;
259 root 1.1
260 root 1.13 # formerly a hash, but we are speed-critical, so try
261     # to be faster even if it hurts.
262     #
263     # 0 FH
264     # 1 desc
265     # 2 timeout
266     # 3 rb
267     # 4 wb # unused
268 root 1.17 # 5 read watcher, if Coro::Event used
269     # 6 write watcher, if Coro::Event used
270 root 1.13 # 7 forward class
271     # 8 blocking
272 root 1.1
273     sub TIEHANDLE {
274 root 1.13 my ($class, %arg) = @_;
275 root 1.1
276 root 1.13 my $self = bless [], $class;
277     $self->[0] = $arg{fh};
278     $self->[1] = $arg{desc};
279     $self->[2] = $arg{timeout};
280     $self->[3] = "";
281     $self->[4] = "";
282     $self->[7] = $arg{forward_class};
283     $self->[8] = $arg{partial};
284 root 1.1
285 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
286 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
287 root 1.6
288 root 1.13 $self
289     }
290    
291     sub cleanup {
292 root 1.29 $_[0][5]->cancel if $_[0][5];
293     $_[0][6]->cancel if $_[0][6];
294     @{$_[0]} = ();
295 root 1.1 }
296    
297     sub OPEN {
298 root 1.13 &cleanup;
299 root 1.1 my $self = shift;
300 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
301     : open $self->[0], $_[0], $_[1], $_[2];
302 root 1.18
303 root 1.1 if ($r) {
304 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
305 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
306 root 1.1 }
307 root 1.18
308     $r
309 root 1.1 }
310    
311 root 1.13 sub PRINT {
312 root 1.18 WRITE (shift, join "", @_)
313 root 1.13 }
314    
315     sub PRINTF {
316 root 1.18 WRITE (shift, sprintf shift,@_)
317 root 1.13 }
318    
319     sub GETC {
320     my $buf;
321 root 1.16 READ ($_[0], $buf, 1);
322 root 1.18 $buf
323 root 1.13 }
324    
325     sub BINMODE {
326     binmode $_[0][0];
327     }
328    
329     sub TELL {
330 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
331 root 1.13 }
332    
333     sub SEEK {
334 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
335 root 1.13 }
336    
337     sub EOF {
338 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
339 root 1.13 }
340    
341 root 1.1 sub CLOSE {
342 root 1.13 &cleanup;
343 root 1.18 close $_[0][0]
344 root 1.13 }
345    
346     sub DESTROY {
347     &cleanup;
348 root 1.8 }
349    
350     sub FILENO {
351 root 1.18 fileno $_[0][0]
352 root 1.1 }
353    
354 root 1.13 # seems to be called for stringification (how weird), at least
355     # when DumpValue::dumpValue is used to print this.
356     sub FETCH {
357 root 1.18 "$_[0]<$_[0][1]>"
358 root 1.1 }
359    
360 root 1.17 sub readable_anyevent {
361 root 1.13 my $current = $Coro::current;
362     my $io = 1;
363    
364     my $w = AnyEvent->io (
365 root 1.16 desc => "$_[0][1] read watcher",
366     fh => $_[0][0],
367     poll => 'r',
368     cb => sub {
369 root 1.13 $current->ready;
370 root 1.15 undef $current;
371 root 1.13 },
372     );
373    
374 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
375     desc => "fh $_[0][1] read timeout",
376 root 1.13 after => $_[0][2],
377     cb => sub {
378     $io = 0;
379     $current->ready;
380 root 1.15 undef $current;
381 root 1.13 },
382     );
383    
384     &Coro::schedule;
385 root 1.15 &Coro::schedule while $current;
386    
387 root 1.13 $io
388     }
389    
390 root 1.17 sub writable_anyevent {
391 root 1.13 my $current = $Coro::current;
392     my $io = 1;
393    
394     my $w = AnyEvent->io (
395 root 1.16 desc => "fh $_[0][1] write watcher",
396     fh => $_[0][0],
397     poll => 'w',
398     cb => sub {
399 root 1.13 $current->ready;
400 root 1.15 undef $current;
401 root 1.13 },
402     );
403    
404 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
405     desc => "fh $_[0][1] write timeout",
406 root 1.13 after => $_[0][2],
407     cb => sub {
408     $io = 0;
409     $current->ready;
410 root 1.15 undef $current;
411 root 1.13 },
412     );
413    
414 root 1.15 &Coro::schedule while $current;
415    
416 root 1.13 $io
417 root 1.1 }
418    
419 root 1.17 sub readable_coro {
420 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
421 root 1.17 fd => $_[0][0],
422     desc => "fh $_[0][1] read watcher",
423     timeout => $_[0][2],
424     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
425 root 1.26 ))->next->[4] & &Event::Watcher::R
426 root 1.17 }
427    
428     sub writable_coro {
429 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
430 root 1.17 fd => $_[0][0],
431     desc => "fh $_[0][1] write watcher",
432     timeout => $_[0][2],
433     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
434 root 1.26 ))->next->[4] & &Event::Watcher::W
435 root 1.17 }
436    
437 root 1.21 # decide on event model at runtime
438 root 1.17 for my $rw (qw(readable writable)) {
439     no strict 'refs';
440    
441     *$rw = sub {
442 root 1.25 AnyEvent::detect;
443 root 1.17 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
444     require Coro::Event;
445     *$rw = \&{"$rw\_coro"};
446     } else {
447     *$rw = \&{"$rw\_anyevent"};
448     }
449 root 1.25 goto &$rw
450 root 1.17 };
451     };
452    
453 root 1.1 sub WRITE {
454     my $len = defined $_[2] ? $_[2] : length $_[1];
455     my $ofs = $_[3];
456     my $res = 0;
457    
458 root 1.16 while () {
459     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
460 root 1.1 if (defined $r) {
461     $len -= $r;
462     $ofs += $r;
463     $res += $r;
464     last unless $len;
465     } elsif ($! != Errno::EAGAIN) {
466     last;
467     }
468 root 1.13 last unless &writable;
469 root 1.1 }
470    
471     return $res;
472     }
473    
474     sub READ {
475     my $len = $_[2];
476     my $ofs = $_[3];
477     my $res = 0;
478    
479 root 1.7 # first deplete the read buffer
480 root 1.13 if (length $_[0][3]) {
481     my $l = length $_[0][3];
482 root 1.7 if ($l <= $len) {
483 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
484 root 1.7 $len -= $l;
485 root 1.13 $ofs += $l;
486 root 1.7 $res += $l;
487     return $res unless $len;
488     } else {
489 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
490     substr ($_[0][3], 0, $len) = "";
491 root 1.7 return $len;
492     }
493     }
494    
495 root 1.2 while() {
496 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
497 root 1.1 if (defined $r) {
498     $len -= $r;
499     $ofs += $r;
500     $res += $r;
501     last unless $len && $r;
502     } elsif ($! != Errno::EAGAIN) {
503     last;
504     }
505 root 1.13 last if $_[0][8] || !&readable;
506 root 1.1 }
507    
508     return $res;
509     }
510    
511     sub READLINE {
512 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
513 root 1.1
514     while() {
515 root 1.24 if (defined $irs) {
516     my $pos = index $_[0][3], $irs;
517     if ($pos >= 0) {
518     $pos += length $irs;
519     my $res = substr $_[0][3], 0, $pos;
520     substr ($_[0][3], 0, $pos) = "";
521     return $res;
522     }
523 root 1.1 }
524 root 1.13
525     my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
526 root 1.1 if (defined $r) {
527 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef unless $r;
528 root 1.13 } elsif ($! != Errno::EAGAIN || !&readable) {
529 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef;
530 root 1.1 }
531     }
532 root 1.6 }
533    
534 root 1.13 1;
535 root 1.1
536 root 1.13 =back
537 root 1.1
538     =head1 BUGS
539    
540     - Perl's IO-Handle model is THE bug.
541    
542     =head1 AUTHOR
543    
544 root 1.13 Marc Lehmann <schmorp@schmorp.de>
545     http://home.schmorp.de/
546 root 1.1
547     =cut
548