ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Event/Handle.pm
Revision: 1.42
Committed: Sun Nov 5 02:01:24 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.41: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking io with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module implements IO-handles in a coroutine-compatible way, that is,
12 other coroutines can run while reads or writes block on the handle. It
13 does NOT inherit from IO::Handle but uses tied objects.
14
15 =over 4
16
17 =cut
18
19 package Coro::Handle;
20
21 BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") }
22
23 use Errno ();
24 use base 'Exporter';
25
26 $VERSION = 1.9;
27
28 @EXPORT = qw(unblock);
29
30 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
31
32 Create a new non-blocking io-handle using the given
33 perl-filehandle. Returns undef if no fhandle is given. The only other
34 supported argument is "timeout", which sets a timeout for each operation.
35
36 =cut
37
38 sub new_from_fh {
39 my $class = shift;
40 my $fh = shift or return;
41 my $self = do { local *Coro::Handle };
42
43 my ($package, $filename, $line) = caller;
44 $filename =~ s/^.*[\/\\]//;
45
46 tie $self, Coro::Handle::FH, fh => $fh, desc => "$filename:$line", @_;
47
48 my $_fh = select bless \$self, ref $class ? ref $class : $class; $| = 1; select $_fh;
49 }
50
51 =item $fh = unblock $fh
52
53 This is a convinience function that just calls C<new_from_fh> on the given
54 filehandle. Use it to replace a normal perl filehandle by a non-blocking
55 equivalent.
56
57 =cut
58
59 sub unblock($) {
60 new_from_fh Coro::Handle $_[0];
61 }
62
63 =item $fh->writable, $fh->readable
64
65 Wait until the filehandle is readable or writable (and return true) or
66 until an error condition happens (and return false).
67
68 =cut
69
70 sub readable { Coro::Handle::FH::readable(tied ${$_[0]}) }
71 sub writable { Coro::Handle::FH::writable(tied ${$_[0]}) }
72
73 =item $fh->readline([$terminator])
74
75 Like the builtin of the same name, but allows you to specify the input
76 record separator in a coroutine-safe manner (i.e. not using a global
77 variable).
78
79 =cut
80
81 sub readline { tied(${+shift})->READLINE(@_) }
82
83 =item $fh->autoflush([...])
84
85 Always returns true, arguments are being ignored (exists for compatibility
86 only). Might change in the future.
87
88 =cut
89
90 sub autoflush { !0 }
91
92 =item $fh->fileno, $fh->close,
93 $fh->read, $fh->sysread, $fh->syswrite,
94 $fh->print, $fh->printf
95
96 Work like their function equivalents (except read, which works like
97 sysread. You should not use the read function with Coro::Handles, it will
98 work but it's not efficient).
99
100 =cut
101
102 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
103 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
104 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
105 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
106 sub printf { Coro::Handle::FH::PRINTF(tied ${+shift}, @_) }
107 sub fileno { Coro::Handle::FH::FILENO(tied ${$_[0]}) }
108 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
109
110 =item $fh->timeout([...])
111
112 The optional agrument sets the new timeout (in seconds) for this
113 handle. Returns the current (new) value.
114
115 C<0> is a valid timeout, use C<undef> to disable the timeout.
116
117 =cut
118
119 sub timeout {
120 my $self = tied(${$_[0]});
121 if (@_ > 1) {
122 $self->[2] = $_[1];
123 $self->[5]->timeout($_[1]) if $self->[5];
124 $self->[6]->timeout($_[1]) if $self->[6];
125 }
126 $self->[2];
127 }
128
129 =item $fh->fh
130
131 Returns the "real" (non-blocking) filehandle. Use this if you want to
132 do operations on the file handle you cannot do using the Coro::Handle
133 interface.
134
135 =item $fh->rbuf
136
137 Returns the current contents of the read buffer (this is an lvalue, so you
138 can change the read buffer if you like).
139
140 You can use this function to implement your own optimized reader when neither
141 readline nor sysread are viable candidates, like this:
142
143 # first get the _real_ non-blocking filehandle
144 # and fetch a reference to the read buffer
145 my $nb_fh = $fh->fh;
146 my $buf = \$fh->rbuf;
147
148 for(;;) {
149 # now use buffer contents, modifying
150 # if necessary to reflect the removed data
151
152 last if $$buf ne ""; # we have leftover data
153
154 # read another buffer full of data
155 $fh->readable or die "end of file";
156 sysread $nb_fh, $$buf, 8192;
157 }
158
159 =cut
160
161 sub fh {
162 (tied ${$_[0]})->[0];
163 }
164
165 sub rbuf : lvalue {
166 (tied ${$_[0]})->[3];
167 }
168
169 sub DESTROY {
170 # nop
171 }
172
173 sub AUTOLOAD {
174 my $self = tied ${$_[0]};
175
176 (my $func = $AUTOLOAD) =~ s/^(.*):://;
177
178 my $forward = UNIVERSAL::can $self->[7], $func;
179
180 $forward or
181 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
182
183 goto &$forward;
184 }
185
186 package Coro::Handle::FH;
187
188 BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") }
189
190 use Fcntl ();
191 use Errno ();
192 use Carp 'croak';
193
194 use Coro::Event;
195 use Event::Watcher qw(R W E);
196
197 # formerly a hash, but we are speed-critical, so try
198 # to be faster even if it hurts.
199 #
200 # 0 FH
201 # 1 desc
202 # 2 timeout
203 # 3 rb
204 # 4 wb # unused
205 # 5 rw
206 # 6 ww
207 # 7 forward class
208
209 sub TIEHANDLE {
210 my ($class, %arg) = @_;
211
212 my $self = bless [], $class;
213 $self->[0] = $arg{fh};
214 $self->[1] = $arg{desc};
215 $self->[2] = $arg{timeout};
216 $self->[3] = "";
217 $self->[4] = "";
218 $self->[7] = $arg{forward_class};
219
220 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
221 or croak "fcntl(O_NONBLOCK): $!";
222
223 $self
224 }
225
226 sub cleanup {
227 $_[0][3] = "";
228 ($_[0][5])->cancel if defined $_[0][5]; $_[0][5] = undef;
229
230 $_[0][4] = "";
231 ($_[0][6])->cancel if defined $_[0][6]; $_[0][6] = undef;
232 }
233
234 sub OPEN {
235 &cleanup;
236 my $self = shift;
237 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
238 : open $self->[0], $_[0], $_[1], $_[2];
239 if ($r) {
240 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
241 or croak "fcntl(O_NONBLOCK): $!";
242 }
243 $r;
244 }
245
246 sub PRINT {
247 WRITE(shift, join "", @_);
248 }
249
250 sub PRINTF {
251 WRITE(shift, sprintf(shift,@_));
252 }
253
254 sub GETC {
255 my $buf;
256 READ($_[0], $buf, 1);
257 $buf;
258 }
259
260 sub BINMODE {
261 binmode $_[0][0];
262 }
263
264 sub TELL {
265 use Carp (); Carp::croak("Coro::Handle's don't support tell()");
266 }
267
268 sub SEEK {
269 use Carp (); Carp::croak("Coro::Handle's don't support seek()");
270 }
271
272 sub EOF {
273 use Carp (); Carp::croak("Coro::Handle's don't support eof()");
274 }
275
276 sub CLOSE {
277 &cleanup;
278 close $_[0][0];
279 }
280
281 sub DESTROY {
282 &cleanup;
283 }
284
285 sub FILENO {
286 fileno $_[0][0];
287 }
288
289 # seems to be called for stringification (how weird), at least
290 # when DumpValue::dumpValue is used to print this.
291 sub FETCH {
292 "$_[0]<$_[0][1]>";
293 }
294
295 sub readable {
296 ($_[0][5] ||= Coro::Event->io(
297 fd => $_[0][0],
298 desc => "$_[0][1] R",
299 timeout => $_[0][2],
300 poll => R+E,
301 ))->next->{Coro::Event}[5] & R;
302 }
303
304 sub writable {
305 ($_[0][6] ||= Coro::Event->io(
306 fd => $_[0][0],
307 desc => "$_[0][1] W",
308 timeout => $_[0][2],
309 poll => W+E,
310 ))->next->{Coro::Event}[5] & W;
311 }
312
313 sub WRITE {
314 my $len = defined $_[2] ? $_[2] : length $_[1];
315 my $ofs = $_[3];
316 my $res = 0;
317
318 while() {
319 my $r = syswrite $_[0][0], $_[1], $len, $ofs;
320 if (defined $r) {
321 $len -= $r;
322 $ofs += $r;
323 $res += $r;
324 last unless $len;
325 } elsif ($! != Errno::EAGAIN) {
326 last;
327 }
328 last unless &writable;
329 }
330
331 return $res;
332 }
333
334 sub READ {
335 my $len = $_[2];
336 my $ofs = $_[3];
337 my $res = 0;
338
339 # first deplete the read buffer
340 if (length $_[0][3]) {
341 my $l = length $_[0][3];
342 if ($l <= $len) {
343 substr($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
344 $len -= $l;
345 $ofs += $l;
346 $res += $l;
347 return $res unless $len;
348 } else {
349 substr($_[1], $ofs) = substr($_[0][3], 0, $len);
350 substr($_[0][3], 0, $len) = "";
351 return $len;
352 }
353 }
354
355 while() {
356 my $r = sysread $_[0][0], $_[1], $len, $ofs;
357 if (defined $r) {
358 $len -= $r;
359 $ofs += $r;
360 $res += $r;
361 last unless $len && $r;
362 } elsif ($! != Errno::EAGAIN) {
363 last;
364 }
365 last unless &readable;
366 }
367
368 return $res;
369 }
370
371 sub READLINE {
372 my $irs = @_ > 1 ? $_[1] : $/;
373
374 while() {
375 my $pos = index $_[0][3], $irs;
376 if ($pos >= 0) {
377 $pos += length $irs;
378 my $res = substr $_[0][3], 0, $pos;
379 substr ($_[0][3], 0, $pos) = "";
380 return $res;
381 }
382
383 my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
384 if (defined $r) {
385 return undef unless $r;
386 } elsif ($! != Errno::EAGAIN || !&readable) {
387 return undef;
388 }
389 }
390 }
391
392 1;
393
394 =back
395
396 =head1 BUGS
397
398 - Perl's IO-Handle model is THE bug.
399
400 =head1 AUTHOR
401
402 Marc Lehmann <schmorp@schmorp.de>
403 http://home.schmorp.de/
404
405 =cut
406