ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Event/Handle.pm
Revision: 1.42
Committed: Sun Nov 5 02:01:24 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.41: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Handle - non-blocking io with a blocking interface.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Handle;
8    
9     =head1 DESCRIPTION
10    
11 root 1.41 This module implements IO-handles in a coroutine-compatible way, that is,
12 root 1.1 other coroutines can run while reads or writes block on the handle. It
13     does NOT inherit from IO::Handle but uses tied objects.
14    
15     =over 4
16    
17     =cut
18    
19     package Coro::Handle;
20    
21 pcg 1.23 BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") }
22 root 1.9
23 root 1.1 use Errno ();
24     use base 'Exporter';
25    
26 root 1.39 $VERSION = 1.9;
27 root 1.1
28     @EXPORT = qw(unblock);
29    
30     =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
31    
32     Create a new non-blocking io-handle using the given
33     perl-filehandle. Returns undef if no fhandle is given. The only other
34     supported argument is "timeout", which sets a timeout for each operation.
35    
36     =cut
37    
38     sub new_from_fh {
39     my $class = shift;
40     my $fh = shift or return;
41     my $self = do { local *Coro::Handle };
42    
43     my ($package, $filename, $line) = caller;
44     $filename =~ s/^.*[\/\\]//;
45    
46     tie $self, Coro::Handle::FH, fh => $fh, desc => "$filename:$line", @_;
47    
48 root 1.16 my $_fh = select bless \$self, ref $class ? ref $class : $class; $| = 1; select $_fh;
49 root 1.1 }
50    
51     =item $fh = unblock $fh
52    
53     This is a convinience function that just calls C<new_from_fh> on the given
54     filehandle. Use it to replace a normal perl filehandle by a non-blocking
55     equivalent.
56    
57     =cut
58    
59     sub unblock($) {
60     new_from_fh Coro::Handle $_[0];
61     }
62    
63     =item $fh->writable, $fh->readable
64    
65     Wait until the filehandle is readable or writable (and return true) or
66     until an error condition happens (and return false).
67    
68     =cut
69    
70 root 1.5 sub readable { Coro::Handle::FH::readable(tied ${$_[0]}) }
71     sub writable { Coro::Handle::FH::writable(tied ${$_[0]}) }
72 root 1.1
73     =item $fh->readline([$terminator])
74    
75     Like the builtin of the same name, but allows you to specify the input
76     record separator in a coroutine-safe manner (i.e. not using a global
77     variable).
78    
79     =cut
80    
81     sub readline { tied(${+shift})->READLINE(@_) }
82    
83     =item $fh->autoflush([...])
84    
85     Always returns true, arguments are being ignored (exists for compatibility
86     only). Might change in the future.
87    
88     =cut
89    
90     sub autoflush { !0 }
91    
92 root 1.9 =item $fh->fileno, $fh->close,
93     $fh->read, $fh->sysread, $fh->syswrite,
94     $fh->print, $fh->printf
95    
96     Work like their function equivalents (except read, which works like
97     sysread. You should not use the read function with Coro::Handles, it will
98     work but it's not efficient).
99 root 1.1
100     =cut
101    
102 root 1.9 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
103     sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
104     sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
105     sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
106     sub printf { Coro::Handle::FH::PRINTF(tied ${+shift}, @_) }
107     sub fileno { Coro::Handle::FH::FILENO(tied ${$_[0]}) }
108     sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
109 root 1.1
110     =item $fh->timeout([...])
111    
112     The optional agrument sets the new timeout (in seconds) for this
113     handle. Returns the current (new) value.
114    
115     C<0> is a valid timeout, use C<undef> to disable the timeout.
116    
117     =cut
118    
119     sub timeout {
120 root 1.4 my $self = tied(${$_[0]});
121 root 1.6 if (@_ > 1) {
122     $self->[2] = $_[1];
123     $self->[5]->timeout($_[1]) if $self->[5];
124     $self->[6]->timeout($_[1]) if $self->[6];
125 root 1.1 }
126 root 1.6 $self->[2];
127 root 1.4 }
128    
129     =item $fh->fh
130    
131     Returns the "real" (non-blocking) filehandle. Use this if you want to
132     do operations on the file handle you cannot do using the Coro::Handle
133     interface.
134    
135 root 1.11 =item $fh->rbuf
136 root 1.10
137 root 1.13 Returns the current contents of the read buffer (this is an lvalue, so you
138 root 1.11 can change the read buffer if you like).
139    
140 root 1.13 You can use this function to implement your own optimized reader when neither
141 root 1.11 readline nor sysread are viable candidates, like this:
142    
143     # first get the _real_ non-blocking filehandle
144 root 1.15 # and fetch a reference to the read buffer
145 root 1.11 my $nb_fh = $fh->fh;
146 root 1.15 my $buf = \$fh->rbuf;
147 root 1.11
148     for(;;) {
149     # now use buffer contents, modifying
150     # if necessary to reflect the removed data
151    
152 root 1.15 last if $$buf ne ""; # we have leftover data
153 root 1.11
154     # read another buffer full of data
155     $fh->readable or die "end of file";
156 root 1.15 sysread $nb_fh, $$buf, 8192;
157 root 1.11 }
158 root 1.10
159 root 1.4 =cut
160    
161     sub fh {
162 root 1.10 (tied ${$_[0]})->[0];
163     }
164    
165 root 1.11 sub rbuf : lvalue {
166 root 1.41 (tied ${$_[0]})->[3];
167     }
168    
169 root 1.42 sub DESTROY {
170     # nop
171     }
172    
173 root 1.41 sub AUTOLOAD {
174     my $self = tied ${$_[0]};
175    
176     (my $func = $AUTOLOAD) =~ s/^(.*):://;
177    
178     my $forward = UNIVERSAL::can $self->[7], $func;
179    
180     $forward or
181     die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
182    
183     goto &$forward;
184 root 1.1 }
185    
186     package Coro::Handle::FH;
187    
188 pcg 1.23 BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") }
189 root 1.9
190 root 1.1 use Fcntl ();
191     use Errno ();
192     use Carp 'croak';
193    
194     use Coro::Event;
195     use Event::Watcher qw(R W E);
196    
197 root 1.6 # formerly a hash, but we are speed-critical, so try
198     # to be faster even if it hurts.
199     #
200     # 0 FH
201     # 1 desc
202     # 2 timeout
203     # 3 rb
204 root 1.10 # 4 wb # unused
205 root 1.6 # 5 rw
206     # 6 ww
207 root 1.41 # 7 forward class
208 root 1.6
209 root 1.1 sub TIEHANDLE {
210 root 1.41 my ($class, %arg) = @_;
211 root 1.1
212 root 1.6 my $self = bless [], $class;
213 root 1.41 $self->[0] = $arg{fh};
214     $self->[1] = $arg{desc};
215     $self->[2] = $arg{timeout};
216 root 1.6 $self->[3] = "";
217     $self->[4] = "";
218 root 1.41 $self->[7] = $arg{forward_class};
219 root 1.1
220 root 1.6 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
221 root 1.1 or croak "fcntl(O_NONBLOCK): $!";
222    
223 root 1.41 $self
224 root 1.1 }
225    
226 root 1.6 sub cleanup {
227     $_[0][3] = "";
228 root 1.7 ($_[0][5])->cancel if defined $_[0][5]; $_[0][5] = undef;
229 root 1.6
230     $_[0][4] = "";
231 root 1.7 ($_[0][6])->cancel if defined $_[0][6]; $_[0][6] = undef;
232 root 1.6 }
233    
234 root 1.1 sub OPEN {
235 root 1.6 &cleanup;
236 root 1.1 my $self = shift;
237 root 1.6 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
238     : open $self->[0], $_[0], $_[1], $_[2];
239 root 1.1 if ($r) {
240 root 1.6 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
241 root 1.1 or croak "fcntl(O_NONBLOCK): $!";
242     }
243     $r;
244 root 1.9 }
245    
246     sub PRINT {
247 root 1.12 WRITE(shift, join "", @_);
248 root 1.9 }
249    
250     sub PRINTF {
251     WRITE(shift, sprintf(shift,@_));
252     }
253    
254     sub GETC {
255     my $buf;
256     READ($_[0], $buf, 1);
257     $buf;
258     }
259    
260     sub BINMODE {
261     binmode $_[0][0];
262     }
263    
264     sub TELL {
265     use Carp (); Carp::croak("Coro::Handle's don't support tell()");
266     }
267    
268     sub SEEK {
269     use Carp (); Carp::croak("Coro::Handle's don't support seek()");
270     }
271    
272     sub EOF {
273     use Carp (); Carp::croak("Coro::Handle's don't support eof()");
274 root 1.1 }
275    
276     sub CLOSE {
277 root 1.6 &cleanup;
278     close $_[0][0];
279 root 1.1 }
280    
281 root 1.6 sub DESTROY {
282     &cleanup;
283 root 1.1 }
284    
285 root 1.6 sub FILENO {
286     fileno $_[0][0];
287 root 1.1 }
288    
289 root 1.8 # seems to be called for stringification (how weird), at least
290     # when DumpValue::dumpValue is used to print this.
291     sub FETCH {
292     "$_[0]<$_[0][1]>";
293     }
294    
295 root 1.1 sub readable {
296 root 1.6 ($_[0][5] ||= Coro::Event->io(
297     fd => $_[0][0],
298     desc => "$_[0][1] R",
299     timeout => $_[0][2],
300 root 1.1 poll => R+E,
301 root 1.3 ))->next->{Coro::Event}[5] & R;
302 root 1.1 }
303    
304 root 1.6 sub writable {
305     ($_[0][6] ||= Coro::Event->io(
306     fd => $_[0][0],
307     desc => "$_[0][1] W",
308     timeout => $_[0][2],
309     poll => W+E,
310     ))->next->{Coro::Event}[5] & W;
311     }
312    
313 root 1.1 sub WRITE {
314     my $len = defined $_[2] ? $_[2] : length $_[1];
315     my $ofs = $_[3];
316     my $res = 0;
317    
318     while() {
319 root 1.6 my $r = syswrite $_[0][0], $_[1], $len, $ofs;
320 root 1.1 if (defined $r) {
321     $len -= $r;
322     $ofs += $r;
323     $res += $r;
324     last unless $len;
325     } elsif ($! != Errno::EAGAIN) {
326     last;
327     }
328 root 1.6 last unless &writable;
329 root 1.1 }
330    
331     return $res;
332     }
333    
334     sub READ {
335     my $len = $_[2];
336     my $ofs = $_[3];
337     my $res = 0;
338    
339     # first deplete the read buffer
340 root 1.14 if (length $_[0][3]) {
341 root 1.6 my $l = length $_[0][3];
342 root 1.1 if ($l <= $len) {
343 root 1.14 substr($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
344 root 1.1 $len -= $l;
345 root 1.19 $ofs += $l;
346 root 1.1 $res += $l;
347     return $res unless $len;
348     } else {
349 root 1.6 substr($_[1], $ofs) = substr($_[0][3], 0, $len);
350     substr($_[0][3], 0, $len) = "";
351 root 1.1 return $len;
352     }
353     }
354    
355     while() {
356 root 1.6 my $r = sysread $_[0][0], $_[1], $len, $ofs;
357 root 1.1 if (defined $r) {
358     $len -= $r;
359     $ofs += $r;
360     $res += $r;
361     last unless $len && $r;
362     } elsif ($! != Errno::EAGAIN) {
363     last;
364     }
365 root 1.6 last unless &readable;
366 root 1.1 }
367    
368     return $res;
369     }
370    
371     sub READLINE {
372 root 1.6 my $irs = @_ > 1 ? $_[1] : $/;
373 root 1.1
374     while() {
375 root 1.6 my $pos = index $_[0][3], $irs;
376 root 1.1 if ($pos >= 0) {
377     $pos += length $irs;
378 root 1.6 my $res = substr $_[0][3], 0, $pos;
379     substr ($_[0][3], 0, $pos) = "";
380 root 1.1 return $res;
381     }
382 root 1.6
383     my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
384 root 1.1 if (defined $r) {
385     return undef unless $r;
386 root 1.6 } elsif ($! != Errno::EAGAIN || !&readable) {
387 root 1.1 return undef;
388     }
389     }
390     }
391    
392     1;
393    
394 root 1.40 =back
395    
396 root 1.1 =head1 BUGS
397    
398     - Perl's IO-Handle model is THE bug.
399    
400     =head1 AUTHOR
401    
402 root 1.31 Marc Lehmann <schmorp@schmorp.de>
403 root 1.29 http://home.schmorp.de/
404 root 1.1
405     =cut
406