ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.19
Committed: Sat Dec 2 03:36:42 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.18: +7 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Handle - non-blocking io with a blocking interface.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
12 root 1.14 other coroutines can run while reads or writes block on the handle.
13    
14     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
15     data, allowing other coroutines to run while one coroutine waits for I/O.
16    
17     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
18 root 1.1
19     =over 4
20    
21     =cut
22    
23     package Coro::Handle;
24    
25 root 1.16 no warnings;
26     use strict;
27 root 1.13
28 root 1.16 use Carp ();
29 root 1.1 use Errno ();
30 root 1.5 use base 'Exporter';
31 root 1.1
32 root 1.16 our $VERSION = 2.5;
33     our @EXPORT = qw(unblock);
34 root 1.1
35 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
36 root 1.1
37     Create a new non-blocking io-handle using the given
38 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
39     other supported argument is "timeout", which sets a timeout for each
40     operation.
41 root 1.1
42     =cut
43    
44     sub new_from_fh {
45     my $class = shift;
46     my $fh = shift or return;
47     my $self = do { local *Coro::Handle };
48    
49 root 1.6 my ($package, $filename, $line) = caller;
50     $filename =~ s/^.*[\/\\]//;
51    
52 root 1.16 tie $self, 'Coro::Handle::FH', fh => $fh, desc => "$filename:$line", @_;
53 root 1.1
54 root 1.19 my $old_fh = select bless \$self, ref $class ? ref $class : $class;
55     $| = 1;
56     select $old_fh
57 root 1.1 }
58    
59 root 1.5 =item $fh = unblock $fh
60    
61     This is a convinience function that just calls C<new_from_fh> on the given
62     filehandle. Use it to replace a normal perl filehandle by a non-blocking
63     equivalent.
64    
65     =cut
66    
67     sub unblock($) {
68 root 1.19 new_from_fh Coro::Handle $_[0]
69 root 1.5 }
70    
71 root 1.1 =item $fh->writable, $fh->readable
72    
73     Wait until the filehandle is readable or writable (and return true) or
74     until an error condition happens (and return false).
75    
76     =cut
77    
78 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
79     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
80 root 1.1
81 root 1.16 =item $fh->readline ([$terminator])
82 root 1.4
83     Like the builtin of the same name, but allows you to specify the input
84 root 1.7 record separator in a coroutine-safe manner (i.e. not using a global
85 root 1.4 variable).
86    
87     =cut
88    
89 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
90 root 1.4
91 root 1.16 =item $fh->autoflush ([...])
92 root 1.4
93     Always returns true, arguments are being ignored (exists for compatibility
94 root 1.8 only). Might change in the future.
95 root 1.4
96     =cut
97    
98     sub autoflush { !0 }
99    
100 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
101 root 1.8
102 root 1.13 Work like their function equivalents (except read, which works like
103 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
104 root 1.13 work but it's not efficient).
105 root 1.8
106     =cut
107    
108 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
109     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
110     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
111     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
112     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
113     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
114     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
115     sub blocking { !0 } # this handler always blocks the caller
116    
117     sub partial {
118     my $obj = tied ${$_[0]};
119    
120     my $retval = $obj->[8];
121     $obj->[8] = $_[1] if @_ > 1;
122     $retval
123     }
124 root 1.8
125     =item $fh->timeout([...])
126    
127 root 1.13 The optional argument sets the new timeout (in seconds) for this
128 root 1.8 handle. Returns the current (new) value.
129    
130     C<0> is a valid timeout, use C<undef> to disable the timeout.
131    
132     =cut
133    
134     sub timeout {
135 root 1.18 my $self = tied ${$_[0]};
136 root 1.13 if (@_ > 1) {
137     $self->[2] = $_[1];
138 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
139     $self->[6]->timeout ($_[1]) if $self->[6];
140 root 1.8 }
141 root 1.13 $self->[2];
142     }
143    
144     =item $fh->fh
145    
146     Returns the "real" (non-blocking) filehandle. Use this if you want to
147     do operations on the file handle you cannot do using the Coro::Handle
148     interface.
149    
150     =item $fh->rbuf
151    
152     Returns the current contents of the read buffer (this is an lvalue, so you
153     can change the read buffer if you like).
154    
155     You can use this function to implement your own optimized reader when neither
156     readline nor sysread are viable candidates, like this:
157    
158     # first get the _real_ non-blocking filehandle
159     # and fetch a reference to the read buffer
160     my $nb_fh = $fh->fh;
161     my $buf = \$fh->rbuf;
162    
163 root 1.18 while () {
164 root 1.13 # now use buffer contents, modifying
165     # if necessary to reflect the removed data
166    
167     last if $$buf ne ""; # we have leftover data
168    
169     # read another buffer full of data
170     $fh->readable or die "end of file";
171     sysread $nb_fh, $$buf, 8192;
172     }
173    
174     =cut
175    
176     sub fh {
177     (tied ${$_[0]})->[0];
178     }
179    
180     sub rbuf : lvalue {
181     (tied ${$_[0]})->[3];
182     }
183    
184     sub DESTROY {
185     # nop
186     }
187    
188 root 1.16 our $AUTOLOAD;
189    
190 root 1.13 sub AUTOLOAD {
191     my $self = tied ${$_[0]};
192    
193     (my $func = $AUTOLOAD) =~ s/^(.*):://;
194    
195     my $forward = UNIVERSAL::can $self->[7], $func;
196    
197     $forward or
198     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
199    
200     goto &$forward;
201 root 1.8 }
202    
203 root 1.1 package Coro::Handle::FH;
204    
205 root 1.16 no warnings;
206     use strict;
207 root 1.13
208 root 1.1 use Fcntl ();
209     use Errno ();
210 root 1.10 use Carp 'croak';
211 root 1.1
212 root 1.13 use AnyEvent;
213 root 1.1
214 root 1.13 # formerly a hash, but we are speed-critical, so try
215     # to be faster even if it hurts.
216     #
217     # 0 FH
218     # 1 desc
219     # 2 timeout
220     # 3 rb
221     # 4 wb # unused
222 root 1.17 # 5 read watcher, if Coro::Event used
223     # 6 write watcher, if Coro::Event used
224 root 1.13 # 7 forward class
225     # 8 blocking
226 root 1.1
227     sub TIEHANDLE {
228 root 1.13 my ($class, %arg) = @_;
229 root 1.1
230 root 1.13 my $self = bless [], $class;
231     $self->[0] = $arg{fh};
232     $self->[1] = $arg{desc};
233     $self->[2] = $arg{timeout};
234     $self->[3] = "";
235     $self->[4] = "";
236     $self->[7] = $arg{forward_class};
237     $self->[8] = $arg{partial};
238 root 1.1
239 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
240 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
241 root 1.6
242 root 1.13 $self
243     }
244    
245     sub cleanup {
246     $_[0][3] = "";
247     $_[0][4] = "";
248 root 1.1 }
249    
250     sub OPEN {
251 root 1.13 &cleanup;
252 root 1.1 my $self = shift;
253 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
254     : open $self->[0], $_[0], $_[1], $_[2];
255 root 1.18
256 root 1.1 if ($r) {
257 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
258 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
259 root 1.1 }
260 root 1.18
261     $r
262 root 1.1 }
263    
264 root 1.13 sub PRINT {
265 root 1.18 WRITE (shift, join "", @_)
266 root 1.13 }
267    
268     sub PRINTF {
269 root 1.18 WRITE (shift, sprintf shift,@_)
270 root 1.13 }
271    
272     sub GETC {
273     my $buf;
274 root 1.16 READ ($_[0], $buf, 1);
275 root 1.18 $buf
276 root 1.13 }
277    
278     sub BINMODE {
279     binmode $_[0][0];
280     }
281    
282     sub TELL {
283 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
284 root 1.13 }
285    
286     sub SEEK {
287 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
288 root 1.13 }
289    
290     sub EOF {
291 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
292 root 1.13 }
293    
294 root 1.1 sub CLOSE {
295 root 1.13 &cleanup;
296 root 1.18 close $_[0][0]
297 root 1.13 }
298    
299     sub DESTROY {
300     &cleanup;
301 root 1.8 }
302    
303     sub FILENO {
304 root 1.18 fileno $_[0][0]
305 root 1.1 }
306    
307 root 1.13 # seems to be called for stringification (how weird), at least
308     # when DumpValue::dumpValue is used to print this.
309     sub FETCH {
310 root 1.18 "$_[0]<$_[0][1]>"
311 root 1.1 }
312    
313 root 1.17 sub readable_anyevent {
314 root 1.13 my $current = $Coro::current;
315     my $io = 1;
316    
317     my $w = AnyEvent->io (
318 root 1.16 desc => "$_[0][1] read watcher",
319     fh => $_[0][0],
320     poll => 'r',
321     cb => sub {
322 root 1.13 $current->ready;
323 root 1.15 undef $current;
324 root 1.13 },
325     );
326    
327 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
328     desc => "fh $_[0][1] read timeout",
329 root 1.13 after => $_[0][2],
330     cb => sub {
331     $io = 0;
332     $current->ready;
333 root 1.15 undef $current;
334 root 1.13 },
335     );
336    
337     &Coro::schedule;
338 root 1.15 &Coro::schedule while $current;
339    
340 root 1.13 $io
341     }
342    
343 root 1.17 sub writable_anyevent {
344 root 1.13 my $current = $Coro::current;
345     my $io = 1;
346    
347     my $w = AnyEvent->io (
348 root 1.16 desc => "fh $_[0][1] write watcher",
349     fh => $_[0][0],
350     poll => 'w',
351     cb => sub {
352 root 1.13 $current->ready;
353 root 1.15 undef $current;
354 root 1.13 },
355     );
356    
357 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
358     desc => "fh $_[0][1] write timeout",
359 root 1.13 after => $_[0][2],
360     cb => sub {
361     $io = 0;
362     $current->ready;
363 root 1.15 undef $current;
364 root 1.13 },
365     );
366    
367 root 1.15 &Coro::schedule while $current;
368    
369 root 1.13 $io
370 root 1.1 }
371    
372 root 1.17 sub readable_coro {
373 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
374 root 1.17 fd => $_[0][0],
375     desc => "fh $_[0][1] read watcher",
376     timeout => $_[0][2],
377     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
378 root 1.18 ))->next->[5] & &Event::Watcher::R
379 root 1.17 }
380    
381     sub writable_coro {
382 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
383 root 1.17 fd => $_[0][0],
384     desc => "fh $_[0][1] write watcher",
385     timeout => $_[0][2],
386     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
387 root 1.18 ))->next->[5] & &Event::Watcher::W
388 root 1.17 }
389    
390     for my $rw (qw(readable writable)) {
391     no strict 'refs';
392    
393     *$rw = sub {
394     my $res = &{"$rw\_anyevent"};
395     if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
396     require Coro::Event;
397     *$rw = \&{"$rw\_coro"};
398     } else {
399     *$rw = \&{"$rw\_anyevent"};
400     }
401     $res
402     };
403     };
404    
405 root 1.1 sub WRITE {
406     my $len = defined $_[2] ? $_[2] : length $_[1];
407     my $ofs = $_[3];
408     my $res = 0;
409    
410 root 1.16 while () {
411     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
412 root 1.1 if (defined $r) {
413     $len -= $r;
414     $ofs += $r;
415     $res += $r;
416     last unless $len;
417     } elsif ($! != Errno::EAGAIN) {
418     last;
419     }
420 root 1.13 last unless &writable;
421 root 1.1 }
422    
423     return $res;
424     }
425    
426     sub READ {
427     my $len = $_[2];
428     my $ofs = $_[3];
429     my $res = 0;
430    
431 root 1.7 # first deplete the read buffer
432 root 1.13 if (length $_[0][3]) {
433     my $l = length $_[0][3];
434 root 1.7 if ($l <= $len) {
435 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
436 root 1.7 $len -= $l;
437 root 1.13 $ofs += $l;
438 root 1.7 $res += $l;
439     return $res unless $len;
440     } else {
441 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
442     substr ($_[0][3], 0, $len) = "";
443 root 1.7 return $len;
444     }
445     }
446    
447 root 1.2 while() {
448 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
449 root 1.1 if (defined $r) {
450     $len -= $r;
451     $ofs += $r;
452     $res += $r;
453     last unless $len && $r;
454     } elsif ($! != Errno::EAGAIN) {
455     last;
456     }
457 root 1.13 last if $_[0][8] || !&readable;
458 root 1.1 }
459    
460     return $res;
461     }
462    
463     sub READLINE {
464 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
465 root 1.1
466     while() {
467 root 1.13 my $pos = index $_[0][3], $irs;
468 root 1.1 if ($pos >= 0) {
469 root 1.7 $pos += length $irs;
470 root 1.13 my $res = substr $_[0][3], 0, $pos;
471     substr ($_[0][3], 0, $pos) = "";
472 root 1.1 return $res;
473     }
474 root 1.13
475     my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
476 root 1.1 if (defined $r) {
477     return undef unless $r;
478 root 1.13 } elsif ($! != Errno::EAGAIN || !&readable) {
479 root 1.1 return undef;
480     }
481     }
482 root 1.6 }
483    
484 root 1.13 1;
485 root 1.1
486 root 1.13 =back
487 root 1.1
488     =head1 BUGS
489    
490     - Perl's IO-Handle model is THE bug.
491    
492     =head1 AUTHOR
493    
494 root 1.13 Marc Lehmann <schmorp@schmorp.de>
495     http://home.schmorp.de/
496 root 1.1
497     =cut
498