ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.16
Committed: Fri Dec 1 19:41:06 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.15: +34 -28 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking io with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module implements IO-handles in a coroutine-compatible way, that is,
12 other coroutines can run while reads or writes block on the handle.
13
14 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
15 data, allowing other coroutines to run while one coroutine waits for I/O.
16
17 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
18
19 =over 4
20
21 =cut
22
23 package Coro::Handle;
24
25 no warnings;
26 use strict;
27
28 use Carp ();
29 use Errno ();
30 use base 'Exporter';
31
32 our $VERSION = 2.5;
33 our @EXPORT = qw(unblock);
34
35 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
36
37 Create a new non-blocking io-handle using the given
38 perl-filehandle. Returns undef if no fhandle is given. The only other
39 supported argument is "timeout", which sets a timeout for each operation.
40
41 =cut
42
43 sub new_from_fh {
44 my $class = shift;
45 my $fh = shift or return;
46 my $self = do { local *Coro::Handle };
47
48 my ($package, $filename, $line) = caller;
49 $filename =~ s/^.*[\/\\]//;
50
51 tie $self, 'Coro::Handle::FH', fh => $fh, desc => "$filename:$line", @_;
52
53 my $_fh = select bless \$self, ref $class ? ref $class : $class; $| = 1; select $_fh;
54 }
55
56 =item $fh = unblock $fh
57
58 This is a convinience function that just calls C<new_from_fh> on the given
59 filehandle. Use it to replace a normal perl filehandle by a non-blocking
60 equivalent.
61
62 =cut
63
64 sub unblock($) {
65 new_from_fh Coro::Handle $_[0];
66 }
67
68 =item $fh->writable, $fh->readable
69
70 Wait until the filehandle is readable or writable (and return true) or
71 until an error condition happens (and return false).
72
73 =cut
74
75 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
76 sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
77
78 =item $fh->readline ([$terminator])
79
80 Like the builtin of the same name, but allows you to specify the input
81 record separator in a coroutine-safe manner (i.e. not using a global
82 variable).
83
84 =cut
85
86 sub readline { tied(${+shift})->READLINE(@_) }
87
88 =item $fh->autoflush ([...])
89
90 Always returns true, arguments are being ignored (exists for compatibility
91 only). Might change in the future.
92
93 =cut
94
95 sub autoflush { !0 }
96
97 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
98
99 Work like their function equivalents (except read, which works like
100 sysread. You should not use the read function with Coro::Handles, it will
101 work but it's not efficient).
102
103 =cut
104
105 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
106 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
107 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
108 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
109 sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
110 sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
111 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
112 sub blocking { !0 } # this handler always blocks the caller
113
114 sub partial {
115 my $obj = tied ${$_[0]};
116
117 my $retval = $obj->[8];
118 $obj->[8] = $_[1] if @_ > 1;
119 $retval
120 }
121
122 =item $fh->timeout([...])
123
124 The optional argument sets the new timeout (in seconds) for this
125 handle. Returns the current (new) value.
126
127 C<0> is a valid timeout, use C<undef> to disable the timeout.
128
129 =cut
130
131 sub timeout {
132 my $self = tied(${$_[0]});
133 if (@_ > 1) {
134 $self->[2] = $_[1];
135 $self->[5]->timeout($_[1]) if $self->[5];
136 $self->[6]->timeout($_[1]) if $self->[6];
137 }
138 $self->[2];
139 }
140
141 =item $fh->fh
142
143 Returns the "real" (non-blocking) filehandle. Use this if you want to
144 do operations on the file handle you cannot do using the Coro::Handle
145 interface.
146
147 =item $fh->rbuf
148
149 Returns the current contents of the read buffer (this is an lvalue, so you
150 can change the read buffer if you like).
151
152 You can use this function to implement your own optimized reader when neither
153 readline nor sysread are viable candidates, like this:
154
155 # first get the _real_ non-blocking filehandle
156 # and fetch a reference to the read buffer
157 my $nb_fh = $fh->fh;
158 my $buf = \$fh->rbuf;
159
160 for(;;) {
161 # now use buffer contents, modifying
162 # if necessary to reflect the removed data
163
164 last if $$buf ne ""; # we have leftover data
165
166 # read another buffer full of data
167 $fh->readable or die "end of file";
168 sysread $nb_fh, $$buf, 8192;
169 }
170
171 =cut
172
173 sub fh {
174 (tied ${$_[0]})->[0];
175 }
176
177 sub rbuf : lvalue {
178 (tied ${$_[0]})->[3];
179 }
180
181 sub DESTROY {
182 # nop
183 }
184
185 our $AUTOLOAD;
186
187 sub AUTOLOAD {
188 my $self = tied ${$_[0]};
189
190 (my $func = $AUTOLOAD) =~ s/^(.*):://;
191
192 my $forward = UNIVERSAL::can $self->[7], $func;
193
194 $forward or
195 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
196
197 goto &$forward;
198 }
199
200 package Coro::Handle::FH;
201
202 no warnings;
203 use strict;
204
205 use Fcntl ();
206 use Errno ();
207 use Carp 'croak';
208
209 use AnyEvent;
210
211 # formerly a hash, but we are speed-critical, so try
212 # to be faster even if it hurts.
213 #
214 # 0 FH
215 # 1 desc
216 # 2 timeout
217 # 3 rb
218 # 4 wb # unused
219 # 5 unused
220 # 6 unused
221 # 7 forward class
222 # 8 blocking
223
224 sub TIEHANDLE {
225 my ($class, %arg) = @_;
226
227 my $self = bless [], $class;
228 $self->[0] = $arg{fh};
229 $self->[1] = $arg{desc};
230 $self->[2] = $arg{timeout};
231 $self->[3] = "";
232 $self->[4] = "";
233 $self->[7] = $arg{forward_class};
234 $self->[8] = $arg{partial};
235
236 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
237 or croak "fcntl(O_NONBLOCK): $!";
238
239 $self
240 }
241
242 sub cleanup {
243 $_[0][3] = "";
244 $_[0][4] = "";
245 }
246
247 sub OPEN {
248 &cleanup;
249 my $self = shift;
250 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
251 : open $self->[0], $_[0], $_[1], $_[2];
252 if ($r) {
253 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
254 or croak "fcntl(O_NONBLOCK): $!";
255 }
256 $r;
257 }
258
259 sub PRINT {
260 WRITE (shift, join "", @_);
261 }
262
263 sub PRINTF {
264 WRITE (shift, sprintf shift,@_);
265 }
266
267 sub GETC {
268 my $buf;
269 READ ($_[0], $buf, 1);
270 $buf;
271 }
272
273 sub BINMODE {
274 binmode $_[0][0];
275 }
276
277 sub TELL {
278 Carp::croak "Coro::Handle's don't support tell()";
279 }
280
281 sub SEEK {
282 Carp::croak "Coro::Handle's don't support seek()";
283 }
284
285 sub EOF {
286 Carp::croak "Coro::Handle's don't support eof()";
287 }
288
289 sub CLOSE {
290 &cleanup;
291 close $_[0][0];
292 }
293
294 sub DESTROY {
295 &cleanup;
296 }
297
298 sub FILENO {
299 fileno $_[0][0];
300 }
301
302 # seems to be called for stringification (how weird), at least
303 # when DumpValue::dumpValue is used to print this.
304 sub FETCH {
305 "$_[0]<$_[0][1]>";
306 }
307
308 sub readable {
309 my $current = $Coro::current;
310 my $io = 1;
311
312 my $w = AnyEvent->io (
313 desc => "$_[0][1] read watcher",
314 fh => $_[0][0],
315 poll => 'r',
316 cb => sub {
317 $current->ready;
318 undef $current;
319 },
320 );
321
322 my $t = (defined $_[0][2]) && AnyEvent->timer (
323 desc => "fh $_[0][1] read timeout",
324 after => $_[0][2],
325 cb => sub {
326 $io = 0;
327 $current->ready;
328 undef $current;
329 },
330 );
331
332 &Coro::schedule;
333 &Coro::schedule while $current;
334
335 $io
336 }
337
338 sub writable {
339 my $current = $Coro::current;
340 my $io = 1;
341
342 my $w = AnyEvent->io (
343 desc => "fh $_[0][1] write watcher",
344 fh => $_[0][0],
345 poll => 'w',
346 cb => sub {
347 $current->ready;
348 undef $current;
349 },
350 );
351
352 my $t = (defined $_[0][2]) && AnyEvent->timer (
353 desc => "fh $_[0][1] write timeout",
354 after => $_[0][2],
355 cb => sub {
356 $io = 0;
357 $current->ready;
358 undef $current;
359 },
360 );
361
362 &Coro::schedule while $current;
363
364 $io
365 }
366
367 sub WRITE {
368 my $len = defined $_[2] ? $_[2] : length $_[1];
369 my $ofs = $_[3];
370 my $res = 0;
371
372 while () {
373 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
374 if (defined $r) {
375 $len -= $r;
376 $ofs += $r;
377 $res += $r;
378 last unless $len;
379 } elsif ($! != Errno::EAGAIN) {
380 last;
381 }
382 last unless &writable;
383 }
384
385 return $res;
386 }
387
388 sub READ {
389 my $len = $_[2];
390 my $ofs = $_[3];
391 my $res = 0;
392
393 # first deplete the read buffer
394 if (length $_[0][3]) {
395 my $l = length $_[0][3];
396 if ($l <= $len) {
397 substr($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
398 $len -= $l;
399 $ofs += $l;
400 $res += $l;
401 return $res unless $len;
402 } else {
403 substr($_[1], $ofs) = substr($_[0][3], 0, $len);
404 substr($_[0][3], 0, $len) = "";
405 return $len;
406 }
407 }
408
409 while() {
410 my $r = sysread $_[0][0], $_[1], $len, $ofs;
411 if (defined $r) {
412 $len -= $r;
413 $ofs += $r;
414 $res += $r;
415 last unless $len && $r;
416 } elsif ($! != Errno::EAGAIN) {
417 last;
418 }
419 last if $_[0][8] || !&readable;
420 }
421
422 return $res;
423 }
424
425 sub READLINE {
426 my $irs = @_ > 1 ? $_[1] : $/;
427
428 while() {
429 my $pos = index $_[0][3], $irs;
430 if ($pos >= 0) {
431 $pos += length $irs;
432 my $res = substr $_[0][3], 0, $pos;
433 substr ($_[0][3], 0, $pos) = "";
434 return $res;
435 }
436
437 my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
438 if (defined $r) {
439 return undef unless $r;
440 } elsif ($! != Errno::EAGAIN || !&readable) {
441 return undef;
442 }
443 }
444 }
445
446 1;
447
448 =back
449
450 =head1 BUGS
451
452 - Perl's IO-Handle model is THE bug.
453
454 =head1 AUTHOR
455
456 Marc Lehmann <schmorp@schmorp.de>
457 http://home.schmorp.de/
458
459 =cut
460