ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.21
Committed: Sat Dec 2 03:40:38 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.20: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Handle - non-blocking io with a blocking interface.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.13 This module implements IO-handles in a coroutine-compatible way, that is,
12 root 1.14 other coroutines can run while reads or writes block on the handle.
13    
14     It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
15     data, allowing other coroutines to run while one coroutine waits for I/O.
16    
17     Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
18 root 1.1
19     =over 4
20    
21     =cut
22    
23     package Coro::Handle;
24    
25 root 1.16 no warnings;
26     use strict;
27 root 1.13
28 root 1.16 use Carp ();
29 root 1.1 use Errno ();
30 root 1.5 use base 'Exporter';
31 root 1.1
32 root 1.16 our $VERSION = 2.5;
33     our @EXPORT = qw(unblock);
34 root 1.1
35 root 1.6 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
36 root 1.1
37     Create a new non-blocking io-handle using the given
38 root 1.19 perl-filehandle. Returns C<undef> if no filehandle is given. The only
39     other supported argument is "timeout", which sets a timeout for each
40     operation.
41 root 1.1
42     =cut
43    
44     sub new_from_fh {
45     my $class = shift;
46     my $fh = shift or return;
47     my $self = do { local *Coro::Handle };
48    
49 root 1.20 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
50 root 1.1
51 root 1.19 my $old_fh = select bless \$self, ref $class ? ref $class : $class;
52     $| = 1;
53     select $old_fh
54 root 1.1 }
55    
56 root 1.5 =item $fh = unblock $fh
57    
58     This is a convinience function that just calls C<new_from_fh> on the given
59     filehandle. Use it to replace a normal perl filehandle by a non-blocking
60     equivalent.
61    
62     =cut
63    
64     sub unblock($) {
65 root 1.19 new_from_fh Coro::Handle $_[0]
66 root 1.5 }
67    
68 root 1.1 =item $fh->writable, $fh->readable
69    
70     Wait until the filehandle is readable or writable (and return true) or
71     until an error condition happens (and return false).
72    
73     =cut
74    
75 root 1.16 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
76     sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
77 root 1.1
78 root 1.16 =item $fh->readline ([$terminator])
79 root 1.4
80     Like the builtin of the same name, but allows you to specify the input
81 root 1.7 record separator in a coroutine-safe manner (i.e. not using a global
82 root 1.4 variable).
83    
84     =cut
85    
86 root 1.18 sub readline { tied(${+shift})->READLINE (@_) }
87 root 1.4
88 root 1.16 =item $fh->autoflush ([...])
89 root 1.4
90     Always returns true, arguments are being ignored (exists for compatibility
91 root 1.8 only). Might change in the future.
92 root 1.4
93     =cut
94    
95     sub autoflush { !0 }
96    
97 root 1.13 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
98 root 1.8
99 root 1.13 Work like their function equivalents (except read, which works like
100 root 1.18 sysread. You should not use the read function with Coro::Handle's, it will
101 root 1.13 work but it's not efficient).
102 root 1.8
103     =cut
104    
105 root 1.13 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
106     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
107     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
108     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
109     sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
110     sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
111     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
112     sub blocking { !0 } # this handler always blocks the caller
113    
114     sub partial {
115     my $obj = tied ${$_[0]};
116    
117     my $retval = $obj->[8];
118     $obj->[8] = $_[1] if @_ > 1;
119     $retval
120     }
121 root 1.8
122 root 1.21 =item $fh->timeout ([...])
123 root 1.8
124 root 1.13 The optional argument sets the new timeout (in seconds) for this
125 root 1.8 handle. Returns the current (new) value.
126    
127     C<0> is a valid timeout, use C<undef> to disable the timeout.
128    
129     =cut
130    
131     sub timeout {
132 root 1.18 my $self = tied ${$_[0]};
133 root 1.13 if (@_ > 1) {
134     $self->[2] = $_[1];
135 root 1.18 $self->[5]->timeout ($_[1]) if $self->[5];
136     $self->[6]->timeout ($_[1]) if $self->[6];
137 root 1.8 }
138 root 1.21 $self->[2]
139 root 1.13 }
140    
141     =item $fh->fh
142    
143     Returns the "real" (non-blocking) filehandle. Use this if you want to
144     do operations on the file handle you cannot do using the Coro::Handle
145     interface.
146    
147     =item $fh->rbuf
148    
149     Returns the current contents of the read buffer (this is an lvalue, so you
150     can change the read buffer if you like).
151    
152     You can use this function to implement your own optimized reader when neither
153     readline nor sysread are viable candidates, like this:
154    
155     # first get the _real_ non-blocking filehandle
156     # and fetch a reference to the read buffer
157     my $nb_fh = $fh->fh;
158     my $buf = \$fh->rbuf;
159    
160 root 1.18 while () {
161 root 1.13 # now use buffer contents, modifying
162     # if necessary to reflect the removed data
163    
164     last if $$buf ne ""; # we have leftover data
165    
166     # read another buffer full of data
167     $fh->readable or die "end of file";
168     sysread $nb_fh, $$buf, 8192;
169     }
170    
171     =cut
172    
173     sub fh {
174     (tied ${$_[0]})->[0];
175     }
176    
177     sub rbuf : lvalue {
178     (tied ${$_[0]})->[3];
179     }
180    
181     sub DESTROY {
182     # nop
183     }
184    
185 root 1.16 our $AUTOLOAD;
186    
187 root 1.13 sub AUTOLOAD {
188     my $self = tied ${$_[0]};
189    
190     (my $func = $AUTOLOAD) =~ s/^(.*):://;
191    
192     my $forward = UNIVERSAL::can $self->[7], $func;
193    
194     $forward or
195     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
196    
197     goto &$forward;
198 root 1.8 }
199    
200 root 1.1 package Coro::Handle::FH;
201    
202 root 1.16 no warnings;
203     use strict;
204 root 1.13
205 root 1.1 use Fcntl ();
206     use Errno ();
207 root 1.10 use Carp 'croak';
208 root 1.1
209 root 1.13 use AnyEvent;
210 root 1.1
211 root 1.13 # formerly a hash, but we are speed-critical, so try
212     # to be faster even if it hurts.
213     #
214     # 0 FH
215     # 1 desc
216     # 2 timeout
217     # 3 rb
218     # 4 wb # unused
219 root 1.17 # 5 read watcher, if Coro::Event used
220     # 6 write watcher, if Coro::Event used
221 root 1.13 # 7 forward class
222     # 8 blocking
223 root 1.1
224     sub TIEHANDLE {
225 root 1.13 my ($class, %arg) = @_;
226 root 1.1
227 root 1.13 my $self = bless [], $class;
228     $self->[0] = $arg{fh};
229     $self->[1] = $arg{desc};
230     $self->[2] = $arg{timeout};
231     $self->[3] = "";
232     $self->[4] = "";
233     $self->[7] = $arg{forward_class};
234     $self->[8] = $arg{partial};
235 root 1.1
236 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
237 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
238 root 1.6
239 root 1.13 $self
240     }
241    
242     sub cleanup {
243     $_[0][3] = "";
244     $_[0][4] = "";
245 root 1.1 }
246    
247     sub OPEN {
248 root 1.13 &cleanup;
249 root 1.1 my $self = shift;
250 root 1.13 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
251     : open $self->[0], $_[0], $_[1], $_[2];
252 root 1.18
253 root 1.1 if ($r) {
254 root 1.13 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
255 root 1.10 or croak "fcntl(O_NONBLOCK): $!";
256 root 1.1 }
257 root 1.18
258     $r
259 root 1.1 }
260    
261 root 1.13 sub PRINT {
262 root 1.18 WRITE (shift, join "", @_)
263 root 1.13 }
264    
265     sub PRINTF {
266 root 1.18 WRITE (shift, sprintf shift,@_)
267 root 1.13 }
268    
269     sub GETC {
270     my $buf;
271 root 1.16 READ ($_[0], $buf, 1);
272 root 1.18 $buf
273 root 1.13 }
274    
275     sub BINMODE {
276     binmode $_[0][0];
277     }
278    
279     sub TELL {
280 root 1.16 Carp::croak "Coro::Handle's don't support tell()";
281 root 1.13 }
282    
283     sub SEEK {
284 root 1.16 Carp::croak "Coro::Handle's don't support seek()";
285 root 1.13 }
286    
287     sub EOF {
288 root 1.16 Carp::croak "Coro::Handle's don't support eof()";
289 root 1.13 }
290    
291 root 1.1 sub CLOSE {
292 root 1.13 &cleanup;
293 root 1.18 close $_[0][0]
294 root 1.13 }
295    
296     sub DESTROY {
297     &cleanup;
298 root 1.8 }
299    
300     sub FILENO {
301 root 1.18 fileno $_[0][0]
302 root 1.1 }
303    
304 root 1.13 # seems to be called for stringification (how weird), at least
305     # when DumpValue::dumpValue is used to print this.
306     sub FETCH {
307 root 1.18 "$_[0]<$_[0][1]>"
308 root 1.1 }
309    
310 root 1.17 sub readable_anyevent {
311 root 1.13 my $current = $Coro::current;
312     my $io = 1;
313    
314     my $w = AnyEvent->io (
315 root 1.16 desc => "$_[0][1] read watcher",
316     fh => $_[0][0],
317     poll => 'r',
318     cb => sub {
319 root 1.13 $current->ready;
320 root 1.15 undef $current;
321 root 1.13 },
322     );
323    
324 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
325     desc => "fh $_[0][1] read timeout",
326 root 1.13 after => $_[0][2],
327     cb => sub {
328     $io = 0;
329     $current->ready;
330 root 1.15 undef $current;
331 root 1.13 },
332     );
333    
334     &Coro::schedule;
335 root 1.15 &Coro::schedule while $current;
336    
337 root 1.13 $io
338     }
339    
340 root 1.17 sub writable_anyevent {
341 root 1.13 my $current = $Coro::current;
342     my $io = 1;
343    
344     my $w = AnyEvent->io (
345 root 1.16 desc => "fh $_[0][1] write watcher",
346     fh => $_[0][0],
347     poll => 'w',
348     cb => sub {
349 root 1.13 $current->ready;
350 root 1.15 undef $current;
351 root 1.13 },
352     );
353    
354 root 1.16 my $t = (defined $_[0][2]) && AnyEvent->timer (
355     desc => "fh $_[0][1] write timeout",
356 root 1.13 after => $_[0][2],
357     cb => sub {
358     $io = 0;
359     $current->ready;
360 root 1.15 undef $current;
361 root 1.13 },
362     );
363    
364 root 1.15 &Coro::schedule while $current;
365    
366 root 1.13 $io
367 root 1.1 }
368    
369 root 1.17 sub readable_coro {
370 root 1.18 ($_[0][5] ||= "Coro::Event"->io (
371 root 1.17 fd => $_[0][0],
372     desc => "fh $_[0][1] read watcher",
373     timeout => $_[0][2],
374     poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
375 root 1.18 ))->next->[5] & &Event::Watcher::R
376 root 1.17 }
377    
378     sub writable_coro {
379 root 1.18 ($_[0][6] ||= "Coro::Event"->io (
380 root 1.17 fd => $_[0][0],
381     desc => "fh $_[0][1] write watcher",
382     timeout => $_[0][2],
383     poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
384 root 1.18 ))->next->[5] & &Event::Watcher::W
385 root 1.17 }
386    
387 root 1.21 # decide on event model at runtime
388 root 1.17 for my $rw (qw(readable writable)) {
389     no strict 'refs';
390    
391     *$rw = sub {
392     my $res = &{"$rw\_anyevent"};
393     if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
394     require Coro::Event;
395     *$rw = \&{"$rw\_coro"};
396     } else {
397     *$rw = \&{"$rw\_anyevent"};
398     }
399     $res
400     };
401     };
402    
403 root 1.1 sub WRITE {
404     my $len = defined $_[2] ? $_[2] : length $_[1];
405     my $ofs = $_[3];
406     my $res = 0;
407    
408 root 1.16 while () {
409     my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
410 root 1.1 if (defined $r) {
411     $len -= $r;
412     $ofs += $r;
413     $res += $r;
414     last unless $len;
415     } elsif ($! != Errno::EAGAIN) {
416     last;
417     }
418 root 1.13 last unless &writable;
419 root 1.1 }
420    
421     return $res;
422     }
423    
424     sub READ {
425     my $len = $_[2];
426     my $ofs = $_[3];
427     my $res = 0;
428    
429 root 1.7 # first deplete the read buffer
430 root 1.13 if (length $_[0][3]) {
431     my $l = length $_[0][3];
432 root 1.7 if ($l <= $len) {
433 root 1.18 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
434 root 1.7 $len -= $l;
435 root 1.13 $ofs += $l;
436 root 1.7 $res += $l;
437     return $res unless $len;
438     } else {
439 root 1.18 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
440     substr ($_[0][3], 0, $len) = "";
441 root 1.7 return $len;
442     }
443     }
444    
445 root 1.2 while() {
446 root 1.13 my $r = sysread $_[0][0], $_[1], $len, $ofs;
447 root 1.1 if (defined $r) {
448     $len -= $r;
449     $ofs += $r;
450     $res += $r;
451     last unless $len && $r;
452     } elsif ($! != Errno::EAGAIN) {
453     last;
454     }
455 root 1.13 last if $_[0][8] || !&readable;
456 root 1.1 }
457    
458     return $res;
459     }
460    
461     sub READLINE {
462 root 1.13 my $irs = @_ > 1 ? $_[1] : $/;
463 root 1.1
464     while() {
465 root 1.13 my $pos = index $_[0][3], $irs;
466 root 1.1 if ($pos >= 0) {
467 root 1.7 $pos += length $irs;
468 root 1.13 my $res = substr $_[0][3], 0, $pos;
469     substr ($_[0][3], 0, $pos) = "";
470 root 1.1 return $res;
471     }
472 root 1.13
473     my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
474 root 1.1 if (defined $r) {
475     return undef unless $r;
476 root 1.13 } elsif ($! != Errno::EAGAIN || !&readable) {
477 root 1.1 return undef;
478     }
479     }
480 root 1.6 }
481    
482 root 1.13 1;
483 root 1.1
484 root 1.13 =back
485 root 1.1
486     =head1 BUGS
487    
488     - Perl's IO-Handle model is THE bug.
489    
490     =head1 AUTHOR
491    
492 root 1.13 Marc Lehmann <schmorp@schmorp.de>
493     http://home.schmorp.de/
494 root 1.1
495     =cut
496