ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.34
Committed: Fri Apr 25 04:28:50 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.33: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Handle - non-blocking io with a blocking interface.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
12 root 1.14 other coroutines can run while reads or writes block on the handle.
13    
14     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
15     data, allowing other coroutines to run while one coroutine waits for I/O.
16    
17     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
18 root 1.1
19 root 1.28 If at all possible, you should I<always> prefer method calls on the handle object over invoking
20     tied methods, i.e.:
21    
22     $fh->print ($str); # NOT print $fh $str;
23     my $line = $fh->readline; # NOT my $line = <$fh>;
24    
25     The reason is that perl recurses within the interpreter when invoking tie
26     magic, forcing the (temporary) allocation of a (big) stack. If you have
27     lots of socket connections and they happen to wait in e.g. <$fh>, then
28     they would all have a costly C coroutine associated with them.
29    
30 root 1.1 =over 4
31    
32     =cut
33    
34     package Coro::Handle;
35    
36 root 1.16 no warnings;
37     use strict;
38 root 1.13
39 root 1.16 use Carp ();
40 root 1.1 use Errno ();
41 root 1.5 use base 'Exporter';
42 root 1.1
43 root 1.34 our $VERSION = 4.6;
44 root 1.16 our @EXPORT = qw(unblock);
45 root 1.1
46 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
47 root 1.1
48     Create a new non-blocking io-handle using the given
49 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
50     other supported argument is "timeout", which sets a timeout for each
51     operation.
52 root 1.1
53     =cut
54    
55     sub new_from_fh {
56     my $class = shift;
57     my $fh = shift or return;
58     my $self = do { local *Coro::Handle };
59    
60 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
61 root 1.1
62 root 1.22 bless \$self, ref $class ? ref $class : $class
63 root 1.1 }
64    
65 root 1.5 =item $fh = unblock $fh
66    
67 root 1.22 This is a convinience function that just calls C<new_from_fh> on the
68     given filehandle. Use it to replace a normal perl filehandle by a
69     non-(coroutine-)blocking equivalent.
70 root 1.5
71     =cut
72    
73     sub unblock($) {
74 root 1.19 new_from_fh Coro::Handle $_[0]
75 root 1.5 }
76    
77 root 1.1 =item $fh->writable, $fh->readable
78    
79     Wait until the filehandle is readable or writable (and return true) or
80     until an error condition happens (and return false).
81    
82     =cut
83    
84 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
85     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
86 root 1.1
87 root 1.16 =item $fh->readline ([$terminator])
88 root 1.4
89     Like the builtin of the same name, but allows you to specify the input
90 root 1.7 record separator in a coroutine-safe manner (i.e. not using a global
91 root 1.4 variable).
92    
93     =cut
94    
95 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
96 root 1.4
97 root 1.16 =item $fh->autoflush ([...])
98 root 1.4
99     Always returns true, arguments are being ignored (exists for compatibility
100 root 1.8 only). Might change in the future.
101 root 1.4
102     =cut
103    
104     sub autoflush { !0 }
105    
106 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
107 root 1.8
108 root 1.13 Work like their function equivalents (except read, which works like
109 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
110 root 1.13 work but it's not efficient).
111 root 1.8
112     =cut
113    
114 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
115     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
116     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
117     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
118     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
119     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
120     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
121     sub blocking { !0 } # this handler always blocks the caller
122    
123     sub partial {
124     my $obj = tied ${$_[0]};
125    
126     my $retval = $obj->[8];
127     $obj->[8] = $_[1] if @_ > 1;
128     $retval
129     }
130 root 1.8
131 root 1.27 =item connect, listen, bind, getsockopt, setsockopt,
132     send, recv, peername, sockname, shutdown, peerport, peerhost
133    
134     Do the same thing as the perl builtins or IO::Socket methods (but return
135     true on EINPROGRESS). Remember that these must be method calls.
136    
137     =cut
138    
139     sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == Errno::EINPROGRESS }
140     sub bind { bind tied(${$_[0]})->[0], $_[1] }
141     sub listen { listen tied(${$_[0]})->[0], $_[1] }
142     sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
143     sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
144     sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
145     sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
146     sub sockname { getsockname tied(${$_[0]})->[0] }
147     sub peername { getpeername tied(${$_[0]})->[0] }
148     sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
149    
150     =item ($fh, $peername) = $listen_fh->accept
151    
152     In scalar context, returns the newly accepted socket (or undef) and in
153     list context return the ($fh, $peername) pair (or nothing).
154    
155     =cut
156    
157     sub accept {
158     my ($peername, $fh);
159     while () {
160     $peername = accept $fh, tied(${$_[0]})->[0]
161     and return wantarray
162     ? ($_[0]->new_from_fh($fh), $peername)
163     : $_[0]->new_from_fh($fh);
164    
165     return unless $!{EAGAIN};
166    
167     $_[0]->readable or return;
168     }
169     }
170    
171 root 1.21 =item $fh->timeout ([...])
172 root 1.8
173 root 1.13 The optional argument sets the new timeout (in seconds) for this
174 root 1.8 handle. Returns the current (new) value.
175    
176     C<0> is a valid timeout, use C<undef> to disable the timeout.
177    
178     =cut
179    
180     sub timeout {
181 root 1.18 my $self = tied ${$_[0]};
182 root 1.13 if (@_ > 1) {
183     $self->[2] = $_[1];
184 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
185     $self->[6]->timeout ($_[1]) if $self->[6];
186 root 1.8 }
187 root 1.21 $self->[2]
188 root 1.13 }
189    
190     =item $fh->fh
191    
192     Returns the "real" (non-blocking) filehandle. Use this if you want to
193     do operations on the file handle you cannot do using the Coro::Handle
194     interface.
195    
196     =item $fh->rbuf
197    
198     Returns the current contents of the read buffer (this is an lvalue, so you
199     can change the read buffer if you like).
200    
201     You can use this function to implement your own optimized reader when neither
202     readline nor sysread are viable candidates, like this:
203    
204     # first get the _real_ non-blocking filehandle
205     # and fetch a reference to the read buffer
206     my $nb_fh = $fh->fh;
207     my $buf = \$fh->rbuf;
208    
209 root 1.18 while () {
210 root 1.13 # now use buffer contents, modifying
211     # if necessary to reflect the removed data
212    
213     last if $$buf ne ""; # we have leftover data
214    
215     # read another buffer full of data
216     $fh->readable or die "end of file";
217     sysread $nb_fh, $$buf, 8192;
218     }
219    
220     =cut
221    
222     sub fh {
223     (tied ${$_[0]})->[0];
224     }
225    
226     sub rbuf : lvalue {
227     (tied ${$_[0]})->[3];
228     }
229    
230     sub DESTROY {
231     # nop
232     }
233    
234 root 1.16 our $AUTOLOAD;
235    
236 root 1.13 sub AUTOLOAD {
237     my $self = tied ${$_[0]};
238    
239     (my $func = $AUTOLOAD) =~ s/^(.*):://;
240    
241     my $forward = UNIVERSAL::can $self->[7], $func;
242    
243     $forward or
244     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
245    
246     goto &$forward;
247 root 1.8 }
248    
249 root 1.1 package Coro::Handle::FH;
250    
251 root 1.16 no warnings;
252     use strict;
253 root 1.13
254 root 1.1 use Fcntl ();
255     use Errno ();
256 root 1.10 use Carp 'croak';
257 root 1.1
258 root 1.13 use AnyEvent;
259 root 1.1
260 root 1.13 # formerly a hash, but we are speed-critical, so try
261     # to be faster even if it hurts.
262     #
263     # 0 FH
264     # 1 desc
265     # 2 timeout
266     # 3 rb
267     # 4 wb # unused
268 root 1.17 # 5 read watcher, if Coro::Event used
269     # 6 write watcher, if Coro::Event used
270 root 1.13 # 7 forward class
271     # 8 blocking
272 root 1.1
273     sub TIEHANDLE {
274 root 1.13 my ($class, %arg) = @_;
275 root 1.1
276 root 1.13 my $self = bless [], $class;
277     $self->[0] = $arg{fh};
278     $self->[1] = $arg{desc};
279     $self->[2] = $arg{timeout};
280     $self->[3] = "";
281     $self->[4] = "";
282     $self->[7] = $arg{forward_class};
283     $self->[8] = $arg{partial};
284 root 1.1
285 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
286 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
287 root 1.6
288 root 1.13 $self
289     }
290    
291     sub cleanup {
292 root 1.29 $_[0][5]->cancel if $_[0][5];
293     $_[0][6]->cancel if $_[0][6];
294     @{$_[0]} = ();
295 root 1.1 }
296    
297     sub OPEN {
298 root 1.13 &cleanup;
299 root 1.1 my $self = shift;
300 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
301     : open $self->[0], $_[0], $_[1], $_[2];
302 root 1.18
303 root 1.1 if ($r) {
304 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
305 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
306 root 1.1 }
307 root 1.18
308     $r
309 root 1.1 }
310    
311 root 1.13 sub PRINT {
312 root 1.18 WRITE (shift, join "", @_)
313 root 1.13 }
314    
315     sub PRINTF {
316 root 1.18 WRITE (shift, sprintf shift,@_)
317 root 1.13 }
318    
319     sub GETC {
320     my $buf;
321 root 1.16 READ ($_[0], $buf, 1);
322 root 1.18 $buf
323 root 1.13 }
324    
325     sub BINMODE {
326     binmode $_[0][0];
327     }
328    
329     sub TELL {
330 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
331 root 1.13 }
332    
333     sub SEEK {
334 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
335 root 1.13 }
336    
337     sub EOF {
338 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
339 root 1.13 }
340    
341 root 1.1 sub CLOSE {
342 root 1.13 &cleanup;
343 root 1.18 close $_[0][0]
344 root 1.13 }
345    
346     sub DESTROY {
347     &cleanup;
348 root 1.8 }
349    
350     sub FILENO {
351 root 1.18 fileno $_[0][0]
352 root 1.1 }
353    
354 root 1.13 # seems to be called for stringification (how weird), at least
355     # when DumpValue::dumpValue is used to print this.
356     sub FETCH {
357 root 1.18 "$_[0]<$_[0][1]>"
358 root 1.1 }
359    
360 root 1.17 sub readable_anyevent {
361 root 1.13 my $current = $Coro::current;
362     my $io = 1;
363    
364     my $w = AnyEvent->io (
365 root 1.16 fh => $_[0][0],
366     poll => 'r',
367     cb => sub {
368 root 1.30 $current->ready if $current;
369 root 1.15 undef $current;
370 root 1.13 },
371     );
372    
373 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
374 root 1.13 after => $_[0][2],
375     cb => sub {
376     $io = 0;
377 root 1.30 $current->ready if $current;
378 root 1.15 undef $current;
379 root 1.13 },
380     );
381    
382     &Coro::schedule;
383 root 1.15 &Coro::schedule while $current;
384    
385 root 1.13 $io
386     }
387    
388 root 1.17 sub writable_anyevent {
389 root 1.13 my $current = $Coro::current;
390     my $io = 1;
391    
392     my $w = AnyEvent->io (
393 root 1.16 fh => $_[0][0],
394     poll => 'w',
395     cb => sub {
396 root 1.30 $current->ready if $current;
397 root 1.15 undef $current;
398 root 1.13 },
399     );
400    
401 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
402 root 1.13 after => $_[0][2],
403     cb => sub {
404     $io = 0;
405 root 1.30 $current->ready if $current;
406 root 1.15 undef $current;
407 root 1.13 },
408     );
409    
410 root 1.15 &Coro::schedule while $current;
411    
412 root 1.13 $io
413 root 1.1 }
414    
415 root 1.17 sub readable_coro {
416 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
417 root 1.17 fd => $_[0][0],
418     desc => "fh $_[0][1] read watcher",
419     timeout => $_[0][2],
420     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
421 root 1.26 ))->next->[4] & &Event::Watcher::R
422 root 1.17 }
423    
424     sub writable_coro {
425 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
426 root 1.17 fd => $_[0][0],
427     desc => "fh $_[0][1] write watcher",
428     timeout => $_[0][2],
429     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
430 root 1.26 ))->next->[4] & &Event::Watcher::W
431 root 1.17 }
432    
433 root 1.33 #sub readable_ev {
434     # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
435     #}
436     #
437     #sub writable_ev {
438     # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
439     #}
440 root 1.30
441 root 1.21 # decide on event model at runtime
442 root 1.17 for my $rw (qw(readable writable)) {
443     no strict 'refs';
444    
445     *$rw = sub {
446 root 1.25 AnyEvent::detect;
447 root 1.17 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
448     require Coro::Event;
449     *$rw = \&{"$rw\_coro"};
450 root 1.32 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::CoroEV" or $AnyEvent::MODEL eq "AnyEvent::Impl::EV") {
451 root 1.30 require Coro::EV;
452 root 1.33 *$rw = \&{"Coro::EV::$rw\_ev"};
453 root 1.17 } else {
454     *$rw = \&{"$rw\_anyevent"};
455     }
456 root 1.25 goto &$rw
457 root 1.17 };
458     };
459    
460 root 1.1 sub WRITE {
461     my $len = defined $_[2] ? $_[2] : length $_[1];
462     my $ofs = $_[3];
463     my $res = 0;
464    
465 root 1.16 while () {
466     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
467 root 1.1 if (defined $r) {
468     $len -= $r;
469     $ofs += $r;
470     $res += $r;
471     last unless $len;
472     } elsif ($! != Errno::EAGAIN) {
473     last;
474     }
475 root 1.13 last unless &writable;
476 root 1.1 }
477    
478     return $res;
479     }
480    
481     sub READ {
482     my $len = $_[2];
483     my $ofs = $_[3];
484     my $res = 0;
485    
486 root 1.7 # first deplete the read buffer
487 root 1.13 if (length $_[0][3]) {
488     my $l = length $_[0][3];
489 root 1.7 if ($l <= $len) {
490 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
491 root 1.7 $len -= $l;
492 root 1.13 $ofs += $l;
493 root 1.7 $res += $l;
494     return $res unless $len;
495     } else {
496 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
497     substr ($_[0][3], 0, $len) = "";
498 root 1.7 return $len;
499     }
500     }
501    
502 root 1.2 while() {
503 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
504 root 1.1 if (defined $r) {
505     $len -= $r;
506     $ofs += $r;
507     $res += $r;
508     last unless $len && $r;
509     } elsif ($! != Errno::EAGAIN) {
510     last;
511     }
512 root 1.13 last if $_[0][8] || !&readable;
513 root 1.1 }
514    
515     return $res;
516     }
517    
518     sub READLINE {
519 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
520 root 1.1
521     while() {
522 root 1.24 if (defined $irs) {
523     my $pos = index $_[0][3], $irs;
524     if ($pos >= 0) {
525     $pos += length $irs;
526     my $res = substr $_[0][3], 0, $pos;
527     substr ($_[0][3], 0, $pos) = "";
528     return $res;
529     }
530 root 1.1 }
531 root 1.13
532     my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
533 root 1.1 if (defined $r) {
534 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef unless $r;
535 root 1.13 } elsif ($! != Errno::EAGAIN || !&readable) {
536 root 1.24 return length $_[0][3] ? delete $_[0][3] : undef;
537 root 1.1 }
538     }
539 root 1.6 }
540    
541 root 1.13 1;
542 root 1.1
543 root 1.13 =back
544 root 1.1
545     =head1 BUGS
546    
547     - Perl's IO-Handle model is THE bug.
548    
549     =head1 AUTHOR
550    
551 root 1.13 Marc Lehmann <schmorp@schmorp.de>
552     http://home.schmorp.de/
553 root 1.1
554     =cut
555